Skip to content

Commit

Permalink
Merge branch 'rumenov/rmfetradr' into 'master'
Browse files Browse the repository at this point in the history
chore: do some small readability  changes

 

See merge request dfinity-lab/public/ic!14866
  • Loading branch information
rumenov committed Sep 19, 2023
2 parents bf09c1b + 4455a7e commit 81f89f4
Showing 1 changed file with 22 additions and 35 deletions.
57 changes: 22 additions & 35 deletions rs/p2p/consensus_manager/src/lib.rs
Expand Up @@ -201,11 +201,14 @@ impl ConsensusManagerMetrics {
}
}

type ValidatedPoolReaderRef<T> = Arc<RwLock<dyn ValidatedPoolReader<T> + Send + Sync>>;
type ReceivedAdvertSender<A> = Sender<(AdvertUpdate<A>, NodeId, ConnId)>;

#[allow(unused)]
pub fn build_axum_router<Artifact: ArtifactKind>(
log: ReplicaLogger,
rt: Handle,
pool: Arc<RwLock<dyn ValidatedPoolReader<Artifact> + Send + Sync>>,
pool: ValidatedPoolReaderRef<Artifact>,
) -> (Router, Receiver<(AdvertUpdate<Artifact>, NodeId, ConnId)>)
where
Artifact: ArtifactKind + Serialize + for<'a> Deserialize<'a> + Send + 'static,
Expand All @@ -214,25 +217,17 @@ where
<Artifact as ArtifactKind>::Message: Serialize + for<'a> Deserialize<'a> + Send,
{
let (update_tx, update_rx) = tokio::sync::mpsc::channel(100);
let rpc_handler_state = Arc::new(RpcHandler { pool });
let update_handler_state = Arc::new(update_tx);
let router = Router::new()
.route(&format!("/{}/rpc", Artifact::TAG), any(rpc_handler))
.with_state(rpc_handler_state)
.with_state(pool)
.route(&format!("/{}/update", Artifact::TAG), any(update_handler))
.with_state(update_handler_state);
.with_state(update_tx);

(router, update_rx)
}

/// Transport endpoint that is used to serve artifacts. Expects Artifact ID in the request
/// and responds with the Artifact message.
struct RpcHandler<Artifact> {
pool: Arc<RwLock<dyn ValidatedPoolReader<Artifact> + Send + Sync>>,
}

async fn rpc_handler<Artifact: ArtifactKind>(
State(state): State<Arc<RpcHandler<Artifact>>>,
State(pool): State<ValidatedPoolReaderRef<Artifact>>,
payload: Bytes,
) -> Result<Bytes, StatusCode>
where
Expand All @@ -243,9 +238,8 @@ where
{
let id: Artifact::Id = bincode::deserialize(&payload).map_err(|_| StatusCode::BAD_REQUEST)?;

let jh = tokio::task::spawn_blocking(move || {
state.pool.read().unwrap().get_validated_by_identifier(&id)
});
let jh =
tokio::task::spawn_blocking(move || pool.read().unwrap().get_validated_by_identifier(&id));
let msg = jh
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
Expand All @@ -256,10 +250,8 @@ where
Ok(bytes)
}

type ReceivedAdvertSender<A> = State<Arc<Sender<(AdvertUpdate<A>, NodeId, ConnId)>>>;

async fn update_handler<Artifact: ArtifactKind>(
State(sender): ReceivedAdvertSender<Artifact>,
State(sender): State<ReceivedAdvertSender<Artifact>>,
Extension(peer): Extension<NodeId>,
Extension(conn_id): Extension<ConnId>,
payload: Bytes,
Expand All @@ -281,14 +273,10 @@ where
Ok(())
}

fn build_rpc_handler_request<Artifact: ArtifactKind>(id: &Artifact::Id) -> Request<Bytes>
where
Artifact: ArtifactKind,
<Artifact as ArtifactKind>::Id: Serialize,
{
fn build_rpc_handler_request<T: Serialize>(uri_prefix: &str, id: &T) -> Request<Bytes> {
Request::builder()
.uri(format!("/{}/rpc", Artifact::TAG))
.body(Bytes::from(bincode::serialize(&id).unwrap()))
.uri(format!("/{}/rpc", uri_prefix))
.body(Bytes::from(bincode::serialize(id).unwrap()))
.unwrap()
}

Expand Down Expand Up @@ -330,7 +318,7 @@ impl<Artifact, Pool> ConsensusManager<Artifact, Pool, (AdvertUpdate<Artifact>, N
where
Pool: 'static + Send + Sync + ValidatedPoolReader<Artifact>,
Artifact: ArtifactKind + Serialize + for<'a> Deserialize<'a> + Send + 'static,
<Artifact as ArtifactKind>::Id: Serialize + for<'a> Deserialize<'a> + Clone + Eq + Send + Hash,
<Artifact as ArtifactKind>::Id: Serialize + for<'a> Deserialize<'a> + Clone + Send + Hash + Eq,
<Artifact as ArtifactKind>::Message: Serialize + for<'a> Deserialize<'a> + Send,
<Artifact as ArtifactKind>::Attribute: Serialize + for<'a> Deserialize<'a> + Send,
{
Expand Down Expand Up @@ -548,7 +536,7 @@ where
.copied();

if let Some(peer) = random_peer {
let request = build_rpc_handler_request::<Artifact>(&id);
let request = build_rpc_handler_request(Artifact::TAG.into(), &id);
let transport = transport.clone();
download_js.spawn(timeout(Duration::from_secs(5), async move {
transport.rpc(&peer, request).await
Expand Down Expand Up @@ -719,7 +707,7 @@ async fn send_advert_to_all_peers<Artifact>(
pool_reader: Arc<RwLock<dyn ValidatedPoolReader<Artifact> + Send + Sync>>,
) where
Artifact: ArtifactKind + Serialize + for<'a> Deserialize<'a> + Send + 'static,
<Artifact as ArtifactKind>::Id: Serialize + for<'a> Deserialize<'a> + Clone + Eq + Send + Hash,
<Artifact as ArtifactKind>::Id: Serialize + for<'a> Deserialize<'a> + Clone + Send,
<Artifact as ArtifactKind>::Message: Serialize + for<'a> Deserialize<'a> + Send,
<Artifact as ArtifactKind>::Attribute: Serialize + for<'a> Deserialize<'a> + Send,
{
Expand Down Expand Up @@ -768,7 +756,7 @@ async fn send_advert_to_all_peers<Artifact>(
let is_completed = completed_transmissions.get(&peer).is_some_and(|c| *c == connection_id);

if !is_completed {
let task = send_advert_to_peer::<Artifact>(transport.clone(), connection_id, body.clone(), peer);
let task = send_advert_to_peer(transport.clone(), connection_id, body.clone(), peer, Artifact::TAG.into());
in_progress_transmissions.spawn_on(peer, task, &rt_handle);
}
}
Expand Down Expand Up @@ -800,17 +788,18 @@ async fn send_advert_to_all_peers<Artifact>(
/// For 10k tasks and 40 peers ~100Mb
/// Note: If we start pushing adverts we probably want to just try pushing once
/// and revert back to the advert if the inital push fails.
async fn send_advert_to_peer<Artifact: ArtifactKind>(
async fn send_advert_to_peer(
transport: Arc<dyn Transport>,
connection_id: ConnId,
message: Bytes,
peer: NodeId,
uri_prefix: &str,
) -> ConnId {
let mut backoff = get_backoff_policy();

loop {
let request = Request::builder()
.uri(format!("/{}/update", Artifact::TAG))
.uri(format!("/{}/update", uri_prefix))
.body(message.clone())
.expect("Building from typed values");

Expand All @@ -824,9 +813,8 @@ async fn send_advert_to_peer<Artifact: ArtifactKind>(
}

#[derive(Deserialize, Serialize)]
pub enum Data<Artifact>
pub enum Data<Artifact: ArtifactKind>
where
Artifact: ArtifactKind,
<Artifact as ArtifactKind>::Id: Serialize + for<'a> Deserialize<'a>,
<Artifact as ArtifactKind>::Message: Serialize,
<Artifact as ArtifactKind>::Attribute: Serialize + for<'a> Deserialize<'a>,
Expand All @@ -836,9 +824,8 @@ where
}

#[derive(Deserialize, Serialize)]
pub struct AdvertUpdate<Artifact>
pub struct AdvertUpdate<Artifact: ArtifactKind>
where
Artifact: ArtifactKind,
<Artifact as ArtifactKind>::Id: Serialize + for<'a> Deserialize<'a>,
<Artifact as ArtifactKind>::Message: Serialize + for<'a> Deserialize<'a>,
<Artifact as ArtifactKind>::Attribute: Serialize + for<'a> Deserialize<'a>,
Expand Down

0 comments on commit 81f89f4

Please sign in to comment.