Skip to content

Commit

Permalink
[refactor] #3412: Move transaction gossiping in separate actor
Browse files Browse the repository at this point in the history
Signed-off-by: Shanin Roman <shanin1000@yandex.ru>
  • Loading branch information
Erigara authored and SamHSmith committed May 4, 2023
1 parent a709d41 commit 3a1a84a
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 121 deletions.
12 changes: 12 additions & 0 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use iroha_config::{
};
use iroha_core::{
block_sync::{BlockSynchronizer, BlockSynchronizerHandle},
gossiper::{TransactionGossiper, TransactionGossiperHandle},
handler::ThreadHandler,
kura::Kura,
prelude::{World, WorldStateView},
Expand Down Expand Up @@ -116,6 +117,7 @@ impl Drop for Iroha {
struct NetworkRelay {
sumeragi: Arc<Sumeragi>,
block_sync: BlockSynchronizerHandle,
gossiper: TransactionGossiperHandle,
network: IrohaNetwork,
shutdown_notify: Arc<Notify>,
#[cfg(debug_assertions)]
Expand Down Expand Up @@ -158,6 +160,7 @@ impl NetworkRelay {
self.sumeragi.incoming_message(data.into_v1());
}
BlockSync(data) => self.block_sync.message(data.into_v1()).await,
TransactionGossiper(data) => self.gossiper.gossip(*data).await,
Health => {}
}
}
Expand Down Expand Up @@ -293,11 +296,20 @@ impl Iroha {
)
.start();

let gossiper = TransactionGossiper::from_configuration(
&config.sumeragi,
network.clone(),
Arc::clone(&queue),
Arc::clone(&sumeragi),
)
.start();

let freeze_status = Arc::new(AtomicBool::new(false));

NetworkRelay {
sumeragi: Arc::clone(&sumeragi),
block_sync,
gossiper,
network: network.clone(),
shutdown_notify: Arc::clone(&notify_shutdown),
#[cfg(debug_assertions)]
Expand Down
152 changes: 152 additions & 0 deletions core/src/gossiper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
//! Gossiper is actor which is responsible for transaction gossiping

use std::{sync::Arc, time::Duration};

use iroha_config::sumeragi::Configuration;
use iroha_data_model::transaction::{
AcceptedTransaction, Transaction, TransactionLimits, VersionedAcceptedTransaction,
VersionedSignedTransaction,
};
use iroha_p2p::Broadcast;
use parity_scale_codec::{Decode, Encode};
use tokio::sync::mpsc;

use crate::{queue::Queue, sumeragi::Sumeragi, IrohaNetwork, NetworkMessage};

/// [`Gossiper`] actor handle.
#[derive(Clone)]
pub struct TransactionGossiperHandle {
message_sender: mpsc::Sender<TransactionGossip>,
}

impl TransactionGossiperHandle {
/// Send [`TransactionGossip`] to actor
pub async fn gossip(&self, gossip: TransactionGossip) {
self.message_sender
.send(gossip)
.await
.expect("Gossiper must handle messages until there is at least one handle to it")
}
}

/// Actor to gossip transactions and receive transaction gossips
pub struct TransactionGossiper {
/// The size of batch that is being gossiped. Smaller size leads
/// to longer time to synchronise, useful if you have high packet loss.
gossip_batch_size: u32,
/// The time between gossiping. More frequent gossiping shortens
/// the time to sync, but can overload the network.
gossip_period: Duration,
/// Address of queue
queue: Arc<Queue>,
/// [`iroha_p2p::Network`] actor handle
network: IrohaNetwork,
/// Sumearagi
sumeragi: Arc<Sumeragi>,
/// Limits that all transactions need to obey, in terms of size
/// of WASM blob and number of instructions.
transaction_limits: TransactionLimits,
}

impl TransactionGossiper {
/// Start [`Self`] actor.
pub fn start(self) -> TransactionGossiperHandle {
let (message_sender, message_receiver) = mpsc::channel(1);
tokio::task::spawn(self.run(message_receiver));
TransactionGossiperHandle { message_sender }
}

/// Construct [`Self`] from configuration
pub fn from_configuration(
// Currently we are using configuration parameters from sumeragi not to break configuration
configuartion: &Configuration,
network: IrohaNetwork,
queue: Arc<Queue>,
sumeragi: Arc<Sumeragi>,
) -> Self {
Self {
queue,
sumeragi,
network,
transaction_limits: configuartion.transaction_limits,
gossip_batch_size: configuartion.gossip_batch_size,
gossip_period: Duration::from_millis(configuartion.gossip_period_ms),
}
}

async fn run(self, mut message_receiver: mpsc::Receiver<TransactionGossip>) {
let mut gossip_period = tokio::time::interval(self.gossip_period);
#[allow(clippy::arithmetic_side_effects)]
loop {
tokio::select! {
_ = gossip_period.tick() => self.gossip_transactions(),
transaction_gossip = message_receiver.recv() => {
let Some(transaction_gossip) = transaction_gossip else {
iroha_logger::info!("All handler to Gossiper are dropped. Shutting down...");
break;
};
self.handle_transaction_gossip(transaction_gossip);
}
}
tokio::task::yield_now().await;
}
}

fn gossip_transactions(&self) {
let txs = self
.queue
.n_random_transactions(self.gossip_batch_size, &self.sumeragi.wsv_mutex_access());

if txs.is_empty() {
iroha_logger::debug!("Nothing to gossip");
return;
}

iroha_logger::trace!(tx_count = txs.len(), "Gossiping transactions");
self.network.broadcast(Broadcast {
data: NetworkMessage::TransactionGossiper(Box::new(TransactionGossip::new(txs))),
});
}

fn handle_transaction_gossip(&self, TransactionGossip { txs }: TransactionGossip) {
iroha_logger::trace!(size = txs.len(), "Received new transaction gossip");
for tx in txs {
match AcceptedTransaction::accept::<false>(tx.into_v1(), &self.transaction_limits) {
Ok(tx) => match self
.queue
.push(tx.into(), &self.sumeragi.wsv_mutex_access())
{
Ok(_) => {}
Err(crate::queue::Failure {
tx,
err: crate::queue::Error::InBlockchain,
}) => {
iroha_logger::debug!(tx_hash = %tx.hash(), "Transaction already in blockchain, ignoring...")
}
Err(crate::queue::Failure { tx, err }) => {
iroha_logger::error!(?err, tx_hash = %tx.hash(), "Failed to enqueue transaction.")
}
},
Err(err) => iroha_logger::error!(%err, "Transaction rejected"),
}
}
}
}

/// Message for gossiping batches of transactions.
#[derive(Decode, Encode, Debug, Clone)]
pub struct TransactionGossip {
/// Batch of transactions.
pub txs: Vec<VersionedSignedTransaction>,
}

impl TransactionGossip {
/// Constructor.
pub fn new(txs: Vec<VersionedAcceptedTransaction>) -> Self {
Self {
// Converting into non-accepted transaction because it's not possible
// to guarantee that the sending peer checked transaction limits
txs: txs.into_iter().map(Into::into).collect(),
}
}
}
4 changes: 4 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

pub mod block;
pub mod block_sync;
pub mod gossiper;
pub mod kura;
pub mod modules;
pub mod queue;
Expand All @@ -14,6 +15,7 @@ pub mod wsv;
use core::time::Duration;

use dashmap::{DashMap, DashSet};
use gossiper::TransactionGossip;
use iroha_data_model::{permission::Permissions, prelude::*};
use parity_scale_codec::{Decode, Encode};
use tokio::sync::broadcast;
Expand Down Expand Up @@ -58,6 +60,8 @@ pub enum NetworkMessage {
SumeragiPacket(Box<SumeragiPacket>),
/// Block sync message
BlockSync(Box<BlockSyncMessage>),
/// Transaction gossiper message
TransactionGossiper(Box<TransactionGossip>),
/// Health check message
Health,
}
Expand Down
Loading

0 comments on commit 3a1a84a

Please sign in to comment.