Skip to content

Commit 816f0ed

Browse files
authored
chore: improve the creation of the p2p handlers and managers (#3467)
1 parent 13a3cc3 commit 816f0ed

File tree

6 files changed

+107
-115
lines changed

6 files changed

+107
-115
lines changed

rs/p2p/consensus_manager/src/lib.rs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,22 @@ mod sender;
2828
type StartConsensusManagerFn =
2929
Box<dyn FnOnce(Arc<dyn Transport>, watch::Receiver<SubnetTopology>) -> Vec<Shutdown>>;
3030

31+
pub struct AbortableBroadcastChannelManager(Vec<StartConsensusManagerFn>);
32+
33+
impl AbortableBroadcastChannelManager {
34+
pub fn start(
35+
self,
36+
transport: Arc<dyn Transport>,
37+
topology_watcher: watch::Receiver<SubnetTopology>,
38+
) -> Vec<Shutdown> {
39+
let mut ret = vec![];
40+
for client in self.0 {
41+
ret.append(&mut client(transport.clone(), topology_watcher.clone()));
42+
}
43+
ret
44+
}
45+
}
46+
3147
/// Same order of magnitude as the number of active artifacts.
3248
const MAX_OUTBOUND_CHANNEL_SIZE: usize = 100_000;
3349

@@ -113,20 +129,11 @@ impl AbortableBroadcastChannelBuilder {
113129
(outbound_tx, inbound_rx)
114130
}
115131

116-
pub fn router(&mut self) -> Router {
117-
self.router.take().unwrap_or_default()
118-
}
119-
120-
pub fn run(
121-
self,
122-
transport: Arc<dyn Transport>,
123-
topology_watcher: watch::Receiver<SubnetTopology>,
124-
) -> Vec<Shutdown> {
125-
let mut ret = vec![];
126-
for client in self.clients {
127-
ret.append(&mut client(transport.clone(), topology_watcher.clone()));
128-
}
129-
ret
132+
pub fn build(self) -> (Router, AbortableBroadcastChannelManager) {
133+
(
134+
self.router.unwrap(),
135+
AbortableBroadcastChannelManager(self.clients),
136+
)
130137
}
131138
}
132139

rs/p2p/consensus_manager/tests/test.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -345,17 +345,18 @@ fn load_test(
345345
for i in 0..num_peers {
346346
let node = node_test_id(i);
347347
let processor = TestConsensus::new(log.clone(), node, 256 * (i as usize + 1), i % 2 == 0);
348-
let (jh, mut cm) =
348+
let (jh, cm) =
349349
start_consensus_manager(no_op_logger(), rt.handle().clone(), processor.clone());
350350
jhs.push(jh);
351-
nodes.push((node, cm.router()));
352-
cms.push((node, cm));
351+
let (r, m) = cm.build();
352+
nodes.push((node, r));
353+
cms.push((node, m));
353354
node_advert_map.insert(node, processor);
354355
}
355356
let (nodes, topology_watcher) = fully_connected_localhost_subnet(rt.handle(), log, id, nodes);
356357
for ((node1, transport), (node2, cm)) in nodes.into_iter().zip(cms.into_iter()) {
357358
assert!(node1 == node2);
358-
cm.run(transport, topology_watcher.clone());
359+
cm.start(transport, topology_watcher.clone());
359360
}
360361

361362
rt.block_on(async move {

rs/p2p/state_sync_manager/src/lib.rs

Lines changed: 42 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -47,25 +47,23 @@ const ADVERT_BROADCAST_INTERVAL: Duration = Duration::from_secs(5);
4747
const ADVERT_BROADCAST_TIMEOUT: Duration =
4848
ADVERT_BROADCAST_INTERVAL.saturating_sub(Duration::from_secs(2));
4949

50-
pub fn build_axum_router<T: 'static>(
51-
state_sync: Arc<dyn StateSyncClient<Message = T>>,
52-
log: ReplicaLogger,
50+
pub fn build_state_sync_manager<T: Send + 'static>(
51+
log: &ReplicaLogger,
5352
metrics_registry: &MetricsRegistry,
54-
) -> (
55-
Router,
56-
tokio::sync::mpsc::Receiver<(StateSyncArtifactId, NodeId)>,
57-
) {
53+
rt_handle: &tokio::runtime::Handle,
54+
state_sync: Arc<dyn StateSyncClient<Message = T>>,
55+
) -> (Router, StateSyncManager<T>) {
5856
let metrics = StateSyncManagerHandlerMetrics::new(metrics_registry);
5957
let shared_chunk_state = Arc::new(StateSyncChunkHandler::new(
6058
log.clone(),
61-
state_sync,
59+
state_sync.clone(),
6260
metrics.clone(),
6361
));
6462

65-
let (tx, rx) = tokio::sync::mpsc::channel(20);
66-
let advert_handler_state = Arc::new(StateSyncAdvertHandler::new(log, tx));
63+
let (advert_sender, advert_receiver) = tokio::sync::mpsc::channel(20);
64+
let advert_handler_state = Arc::new(StateSyncAdvertHandler::new(log.clone(), advert_sender));
6765

68-
let app = Router::new()
66+
let router = Router::new()
6967
.route(STATE_SYNC_CHUNK_PATH, any(state_sync_chunk_handler))
7068
.with_state(shared_chunk_state)
7169
.route(
@@ -74,45 +72,37 @@ pub fn build_axum_router<T: 'static>(
7472
)
7573
.with_state(advert_handler_state);
7674

77-
(app, rx)
78-
}
79-
80-
pub fn start_state_sync_manager<T: Send + 'static>(
81-
log: &ReplicaLogger,
82-
metrics: &MetricsRegistry,
83-
rt: &Handle,
84-
transport: Arc<dyn Transport>,
85-
state_sync: Arc<dyn StateSyncClient<Message = T>>,
86-
advert_receiver: tokio::sync::mpsc::Receiver<(StateSyncArtifactId, NodeId)>,
87-
) -> Shutdown {
88-
let state_sync_manager_metrics = StateSyncManagerMetrics::new(metrics);
75+
let state_sync_manager_metrics = StateSyncManagerMetrics::new(metrics_registry);
8976
let manager = StateSyncManager {
9077
log: log.clone(),
91-
rt: rt.clone(),
78+
rt: rt_handle.clone(),
9279
metrics: state_sync_manager_metrics,
93-
transport,
9480
state_sync,
9581
advert_receiver,
9682
ongoing_state_sync: None,
9783
};
98-
Shutdown::spawn_on_with_cancellation(
99-
|cancellation: CancellationToken| manager.run(cancellation),
100-
rt,
101-
)
84+
(router, manager)
10285
}
10386

104-
struct StateSyncManager<T> {
87+
pub struct StateSyncManager<T> {
10588
log: ReplicaLogger,
10689
rt: Handle,
10790
metrics: StateSyncManagerMetrics,
108-
transport: Arc<dyn Transport>,
10991
state_sync: Arc<dyn StateSyncClient<Message = T>>,
11092
advert_receiver: tokio::sync::mpsc::Receiver<(StateSyncArtifactId, NodeId)>,
11193
ongoing_state_sync: Option<OngoingStateSyncHandle>,
11294
}
11395

11496
impl<T: 'static + Send> StateSyncManager<T> {
115-
async fn run(mut self, cancellation: CancellationToken) {
97+
pub fn start(self, transport: Arc<dyn Transport>) -> Shutdown {
98+
let rt_handle = self.rt.clone();
99+
Shutdown::spawn_on_with_cancellation(
100+
|cancellation: CancellationToken| self.run(cancellation, transport),
101+
&rt_handle,
102+
)
103+
}
104+
105+
async fn run(mut self, cancellation: CancellationToken, transport: Arc<dyn Transport>) {
116106
let mut interval = tokio::time::interval(ADVERT_BROADCAST_INTERVAL);
117107
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
118108
let mut advertise_task = JoinSet::new();
@@ -127,15 +117,15 @@ impl<T: 'static + Send> StateSyncManager<T> {
127117
Self::send_state_adverts(
128118
self.rt.clone(),
129119
self.state_sync.clone(),
130-
self.transport.clone(),
120+
transport.clone(),
131121
self.metrics.clone(),
132122
cancellation.clone(),
133123
),
134124
&self.rt
135125
);
136126
},
137127
Some((advert, peer_id)) = self.advert_receiver.recv() =>{
138-
self.handle_advert(advert, peer_id).await;
128+
self.handle_advert(advert, peer_id, transport.clone()).await;
139129
}
140130
Some(_) = advertise_task.join_next() => {}
141131
}
@@ -146,7 +136,12 @@ impl<T: 'static + Send> StateSyncManager<T> {
146136
}
147137
}
148138

149-
async fn handle_advert(&mut self, artifact_id: StateSyncArtifactId, peer_id: NodeId) {
139+
async fn handle_advert(
140+
&mut self,
141+
artifact_id: StateSyncArtifactId,
142+
peer_id: NodeId,
143+
transport: Arc<dyn Transport>,
144+
) {
150145
self.metrics.adverts_received_total.inc();
151146
// Remove ongoing state sync if finished or try to add peer if ongoing.
152147
if let Some(ongoing) = &mut self.ongoing_state_sync {
@@ -184,7 +179,7 @@ impl<T: 'static + Send> StateSyncManager<T> {
184179
self.metrics.ongoing_state_sync_metrics.clone(),
185180
Arc::new(Mutex::new(chunkable)),
186181
artifact_id.clone(),
187-
self.transport.clone(),
182+
transport,
188183
);
189184
// Add peer that initiated this state sync to ongoing state sync.
190185
ongoing
@@ -330,14 +325,17 @@ mod tests {
330325
};
331326

332327
let (handler_tx, handler_rx) = tokio::sync::mpsc::channel(100);
333-
start_state_sync_manager(
334-
&log,
335-
&MetricsRegistry::default(),
336-
rt.handle(),
337-
Arc::new(t) as Arc<_>,
338-
Arc::new(s) as Arc<_>,
339-
handler_rx,
340-
);
328+
let metrics = StateSyncManagerMetrics::new(&MetricsRegistry::default());
329+
330+
let manager = StateSyncManager {
331+
log: log.clone(),
332+
advert_receiver: handler_rx,
333+
ongoing_state_sync: None,
334+
metrics,
335+
state_sync: Arc::new(s) as Arc<_>,
336+
rt: rt.handle().clone(),
337+
};
338+
let _ = manager.start(Arc::new(t) as Arc<_>);
341339
rt.block_on(async move {
342340
handler_tx.send((id, NODE_1)).await.unwrap();
343341
handler_tx.send((old_id, NODE_2)).await.unwrap();

rs/p2p/state_sync_manager/tests/common.rs

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -418,25 +418,18 @@ pub fn create_node(
418418
disconnected: Arc::new(AtomicBool::new(false)),
419419
});
420420

421-
let (router, rx) = ic_state_sync_manager::build_axum_router(
422-
state_sync.clone(),
423-
log.clone(),
421+
let (router, manager) = ic_state_sync_manager::build_state_sync_manager(
422+
&log,
424423
&MetricsRegistry::default(),
424+
rt,
425+
state_sync.clone(),
425426
);
426427
let transport = transport_router.add_peer(
427428
NodeId::from(PrincipalId::new_node_test_id(node_num)),
428429
router,
429430
link.0,
430431
link.1,
431432
);
432-
let shutdown = ic_state_sync_manager::start_state_sync_manager(
433-
&log,
434-
&MetricsRegistry::default(),
435-
rt,
436-
Arc::new(transport),
437-
state_sync.clone(),
438-
rx,
439-
);
440-
433+
let shutdown = manager.start(Arc::new(transport));
441434
(state_sync, shutdown)
442435
}

rs/p2p/test_utils/src/turmoil.rs

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -358,19 +358,21 @@ pub fn add_transport_to_sim<F>(
358358
let this_ip = turmoil::lookup(peer.to_string());
359359
let custom_udp = CustomUdp::new(this_ip, udp_listener);
360360

361-
let state_sync_rx = if let Some(ref state_sync) = state_sync_client_clone {
362-
let (state_sync_router, state_sync_rx) = ic_state_sync_manager::build_axum_router(
363-
state_sync.clone(),
364-
log.clone(),
365-
&MetricsRegistry::default(),
366-
);
361+
let state_sync_manager = if let Some(ref state_sync) = state_sync_client_clone {
362+
let (state_sync_router, state_sync_manager) =
363+
ic_state_sync_manager::build_state_sync_manager(
364+
&log,
365+
&MetricsRegistry::default(),
366+
&tokio::runtime::Handle::current(),
367+
state_sync.clone(),
368+
);
367369
router = Some(router.unwrap_or_default().merge(state_sync_router));
368-
Some(state_sync_rx)
370+
Some(state_sync_manager)
369371
} else {
370372
None
371373
};
372374

373-
let _artifact_processor_jh = if let Some(consensus) = consensus_manager_clone {
375+
let con = if let Some(consensus) = consensus_manager_clone {
374376
let bouncer_factory = Arc::new(consensus.clone().read().unwrap().clone());
375377
let downloader = FetchArtifact::new(
376378
log.clone(),
@@ -388,9 +390,11 @@ pub fn add_transport_to_sim<F>(
388390
consensus.clone(),
389391
consensus.clone().read().unwrap().clone(),
390392
);
391-
router = Some(router.unwrap_or_default().merge(consensus_builder.router()));
392393

393-
Some(artifact_processor_jh)
394+
let (consensus_router, manager) = consensus_builder.build();
395+
router = Some(router.unwrap_or_default().merge(consensus_router));
396+
397+
Some((artifact_processor_jh, manager))
394398
} else {
395399
None
396400
};
@@ -407,17 +411,12 @@ pub fn add_transport_to_sim<F>(
407411
router.unwrap_or_default(),
408412
));
409413

410-
consensus_builder.run(transport.clone(), topology_watcher_clone.clone());
414+
if let Some((_, con_manager)) = con {
415+
con_manager.start(transport.clone(), topology_watcher_clone.clone());
416+
}
411417

412-
if let Some(state_sync_rx) = state_sync_rx {
413-
ic_state_sync_manager::start_state_sync_manager(
414-
&log,
415-
&MetricsRegistry::default(),
416-
&tokio::runtime::Handle::current(),
417-
transport.clone(),
418-
state_sync_client_clone.unwrap().clone(),
419-
state_sync_rx,
420-
);
418+
if let Some(state_sync_manager) = state_sync_manager {
419+
state_sync_manager.start(transport.clone());
421420
}
422421

423422
post_setup_future_clone(peer, transport).await;

rs/replica/setup_ic_network/src/lib.rs

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ pub fn setup_consensus_and_p2p(
131131
) {
132132
let consensus_pool_cache = consensus_pool.read().unwrap().get_cache();
133133

134-
let (ingress_pool, ingress_sender, join_handles, mut p2p_consensus) = start_consensus(
134+
let (ingress_pool, ingress_sender, join_handles, p2p_consensus) = start_consensus(
135135
log,
136136
metrics_registry,
137137
rt_handle,
@@ -158,18 +158,21 @@ pub fn setup_consensus_and_p2p(
158158
max_certified_height_tx,
159159
);
160160

161-
// StateSync receive side => handler definition
162-
let (state_sync_router, state_sync_manager_rx) = ic_state_sync_manager::build_axum_router(
163-
state_sync_client.clone(),
164-
log.clone(),
165-
metrics_registry,
166-
);
161+
// StateSync receive side + handler definition
162+
let (state_sync_manager_router, state_sync_manager_runner) =
163+
ic_state_sync_manager::build_state_sync_manager(
164+
log,
165+
metrics_registry,
166+
rt_handle,
167+
state_sync_client.clone(),
168+
);
167169

168-
// Consensus receive side => handler definition
170+
// Consensus receive side + handler definition
171+
let (consensus_manager_router, consensus_manager_runner) = p2p_consensus.build();
169172

170173
// Merge all receive side handlers => router
171-
let p2p_router = state_sync_router
172-
.merge(p2p_consensus.router())
174+
let p2p_router = state_sync_manager_router
175+
.merge(consensus_manager_router)
173176
.layer(TraceLayer::new_for_http());
174177
// Quic transport
175178
let (_, topology_watcher) = ic_peer_manager::start_peer_manager(
@@ -200,17 +203,8 @@ pub fn setup_consensus_and_p2p(
200203
));
201204

202205
// Start the main event loops for StateSync and Consensus
203-
204-
let _state_sync_manager = ic_state_sync_manager::start_state_sync_manager(
205-
log,
206-
metrics_registry,
207-
rt_handle,
208-
quic_transport.clone(),
209-
state_sync_client,
210-
state_sync_manager_rx,
211-
);
212-
213-
let _cancellation_token = p2p_consensus.run(quic_transport, topology_watcher);
206+
let _state_sync_manager = state_sync_manager_runner.start(quic_transport.clone());
207+
let _cancellation_token = consensus_manager_runner.start(quic_transport, topology_watcher);
214208

215209
(ingress_pool, ingress_sender, join_handles)
216210
}

0 commit comments

Comments
 (0)