Conversation
emlautarom1
left a comment
There was a problem hiding this comment.
LGTM, some small comments on some improvements to the tests.
| let sig_types = sig_types.clone(); | ||
| let st = st.clone(); |
There was a problem hiding this comment.
nit: This matches Charon but I don't understand why we need the same data as Vec and HashSet (we do a contains in both cases).
| .await | ||
| .subscribe_internal(sub) | ||
| .await | ||
| .expect("subscribe_internal is infallible"); |
There was a problem hiding this comment.
Why did we make subscribe_internal return a Result if it's infallible? Can we update its definition?
| // Run concurrent exchanges: for each (node, sig_type) pair, spawn a task | ||
| let (result_tx, mut result_rx) = | ||
| mpsc::unbounded_channel::<(SigType, HashMap<PubKey, Vec<ParSignedData>>)>(); | ||
|
|
||
| let mut join_set = tokio::task::JoinSet::new(); | ||
| for (node_idx, ex) in exchangers.iter().enumerate() { | ||
| for &sig_type in &sig_types { | ||
| let ex = Arc::clone(ex); | ||
| let set = data_to_be_sent[node_idx].clone(); | ||
| let tx = result_tx.clone(); | ||
| join_set.spawn(async move { | ||
| let data = ex.exchange(sig_type, set).await.expect("exchange failed"); | ||
| let _ = tx.send((sig_type, data)); | ||
| }); | ||
| } | ||
| } | ||
| drop(result_tx); | ||
|
|
||
| // Collect results into actual: one entry per sig_type (last writer wins, | ||
| // all nodes return equivalent data for each sig_type) | ||
| let mut actual: SigTypeStore = HashMap::new(); | ||
| while let Some((sig_type, data)) = result_rx.recv().await { | ||
| actual.insert(sig_type, data); | ||
| } | ||
| while join_set.join_next().await.is_some() {} |
There was a problem hiding this comment.
This can be simplified by using the JoinSet values instead of using extra channels
| // Run concurrent exchanges: for each (node, sig_type) pair, spawn a task | |
| let (result_tx, mut result_rx) = | |
| mpsc::unbounded_channel::<(SigType, HashMap<PubKey, Vec<ParSignedData>>)>(); | |
| let mut join_set = tokio::task::JoinSet::new(); | |
| for (node_idx, ex) in exchangers.iter().enumerate() { | |
| for &sig_type in &sig_types { | |
| let ex = Arc::clone(ex); | |
| let set = data_to_be_sent[node_idx].clone(); | |
| let tx = result_tx.clone(); | |
| join_set.spawn(async move { | |
| let data = ex.exchange(sig_type, set).await.expect("exchange failed"); | |
| let _ = tx.send((sig_type, data)); | |
| }); | |
| } | |
| } | |
| drop(result_tx); | |
| // Collect results into actual: one entry per sig_type (last writer wins, | |
| // all nodes return equivalent data for each sig_type) | |
| let mut actual: SigTypeStore = HashMap::new(); | |
| while let Some((sig_type, data)) = result_rx.recv().await { | |
| actual.insert(sig_type, data); | |
| } | |
| while join_set.join_next().await.is_some() {} | |
| // Run concurrent exchanges: for each (node, sig_type) pair, spawn a task | |
| let mut join_set = tokio::task::JoinSet::new(); | |
| for (node_idx, ex) in exchangers.iter().enumerate() { | |
| for &sig_type in &sig_types { | |
| let ex = Arc::clone(ex); | |
| let set = data_to_be_sent[node_idx].clone(); | |
| join_set.spawn(async move { | |
| let data = ex.exchange(sig_type, set).await.expect("exchange failed"); | |
| (sig_type, data) | |
| }); | |
| } | |
| } | |
| // Collect results into actual: one entry per sig_type (last writer wins, | |
| // all nodes return equivalent data for each sig_type) | |
| let actual: SigTypeStore = join_set.join_all().await.into_iter().collect(); |
|
|
||
| for stop_tx in stop_txs { | ||
| let _ = stop_tx.send(()); | ||
| } |
There was a problem hiding this comment.
Instead of having multiple stop_txs we could have a single CancellationToken shared across all swarms.
We could also ignore cancellation entirely since the spawned tasks will get dropped when the function completes. The current design is not waiting for them to finish (no join on the handles), so it would not make a difference.
No description provided.