Skip to content

Commit

Permalink
fix(sumeragi): Use proper view_change_index on init block load (#4612)
Browse files Browse the repository at this point in the history
* fix(sumeragi): Use proper `view_change_index` on init block load
* fix(sumeragi): Exit on shutdown signal

Signed-off-by: Shanin Roman <shanin1000@yandex.ru>
  • Loading branch information
Erigara committed May 22, 2024
1 parent 5b591db commit 4396006
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 39 deletions.
34 changes: 16 additions & 18 deletions core/src/sumeragi/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl Sumeragi {

*state_block.world.trusted_peers_ids = block.as_ref().commit_topology().clone();
self.commit_block(block, state_block);
return Err(EarlyReturn::GenesisBlockReceivedAndCommitted);
return Ok(());
}
Err(mpsc::TryRecvError::Disconnected) => return Err(EarlyReturn::Disconnected),
_ => (),
Expand Down Expand Up @@ -827,25 +827,25 @@ pub(crate) fn run(
sumeragi.connect_peers(&sumeragi.current_topology);

let span = span!(tracing::Level::TRACE, "genesis").entered();
let is_genesis_peer = if state.view().height() == 0
|| state.view().latest_block_hash().is_none()
{
if let Some(genesis) = genesis_network.genesis {
sumeragi.sumeragi_init_commit_genesis(genesis, &genesis_network.public_key, &state);
true
} else {
sumeragi
.init_listen_for_genesis(
let is_genesis_peer =
if state.view().height() == 0 || state.view().latest_block_hash().is_none() {
if let Some(genesis) = genesis_network.genesis {
sumeragi.sumeragi_init_commit_genesis(genesis, &genesis_network.public_key, &state);
true
} else {
if let Err(err) = sumeragi.init_listen_for_genesis(
&genesis_network.public_key,
&state,
&mut shutdown_receiver,
)
.unwrap_or_else(|err| assert_ne!(EarlyReturn::Disconnected, err, "Disconnected"));
) {
info!(?err, "Sumeragi Thread is being shut down.");
return;
}
false
}
} else {
false
}
} else {
false
};
};
span.exit();

info!(
Expand Down Expand Up @@ -1038,8 +1038,6 @@ fn add_signatures<const EXPECT_VALID: bool>(
/// FromResidual`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum EarlyReturn {
/// Genesis block received and committed
GenesisBlockReceivedAndCommitted,
/// Shutdown message received.
ShutdownMessageReceived,
/// Disconnected
Expand Down
48 changes: 27 additions & 21 deletions core/src/sumeragi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ impl SumeragiHandle {
block: &SignedBlock,
state_block: &mut StateBlock<'_>,
events_sender: &EventsSender,
mut current_topology: Topology,
) -> Topology {
recreate_topology: RecreateTopologyByViewChangeIndex,
) -> RecreateTopologyByViewChangeIndex {
// NOTE: topology need to be updated up to block's view_change_index
current_topology.rotate_all_n(block.header().view_change_index);
let current_topology = recreate_topology(block.header().view_change_index);

let block = ValidBlock::validate(
block.clone(),
Expand Down Expand Up @@ -105,11 +105,10 @@ impl SumeragiHandle {
let _ = events_sender.send(e);
});

Topology::recreate_topology(
block.as_ref(),
0,
state_block.world.peers().cloned().collect(),
)
let peers = state_block.world.peers().cloned().collect();
Box::new(move |view_change_index| {
Topology::recreate_topology(block.as_ref(), view_change_index, peers)
})
}

/// Start [`Sumeragi`] actor and return handle to it.
Expand Down Expand Up @@ -139,7 +138,7 @@ impl SumeragiHandle {
let (message_sender, message_receiver) = mpsc::sync_channel(100);

let blocks_iter;
let mut current_topology;
let mut recreate_topology: RecreateTopologyByViewChangeIndex;

{
let state_view = state.view();
Expand All @@ -151,41 +150,45 @@ impl SumeragiHandle {
)
});

current_topology = match state_view.height() {
0 => Topology::new(
sumeragi_config
recreate_topology = match state_view.height() {
// View change index of the next block doesn't affect init topology
0 => {
let peers = sumeragi_config
.trusted_peers
.value()
.clone()
.into_non_empty_vec(),
),
.into_non_empty_vec();
Box::new(move |_view_change_index| Topology::new(peers))
}
height => {
let block_ref = kura.get_block_by_height(height).expect(
"Sumeragi could not load block that was reported as present. \
Please check that the block storage was not disconnected.",
);
Topology::recreate_topology(
&block_ref,
0,
state_view.world.peers_ids().iter().cloned().collect(),
)
let peers = state_view.world.peers_ids().iter().cloned().collect();
Box::new(move |view_change_index| {
Topology::recreate_topology(&block_ref, view_change_index, peers)
})
}
};
}

for block in blocks_iter {
let mut state_block = state.block();
current_topology = Self::replay_block(
recreate_topology = Self::replay_block(
&common_config.chain_id,
&genesis_network.public_key,
&block,
&mut state_block,
&events_sender,
current_topology,
recreate_topology,
);
state_block.commit();
}

// There is no more blocks so we pick 0 as view change index
let current_topology = recreate_topology(0);

info!("Sumeragi has finished loading blocks and setting up the state");

#[cfg(debug_assertions)]
Expand Down Expand Up @@ -243,6 +246,9 @@ impl SumeragiHandle {
}
}

/// Closure to get topology recreated at certain view change index
type RecreateTopologyByViewChangeIndex = Box<dyn FnOnce(u64) -> Topology>;

/// The interval at which sumeragi checks if there are tx in the
/// `queue`. And will create a block if is leader and the voting is
/// not already in progress.
Expand Down

0 comments on commit 4396006

Please sign in to comment.