Skip to content

Commit

Permalink
refactor: Make the error code when adding chunks sane
Browse files Browse the repository at this point in the history
  • Loading branch information
rumenov committed Jan 12, 2024
1 parent 5e42368 commit 2e614cf
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 94 deletions.
12 changes: 7 additions & 5 deletions rs/interfaces/src/p2p/state_sync.rs
@@ -1,14 +1,15 @@
//! The file contains the synchronous interface used from P2P, to drive the StateSync protocol.
use ic_types::artifact::StateSyncArtifactId;
use phantom_newtype::Id;
use thiserror::Error;

pub type Chunk = Vec<u8>;

/// Error codes returned by the `Chunkable` interface.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum ArtifactErrorCode {
ChunksMoreNeeded,
ChunkVerificationFailed,
#[derive(Clone, Debug, PartialEq, Eq, Hash, Error)]
pub enum AddChunkError {
#[error("bad chunk")]
Invalid,
}

/// The chunk type.
Expand All @@ -17,7 +18,8 @@ pub type ChunkId = Id<ChunkIdTag, u32>;

pub trait Chunkable<T> {
fn chunks_to_download(&self) -> Box<dyn Iterator<Item = ChunkId>>;
fn add_chunk(&mut self, chunk_id: ChunkId, chunk: Chunk) -> Result<T, ArtifactErrorCode>;
fn add_chunk(&mut self, chunk_id: ChunkId, chunk: Chunk) -> Result<(), AddChunkError>;
fn completed(&self) -> Option<T>;
}

pub trait StateSyncClient: Send + Sync {
Expand Down
30 changes: 13 additions & 17 deletions rs/p2p/state_sync_manager/src/ongoing.rs
Expand Up @@ -20,7 +20,7 @@ use crate::metrics::OngoingStateSyncMetrics;
use crate::routes::{build_chunk_handler_request, parse_chunk_handler_response};

use ic_async_utils::JoinMap;
use ic_interfaces::p2p::state_sync::{ArtifactErrorCode, ChunkId, Chunkable, StateSyncClient};
use ic_interfaces::p2p::state_sync::{ChunkId, Chunkable, StateSyncClient};
use ic_logger::{error, info, ReplicaLogger};
use ic_quic_transport::Transport;
use ic_types::{artifact::StateSyncArtifactId, NodeId};
Expand Down Expand Up @@ -329,9 +329,16 @@ impl<T: 'static + Send> OngoingStateSync<T> {
}
};

let chunk_add_result = tokio::task::spawn_blocking(move || {
let result = tokio::task::spawn_blocking(move || {
let chunk = parse_chunk_handler_response(response, chunk_id, metrics)?;
Ok(tracker.lock().unwrap().add_chunk(chunk_id, chunk))
let mut tracker_guard = tracker.lock().unwrap();
tracker_guard.add_chunk(chunk_id, chunk).map_err(|err| {
DownloadChunkError::RequestError {
chunk_id,
err: err.to_string(),
}
})?;
Ok(tracker_guard.completed())
})
.await
.map_err(|err| DownloadChunkError::RequestError {
Expand All @@ -340,17 +347,6 @@ impl<T: 'static + Send> OngoingStateSync<T> {
})
.and_then(std::convert::identity);

let result = match chunk_add_result {
Ok(Ok(msg)) => Ok(Some(msg)),
Ok(Err(ArtifactErrorCode::ChunksMoreNeeded)) => Ok(None),
Ok(Err(ArtifactErrorCode::ChunkVerificationFailed)) => {
Err(DownloadChunkError::RequestError {
chunk_id,
err: String::from("Chunk verification failed."),
})
}
Err(err) => Err(err),
};
DownloadResult { peer_id, result }
}
}
Expand All @@ -377,6 +373,7 @@ mod tests {

use axum::http::{Response, StatusCode};
use bytes::{Bytes, BytesMut};
use ic_interfaces::p2p::state_sync::AddChunkError;
use ic_metrics::MetricsRegistry;
use ic_p2p_test_utils::mocks::{MockChunkable, MockStateSync, MockTransport};
use ic_test_utilities_logger::with_test_replica_logger;
Expand Down Expand Up @@ -457,7 +454,7 @@ mod tests {
c.expect_chunks_to_download()
.returning(|| Box::new(std::iter::once(ChunkId::from(1))));
c.expect_add_chunk()
.return_const(Err(ArtifactErrorCode::ChunkVerificationFailed));
.return_const(Err(AddChunkError::Invalid));

let rt = Runtime::new().unwrap();
let ongoing = start_ongoing_state_sync(
Expand Down Expand Up @@ -504,8 +501,7 @@ mod tests {
// Endless iterator
c.expect_chunks_to_download()
.returning(|| Box::new(std::iter::once(ChunkId::from(1))));
c.expect_add_chunk()
.return_const(Err(ArtifactErrorCode::ChunksMoreNeeded));
c.expect_add_chunk().return_const(Ok(()));

let rt = Runtime::new().unwrap();
let ongoing = start_ongoing_state_sync(
Expand Down
28 changes: 13 additions & 15 deletions rs/p2p/state_sync_manager/tests/common.rs
Expand Up @@ -9,9 +9,7 @@ use std::{
time::Duration,
};

use ic_interfaces::p2p::state_sync::{
ArtifactErrorCode, Chunk, ChunkId, Chunkable, StateSyncClient,
};
use ic_interfaces::p2p::state_sync::{AddChunkError, Chunk, ChunkId, Chunkable, StateSyncClient};
use ic_logger::ReplicaLogger;
use ic_memory_transport::TransportRouter;
use ic_metrics::MetricsRegistry;
Expand Down Expand Up @@ -281,11 +279,7 @@ impl Chunkable<StateSyncMessage> for FakeChunkable {
Box::new(to_download.into_iter().map(ChunkId::from))
}

fn add_chunk(
&mut self,
chunk_id: ChunkId,
chunk: Chunk,
) -> Result<StateSyncMessage, ArtifactErrorCode> {
fn add_chunk(&mut self, chunk_id: ChunkId, chunk: Chunk) -> Result<(), AddChunkError> {
for set in self.chunk_sets.iter_mut() {
if set.is_empty() {
continue;
Expand All @@ -302,11 +296,15 @@ impl Chunkable<StateSyncMessage> for FakeChunkable {
self.local_state.add_chunk(chunk_id, chunk.len())
}

Ok(())
}

fn completed(&self) -> Option<StateSyncMessage> {
let elems = self.chunk_sets.iter().map(|set| set.len()).sum::<usize>();
if elems == 0 {
Ok(state_sync_artifact(self.syncing_state.clone()))
Some(state_sync_artifact(self.syncing_state.clone()))
} else {
Err(ArtifactErrorCode::ChunksMoreNeeded)
None
}
}
}
Expand Down Expand Up @@ -342,14 +340,14 @@ impl Chunkable<StateSyncMessage> for SharableMockChunkable {
self.chunks_to_download_calls.fetch_add(1, Ordering::SeqCst);
self.mock.lock().unwrap().chunks_to_download()
}
fn add_chunk(
&mut self,
chunk_id: ChunkId,
chunk: Chunk,
) -> Result<StateSyncMessage, ArtifactErrorCode> {
fn add_chunk(&mut self, chunk_id: ChunkId, chunk: Chunk) -> Result<(), AddChunkError> {
self.add_chunks_calls.fetch_add(1, Ordering::SeqCst);
self.mock.lock().unwrap().add_chunk(chunk_id, chunk)
}

fn completed(&self) -> Option<StateSyncMessage> {
self.mock.lock().unwrap().completed()
}
}

#[derive(Clone, Default)]
Expand Down
9 changes: 4 additions & 5 deletions rs/p2p/state_sync_manager/tests/test.rs
Expand Up @@ -11,7 +11,7 @@ use crate::common::{
SharableMockChunkable, State,
};
use common::SharableMockStateSync;
use ic_interfaces::p2p::state_sync::{ArtifactErrorCode, ChunkId};
use ic_interfaces::p2p::state_sync::{AddChunkError, ChunkId};
use ic_logger::info;
use ic_memory_transport::TransportRouter;
use ic_p2p_test_utils::{
Expand Down Expand Up @@ -670,9 +670,8 @@ fn test_state_sync_abortion() {
c2.get_mut()
.expect_chunks_to_download()
.returning(|| Box::new(vec![ChunkId::from(1)].into_iter()) as Box<_>);
c2.get_mut()
.expect_add_chunk()
.return_const(Err(ArtifactErrorCode::ChunksMoreNeeded));
c2.get_mut().expect_add_chunk().return_const(Ok(()));
c2.get_mut().expect_completed().return_const(None);
{
let c2 = c2.clone();
s2.get_mut()
Expand Down Expand Up @@ -733,7 +732,7 @@ fn test_state_sync_abortion() {
c2.clear();
c2.get_mut()
.expect_add_chunk()
.returning(|_, _| Err(ArtifactErrorCode::ChunkVerificationFailed));
.returning(|_, _| Err(AddChunkError::Invalid));
c2.get_mut()
.expect_chunks_to_download()
.returning(|| Box::new(vec![ChunkId::from(1)].into_iter()) as Box<_>);
Expand Down
5 changes: 3 additions & 2 deletions rs/p2p/test_utils/src/mocks.rs
Expand Up @@ -4,7 +4,7 @@ use axum::http::{Request, Response};
use bytes::Bytes;
use ic_interfaces::p2p::{
consensus::{PriorityFnAndFilterProducer, ValidatedPoolReader},
state_sync::{ArtifactErrorCode, Chunk, ChunkId, Chunkable, StateSyncClient},
state_sync::{AddChunkError, Chunk, ChunkId, Chunkable, StateSyncClient},
};
use ic_quic_transport::{ConnId, SendError, Transport};
use ic_types::artifact::PriorityFn;
Expand Down Expand Up @@ -58,7 +58,8 @@ mock! {

impl<T> Chunkable<T> for Chunkable<T> {
fn chunks_to_download(&self) -> Box<dyn Iterator<Item = ChunkId>>;
fn add_chunk(&mut self, chunk_id: ChunkId, chunk: Chunk) -> Result<T, ArtifactErrorCode>;
fn add_chunk(&mut self, chunk_id: ChunkId, chunk: Chunk) -> Result<(), AddChunkError>;
fn completed(&self) -> Option<T>;
}
}

Expand Down

0 comments on commit 2e614cf

Please sign in to comment.