Skip to content

Commit

Permalink
Merge #3594
Browse files Browse the repository at this point in the history
3594: Propagate unreliable peers upwards from the trie accumulator to the block synchronizer r=alsrdn a=alsrdn

The trie accumulator uses a snapshot of the peer list provided by the block synchronizer through the global state synchronizer in order to get the required tries.

Because we didn't provide the block synchronizer any feedback on what the peers that dind't have the global state data were, the block synchronizer may re-issue the global state sync request with the same peer set as for the previous failed attempt. This may lead to delays in getting the global state data.

This PR adds:
* the ability to propagate the unreliable peers (that didn't have the requested data) upwards to the global state synchronizer and then to the block synchronizer. This gives feedback that the original peer list provided should be changed if another global sync request is made.
* block synchronizer now demotes the peers unreliable received from the global state synchronizer
* a test to verify that the peer list is changed based on the feedback from the global state sync components.
* slight change into how `PeerQuality` is handled when demoting and promoting peers; now peers with `Reliable` or `Unreliable` quality will not be set as `Unknown` when demoting or promoting respectively.

These changes reduce the risk that the global state synchronizer gets stuck asking the same peers all over again.

Further potential improvements that can be made:
* Promote peers that were successful when downloading global state. We only demote the peers now.
* Use a reputation based peer list (similar to the one used in the block synchronizer) in the trie accumulator rather than going through peers one by one.

Fixes: #3563

Co-authored-by: Alexandru Sardan <alexandru@casperlabs.io>
  • Loading branch information
casperlabs-bors-ng[bot] and alsrdn committed Feb 2, 2023
2 parents 6e9055b + baa3461 commit 749cab9
Show file tree
Hide file tree
Showing 13 changed files with 584 additions and 79 deletions.
45 changes: 37 additions & 8 deletions node/src/components/block_synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ mod peer_list;
mod signature_acquisition;
mod trie_accumulator;

#[cfg(test)]
mod tests;

use std::sync::Arc;

use datasize::DataSize;
Expand Down Expand Up @@ -66,11 +69,15 @@ pub(crate) use execution_results_acquisition::ExecutionResultsChecksum;
use global_state_synchronizer::GlobalStateSynchronizer;
pub(crate) use global_state_synchronizer::{
Error as GlobalStateSynchronizerError, Event as GlobalStateSynchronizerEvent,
Response as GlobalStateSynchronizerResponse,
};
use metrics::Metrics;
pub(crate) use need_next::NeedNext;
use trie_accumulator::TrieAccumulator;
pub(crate) use trie_accumulator::{Error as TrieAccumulatorError, Event as TrieAccumulatorEvent};
pub(crate) use trie_accumulator::{
Error as TrieAccumulatorError, Event as TrieAccumulatorEvent,
Response as TrieAccumulatorResponse,
};

static BLOCK_SYNCHRONIZER_STATUS: Lazy<BlockSynchronizerStatus> = Lazy::new(|| {
BlockSynchronizerStatus::new(
Expand Down Expand Up @@ -894,21 +901,43 @@ impl BlockSynchronizer {
fn global_state_synced(
&mut self,
block_hash: BlockHash,
result: Result<Digest, GlobalStateSynchronizerError>,
result: Result<GlobalStateSynchronizerResponse, GlobalStateSynchronizerError>,
) {
let root_hash = match result {
Ok(hash) => hash,
let (maybe_root_hash, unreliable_peers) = match result {
Ok(response) => (Some(*response.hash()), response.unreliable_peers()),
Err(error) => {
debug!(%error, "BlockSynchronizer: failed to sync global state");
return;
match error {
GlobalStateSynchronizerError::TrieAccumulator(unreliable_peers)
| GlobalStateSynchronizerError::PutTrie(_, unreliable_peers) => {
(None, unreliable_peers)
}
GlobalStateSynchronizerError::NoPeersAvailable(_) => {
// This should never happen. Before creating a sync request,
// the block synchronizer will request another set of peers
// (both random and from the accumulator).
debug!(
"BlockSynchronizer: global state sync request was issued with no peers"
);
(None, Vec::new())
}
}
}
};

if let Some(builder) = &mut self.historical {
if builder.block_hash() != block_hash {
debug!(%block_hash, "BlockSynchronizer: not currently synchronising block");
} else if let Err(error) = builder.register_global_state(root_hash) {
error!(%block_hash, %error, "BlockSynchronizer: failed to apply global state");
debug!(%block_hash, "BlockSynchronizer: not currently synchronizing block");
} else {
if let Some(root_hash) = maybe_root_hash {
if let Err(error) = builder.register_global_state(root_hash) {
error!(%block_hash, %error, "BlockSynchronizer: failed to apply global state");
}
}
// Demote all the peers where we didn't find the required global state tries
for peer in unreliable_peers.iter() {
builder.demote_peer(Some(*peer));
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions node/src/components/block_synchronizer/block_builder.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#[cfg(test)]
mod tests;

use std::{
collections::HashMap,
fmt::{Display, Formatter},
Expand Down
7 changes: 7 additions & 0 deletions node/src/components/block_synchronizer/block_builder/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use super::*;

impl BlockBuilder {
pub(crate) fn peer_list(&self) -> &PeerList {
&self.peer_list
}
}
9 changes: 6 additions & 3 deletions node/src/components/block_synchronizer/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use casper_execution_engine::core::engine_state;

use super::GlobalStateSynchronizerEvent;
use crate::{
components::{block_synchronizer::GlobalStateSynchronizerError, fetcher::FetchResult},
components::{
block_synchronizer::{GlobalStateSynchronizerError, GlobalStateSynchronizerResponse},
fetcher::FetchResult,
},
effect::requests::BlockSynchronizerRequest,
types::{
ApprovalsHashes, Block, BlockExecutionResultsOrChunk, BlockHash, BlockHeader, Deploy,
Expand Down Expand Up @@ -45,7 +48,7 @@ pub(crate) enum Event {
GlobalStateSynced {
block_hash: BlockHash,
#[serde(skip_serializing)]
result: Result<Digest, GlobalStateSynchronizerError>,
result: Result<GlobalStateSynchronizerResponse, GlobalStateSynchronizerError>,
},
GotExecutionResultsChecksum {
block_hash: BlockHash,
Expand Down Expand Up @@ -121,7 +124,7 @@ impl Display for Event {
block_hash: _,
result,
} => match result {
Ok(root_hash) => write!(f, "synced global state under root {}", root_hash),
Ok(response) => write!(f, "synced global state under root {}", response.hash()),
Err(error) => write!(f, "failed to sync global state: {}", error),
},
Event::GotExecutionResultsChecksum {
Expand Down
87 changes: 74 additions & 13 deletions node/src/components/block_synchronizer/global_state_synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use casper_execution_engine::{core::engine_state, storage::trie::TrieRaw};
use casper_hashing::Digest;
use casper_types::Timestamp;

use super::{TrieAccumulator, TrieAccumulatorError, TrieAccumulatorEvent};
use super::{TrieAccumulator, TrieAccumulatorError, TrieAccumulatorEvent, TrieAccumulatorResponse};
use crate::{
components::Component,
effect::{
Expand All @@ -25,28 +25,52 @@ use crate::{
},
reactor,
types::{BlockHash, NodeId, TrieOrChunk},
utils::DisplayIter,
NodeRng,
};

const COMPONENT_NAME: &str = "global_state_synchronizer";

#[derive(Debug, Clone, Error)]
pub(crate) enum Error {
#[error(transparent)]
TrieAccumulator(TrieAccumulatorError),
#[error("ContractRuntime failed to put a trie into global state: {0}")]
PutTrie(engine_state::Error),
#[error("trie accumulator encountered an error while fetching a trie; unreliable peers {}", DisplayIter::new(.0))]
TrieAccumulator(Vec<NodeId>),
#[error("ContractRuntime failed to put a trie into global state: {0}; unreliable peers {}", DisplayIter::new(.1))]
PutTrie(engine_state::Error, Vec<NodeId>),
#[error("no peers available to ask for a trie: {0}")]
NoPeersAvailable(Digest),
}

#[derive(Debug, Clone)]
pub(crate) struct Response {
hash: Digest,
unreliable_peers: Vec<NodeId>,
}

impl Response {
pub(crate) fn new(hash: Digest, unreliable_peers: Vec<NodeId>) -> Self {
Self {
hash,
unreliable_peers,
}
}

pub(crate) fn hash(&self) -> &Digest {
&self.hash
}

pub(crate) fn unreliable_peers(self) -> Vec<NodeId> {
self.unreliable_peers
}
}

#[derive(Debug, From, Serialize)]
pub(crate) enum Event {
#[from]
Request(SyncGlobalStateRequest),
FetchedTrie {
trie_hash: Digest,
trie_accumulator_result: Result<Box<TrieRaw>, TrieAccumulatorError>,
trie_accumulator_result: Result<TrieAccumulatorResponse, TrieAccumulatorError>,
},
PutTrieResult {
trie_hash: Digest,
Expand All @@ -63,7 +87,8 @@ pub(crate) enum Event {
struct RequestState {
block_hashes: HashSet<BlockHash>,
peers: HashSet<NodeId>,
responders: Vec<Responder<Result<Digest, Error>>>,
responders: Vec<Responder<Result<Response, Error>>>,
unreliable_peers: HashSet<NodeId>,
}

impl RequestState {
Expand All @@ -74,6 +99,7 @@ impl RequestState {
block_hashes,
peers: request.peers,
responders: vec![request.responder],
unreliable_peers: HashSet::new(),
}
}

Expand All @@ -88,7 +114,7 @@ impl RequestState {
}

/// Consumes this request state and sends the response on all responders.
fn respond(self, response: Result<Digest, Error>) -> Effects<Event> {
fn respond(self, response: Result<Response, Error>) -> Effects<Event> {
self.responders
.into_iter()
.flat_map(|responder| responder.respond(response.clone()).ignore())
Expand Down Expand Up @@ -289,7 +315,7 @@ impl GlobalStateSynchronizer {
fn handle_fetched_trie<REv>(
&mut self,
trie_hash: Digest,
trie_accumulator_result: Result<Box<TrieRaw>, TrieAccumulatorError>,
trie_accumulator_result: Result<TrieAccumulatorResponse, TrieAccumulatorError>,
effect_builder: EffectBuilder<REv>,
) -> Effects<Event>
where
Expand All @@ -306,12 +332,37 @@ impl GlobalStateSynchronizer {
};

let trie_raw = match trie_accumulator_result {
Ok(trie_raw) => trie_raw,
Ok(response) => {
for root_hash in request_root_hashes.iter() {
if let Some(request_state) = self.request_states.get_mut(root_hash) {
request_state
.unreliable_peers
.extend(response.unreliable_peers());
}
}
response.trie()
}
Err(error) => {
debug!(%error, "error fetching a trie");
let mut effects = Effects::new();
effects.extend(request_root_hashes.into_iter().flat_map(|root_hash| {
self.cancel_request(root_hash, Error::TrieAccumulator(error.clone()))
if let Some(request_state) = self.request_states.get_mut(&root_hash) {
match &error {
TrieAccumulatorError::Absent(_, _, unreliable_peers)
| TrieAccumulatorError::PeersExhausted(_, unreliable_peers) => {
request_state.unreliable_peers.extend(unreliable_peers);
}
TrieAccumulatorError::NoPeers(_) => {
// Trie accumulator did not have any peers to download from
// so the request will be canceled with no peers to report
}
}
let unreliable_peers =
request_state.unreliable_peers.iter().copied().collect();
self.cancel_request(root_hash, Error::TrieAccumulator(unreliable_peers))
} else {
Effects::new()
}
}));
// continue fetching other requests if any
effects.extend(self.parallel_fetch(effect_builder));
Expand Down Expand Up @@ -340,7 +391,10 @@ impl GlobalStateSynchronizer {

fn finish_request(&mut self, trie_hash: Digest) -> Effects<Event> {
match self.request_states.remove(&trie_hash) {
Some(request_state) => request_state.respond(Ok(trie_hash)),
Some(request_state) => {
let unreliable_peers = request_state.unreliable_peers.iter().copied().collect();
request_state.respond(Ok(Response::new(trie_hash, unreliable_peers)))
}
None => Effects::new(),
}
}
Expand Down Expand Up @@ -382,7 +436,14 @@ impl GlobalStateSynchronizer {
Err(error) => {
warn!(%trie_hash, %error, "couldn't put trie into global state");
for root_hash in request_root_hashes {
effects.extend(self.cancel_request(root_hash, Error::PutTrie(error.clone())));
if let Some(request_state) = self.request_states.get_mut(&root_hash) {
let unreliable_peers =
request_state.unreliable_peers.iter().copied().collect();
effects.extend(self.cancel_request(
root_hash,
Error::PutTrie(error.clone(), unreliable_peers),
));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ fn random_test_trie(rng: &mut TestRng) -> TrieRaw {
fn random_sync_global_state_request(
rng: &mut TestRng,
num_random_peers: usize,
responder: Responder<Result<Digest, Error>>,
responder: Responder<Result<Response, Error>>,
) -> (SyncGlobalStateRequest, TrieRaw) {
let block = Block::random(rng);
let trie = random_test_trie(rng);
Expand Down Expand Up @@ -238,7 +238,11 @@ async fn trie_accumulator_error_cancels_request() {
assert_eq!(global_state_synchronizer.in_flight.len(), 1);

// Simulate a trie_accumulator error for the first trie
let trie_accumulator_result = Err(TrieAccumulatorError::Absent(trie_hash1, 0));
let trie_accumulator_result = Err(TrieAccumulatorError::Absent(
trie_hash1,
0,
peers1.iter().cloned().collect(),
));
let mut effects = global_state_synchronizer.handle_fetched_trie(
trie_hash1,
trie_accumulator_result,
Expand Down Expand Up @@ -289,7 +293,7 @@ async fn successful_trie_fetch_puts_trie_to_store() {
.await;

// Simulate a successful trie fetch
let trie_accumulator_result = Ok(Box::new(trie.clone()));
let trie_accumulator_result = Ok(TrieAccumulatorResponse::new(trie.clone(), Vec::new()));
let mut effects = global_state_synchronizer.handle_fetched_trie(
state_root_hash,
trie_accumulator_result,
Expand Down Expand Up @@ -373,7 +377,10 @@ async fn missing_trie_node_children_triggers_fetch() {
.await;

// Simulate a successful trie fetch from the accumulator
let trie_accumulator_result = Ok(Box::new(request_trie.clone()));
let trie_accumulator_result = Ok(TrieAccumulatorResponse::new(
request_trie.clone(),
Vec::new(),
));
let mut effects = global_state_synchronizer.handle_fetched_trie(
state_root_hash,
trie_accumulator_result,
Expand Down Expand Up @@ -437,7 +444,10 @@ async fn missing_trie_node_children_triggers_fetch() {

// Now handle a successful fetch from the trie_accumulator for one of the missing children.
let trie_hash = missing_trie_nodes_hashes[num_missing_trie_nodes - 1];
let trie_accumulator_result = Ok(Box::new(missing_tries[num_missing_trie_nodes - 1].clone()));
let trie_accumulator_result = Ok(TrieAccumulatorResponse::new(
missing_tries[num_missing_trie_nodes - 1].clone(),
Vec::new(),
));
let mut effects = global_state_synchronizer.handle_fetched_trie(
trie_hash,
trie_accumulator_result,
Expand Down Expand Up @@ -524,7 +534,7 @@ async fn stored_trie_finalizes_request() {

// Handle a successful fetch from the trie_accumulator for one of the missing children.
let trie_hash = Digest::hash(trie.inner());
let trie_accumulator_result = Ok(Box::new(trie.clone()));
let trie_accumulator_result = Ok(TrieAccumulatorResponse::new(trie.clone(), Vec::new()));
let mut effects = global_state_synchronizer.handle_fetched_trie(
trie_hash,
trie_accumulator_result,
Expand Down
Loading

0 comments on commit 749cab9

Please sign in to comment.