From 4455a7ec8a67c99c24f75b378a26f9b9ada60cfc Mon Sep 17 00:00:00 2001 From: Rostislav Rumenov Date: Tue, 19 Sep 2023 11:40:12 +0000 Subject: [PATCH] chore: do some small readability changes --- rs/p2p/consensus_manager/src/lib.rs | 57 +++++++++++------------------ 1 file changed, 22 insertions(+), 35 deletions(-) diff --git a/rs/p2p/consensus_manager/src/lib.rs b/rs/p2p/consensus_manager/src/lib.rs index 352e07f1243..9e04b6f47f0 100644 --- a/rs/p2p/consensus_manager/src/lib.rs +++ b/rs/p2p/consensus_manager/src/lib.rs @@ -201,11 +201,14 @@ impl ConsensusManagerMetrics { } } +type ValidatedPoolReaderRef = Arc + Send + Sync>>; +type ReceivedAdvertSender = Sender<(AdvertUpdate, NodeId, ConnId)>; + #[allow(unused)] pub fn build_axum_router( log: ReplicaLogger, rt: Handle, - pool: Arc + Send + Sync>>, + pool: ValidatedPoolReaderRef, ) -> (Router, Receiver<(AdvertUpdate, NodeId, ConnId)>) where Artifact: ArtifactKind + Serialize + for<'a> Deserialize<'a> + Send + 'static, @@ -214,25 +217,17 @@ where ::Message: Serialize + for<'a> Deserialize<'a> + Send, { let (update_tx, update_rx) = tokio::sync::mpsc::channel(100); - let rpc_handler_state = Arc::new(RpcHandler { pool }); - let update_handler_state = Arc::new(update_tx); let router = Router::new() .route(&format!("/{}/rpc", Artifact::TAG), any(rpc_handler)) - .with_state(rpc_handler_state) + .with_state(pool) .route(&format!("/{}/update", Artifact::TAG), any(update_handler)) - .with_state(update_handler_state); + .with_state(update_tx); (router, update_rx) } -/// Transport endpoint that is used to serve artifacts. Expects Artifact ID in the request -/// and responds with the Artifact message. -struct RpcHandler { - pool: Arc + Send + Sync>>, -} - async fn rpc_handler( - State(state): State>>, + State(pool): State>, payload: Bytes, ) -> Result where @@ -243,9 +238,8 @@ where { let id: Artifact::Id = bincode::deserialize(&payload).map_err(|_| StatusCode::BAD_REQUEST)?; - let jh = tokio::task::spawn_blocking(move || { - state.pool.read().unwrap().get_validated_by_identifier(&id) - }); + let jh = + tokio::task::spawn_blocking(move || pool.read().unwrap().get_validated_by_identifier(&id)); let msg = jh .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? @@ -256,10 +250,8 @@ where Ok(bytes) } -type ReceivedAdvertSender = State, NodeId, ConnId)>>>; - async fn update_handler( - State(sender): ReceivedAdvertSender, + State(sender): State>, Extension(peer): Extension, Extension(conn_id): Extension, payload: Bytes, @@ -281,14 +273,10 @@ where Ok(()) } -fn build_rpc_handler_request(id: &Artifact::Id) -> Request -where - Artifact: ArtifactKind, - ::Id: Serialize, -{ +fn build_rpc_handler_request(uri_prefix: &str, id: &T) -> Request { Request::builder() - .uri(format!("/{}/rpc", Artifact::TAG)) - .body(Bytes::from(bincode::serialize(&id).unwrap())) + .uri(format!("/{}/rpc", uri_prefix)) + .body(Bytes::from(bincode::serialize(id).unwrap())) .unwrap() } @@ -330,7 +318,7 @@ impl ConsensusManager, N where Pool: 'static + Send + Sync + ValidatedPoolReader, Artifact: ArtifactKind + Serialize + for<'a> Deserialize<'a> + Send + 'static, - ::Id: Serialize + for<'a> Deserialize<'a> + Clone + Eq + Send + Hash, + ::Id: Serialize + for<'a> Deserialize<'a> + Clone + Send + Hash + Eq, ::Message: Serialize + for<'a> Deserialize<'a> + Send, ::Attribute: Serialize + for<'a> Deserialize<'a> + Send, { @@ -548,7 +536,7 @@ where .copied(); if let Some(peer) = random_peer { - let request = build_rpc_handler_request::(&id); + let request = build_rpc_handler_request(Artifact::TAG.into(), &id); let transport = transport.clone(); download_js.spawn(timeout(Duration::from_secs(5), async move { transport.rpc(&peer, request).await @@ -719,7 +707,7 @@ async fn send_advert_to_all_peers( pool_reader: Arc + Send + Sync>>, ) where Artifact: ArtifactKind + Serialize + for<'a> Deserialize<'a> + Send + 'static, - ::Id: Serialize + for<'a> Deserialize<'a> + Clone + Eq + Send + Hash, + ::Id: Serialize + for<'a> Deserialize<'a> + Clone + Send, ::Message: Serialize + for<'a> Deserialize<'a> + Send, ::Attribute: Serialize + for<'a> Deserialize<'a> + Send, { @@ -768,7 +756,7 @@ async fn send_advert_to_all_peers( let is_completed = completed_transmissions.get(&peer).is_some_and(|c| *c == connection_id); if !is_completed { - let task = send_advert_to_peer::(transport.clone(), connection_id, body.clone(), peer); + let task = send_advert_to_peer(transport.clone(), connection_id, body.clone(), peer, Artifact::TAG.into()); in_progress_transmissions.spawn_on(peer, task, &rt_handle); } } @@ -800,17 +788,18 @@ async fn send_advert_to_all_peers( /// For 10k tasks and 40 peers ~100Mb /// Note: If we start pushing adverts we probably want to just try pushing once /// and revert back to the advert if the inital push fails. -async fn send_advert_to_peer( +async fn send_advert_to_peer( transport: Arc, connection_id: ConnId, message: Bytes, peer: NodeId, + uri_prefix: &str, ) -> ConnId { let mut backoff = get_backoff_policy(); loop { let request = Request::builder() - .uri(format!("/{}/update", Artifact::TAG)) + .uri(format!("/{}/update", uri_prefix)) .body(message.clone()) .expect("Building from typed values"); @@ -824,9 +813,8 @@ async fn send_advert_to_peer( } #[derive(Deserialize, Serialize)] -pub enum Data +pub enum Data where - Artifact: ArtifactKind, ::Id: Serialize + for<'a> Deserialize<'a>, ::Message: Serialize, ::Attribute: Serialize + for<'a> Deserialize<'a>, @@ -836,9 +824,8 @@ where } #[derive(Deserialize, Serialize)] -pub struct AdvertUpdate +pub struct AdvertUpdate where - Artifact: ArtifactKind, ::Id: Serialize + for<'a> Deserialize<'a>, ::Message: Serialize + for<'a> Deserialize<'a>, ::Attribute: Serialize + for<'a> Deserialize<'a>,