Skip to content

Commit

Permalink
chore(p2p): Add graceful shutdown to P2P sender
Browse files Browse the repository at this point in the history
  • Loading branch information
DSharifi committed Feb 16, 2024
1 parent 85d9936 commit 5efe31e
Show file tree
Hide file tree
Showing 5 changed files with 369 additions and 204 deletions.
2 changes: 1 addition & 1 deletion rs/p2p/consensus_manager/BUILD.bazel
Expand Up @@ -22,14 +22,14 @@ DEPENDENCIES = [
"@crate_index//:rand",
"@crate_index//:slog",
"@crate_index//:tokio",
"@crate_index//:tokio-util",
]

DEV_DEPENDENCIES = [
"//rs/p2p/test_utils",
"//rs/test_utilities/logger",
"//rs/types/types_test_utils",
"@crate_index//:mockall",
"@crate_index//:tokio-util",
"@crate_index//:turmoil",
]

Expand Down
2 changes: 1 addition & 1 deletion rs/p2p/consensus_manager/Cargo.toml
Expand Up @@ -22,11 +22,11 @@ prometheus = { workspace = true }
rand = "0.8.5"
slog = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }

[dev-dependencies]
ic-p2p-test-utils = { path = "../test_utils" }
ic-test-utilities-logger = { path = "../../test_utilities/logger" }
ic-types-test-utils = { path = "../../types/types_test_utils" }
mockall = { workspace = true }
tokio-util = { workspace = true }
turmoil = { workspace = true }
10 changes: 9 additions & 1 deletion rs/p2p/consensus_manager/src/lib.rs
Expand Up @@ -27,6 +27,7 @@ use tokio::{
watch,
},
};
use tokio_util::sync::CancellationToken;

mod metrics;
mod receiver;
Expand All @@ -40,6 +41,7 @@ pub struct ConsensusManagerBuilder {
rt_handle: Handle,
clients: Vec<StartConsensusManagerFn>,
router: Option<Router>,
cancellation_token: CancellationToken,
}

impl ConsensusManagerBuilder {
Expand All @@ -50,6 +52,7 @@ impl ConsensusManagerBuilder {
rt_handle,
clients: Vec::new(),
router: None,
cancellation_token: CancellationToken::new(),
}
}

Expand All @@ -68,6 +71,7 @@ impl ConsensusManagerBuilder {
let log = self.log.clone();
let rt_handle = self.rt_handle.clone();
let metrics_registry = self.metrics_registry.clone();
let cancellation_token = self.cancellation_token.child_token();

let builder = move |transport: Arc<dyn Transport>, topology_watcher| {
start_consensus_manager(
Expand All @@ -81,6 +85,7 @@ impl ConsensusManagerBuilder {
sender,
transport,
topology_watcher,
cancellation_token,
)
};

Expand All @@ -97,10 +102,11 @@ impl ConsensusManagerBuilder {
self,
transport: Arc<dyn Transport>,
topology_watcher: watch::Receiver<SubnetTopology>,
) {
) -> CancellationToken {
for client in self.clients {
client(transport.clone(), topology_watcher.clone());
}
self.cancellation_token
}
}

Expand All @@ -117,6 +123,7 @@ fn start_consensus_manager<Artifact, Pool>(
sender: UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
transport: Arc<dyn Transport>,
topology_watcher: watch::Receiver<SubnetTopology>,
cancellation_token: CancellationToken,
) where
Pool: 'static + Send + Sync + ValidatedPoolReader<Artifact>,
Artifact: ArtifactKind,
Expand All @@ -130,6 +137,7 @@ fn start_consensus_manager<Artifact, Pool>(
raw_pool.clone(),
transport.clone(),
adverts_to_send,
cancellation_token,
);

ConsensusManagerReceiver::run(
Expand Down

0 comments on commit 5efe31e

Please sign in to comment.