Skip to content

Commit

Permalink
fix: view change updates
Browse files Browse the repository at this point in the history
Signed-off-by: Marin Veršić <marin.versic101@gmail.com>
  • Loading branch information
mversic committed May 23, 2024
1 parent 79e3adf commit ad1aa08
Show file tree
Hide file tree
Showing 18 changed files with 623 additions and 452 deletions.
34 changes: 29 additions & 5 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ struct IrohaMainState {
/// A boolean value indicating whether or not the peers will receive data from the network.
/// Used in sumeragi testing.
#[cfg(debug_assertions)]
pub freeze_status: Arc<AtomicBool>,
pub freeze_status: FreezeStatus,
}

/// A state of [`Iroha`] for when the network is started, but [`Torii`] not yet.
Expand Down Expand Up @@ -118,14 +118,37 @@ pub enum StartError {
StartTorii,
}

/// Handle for freezing and unfreezing the network
#[derive(Clone)]
#[cfg(debug_assertions)]
pub struct FreezeStatus(Arc<AtomicBool>, PeerId);

#[cfg(debug_assertions)]
impl FreezeStatus {
pub(crate) fn new(peer_id: PeerId) -> Self {
Self(Arc::new(AtomicBool::new(false)), peer_id)
}

/// Stop listening for messages
pub fn freeze(&self) {
iroha_logger::warn!(peer_id=%self.1, "NetworkRelay is frozen");
self.0.store(true, Ordering::SeqCst);
}
/// Start listening for messages
pub fn unfreeze(&self) {
iroha_logger::warn!(peer_id=%self.1, "NetworkRelay is unfrozen");
self.0.store(false, Ordering::SeqCst);
}
}

struct NetworkRelay {
sumeragi: SumeragiHandle,
block_sync: BlockSynchronizerHandle,
gossiper: TransactionGossiperHandle,
network: IrohaNetwork,
shutdown_notify: Arc<Notify>,
#[cfg(debug_assertions)]
freeze_status: Arc<AtomicBool>,
freeze_status: FreezeStatus,
}

impl NetworkRelay {
Expand Down Expand Up @@ -156,7 +179,7 @@ impl NetworkRelay {
use iroha_core::NetworkMessage::*;

#[cfg(debug_assertions)]
if self.freeze_status.load(Ordering::SeqCst) {
if self.freeze_status.0.load(Ordering::SeqCst) {
return;
}

Expand Down Expand Up @@ -346,7 +369,8 @@ impl Iroha<ToriiNotStarted> {
.start();

#[cfg(debug_assertions)]
let freeze_status = Arc::new(AtomicBool::new(false));
let freeze_status = FreezeStatus::new(config.common.peer_id.clone());
Arc::new(AtomicBool::new(false));

let notify_shutdown = Arc::new(Notify::new());

Expand Down Expand Up @@ -564,7 +588,7 @@ impl Iroha<ToriiNotStarted> {
impl<T> Iroha<T> {
#[allow(missing_docs)]
#[cfg(debug_assertions)]
pub fn freeze_status(&self) -> &Arc<AtomicBool> {
pub fn freeze_status(&self) -> &FreezeStatus {
&self.main_state.freeze_status
}

Expand Down
2 changes: 0 additions & 2 deletions cli/src/samples.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ pub fn get_config_toml(
let (public_key, private_key) = peer_key_pair.into_parts();
let (genesis_public_key, genesis_private_key) = genesis_key_pair.into_parts();

iroha_logger::info!(%public_key, "sample configuration public key");

let mut raw = toml::Table::new();
iroha_config::base::toml::Writer::new(&mut raw)
.write("chain_id", chain_id)
Expand Down
5 changes: 2 additions & 3 deletions client/tests/integration/extra_functional/unstable_network.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use core::sync::atomic::Ordering;
use std::thread;

use iroha_client::{
Expand Down Expand Up @@ -97,7 +96,7 @@ fn unstable_network(
for _i in 0..n_transactions {
// Make random peers faulty.
for f in freezers.choose_multiple(&mut rng, n_offline_peers as usize) {
f.store(true, Ordering::SeqCst);
f.freeze();
}

let quantity = Numeric::ONE;
Expand Down Expand Up @@ -129,7 +128,7 @@ fn unstable_network(

// Return all peers to normal function.
for f in &freezers {
f.store(false, Ordering::SeqCst);
f.unfreeze();
}
}
}
Empty file.
15 changes: 9 additions & 6 deletions core/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ mod valid {
block: &SignedBlock,
topology: &Topology,
) -> Result<(), SignatureVerificationError> {
// TODO: ?
//let roles: &[Role] = if topology.view_change_index() >= 1 {
// &[Role::ValidatingPeer, Role::ObservingPeer]
//} else {
Expand Down Expand Up @@ -350,7 +351,7 @@ mod valid {
let proxy_tail_index = topology.proxy_tail_index();
let mut signatures = block.signatures().rev();

match signatures.next() {
let proxy_tail_signature = match signatures.next() {
Some(BlockSignature(signatory, signature))
if usize::try_from(*signatory)
.map_err(|_err| SignatureVerificationError::ProxyTailMissing)?
Expand All @@ -366,13 +367,15 @@ mod valid {
}

signature
.verify(topology.proxy_tail().public_key(), block.payload())
.map_err(|_err| SignatureVerificationError::ProxyTailMissing)?;
}
_ => {
return Err(SignatureVerificationError::ProxyTailMissing);
}
}
};

proxy_tail_signature
.verify(topology.proxy_tail().public_key(), block.payload())
.map_err(|_err| SignatureVerificationError::ProxyTailMissing)?;

Ok(())
}
Expand Down Expand Up @@ -426,7 +429,7 @@ mod valid {

if !block.header().is_genesis() {
if let Err(err) = Self::verify_leader_signature(&block, topology)
.map(|()| Self::verify_validator_signatures(&block, topology))
.and_then(|()| Self::verify_validator_signatures(&block, topology))
{
return WithEvents::new(Err((block, err.into())));
}
Expand Down Expand Up @@ -540,7 +543,7 @@ mod valid {
let prev_signatures = self.0.replace_signatures_unchecked(signatures);

if let Err(err) = Self::verify_leader_signature(self.as_ref(), topology)
.map(|()| Self::verify_validator_signatures(self.as_ref(), topology))
.and_then(|()| Self::verify_validator_signatures(self.as_ref(), topology))
{
self.0.replace_signatures_unchecked(prev_signatures);
WithEvents::new(Err(err))
Expand Down
14 changes: 9 additions & 5 deletions core/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,11 @@ pub mod message {
let start_height = match prev_hash {
Some(hash) => match block_sync.kura.get_block_height_by_hash(hash) {
None => {
error!(?prev_hash, "Block hash not found");
error!(
peer_id=%block_sync.peer_id,
block=%hash,
"Block hash not found"
);
return;
}
Some(height) => height + 1, // It's get blocks *after*, so we add 1.
Expand Down Expand Up @@ -229,11 +233,11 @@ pub mod message {
}
}
Message::ShareBlocks(ShareBlocks { blocks, .. }) => {
use crate::sumeragi::message::BlockMessage;
use crate::sumeragi::message::BlockSyncUpdate;

for block in blocks.clone() {
block_sync
.sumeragi
.incoming_block_message(BlockMessage::BlockSyncUpdate(block.into()));
let msg = BlockSyncUpdate::from(&block);
block_sync.sumeragi.incoming_block_message(msg);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/smartcontracts/isi/triggers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub mod isi {
}
}

let last_block_estimation = state_transaction.latest_block_ref().map(|block| {
let last_block_estimation = state_transaction.latest_block().map(|block| {
block.header().timestamp()
+ Duration::from_millis(block.header().consensus_estimation_ms)
});
Expand Down
17 changes: 5 additions & 12 deletions core/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1004,11 +1004,11 @@ pub trait StateReadOnly {
fn query_handle(&self) -> &LiveQueryStoreHandle;
fn new_tx_amounts(&self) -> &Mutex<Vec<f64>>;

// Block-related methods

/// Get a reference to the latest block. Returns none if genesis is not committed.
///
/// If you only need hash of the latest block prefer using [`Self::latest_block_hash`]
#[inline]
fn latest_block_ref(&self) -> Option<Arc<SignedBlock>> {
fn latest_block(&self) -> Option<Arc<SignedBlock>> {
self.kura().get_block_by_height(self.height())
}

Expand All @@ -1017,13 +1017,6 @@ pub trait StateReadOnly {
self.block_hashes().iter().nth_back(0).copied()
}

/// Return the view change index of the latest block
fn latest_block_view_change_index(&self) -> u32 {
self.kura()
.get_block_by_height(self.height())
.map_or(0, |block| block.header().view_change_index)
}

/// Return the hash of the block one before the latest block
fn prev_block_hash(&self) -> Option<HashOf<SignedBlock>> {
self.block_hashes().iter().nth_back(1).copied()
Expand Down Expand Up @@ -1097,7 +1090,7 @@ pub trait StateReadOnly {
}
}

/// Check if this [`SignedTransaction`] is already committed or rejected.
/// Check if [`SignedTransaction`] is already committed
#[inline]
fn has_transaction(&self, hash: HashOf<SignedTransaction>) -> bool {
self.transactions().get(&hash).is_some()
Expand Down Expand Up @@ -1262,7 +1255,7 @@ impl<'state> StateBlock<'state> {
fn create_time_event(&self, block: &CommittedBlock) -> TimeEvent {
use iroha_config::parameters::defaults::chain_wide::CONSENSUS_ESTIMATION as DEFAULT_CONSENSUS_ESTIMATION;

let prev_interval = self.latest_block_ref().map(|latest_block| {
let prev_interval = self.latest_block().map(|latest_block| {
let header = &latest_block.as_ref().header();

TimeInterval {
Expand Down

0 comments on commit ad1aa08

Please sign in to comment.