Skip to content

Commit

Permalink
Move blocks from kura to wsv
Browse files Browse the repository at this point in the history
Signed-off-by: i1i1 <vanyarybin1@live.ru>
  • Loading branch information
i1i1 committed Apr 29, 2021
1 parent 1db9e69 commit fdcb2dc
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 118 deletions.
82 changes: 82 additions & 0 deletions iroha/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,59 @@ impl VersionedCommittedBlock {
pub fn hash(&self) -> Hash {
self.as_inner_v1().hash()
}

/// Returns header of valid block
pub const fn header(&self) -> &BlockHeader {
&self.as_inner_v1().header
}

/// Signatures that are verified with the `hash` of this block as `payload`.
pub fn verified_signatures(&self) -> Vec<Signature> {
self.as_inner_v1().verified_signatures()
}

/// Filters transactions by predicate
pub fn filter_tx_values_by_payload<'a>(
&'a self,
predicate: impl FnOnce(&'a Payload) -> bool + Copy + 'a,
) -> impl Iterator<Item = TransactionValue> + 'a {
let block = self.as_inner_v1();
block
.rejected_transactions
.iter()
.filter(move |tx| predicate(tx.payload()))
.cloned()
.map(TransactionValue::RejectedTransaction)
.chain(
block
.transactions
.iter()
.filter(move |tx| predicate(tx.payload()))
.cloned()
.map(VersionedAcceptedTransaction::from)
.map(Into::into)
.map(TransactionValue::Transaction),
)
}

/// Consumes block and returns iterator over transaction values
pub fn iter_tx_values(&'_ self) -> impl Iterator<Item = TransactionValue> + '_ {
let block = self.as_inner_v1();
block
.rejected_transactions
.iter()
.cloned()
.map(TransactionValue::RejectedTransaction)
.chain(
block
.transactions
.iter()
.cloned()
.map(VersionedAcceptedTransaction::from)
.map(Into::into)
.map(TransactionValue::Transaction),
)
}
}

/// When Kura receives `ValidBlock`, the block is stored and
Expand All @@ -506,6 +559,35 @@ impl CommittedBlock {
pub fn hash(&self) -> Hash {
self.header.hash()
}

/// Signatures that are verified with the `hash` of this block as `payload`.
pub fn verified_signatures(&self) -> Vec<Signature> {
self.signatures.verified(self.hash().as_ref())
}
}

impl From<CommittedBlock> for ValidBlock {
fn from(
CommittedBlock {
header,
rejected_transactions,
transactions,
signatures,
}: CommittedBlock,
) -> Self {
Self {
header,
rejected_transactions,
transactions,
signatures,
}
}
}

impl From<VersionedCommittedBlock> for VersionedValidBlock {
fn from(block: VersionedCommittedBlock) -> Self {
ValidBlock::from(block.into_inner_v1()).into()
}
}

impl From<&VersionedCommittedBlock> for Vec<Event> {
Expand Down
36 changes: 17 additions & 19 deletions iroha/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use iroha_logger::{log, InstrumentFutures};

use self::{config::BlockSyncConfiguration, message::*};
use crate::{
kura::Kura,
sumeragi::{Role, Sumeragi},
VersionedValidBlock,
wsv::WorldStateView,
VersionedCommittedBlock,
};

/// The state of `BlockSynchronizer`.
Expand All @@ -20,13 +20,13 @@ enum State {
Idle,
/// Synchronization is in progress: validating and committing blocks.
/// Contains a vector of blocks left to commit and an id of the peer from which the blocks were requested.
InProgress(Vec<VersionedValidBlock>, PeerId),
InProgress(Vec<VersionedCommittedBlock>, PeerId),
}

/// Structure responsible for block synchronization between peers.
#[derive(Debug)]
pub struct BlockSynchronizer {
kura: Arc<RwLock<Kura>>,
wsv: Arc<WorldStateView>,
sumeragi: Arc<RwLock<Sumeragi>>,
peer_id: PeerId,
state: State,
Expand All @@ -39,13 +39,13 @@ impl BlockSynchronizer {
/// Constructs `BlockSync`
pub fn from_configuration(
config: &BlockSyncConfiguration,
kura: Arc<RwLock<Kura>>,
wsv: Arc<WorldStateView>,
sumeragi: Arc<RwLock<Sumeragi>>,
peer_id: PeerId,
n_topology_shifts_before_reshuffle: u32,
) -> BlockSynchronizer {
Self {
kura,
wsv,
peer_id,
sumeragi,
state: State::Idle,
Expand All @@ -61,17 +61,15 @@ impl BlockSynchronizer {
#[log]
pub fn start(&self) {
let gossip_period = self.gossip_period;
let kura = Arc::clone(&self.kura);
let wsv = Arc::clone(&self.wsv);
let peer_id = self.peer_id.clone();
let sumeragi = Arc::clone(&self.sumeragi);
drop(task::spawn(
async move {
loop {
task::sleep(gossip_period).await;
let message = Message::LatestBlock(
kura.read().await.latest_block_hash(),
peer_id.clone(),
);
let message =
Message::LatestBlock(wsv.latest_block_hash().await, peer_id.clone());
drop(
futures::future::join_all(
sumeragi
Expand Down Expand Up @@ -102,7 +100,7 @@ impl BlockSynchronizer {
.sumeragi
.read()
.await
.network_topology_current_or_genesis(block);
.network_topology_current_or_genesis(&block.clone().into());
if block.header().number_of_view_changes < self.n_topology_shifts_before_reshuffle {
network_topology.shift_peers_by_n(block.header().number_of_view_changes);
} else {
Expand All @@ -111,7 +109,7 @@ impl BlockSynchronizer {
block.header().number_of_view_changes,
)
}
if self.kura.read().await.latest_block_hash() == block.header().previous_block_hash
if self.wsv.latest_block_hash().await == block.header().previous_block_hash
&& network_topology
.filter_signatures_by_roles(
&[Role::ValidatingPeer, Role::Leader, Role::ProxyTail],
Expand All @@ -124,15 +122,15 @@ impl BlockSynchronizer {
self.sumeragi
.write()
.await
.commit_block(block.clone())
.commit_block(block.clone().into())
.await;
} else {
self.state = State::Idle;
}
} else {
self.state = State::Idle;
if let Err(e) = Message::GetBlocksAfter(
self.kura.read().await.latest_block_hash(),
self.wsv.latest_block_hash().await,
self.peer_id.clone(),
)
.send_to(&peer_id)
Expand All @@ -157,7 +155,7 @@ pub mod message {
use parity_scale_codec::{Decode, Encode};

use super::{BlockSynchronizer, State};
use crate::{block::VersionedValidBlock, torii::uri};
use crate::{block::VersionedCommittedBlock, torii::uri};

declare_versioned_with_scale!(VersionedMessage 1..2);

Expand Down Expand Up @@ -194,15 +192,15 @@ pub mod message {
/// Request for blocks after the block with `Hash` for the peer with `PeerId`.
GetBlocksAfter(Hash, PeerId),
/// The response to `GetBlocksAfter`. Contains the requested blocks and the id of the peer who shared them.
ShareBlocks(Vec<VersionedValidBlock>, PeerId),
ShareBlocks(Vec<VersionedCommittedBlock>, PeerId),
}

impl Message {
/// Handles the incoming message.
pub async fn handle(&self, block_sync: &mut BlockSynchronizer) {
match self {
Message::LatestBlock(hash, peer) => {
let latest_block_hash = block_sync.kura.read().await.latest_block_hash();
let latest_block_hash = block_sync.wsv.latest_block_hash().await;
if *hash != latest_block_hash {
if let Err(err) =
Message::GetBlocksAfter(latest_block_hash, block_sync.peer_id.clone())
Expand All @@ -221,7 +219,7 @@ pub mod message {
return;
}

if let Some(blocks) = block_sync.kura.read().await.blocks_after(*hash) {
if let Some(blocks) = block_sync.wsv.blocks_after(*hash).await {
#[allow(clippy::cast_possible_truncation)]
if let Some(blocks_batch) =
blocks.chunks(block_sync.batch_size as usize).next()
Expand Down
Loading

0 comments on commit fdcb2dc

Please sign in to comment.