Skip to content

Commit

Permalink
Merge branch 'tim/state-sync-mgr' into 'master'
Browse files Browse the repository at this point in the history
[NET-1440] state sync manager

Adds the state sync manager.

There are still a few cleanup tasks to do but I would like to do them as separate MRs and have an initial version merged. Refer to the tracking [ticket](https://dfinity.atlassian.net/browse/NET-1437) 

See merge request dfinity-lab/public/ic!12596
  • Loading branch information
tthebst committed Jun 23, 2023
2 parents 54907b0 + bdc6cf3 commit 50e3d40
Show file tree
Hide file tree
Showing 13 changed files with 976 additions and 0 deletions.
35 changes: 35 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ members = [
"rs/orchestrator/registry_replicator",
"rs/p2p",
"rs/p2p/peer_manager",
"rs/p2p/quic_transport",
"rs/p2p/state_sync_manager",
"rs/p2p/experimental/prototype",
"rs/replica/setup_ic_network",
"rs/phantom_newtype",
Expand Down
28 changes: 28 additions & 0 deletions rs/p2p/quic_transport/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
load("@rules_rust//rust:defs.bzl", "rust_library")

package(default_visibility = [
"//rs/p2p/quic_transport:__subpackages__",
"//rs/p2p/state_sync_manager:__subpackages__",
])

DEPENDENCIES = [
"//rs/types/types",
"@crate_index//:bytes",
"@crate_index//:http",
]

MACRO_DEPENDENCIES = [
"@crate_index//:async-trait",
]

ALIASES = {}

rust_library(
name = "quic_transport",
srcs = glob(["src/**/*.rs"]),
aliases = ALIASES,
crate_name = "ic_quic_transport",
proc_macro_deps = MACRO_DEPENDENCIES,
version = "0.8.0",
deps = DEPENDENCIES,
)
13 changes: 13 additions & 0 deletions rs/p2p/quic_transport/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "ic-quic-transport"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1.36"
bytes = "1.0.1"
http = "0.2.9"
ic-types = { path = "../../types/types" }

49 changes: 49 additions & 0 deletions rs/p2p/quic_transport/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use async_trait::async_trait;
use bytes::Bytes;
use http::{Request, Response};
use ic_types::NodeId;

#[derive(Debug)]
pub enum TransportError {
Disconnected {
peer_id: NodeId,
// Potential reason for not being connected
connection_error: Option<String>,
},
Io {
peer_id: NodeId,
error: std::io::Error,
},
/// Transport is shutdown.
Stopped,
}

impl std::fmt::Display for TransportError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Disconnected {
peer_id,
connection_error: _,
} => {
write!(f, "Disconnected/No connection to peer {}.", peer_id)
}
Self::Io { peer_id, error } => {
write!(f, "Io error with peer {}. Reason: {}", peer_id, error)
}
Self::Stopped => {
write!(f, "Transport Stopped")
}
}
}
}

#[async_trait]
pub trait Transport: Send + Sync {
async fn rpc(
&self,
peer: &NodeId,
request: Request<Bytes>,
) -> Result<Response<Bytes>, TransportError>;

fn broadcast(&self, msg: Request<Bytes>) -> Result<(), TransportError>;
}
42 changes: 42 additions & 0 deletions rs/p2p/state_sync_manager/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
load("@rules_rust//rust:defs.bzl", "rust_library")

package(default_visibility = [
"//rs/replica:__subpackages__",
])

DEPENDENCIES = [
"//rs/async_utils",
"//rs/interfaces",
"//rs/monitoring/logger",
"//rs/monitoring/metrics",
"//rs/protobuf",
"//rs/p2p/quic_transport",
"//rs/types/types",
"@crate_index//:axum",
"@crate_index//:base64",
"@crate_index//:bincode",
"@crate_index//:bytes",
"@crate_index//:futures",
"@crate_index//:prometheus",
"@crate_index//:prost",
"@crate_index//:rand_0_8_4",
"@crate_index//:serde",
"@crate_index//:slog",
"@crate_index//:tokio",
]

MACRO_DEPENDENCIES = [
"@crate_index//:async-trait",
]

ALIASES = {}

rust_library(
name = "state_sync_manager",
srcs = glob(["src/**/*.rs"]),
aliases = ALIASES,
crate_name = "ic_state_sync_manager",
proc_macro_deps = MACRO_DEPENDENCIES,
version = "0.8.0",
deps = DEPENDENCIES,
)
27 changes: 27 additions & 0 deletions rs/p2p/state_sync_manager/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
name = "ic-state-sync-manager"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1.36"
axum = "0.6.12"
base64 = "0.11.0"
bincode = "1.2.1"
bytes = "1.0.1"
futures = "0.3.10"
ic-async-utils = { path = "../../async_utils" }
ic-interfaces = { path = "../../interfaces" }
ic-logger = { path = "../../monitoring/logger" }
ic-metrics = { path = "../../monitoring/metrics" }
ic-protobuf = { path = "../../protobuf" }
ic-quic-transport = { path = "../quic_transport" }
ic-types = { path = "../../types/types" }
prometheus = { version = "0.12.0", features = [ "process" ] }
prost = "0.11.0"
rand = "0.8.5"
serde = { version = "1.0.99", features = [ "derive" ] }
slog = { version = "2.5.2", features = ["nested-values", "release_max_level_debug"] }
tokio = { version = "1.28.0", features = ["full"] }
170 changes: 170 additions & 0 deletions rs/p2p/state_sync_manager/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
//! State sync manager.
//!
//! Implements the necessary network logic for state sync:
//! - Periodic broadcasting of the latest state advert to all peers.
//! - Checking advertisments for peers against local state and
//! starting state sync if necessary.
//! - Adding peers to ongoing state sync if they advertise the same state.
//!
//! API:
//! - `/chunk` route takes `pb::GossipChunkRequest` and responds with `pb::ArtifactChunk`
//! if the chunk was found. It responds with NOT_FOUND if the chunk is not available.
//! - `/advert` accepts `pb::GossipAdvert` and returns nothing.
//!
//! GUARANTEES:
//! - There is only ever one active state sync.
//! - State sync is started for the advert that returned FETCH.
//! - State advert is periodically broadcasted and there is no delivery guarantee.
use std::{
sync::{Arc, Mutex},
time::Duration,
};

use axum::{routing::any, Router};
use ic_interfaces::state_sync_client::StateSyncClient;
use ic_logger::{info, ReplicaLogger};
use ic_metrics::MetricsRegistry;
use ic_quic_transport::Transport;
use ic_types::{artifact::StateSyncArtifactId, NodeId};
use metrics::{StateSyncManagerHandlerMetrics, StateSyncManagerMetrics};
use ongoing::OngoingStateSyncHandle;
use routes::{
build_advert_handler_request, state_sync_advert_handler, state_sync_chunk_handler,
StateSyncAdvertHandler, StateSyncChunkHandler, STATE_SYNC_ADVERT_PATH, STATE_SYNC_CHUNK_PATH,
};
use tokio::{runtime::Handle, select, task::JoinHandle};

use crate::ongoing::start_ongoing_state_sync;

mod metrics;
mod ongoing;
mod routes;

pub fn build_axum_router(
state_sync: Arc<dyn StateSyncClient>,
log: ReplicaLogger,
metrics_registry: &MetricsRegistry,
) -> (
Router,
tokio::sync::mpsc::Receiver<(StateSyncArtifactId, NodeId)>,
) {
let metrics = StateSyncManagerHandlerMetrics::new(metrics_registry);
let shared_chunk_state = Arc::new(StateSyncChunkHandler::new(
log.clone(),
state_sync,
metrics.clone(),
));

let (tx, rx) = tokio::sync::mpsc::channel(20);
let advert_handler_state = Arc::new(StateSyncAdvertHandler::new(log, tx, metrics));

let app = Router::new()
.route(STATE_SYNC_CHUNK_PATH, any(state_sync_chunk_handler))
.with_state(shared_chunk_state)
.route(
STATE_SYNC_ADVERT_PATH,
axum::routing::any(state_sync_advert_handler),
)
.with_state(advert_handler_state);

(app, rx)
}

pub fn start_state_sync_manager(
log: ReplicaLogger,
metrics: &MetricsRegistry,
rt: &Handle,
transport: Arc<dyn Transport>,
state_sync: Arc<dyn StateSyncClient>,
advert_receiver: tokio::sync::mpsc::Receiver<(StateSyncArtifactId, NodeId)>,
) -> JoinHandle<()> {
let state_sync_manager_metrics = StateSyncManagerMetrics::new(metrics);
let manager = StateSyncManager {
log,
rt: rt.clone(),
metrics: state_sync_manager_metrics,
transport,
state_sync,
advert_receiver,
ongoing_state_sync: None,
};
rt.spawn(manager.run())
}

struct StateSyncManager {
log: ReplicaLogger,
rt: Handle,
metrics: StateSyncManagerMetrics,
transport: Arc<dyn Transport>,
state_sync: Arc<dyn StateSyncClient>,
advert_receiver: tokio::sync::mpsc::Receiver<(StateSyncArtifactId, NodeId)>,
ongoing_state_sync: Option<OngoingStateSyncHandle>,
}

impl StateSyncManager {
async fn run(mut self) {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
select! {
_ = interval.tick() => {
self.handle_advert_tick();
},
Some((advert, peer)) = self.advert_receiver.recv() =>{
self.handle_advert(advert, peer).await;
}
}
}
}

async fn handle_advert(&mut self, artifact_id: StateSyncArtifactId, peer: NodeId) {
// Remove ongoing state sync if finished or try to add peer if ongoing.
if let Some(ongoing) = &mut self.ongoing_state_sync {
// Try to add peer to state sync peer set.
let _ = ongoing.sender.send(peer).await;
if ongoing.jh.is_finished() {
self.ongoing_state_sync = None;
}
}

// `start_state_sync` should not be called if we have ongoing state sync!
if self.ongoing_state_sync.is_some() {
return;
}

if let Some(chunkable) = self.state_sync.start_state_sync(&artifact_id) {
info!(
self.log,
"Starting state sync for height {}", artifact_id.height
);
self.metrics.state_syncs.inc();

// This will spawn a task that downloads the chunk according to the tracker.
// If it is done/timeout it will finish and drop the tracker. Until the state is dropped
// the priority function guarantees to never return FETCH again.
let ongoing = start_ongoing_state_sync(
self.log.clone(),
&self.rt,
self.metrics.ongoing_state_sync_metrics.clone(),
Arc::new(Mutex::new(chunkable)),
artifact_id,
self.state_sync.clone(),
self.transport.clone(),
);
// Add peer that initiated this state sync to ongoing state sync.
ongoing
.sender
.send(peer)
.await
.expect("Receive side is not dropped");
self.ongoing_state_sync = Some(ongoing);
}
}

fn handle_advert_tick(&mut self) {
if let Some(state_id) = self.state_sync.latest_state() {
let _ = self
.transport
.broadcast(build_advert_handler_request(&state_id));
}
}
}

0 comments on commit 50e3d40

Please sign in to comment.