Skip to content

Commit 3973797

Browse files
committed
Merge branch 'stefan/transmission_bug' into 'master'
fix: multiple transmissions for the same advert bug See merge request dfinity-lab/public/ic!15944
2 parents d0951df + ed80cf6 commit 3973797

File tree

1 file changed

+17
-13
lines changed

1 file changed

+17
-13
lines changed

rs/p2p/consensus_manager/src/sender.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,19 @@ use std::{
88
use axum::http::Request;
99
use backoff::backoff::Backoff;
1010
use bytes::Bytes;
11-
use ic_async_utils::JoinMap;
1211
use ic_interfaces::{artifact_manager::ArtifactProcessorEvent, artifact_pool::ValidatedPoolReader};
1312
use ic_logger::{warn, ReplicaLogger};
1413
use ic_quic_transport::{ConnId, Transport};
1514
use ic_types::artifact::{Advert, ArtifactKind};
1615
use ic_types::NodeId;
1716
use serde::{Deserialize, Serialize};
18-
use tokio::{runtime::Handle, select, sync::mpsc::Receiver, task::JoinHandle, time};
17+
use tokio::{
18+
runtime::Handle,
19+
select,
20+
sync::mpsc::Receiver,
21+
task::{JoinHandle, JoinSet},
22+
time,
23+
};
1924

2025
use crate::{metrics::ConsensusManagerMetrics, AdvertUpdate, CommitId, Data, SlotNumber};
2126

@@ -210,9 +215,9 @@ where
210215
.expect("Serializing advert update")
211216
.into();
212217

213-
let mut in_progress_transmissions = JoinMap::new();
218+
let mut in_progress_transmissions = JoinSet::new();
214219
// stores the connection ID of the last successful transmission to a peer.
215-
let mut completed_transmissions: HashMap<NodeId, ConnId> = HashMap::new();
220+
let mut initiated_transmissions: HashMap<NodeId, ConnId> = HashMap::new();
216221
let mut periodic_check_interval = time::interval(Duration::from_secs(5));
217222

218223
loop {
@@ -222,20 +227,20 @@ where
222227
// spawn task for peers with higher conn id or not in completed transmissions.
223228
// add task to join map
224229
for (peer, connection_id) in transport.peers() {
225-
let is_completed = completed_transmissions.get(&peer).is_some_and(|c| *c == connection_id);
230+
let is_initiated = initiated_transmissions.get(&peer).is_some_and(|c| *c == connection_id);
226231

227-
if !is_completed {
232+
if !is_initiated {
228233
metrics.send_view_send_to_peer_total.inc();
229-
let task = send_advert_to_peer(transport.clone(), connection_id, body.clone(), peer, Artifact::TAG.into());
230-
in_progress_transmissions.spawn_on(peer, task, &rt_handle);
234+
let task = send_advert_to_peer(transport.clone(), body.clone(), peer, Artifact::TAG.into());
235+
in_progress_transmissions.spawn_on(task, &rt_handle);
236+
initiated_transmissions.insert(peer, connection_id);
231237
}
232238
}
233239
}
234240
Some(result) = in_progress_transmissions.join_next() => {
235241
match result {
236-
Ok((connection_id, peer)) => {
242+
Ok(_) => {
237243
metrics.send_view_send_to_peer_delivered_total.inc();
238-
completed_transmissions.insert(peer, connection_id);
239244
},
240245
Err(err) => {
241246
// Cancelling tasks is ok. Panicking tasks are not.
@@ -254,11 +259,10 @@ where
254259
/// If the peer is not reachable, it will retry with an exponential backoff.
255260
async fn send_advert_to_peer(
256261
transport: Arc<dyn Transport>,
257-
connection_id: ConnId,
258262
message: Bytes,
259263
peer: NodeId,
260264
uri_prefix: &str,
261-
) -> ConnId {
265+
) {
262266
let mut backoff = get_backoff_policy();
263267

264268
loop {
@@ -268,7 +272,7 @@ async fn send_advert_to_peer(
268272
.expect("Building from typed values");
269273

270274
if let Ok(()) = transport.push(&peer, request).await {
271-
return connection_id;
275+
return;
272276
}
273277

274278
let backoff_duration = backoff.next_backoff().unwrap_or(MAX_ELAPSED_TIME);

0 commit comments

Comments
 (0)