Skip to content

Commit

Permalink
Rework commit with broker bug fixes
Browse files Browse the repository at this point in the history
Signed-off-by: i1i1 <vanyarybin1@live.ru>
  • Loading branch information
i1i1 committed Aug 16, 2021
1 parent 025819f commit 7a0b936
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 108 deletions.
51 changes: 51 additions & 0 deletions iroha_actor/src/actor_id.rs
@@ -0,0 +1,51 @@
use std::{
fmt::{self, Debug, Display},
hash::Hash,
sync::atomic::{AtomicUsize, Ordering},
};

static ACTOR_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);

#[derive(Clone, Copy)]
pub struct ActorId {
pub name: Option<&'static str>,
pub id: usize,
}

impl ActorId {
pub fn new(name: Option<&'static str>) -> Self {
Self {
name,
id: ACTOR_ID_COUNTER.fetch_add(1, Ordering::SeqCst),
}
}
}

impl Display for ActorId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Some(name) = &self.name {
write!(f, "{}:{}", name, self.id)
} else {
write!(f, "<unknown>:{}", self.id)
}
}
}

impl Debug for ActorId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Display::fmt(&self, f)
}
}

impl Hash for ActorId {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.id.hash(state)
}
}

impl PartialEq for ActorId {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl Eq for ActorId {}
232 changes: 169 additions & 63 deletions iroha_actor/src/broker.rs
Expand Up @@ -9,8 +9,7 @@
/// struct Message1(String);
/// impl Message for Message1 { type Result = (); }
///
/// #[derive(Clone)]
/// struct Message2(String);
/// #[derive(Clone)] struct Message2(String);
/// impl Message for Message2 { type Result = (); }
///
/// struct Actor1(Broker);
Expand Down Expand Up @@ -54,21 +53,21 @@
/// })
/// ```
use std::any::{Any, TypeId};
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

use dashmap::{mapref::entry::Entry, DashMap};
use futures::future;
use futures::{prelude::*, stream::FuturesUnordered};

use super::*;

type TypeMap<V> = DashMap<TypeId, V>;
type MessageId = TypeId;
type MessageMap<V> = DashMap<MessageId, V>;
type ActorMap<V> = HashMap<ActorId, V>;
type BrokerRecipient = Box<dyn Any + Sync + Send + 'static>;

/// Broker type. Can be cloned and shared between many actors.
///
/// TODO: There might be several actors of one type. We should handle this case!
#[derive(Debug)]
pub struct Broker(Arc<TypeMap<Vec<(TypeId, BrokerRecipient)>>>);
pub struct Broker(Arc<MessageMap<ActorMap<BrokerRecipient>>>);

impl Clone for Broker {
fn clone(&self) -> Self {
Expand All @@ -85,71 +84,90 @@ impl Default for Broker {
impl Broker {
/// Default constructor for broker
pub fn new() -> Self {
Self(Arc::new(DashMap::new()))
Self(Arc::new(MessageMap::new()))
}

fn message_entry(&'_ self, id: TypeId) -> Entry<'_, TypeId, Vec<(TypeId, BrokerRecipient)>> {
fn entry(&'_ self, id: TypeId) -> Entry<'_, TypeId, ActorMap<BrokerRecipient>> {
self.0.entry(id)
}

/// Number of subscribers for specific message
pub fn subscribers<M: BrokerMessage + Send + Sync>(&self) -> usize {
let entry = if let Entry::Occupied(entry) = self.entry(TypeId::of::<M>()) {
entry
} else {
return 0;
};

entry
.get()
.iter()
.filter_map(|(id, recipient)| Some((id, recipient.downcast_ref::<Recipient<M>>()?)))
.fold(0, |p, (_, n)| p + if n.0.is_closed() { 0 } else { 1 })
}

/// Send message via broker
#[allow(clippy::missing_panics_doc)]
pub async fn issue_send<M: BrokerMessage + Send + Sync>(&self, m: M) {
let entry = if let Entry::Occupied(entry) = self.message_entry(TypeId::of::<M>()) {
let mut entry = if let Entry::Occupied(entry) = self.entry(TypeId::of::<M>()) {
entry
} else {
return;
};
let send = entry.get().iter().filter_map(|(_, recipient)| {
recipient
.downcast_ref::<Recipient<M>>()
.map(|recipient| recipient.send(m.clone()))
});
future::join_all(send).await;
}

fn subscribe_recipient<M: BrokerMessage>(&self, recipient: Recipient<M>) {
let mut entry = self
.message_entry(TypeId::of::<M>())
.or_insert_with(|| Vec::with_capacity(1));
if entry
let closed = entry
.get()
.iter()
.any(|(actor_id, _)| *actor_id == TypeId::of::<Self>())
{
return;
.filter_map(|(id, recipient)| Some((id, recipient.downcast_ref::<Recipient<M>>()?)))
.map(|(id, recipient)| {
let m = m.clone();
async move {
if recipient.0.is_closed() {
return Some(*id);
}

recipient.send(m).await;
None
}
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<_>>()
.await
.into_iter()
.flatten();

let entry = entry.get_mut();

for c in closed {
entry.remove(&c);
}
entry.push((TypeId::of::<Self>(), Box::new(recipient)));
}

fn subscribe_recipient<M: BrokerMessage>(&self, recipient: Recipient<M>, a: ActorId) {
self.entry(TypeId::of::<M>())
.or_insert_with(|| ActorMap::with_capacity(1))
.insert(a, Box::new(recipient));
}

/// Subscribe actor to specific message type
pub fn subscribe<M: BrokerMessage, A: Actor + ContextHandler<M>>(&self, ctx: &mut Context<A>) {
self.subscribe_recipient(ctx.recipient::<M>())
self.subscribe_recipient(ctx.recipient::<M>(), ctx.actor_id)
}

/// Subscribe with channel to specific message type
pub fn subscribe_with_channel<M: BrokerMessage + Debug>(&self) -> mpsc::Receiver<M> {
let (sender, receiver) = mpsc::channel(100);
self.subscribe_recipient(sender.into());
self.subscribe_recipient(sender.into(), ActorId::new(None));
receiver
}

/// Unsubscribe actor to this specific message type
pub fn unsubscribe<M: BrokerMessage, A: Actor + ContextHandler<M>>(
&self,
_ctx: &mut Context<A>,
ctx: &mut Context<A>,
) {
let mut entry = if let Entry::Occupied(entry) = self.message_entry(TypeId::of::<M>()) {
entry
} else {
return;
};

if let Some(pos) = entry
.get()
.iter()
.position(|(actor_id, _)| actor_id == &TypeId::of::<Self>())
{
entry.get_mut().remove(pos);
}
self.0
.get_mut(&TypeId::of::<M>())
.map(|mut entry| entry.remove(&ctx.actor_id));
}
}

Expand All @@ -158,26 +176,114 @@ pub trait BrokerMessage: Message<Result = ()> + Clone + 'static + Send {}

impl<M: Message<Result = ()> + Clone + 'static + Send> BrokerMessage for M {}

#[tokio::test]
async fn two_channels_subscribe_to_same_message() {
#[derive(Clone, Debug)]
struct Message1;
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;

#[tokio::test]
async fn unsubscribe_on_stop() {
#[derive(Clone, Debug)]
struct Actor1(Broker);

#[derive(Clone, Copy, Debug)]
struct Message1;
impl Message for Message1 {
type Result = ();
}

#[derive(Clone, Copy, Debug)]
struct Stop;
impl Message for Stop {
type Result = ();
}

#[async_trait::async_trait]
impl Actor for Actor1 {
async fn on_start(&mut self, ctx: &mut Context<Self>) {
self.0.subscribe::<Message1, _>(ctx);
self.0.subscribe::<Stop, _>(ctx);
}
}

#[async_trait::async_trait]
impl Handler<Message1> for Actor1 {
type Result = ();
async fn handle(&mut self, _: Message1) {}
}

#[async_trait::async_trait]
impl ContextHandler<Stop> for Actor1 {
type Result = ();
async fn handle(&mut self, ctx: &mut Context<Self>, _: Stop) {
ctx.stop_now();
}
}

let broker = Broker::new();
Actor1(broker.clone()).start().await;
Actor1(broker.clone()).start().await;
let mut rec = broker.subscribe_with_channel::<Message1>();

time::sleep(Duration::from_millis(100)).await;
assert_eq!(
(
broker.subscribers::<Message1>(),
broker.subscribers::<Stop>()
),
(3, 2)
);

broker.issue_send(Message1).await;
time::sleep(Duration::from_millis(100)).await;

broker.issue_send(Stop).await;
time::sleep(Duration::from_millis(100)).await;

assert_eq!(
(
broker.subscribers::<Message1>(),
broker.subscribers::<Stop>()
),
(1, 0)
);

impl Message for Message1 {
type Result = ();
tokio::time::timeout(Duration::from_millis(10), rec.recv())
.await
.unwrap()
.unwrap();
drop(rec);

assert_eq!(
(
broker.subscribers::<Message1>(),
broker.subscribers::<Stop>()
),
(0, 0)
);
}

let broker = Broker::new();
let mut receiver1 = broker.subscribe_with_channel::<Message1>();
let mut receiver2 = broker.subscribe_with_channel::<Message1>();

broker.issue_send(Message1).await;
let Message1: Message1 = tokio::time::timeout(Duration::from_millis(100), receiver1.recv())
.await
.unwrap()
.unwrap();
let Message1: Message1 = tokio::time::timeout(Duration::from_millis(100), receiver2.recv())
.await
.unwrap()
.unwrap();
#[tokio::test]
async fn two_channels_subscribe_to_same_message() {
#[derive(Clone, Debug)]
struct Message1;

impl Message for Message1 {
type Result = ();
}

let broker = Broker::new();
let mut receiver1 = broker.subscribe_with_channel::<Message1>();
let mut receiver2 = broker.subscribe_with_channel::<Message1>();

broker.issue_send(Message1).await;
let Message1: Message1 = tokio::time::timeout(Duration::from_millis(100), receiver1.recv())
.await
.unwrap()
.unwrap();
let Message1: Message1 = tokio::time::timeout(Duration::from_millis(100), receiver2.recv())
.await
.unwrap()
.unwrap();
}
}

0 comments on commit 7a0b936

Please sign in to comment.