Skip to content

Commit

Permalink
Introduces multiple broker fixes
Browse files Browse the repository at this point in the history
1. Unsubscribe from broker on actor stop
2. Support multiple subscriptions from the same actor type (previously a TODO)
3. Fixes a bug where broker always put self as an actor id.

Signed-off-by: Egor Ivkov <e.o.ivkov@gmail.com>
  • Loading branch information
e-ivkov committed Aug 12, 2021
1 parent 2716e57 commit 9c148c3
Show file tree
Hide file tree
Showing 7 changed files with 342 additions and 78 deletions.
4 changes: 4 additions & 0 deletions iroha/src/block_sync.rs
Expand Up @@ -103,6 +103,10 @@ impl<S: SumeragiTrait, W: WorldTrait> Actor for BlockSynchronizer<S, W> {
self.broker.subscribe::<ContinueSync, _>(ctx);
ctx.notify_every::<ReceiveUpdates>(self.gossip_period);
}

fn broker(&self) -> Option<&broker::Broker> {
Some(&self.broker)
}
}

#[async_trait::async_trait]
Expand Down
4 changes: 4 additions & 0 deletions iroha/src/kura.rs
Expand Up @@ -116,6 +116,10 @@ impl<W: WorldTrait> Actor for Kura<W> {
}
}
}

fn broker(&self) -> Option<&broker::Broker> {
Some(&self.broker)
}
}

#[async_trait::async_trait]
Expand Down
4 changes: 4 additions & 0 deletions iroha/src/queue.rs
Expand Up @@ -84,6 +84,10 @@ impl<W: WorldTrait> Actor for Queue<W> {
self.broker
.subscribe::<VersionedAcceptedTransaction, _>(ctx);
}

fn broker(&self) -> Option<&broker::Broker> {
Some(&self.broker)
}
}

#[async_trait::async_trait]
Expand Down
4 changes: 4 additions & 0 deletions iroha/src/sumeragi/mod.rs
Expand Up @@ -213,6 +213,10 @@ impl<Q: QueueTrait, G: GenesisNetworkTrait, W: WorldTrait> Actor for Sumeragi<Q,
self.broker.subscribe::<Init, _>(ctx);
self.broker.subscribe::<CommitBlock, _>(ctx);
}

fn broker(&self) -> Option<&broker::Broker> {
Some(&self.broker)
}
}

#[async_trait::async_trait]
Expand Down
10 changes: 5 additions & 5 deletions iroha/test_network/tests/sumeragi_with_mock.rs
Expand Up @@ -600,7 +600,7 @@ async fn all_peers_commit_block() {

let mut channels = network
.peers()
.map(|peer| peer.broker.subscribe_with_channel::<Stored>())
.map(|peer| peer.broker.subscribe_with_channel::<Stored>().0)
.collect::<Vec<_>>();

// Send tx to leader
Expand All @@ -626,7 +626,7 @@ async fn change_view_on_commit_timeout() {

let mut channels = network
.peers()
.map(|peer| peer.broker.subscribe_with_channel::<Stored>())
.map(|peer| peer.broker.subscribe_with_channel::<Stored>().0)
.collect::<Vec<_>>();

// send to leader
Expand Down Expand Up @@ -664,7 +664,7 @@ async fn change_view_on_tx_receipt_timeout() {

let mut channels = network
.peers()
.map(|peer| peer.broker.subscribe_with_channel::<Stored>())
.map(|peer| peer.broker.subscribe_with_channel::<Stored>().0)
.collect::<Vec<_>>();

// send to not leader
Expand Down Expand Up @@ -706,7 +706,7 @@ async fn change_view_on_block_creation_timeout() {

let mut channels = network
.peers()
.map(|peer| peer.broker.subscribe_with_channel::<Stored>())
.map(|peer| peer.broker.subscribe_with_channel::<Stored>().0)
.collect::<Vec<_>>();

// send to not leader
Expand Down Expand Up @@ -740,7 +740,7 @@ async fn not_enough_votes() {

let mut channels = network
.peers()
.map(|peer| peer.broker.subscribe_with_channel::<Stored>())
.map(|peer| peer.broker.subscribe_with_channel::<Stored>().0)
.collect::<Vec<_>>();

// send to not leader
Expand Down

0 comments on commit 9c148c3

Please sign in to comment.