Skip to content

Commit

Permalink
add name for children (#205)
Browse files Browse the repository at this point in the history
add rustfmt.toml
fix two warnings

Co-authored-by: Mahmut Bulut <vertexclique@gmail.com>
  • Loading branch information
attila-lin and vertexclique committed May 11, 2020
1 parent f27dc39 commit 4af8a48
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 21 deletions.
Empty file added rustfmt.toml
Empty file.
2 changes: 1 addition & 1 deletion src/bastion-executor/src/load_balancer.rs
Expand Up @@ -164,7 +164,7 @@ pub fn stats() -> &'static Stats {
#[inline]
pub fn core_retrieval() -> &'static usize {
lazy_static! {
static ref CORE_COUNT: usize = { placement::get_core_ids().unwrap().len() };
static ref CORE_COUNT: usize = placement::get_core_ids().unwrap().len();
}

&*CORE_COUNT
Expand Down
2 changes: 1 addition & 1 deletion src/bastion-executor/src/sleepers.rs
Expand Up @@ -37,7 +37,7 @@ impl Sleepers {

if !self.notified.swap(false, Ordering::SeqCst) {
*sleep += 1;
self.wake.wait(sleep).unwrap();
let _ = self.wake.wait(sleep).unwrap();
}
}

Expand Down
14 changes: 8 additions & 6 deletions src/bastion/examples/broadcast_message.rs
Expand Up @@ -47,9 +47,8 @@ fn response_supervisor(supervisor: Supervisor) -> Supervisor {
}

fn input_group(children: Children) -> Children {
children
.with_redundancy(1)
.with_exec(move |ctx: BastionContext| async move {
children.with_name("input").with_redundancy(1).with_exec(
move |ctx: BastionContext| async move {
println!("[Input] Worker started!");

let data = vec!["A B C", "A C C", "B C C"];
Expand All @@ -61,11 +60,13 @@ fn input_group(children: Children) -> Children {
}

Ok(())
})
},
)
}

fn process_group(children: Children) -> Children {
children
.with_name("process")
.with_redundancy(3)
.with_dispatcher(
// Declare a dispatcher to use. All instantiated actors will be registered in
Expand Down Expand Up @@ -94,7 +95,7 @@ fn process_group(children: Children) -> Children {
*value += 1;
}

println!("[Processing] Worker #{:?} processed data. Result: `{:?}`", ctx.current().id(), counter);
println!("[Processing] Worker {} #{:?} processed data. Result: `{:?}`", ctx.current().name(), ctx.current().id(), counter);

// Push hashmap with data to the next actor group
let group_name = "Response".to_string();
Expand All @@ -113,6 +114,7 @@ fn process_group(children: Children) -> Children {

fn response_group(children: Children) -> Children {
children
.with_name("response")
.with_redundancy(1)
.with_dispatcher(
// We will re-use the dispatcher to make the example easier to understand
Expand All @@ -137,7 +139,7 @@ fn response_group(children: Children) -> Children {
let message = Arc::try_unwrap(raw_message).unwrap();
msg! { message,
ref data: HashMap<&str, u32> => {
println!("[Response] Worker received `{:?}`", data);
println!("[Response] Worker {} received `{:?}`", ctx.current().name(), data);

for (key, value) in data.iter() {
let current_value = counter.entry(key).or_insert(0);
Expand Down
20 changes: 18 additions & 2 deletions src/bastion/src/child_ref.rs
Expand Up @@ -16,12 +16,23 @@ use std::sync::Arc;
pub struct ChildRef {
id: BastionId,
sender: Sender,
name: String,
path: Arc<BastionPath>,
}

impl ChildRef {
pub(crate) fn new(id: BastionId, sender: Sender, path: Arc<BastionPath>) -> ChildRef {
ChildRef { id, sender, path }
pub(crate) fn new(
id: BastionId,
sender: Sender,
name: String,
path: Arc<BastionPath>,
) -> ChildRef {
ChildRef {
id,
sender,
name,
path,
}
}

/// Returns the identifier of the children group element this
Expand Down Expand Up @@ -284,6 +295,11 @@ impl ChildRef {
pub fn path(&self) -> &Arc<BastionPath> {
&self.path
}

/// Return the [`name`] of the child
pub fn name(&self) -> &str {
&self.name
}
}

impl PartialEq for ChildRef {
Expand Down
28 changes: 24 additions & 4 deletions src/bastion/src/children.rs
Expand Up @@ -89,6 +89,8 @@ pub struct Children {
started: bool,
// List of dispatchers attached to each actor in the group.
dispatchers: Vec<Arc<Box<Dispatcher>>>,
// The name of children
name: Option<String>,
}

impl Children {
Expand All @@ -101,6 +103,7 @@ impl Children {
let pre_start_msgs = Vec::new();
let started = false;
let dispatchers = Vec::new();
let name = None;

Children {
bcast,
Expand All @@ -111,6 +114,7 @@ impl Children {
pre_start_msgs,
started,
dispatchers,
name,
}
}

Expand Down Expand Up @@ -156,6 +160,14 @@ impl Children {
&self.callbacks
}

pub(crate) fn name(&self) -> String {
if let Some(name) = &self.name {
name.clone()
} else {
"__Anonymous__".into()
}
}

pub(crate) fn as_ref(&self) -> ChildrenRef {
trace!(
"Children({}): Creating new ChildrenRef({}).",
Expand All @@ -171,7 +183,7 @@ impl Children {
for (id, (sender, _)) in &self.launched {
trace!("Children({}): Creating new ChildRef({}).", self.id(), id);
// TODO: clone or ref?
let child = ChildRef::new(id.clone(), sender.clone(), path.clone());
let child = ChildRef::new(id.clone(), sender.clone(), self.name(), path.clone());
children.push(child);
}

Expand All @@ -184,6 +196,12 @@ impl Children {
ChildrenRef::new(id, sender, path, children, dispatchers)
}

/// Sets the name of this children group.
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}

/// Sets the closure taking a [`BastionContext`] and returning a
/// [`Future`] that will be used by every element of this children
/// group.
Expand Down Expand Up @@ -237,7 +255,7 @@ impl Children {
self
}

/// Sets the number of number of elements this children group will
/// Sets the number of elements this children group will
/// contain. Each element will call the closure passed in
/// [`with_exec`] and run the returned future until it stops,
/// panics or another element in the group stops or panics.
Expand Down Expand Up @@ -456,7 +474,7 @@ impl Children {
let id = bcast.id().clone();
let sender = bcast.sender().clone();
let path = bcast.path().clone();
let child_ref = ChildRef::new(id.clone(), sender.clone(), path);
let child_ref = ChildRef::new(id.clone(), sender.clone(), self.name(), path);

let children = self.as_ref();
let supervisor = self.bcast.parent().clone().into_supervisor();
Expand Down Expand Up @@ -670,6 +688,8 @@ impl Children {

pub(crate) fn launch_elems(&mut self) {
debug!("Children({}): Launching elements.", self.id());

let name = self.name();
for _ in 0..self.redundancy {
let parent = Parent::children(self.as_ref());
let bcast = Broadcast::new(parent, BastionPathElement::Child(BastionId::new()));
Expand All @@ -678,7 +698,7 @@ impl Children {
let id = bcast.id().clone();
let sender = bcast.sender().clone();
let path = bcast.path().clone();
let child_ref = ChildRef::new(id.clone(), sender.clone(), path);
let child_ref = ChildRef::new(id.clone(), sender.clone(), name.clone(), path);

let children = self.as_ref();
let supervisor = self.bcast.parent().clone().into_supervisor();
Expand Down
21 changes: 14 additions & 7 deletions src/bastion/src/dispatcher.rs
Expand Up @@ -443,7 +443,8 @@ mod tests {
let bastion_id = BastionId::new();
let (sender, _) = mpsc::unbounded();
let path = Arc::new(BastionPath::root());
let child_ref = ChildRef::new(bastion_id, sender, path);
let name = "test_name".to_string();
let child_ref = ChildRef::new(bastion_id, sender, name, path);

assert_eq!(instance.actors.contains_key(&child_ref), false);

Expand All @@ -457,7 +458,8 @@ mod tests {
let bastion_id = BastionId::new();
let (sender, _) = mpsc::unbounded();
let path = Arc::new(BastionPath::root());
let child_ref = ChildRef::new(bastion_id, sender, path);
let name = "test_name".to_string();
let child_ref = ChildRef::new(bastion_id, sender, name, path);

instance.register(&child_ref, "my::test::module".to_string());
assert_eq!(instance.actors.contains_key(&child_ref), true);
Expand All @@ -473,7 +475,8 @@ mod tests {
let bastion_id = BastionId::new();
let (sender, _) = mpsc::unbounded();
let path = Arc::new(BastionPath::root());
let child_ref = ChildRef::new(bastion_id, sender, path);
let name = "test_name".to_string();
let child_ref = ChildRef::new(bastion_id, sender, name, path);

instance.notify(&child_ref, NotificationType::Register);
let handler_was_called = handler.was_called();
Expand Down Expand Up @@ -540,7 +543,8 @@ mod tests {
let bastion_id = BastionId::new();
let (sender, _) = mpsc::unbounded();
let path = Arc::new(BastionPath::root());
let child_ref = ChildRef::new(bastion_id, sender, path);
let name = "test_name".to_string();
let child_ref = ChildRef::new(bastion_id, sender, name, path);

let dispatcher_type = DispatcherType::Named("test".to_string());
let local_dispatcher = Arc::new(Box::new(Dispatcher::with_type(dispatcher_type.clone())));
Expand All @@ -561,7 +565,8 @@ mod tests {
let bastion_id = BastionId::new();
let (sender, _) = mpsc::unbounded();
let path = Arc::new(BastionPath::root());
let child_ref = ChildRef::new(bastion_id, sender, path);
let name = "test_name".to_string();
let child_ref = ChildRef::new(bastion_id, sender, name, path);

let dispatcher_type = DispatcherType::Named("test".to_string());
let local_dispatcher = Arc::new(Box::new(Dispatcher::with_type(dispatcher_type.clone())));
Expand All @@ -583,7 +588,8 @@ mod tests {
let bastion_id = BastionId::new();
let (sender, _) = mpsc::unbounded();
let path = Arc::new(BastionPath::root());
let child_ref = ChildRef::new(bastion_id, sender, path);
let name = "test_name".to_string();
let child_ref = ChildRef::new(bastion_id, sender, name, path);

let dispatcher_type = DispatcherType::Named("test".to_string());
let handler = Box::new(CustomHandler::new(false));
Expand All @@ -607,7 +613,8 @@ mod tests {
let bastion_id = BastionId::new();
let (sender, _) = mpsc::unbounded();
let path = Arc::new(BastionPath::root());
let child_ref = ChildRef::new(bastion_id, sender, path);
let name = "test_name".to_string();
let child_ref = ChildRef::new(bastion_id, sender, name, path);

let dispatcher_type = DispatcherType::Named("test".to_string());
let handler = Box::new(CustomHandler::new(false));
Expand Down

0 comments on commit 4af8a48

Please sign in to comment.