Skip to content

Commit

Permalink
Add tests and fix up Actor interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
johnteee committed Aug 11, 2021
1 parent ed11da8 commit 608f504
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fp_rust"
version = "0.2.5"
version = "0.2.6"
license = "MIT"
authors = ["JunYi JohnTeee Lee <johnteee@gmail.com>"]
edition = "2018"
Expand Down
136 changes: 125 additions & 11 deletions src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ It defines simple and practical hehaviors of `Actor` model.
*/
pub trait Actor<Msg, ContextValue, HandleType, Functor>: UniqueId<String> {
fn receive(&mut self, message: Msg, context: &mut HashMap<String, ContextValue>);
fn spawn_with_handle(&self, name: impl Into<String>, func: Functor) -> HandleType;
fn spawn_with_handle(&self, func: Functor) -> HandleType;

fn get_handle(&self) -> HandleType;
fn get_handle_child(&self, name: impl Into<String>) -> Option<HandleType>;
Expand Down Expand Up @@ -76,7 +76,14 @@ pub struct ActorAsync<Msg, ContextValue> {

context: Arc<Mutex<Box<HashMap<String, ContextValue>>>>,
queue: Arc<Mutex<BlockingQueue<Msg>>>,
effect: Arc<Mutex<dyn FnMut(Msg, &mut HashMap<String, ContextValue>) + Send + Sync + 'static>>,
effect: Arc<
Mutex<
dyn FnMut(&mut ActorAsync<Msg, ContextValue>, Msg, &mut HashMap<String, ContextValue>)
+ Send
+ Sync
+ 'static,
>,
>,

join_handle: Arc<Mutex<Option<thread::JoinHandle<()>>>>,
}
Expand All @@ -99,13 +106,19 @@ impl<Msg, ContextValue> Clone for ActorAsync<Msg, ContextValue> {

impl<Msg, ContextValue> ActorAsync<Msg, ContextValue> {
pub fn new(
effect: impl FnMut(Msg, &mut HashMap<String, ContextValue>) + Send + Sync + 'static,
effect: impl FnMut(&mut ActorAsync<Msg, ContextValue>, Msg, &mut HashMap<String, ContextValue>)
+ Send
+ Sync
+ 'static,
) -> Self {
Self::new_with_options(effect, None, Arc::new(Mutex::new(BlockingQueue::new())))
}

pub fn new_with_options(
effect: impl FnMut(Msg, &mut HashMap<String, ContextValue>) + Send + Sync + 'static,
effect: impl FnMut(&mut ActorAsync<Msg, ContextValue>, Msg, &mut HashMap<String, ContextValue>)
+ Send
+ Sync
+ 'static,
parent_handle: Option<Arc<dyn Handle<Msg, ContextValue> + Send + Sync + 'static>>,
queue: Arc<Mutex<BlockingQueue<Msg>>>,
) -> Self {
Expand Down Expand Up @@ -226,30 +239,41 @@ impl<Msg, ContextValue>
Msg,
ContextValue,
Arc<dyn Handle<Msg, ContextValue> + Send + Sync + 'static>,
Box<dyn FnMut(Msg, &mut HashMap<String, ContextValue>) + Send + Sync + 'static>,
Box<
dyn FnMut(&mut ActorAsync<Msg, ContextValue>, Msg, &mut HashMap<String, ContextValue>)
+ Send
+ Sync
+ 'static,
>,
> for ActorAsync<Msg, ContextValue>
where
Msg: Send + 'static,
ContextValue: 'static,
Msg: Clone + Send + 'static,
ContextValue: Send + 'static,
{
fn receive(&mut self, message: Msg, context: &mut HashMap<String, ContextValue>) {
{
self.effect.lock().unwrap()(message, context);
let effect = self.effect.clone();
effect.lock().unwrap()(self, message, context);
}
}
fn spawn_with_handle(
&self,
name: impl Into<String>,
func: Box<dyn FnMut(Msg, &mut HashMap<String, ContextValue>) + Send + Sync + 'static>,
func: Box<
dyn FnMut(&mut ActorAsync<Msg, ContextValue>, Msg, &mut HashMap<String, ContextValue>)
+ Send
+ Sync
+ 'static,
>,
) -> Arc<dyn Handle<Msg, ContextValue> + Send + Sync + 'static> {
let mut new_one = Self::new(func);
new_one.parent_handle = Some(self.get_handle());
{
self.children_handle_map
.lock()
.unwrap()
.insert(name.into(), new_one.get_handle());
.insert(new_one.get_id(), new_one.get_handle());
}
new_one.start();
return new_one.get_handle();
}
fn get_handle(&self) -> Arc<dyn Handle<Msg, ContextValue> + Send + Sync + 'static> {
Expand Down Expand Up @@ -277,4 +301,94 @@ where
#[test]
fn test_actor_common() {
// assert_eq!(false, Maybe::just(None::<bool>).or(false));

#[derive(Clone, Debug)]
enum Value {
// Str(String),
Int(i32),
VecStr(Vec<String>),
Spawn,
Shutdown,
}

let mut queue = BlockingQueue::<i32>::new();
let queue_thread = queue.clone();
let mut root = ActorAsync::new(
move |this: &mut ActorAsync<_, _>, msg: Value, context: &mut HashMap<String, Value>| {
match msg {
Value::Spawn => {
println!("Actor Spawn");
let mut queue_thread = queue_thread.clone();
let spawned = this.spawn_with_handle(Box::new(
move |this: &mut ActorAsync<_, _>, msg: Value, _| {
match msg {
Value::Int(v) => {
println!("Actor Child Int");
queue_thread.put(v * 10);
}
Value::Shutdown => {
println!("Actor Child Shutdown");
this.stop();
}
_ => {}
};
},
));
let list = context.get("children_ids").cloned();
let mut list = match list {
Some(Value::VecStr(list)) => list,
_ => Vec::new(),
};
list.push(spawned.get_id());
context.insert("children_ids".into(), Value::VecStr(list));
}
Value::Shutdown => {
println!("Actor Shutdown");
if let Some(Value::VecStr(ids)) = context.get("children_ids") {
for id in ids {
println!("Actor Shutdown id {:?}", id);
if let Some(handle) = this.get_handle_child(id) {
handle.send(Value::Shutdown);
}
}
}
this.stop();
}
Value::Int(v) => {
println!("Actor Int");
if let Some(Value::VecStr(ids)) = context.get("children_ids") {
for id in ids {
println!("Actor Int id {:?}", id);
if let Some(handle) = this.get_handle_child(id) {
handle.send(Value::Int(v));
}
}
}
}
_ => {}
}
},
);

let root_handle = root.get_handle();
root.start();

root_handle.send(Value::Spawn);
root_handle.send(Value::Int(10));
root_handle.send(Value::Spawn);
root_handle.send(Value::Int(20));
root_handle.send(Value::Spawn);
root_handle.send(Value::Int(30));

root_handle.send(Value::Shutdown);

let mut v = Vec::<i32>::new();
for _ in 1..7 {
let i = queue.take().unwrap();
println!("Actor {:?}", i);
v.push(i);
}
v.sort();

assert_eq!([100, 200, 200, 300, 300, 300], v.as_slice())
}

0 comments on commit 608f504

Please sign in to comment.