Skip to content

Commit

Permalink
Merge pull request #54 from constellation-rs/unique-pids
Browse files Browse the repository at this point in the history
ensure pids are unique
  • Loading branch information
alecmocatta committed Jan 21, 2020
2 parents 742da09 + 04febb5 commit ec4c2b5
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 212 deletions.
5 changes: 5 additions & 0 deletions .editorconfig
@@ -0,0 +1,5 @@
[*]
tab_width = 4

[.*]
tab_width = 4
10 changes: 4 additions & 6 deletions .mergify.yml
Expand Up @@ -4,11 +4,10 @@ pull_request_rules:
- base=master
- status-success=tests
- status-success=ci/dockercloud
- label!=work-in-progress
- "label!=work in progress"
- "#approved-reviews-by>=1"
- "#review-requested=0"
- "#changes-requested-reviews-by=0"
- "#commented-reviews-by=0"
actions:
merge:
method: merge
Expand All @@ -19,11 +18,10 @@ pull_request_rules:
- base=master
- status-success=tests
- status-success=ci/dockercloud
- label!=work-in-progress
- "label!=work in progress"
- author=alecmocatta # https://github.com/Mergifyio/mergify-engine/issues/451
- "#review-requested=0"
- "#changes-requested-reviews-by=0"
- "#commented-reviews-by=0"
actions:
merge:
method: merge
Expand All @@ -38,10 +36,10 @@ pull_request_rules:
- "title~=^WIP: .*"
actions:
label:
add: ["work-in-progress"]
add: ["work in progress"]
- name: auto remove wip label
conditions:
- "-title~=^WIP: .*"
actions:
label:
remove: ["work-in-progress"]
remove: ["work in progress"]
1 change: 0 additions & 1 deletion constellation-internal/Cargo.toml
Expand Up @@ -18,7 +18,6 @@ distribute_binaries = []
no_alloc = ["alloc_counter"]

[dependencies]
aes = "0.3"
alloc_counter = { optional = true, version = "0.0", default-features = false, features = ["std", "no_alloc"] }
ansi_term = "0.12"
bincode = "1.0"
Expand Down
94 changes: 94 additions & 0 deletions constellation-internal/src/ext.rs
@@ -1,3 +1,97 @@
mod owningorref {
use serde::{de::Deserializer, ser::Serializer, Deserialize, Serialize};
use std::{
fmt::{self, Debug}, ops::Deref
};

pub enum OwningOrRef<'a, T>
where
T: Deref,
{
Owning(T),
Ref(&'a T::Target),
}

impl<'a, T> OwningOrRef<'a, T>
where
T: Deref,
{
pub fn into_inner(self) -> Option<T> {
match self {
Self::Owning(a) => Some(a),
Self::Ref(_) => None,
}
}
}

impl<'a, T> Clone for OwningOrRef<'a, T>
where
T: Deref + Clone,
{
fn clone(&self) -> Self {
match self {
Self::Owning(a) => Self::Owning(a.clone()),
Self::Ref(a) => Self::Ref(Clone::clone(a)),
}
}
}
impl<'a, T> Copy for OwningOrRef<'a, T> where T: Deref + Copy {}

impl<'a, T> Debug for OwningOrRef<'a, T>
where
T: Deref + Debug,
T::Target: Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Owning(a) => a.fmt(f),
Self::Ref(a) => a.fmt(f),
}
}
}

impl<'a, T> Deref for OwningOrRef<'a, T>
where
T: Deref,
{
type Target = T::Target;

fn deref(&self) -> &Self::Target {
match self {
Self::Owning(a) => &**a,
Self::Ref(a) => a,
}
}
}

impl<'a, T> Serialize for OwningOrRef<'a, T>
where
T: Deref,
T::Target: Serialize,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
(**self).serialize(serializer)
}
}
impl<'de, 'a, T> Deserialize<'de> for OwningOrRef<'a, T>
where
T: Deref + Deserialize<'de>,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
T::deserialize(deserializer).map(Self::Owning)
}
}
}
pub use self::owningorref::OwningOrRef;

//////////////////////////////////////////////////////////////////////////////////////////////////////////////////

mod bufferedstream {
use std::io::{self, Read, Write};
#[derive(Debug)]
Expand Down
46 changes: 13 additions & 33 deletions constellation-internal/src/format.rs
@@ -1,10 +1,10 @@
use super::{DeployOutputEvent, Pid, ToHex};
use aes::{block_cipher_trait::BlockCipher, Aes128};
use rand::{self, Rng, SeedableRng};
use std::{
borrow, fmt, fs, io::{self, Write}, os::{self, unix::io::IntoRawFd}
borrow, convert::TryInto, fmt, fs, io::{self, Write}, os::{self, unix::io::IntoRawFd}
};

use super::{DeployOutputEvent, Pid};

const STDOUT: os::unix::io::RawFd = 1;
const STDERR: os::unix::io::RawFd = 2;

Expand Down Expand Up @@ -252,19 +252,16 @@ impl Style {

pub(crate) fn pretty_pid(
pid: &Pid, bold: bool, style_support: StyleSupport,
) -> ansi_term::ANSIGenericString<str> {
// impl std::fmt::Display + 'a {
let key: [u8; 16] = [0; 16];

let bytes = encrypt(pid.0, key);
let decrypted_data = decrypt(bytes, key);
assert_eq!(&pid.0, &decrypted_data);

let x = bytes.to_hex().take(7).collect::<String>();
let mut rng = rand::rngs::SmallRng::from_seed([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7], bytes[8],
bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15],
]);
) -> impl std::fmt::Display {
let mut format = pid.format();
let x = format.clone().collect::<String>();
let mut rng = rand::rngs::SmallRng::from_seed(
(0..16)
.map(|_| format.next().map_or(0, |x| (x as u32).try_into().unwrap()))
.collect::<Vec<u8>>()[..]
.try_into()
.unwrap(),
);
let (r, g, b) = loop {
let (r_, g_, b_): (u8, u8, u8) = rng.gen();
let (r, g, b) = (u16::from(r_), u16::from(g_), u16::from(b_));
Expand All @@ -279,20 +276,3 @@ pub(crate) fn pretty_pid(
}
color.paint(x)
}

//////////////////////////////////////////////////////////////////////////////////////////////////////////////////

pub(crate) fn encrypt(input: [u8; 16], key: [u8; 16]) -> [u8; 16] {
let key = key.into();
let mut block = input.into();
let cipher = Aes128::new(&key);
cipher.encrypt_block(&mut block);
block.into()
}
pub(crate) fn decrypt(input: [u8; 16], key: [u8; 16]) -> [u8; 16] {
let key = key.into();
let mut block = input.into();
let cipher = Aes128::new(&key);
cipher.decrypt_block(&mut block);
block.into()
}
61 changes: 21 additions & 40 deletions constellation-internal/src/lib.rs
Expand Up @@ -33,7 +33,7 @@ use nix::{fcntl, libc, sys::signal, unistd};
use palaver::file::{copy, memfd_create};
use serde::{Deserialize, Serialize};
use std::{
convert::{TryFrom, TryInto}, env, error::Error, ffi::{CString, OsString}, fmt::{self, Debug, Display}, fs::File, io::{self, Read, Seek, Write}, net, ops, os::unix::{
convert::{TryFrom, TryInto}, env, error::Error, ffi::{CString, OsString}, fmt::{self, Debug, Display}, fs::File, io::{self, Read, Seek, Write}, net::{IpAddr, SocketAddr}, ops, os::unix::{
ffi::OsStringExt, io::{AsRawFd, FromRawFd, IntoRawFd}
}, process::abort, sync::{Arc, Mutex}
};
Expand All @@ -60,53 +60,34 @@ pub use format::*;
///
/// All inter-process communication occurs after [Sender](Sender)s and [Receiver](Receiver)s have been created with their remotes' `Pid`s. Thus `Pid`s are the primary form of addressing in a `constellation` cluster.
#[derive(Copy, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub struct Pid([u8; 16]);
pub struct Pid {
ip: IpAddr,
port: u16,
key: u128,
}
impl Pid {
pub(crate) fn new(ip: net::IpAddr, port: u16) -> Self {
match ip {
net::IpAddr::V4(ip) => {
let ip = ip.octets();
Self([
ip[0],
ip[1],
ip[2],
ip[3],
(port >> 8).try_into().unwrap(),
(port & 0xff).try_into().unwrap(),
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
])
}
_ => unimplemented!(),
}
pub(crate) fn new(ip: IpAddr, port: u16) -> Self {
assert_ne!(port, 0);
let key = rand::random();
Self { ip, port, key }
}

pub(crate) fn addr(&self) -> net::SocketAddr {
net::SocketAddr::new(
[self.0[0], self.0[1], self.0[2], self.0[3]].into(),
((u16::from(self.0[4])) << 8) | (u16::from(self.0[5])),
)
pub(crate) fn addr(&self) -> SocketAddr {
SocketAddr::new(self.ip, self.port)
}

fn format<'a>(&'a self) -> impl Iterator<Item = char> + 'a {
let key: [u8; 16] = [0; 16];
encrypt(self.0, key)
fn format<'a>(&'a self) -> impl Iterator<Item = char> + Clone + 'a {
self.key
.to_le_bytes()
.to_hex()
.take(7)
.collect::<Vec<_>>()
.into_iter()
}
}
impl Display for Pid {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.format().take(7).collect::<String>())
write!(f, "{}", self.format().collect::<String>())
}
}
impl Debug for Pid {
Expand All @@ -117,16 +98,16 @@ impl Debug for Pid {
}
}
pub trait PidInternal {
fn new(ip: net::IpAddr, port: u16) -> Pid;
fn addr(&self) -> net::SocketAddr;
fn new(ip: IpAddr, port: u16) -> Pid;
fn addr(&self) -> SocketAddr;
}
#[doc(hidden)]
impl PidInternal for Pid {
fn new(ip: net::IpAddr, port: u16) -> Self {
fn new(ip: IpAddr, port: u16) -> Self {
Self::new(ip, port)
}

fn addr(&self) -> net::SocketAddr {
fn addr(&self) -> SocketAddr {
Self::addr(self)
}
}
Expand Down
25 changes: 23 additions & 2 deletions constellation-internal/src/msg.rs
@@ -1,7 +1,28 @@
use crate::Resources;
use ::serde::{Deserialize, Serialize};
#[cfg(not(feature = "distribute_binaries"))]
use std::marker::PhantomData;
use std::{ffi::OsString, net::SocketAddr};
use std::{
ffi::OsString, net::{IpAddr, SocketAddr}
};

use crate::{Pid, Resources};

#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct SchedulerArg {
pub ip: IpAddr,
pub scheduler: Pid,
}

#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct SpawnArg<T> {
pub bridge: Pid,
pub spawn: Option<SpawnArgSub<T>>,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct SpawnArgSub<T> {
pub parent: Pid,
pub f: T,
}

#[derive(Debug)]
pub struct FabricRequest<A, B>
Expand Down
10 changes: 8 additions & 2 deletions src/bin/constellation/bridge.rs
Expand Up @@ -37,7 +37,9 @@ use std::{

use constellation::FutureExt1;
use constellation_internal::{
abort_on_unwind, abort_on_unwind_1, forbid_alloc, map_bincode_err, msg::{bincode_deserialize_from, bincode_serialize_into, BridgeRequest, FabricRequest}, BufferedStream, DeployInputEvent, DeployOutputEvent, ExitStatus, Fd, Pid, ProcessInputEvent, ProcessOutputEvent, Resources, TrySpawnError
abort_on_unwind, abort_on_unwind_1, forbid_alloc, map_bincode_err, msg::{
bincode_deserialize_from, bincode_serialize_into, BridgeRequest, FabricRequest, SpawnArg
}, BufferedStream, DeployInputEvent, DeployOutputEvent, ExitStatus, Fd, Pid, ProcessInputEvent, ProcessOutputEvent, Resources, TrySpawnError
};

const SCHEDULER_FD: Fd = 4;
Expand Down Expand Up @@ -278,7 +280,11 @@ fn manage_connection(
.map_err(map_bincode_err)
.map_err(drop)?;
assert_eq!(request.arg.len(), 0);
bincode::serialize_into(&mut request.arg, &constellation::pid()).unwrap();
let spawn_arg = SpawnArg::<()> {
bridge: constellation::pid(),
spawn: None,
};
bincode::serialize_into(&mut request.arg, &spawn_arg).unwrap();
let resources = request
.resources
.or_else(|| {
Expand Down
5 changes: 4 additions & 1 deletion src/bin/constellation/main.rs
Expand Up @@ -84,7 +84,7 @@ use palaver::{
file::{copy_fd, execve, fexecve, move_fds}, socket::{socket, SockFlag}, valgrind
};
use std::{
collections::HashMap, convert::{TryFrom, TryInto}, env, io, net::{IpAddr, SocketAddr, TcpListener}, process, sync, thread
collections::HashMap, convert::{TryFrom, TryInto}, env, io, io::Seek, net::{IpAddr, SocketAddr, TcpListener}, process, sync, thread
};
#[cfg(unix)]
use std::{
Expand Down Expand Up @@ -258,6 +258,9 @@ fn main() {
}
.port();
let process_id = Pid::new(ip, port);
let _ = (&request.arg).seek(std::io::SeekFrom::End(0)).unwrap();
bincode::serialize_into(&request.arg, &process_id).unwrap();
let _ = (&request.arg).seek(std::io::SeekFrom::Start(0)).unwrap();

let args = request
.args
Expand Down

0 comments on commit ec4c2b5

Please sign in to comment.