Skip to content

Commit

Permalink
[network] Add health-checker network interface
Browse files Browse the repository at this point in the history
+ Some types (`Ping2`, `Pong2`) are temporarily suffixed with "2". We'll
remove this once the refactor is complete.
  • Loading branch information
phlip9 committed Oct 31, 2019
1 parent 0d82ee3 commit cc12920
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 1 deletion.
1 change: 1 addition & 0 deletions network/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ fn main() {
"src/proto/mempool.proto",
"src/proto/network.proto",
"src/proto/state_synchronizer.proto",
"src/proto/health_checker.proto",
];

let includes = [
Expand Down
21 changes: 21 additions & 0 deletions network/src/proto/health_checker.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

syntax = "proto3";

package health_checker;

message HealthCheckerMsg {
oneof message {
Ping ping = 1;
Pong pong = 2;
}
}

message Ping {
uint32 nonce = 1;
}

message Pong {
uint32 nonce = 1;
}
7 changes: 7 additions & 0 deletions network/src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ mod mempool {
mod state_synchronizer {
include!(concat!(env!("OUT_DIR"), "/state_synchronizer.rs"));
}
mod health_checker {
include!(concat!(env!("OUT_DIR"), "/health_checker.rs"));
}

use ::libra_types::proto::types;

Expand All @@ -25,6 +28,10 @@ pub use self::{
consensus_msg::Message as ConsensusMsg_oneof, Block, ConsensusMsg, Proposal, RequestBlock,
RespondBlock, SyncInfo, VoteMsg, VoteProposal,
},
health_checker::{
health_checker_msg::Message as HealthCheckerMsg_oneof, HealthCheckerMsg, Ping as Ping2,
Pong as Pong2,
},
mempool::MempoolSyncMsg,
network::{
identity_msg::Role as IdentityMsg_Role, DiscoveryMsg, FullNodePayload, IdentityMsg, Note,
Expand Down
205 changes: 205 additions & 0 deletions network/src/validator_network/health_checker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

//! Interface between HealthChecker and Network layers.

use crate::{
error::NetworkError,
interface::{NetworkNotification, NetworkRequest},
proto::{HealthCheckerMsg, HealthCheckerMsg_oneof, Ping2, Pong2},
protocols::rpc::{self, error::RpcError},
validator_network::Event,
ProtocolId,
};
use channel;
use futures::{
stream::Map,
task::{Context, Poll},
Stream, StreamExt,
};
use libra_types::PeerId;
use pin_project::pin_project;
use prost::Message as _;
use std::{pin::Pin, time::Duration};

/// Protocol id for HealthChecker RPC calls
#[allow(dead_code)]
pub const HEALTH_CHECKER_RPC_PROTOCOL: &[u8] = b"/libra/health-checker/rpc/0.1.0";

/// The interface from Network to HealthChecker layer.
///
/// `HealthCheckerNetworkEvents` is a `Stream` of `NetworkNotification` where the
/// raw `Bytes` rpc messages are deserialized into
/// `HealthCheckerMsg` types. `HealthCheckerNetworkEvents` is a thin wrapper
/// around an `channel::Receiver<NetworkNotification>`.
#[pin_project]
pub struct HealthCheckerNetworkEvents {
#[pin]
inner: Map<
channel::Receiver<NetworkNotification>,
fn(NetworkNotification) -> Result<Event<HealthCheckerMsg>, NetworkError>,
>,
}

impl HealthCheckerNetworkEvents {
pub fn new(receiver: channel::Receiver<NetworkNotification>) -> Self {
let inner = receiver.map::<_, fn(_) -> _>(|notification| match notification {
NetworkNotification::NewPeer(peer_id) => Ok(Event::NewPeer(peer_id)),
NetworkNotification::LostPeer(peer_id) => Ok(Event::LostPeer(peer_id)),
NetworkNotification::RecvRpc(peer_id, rpc_req) => {
let req_msg = HealthCheckerMsg::decode(rpc_req.data.as_ref())?;
Ok(Event::RpcRequest((peer_id, req_msg, rpc_req.res_tx)))
}
NetworkNotification::RecvMessage(_, _) => {
unreachable!("HealthChecker does not currently use DirectSend");
}
});

Self { inner }
}
}

impl Stream for HealthCheckerNetworkEvents {
type Item = Result<Event<HealthCheckerMsg>, NetworkError>;

fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
self.project().inner.poll_next(context)
}
}

/// The interface from HealthChecker to Networking layer.
///
/// This is a thin wrapper around an `channel::Sender<NetworkRequest>`, so it is
/// easy to clone and send off to a separate task. For example, the rpc requests
/// return Futures that encapsulate the whole flow, from sending the request to
/// remote, to finally receiving the response and deserializing. It therefore
/// makes the most sense to make the rpc call on a separate async task, which
/// requires the `HealthCheckerNetworkSender` to be `Clone` and `Send`.
#[derive(Clone)]
pub struct HealthCheckerNetworkSender {
inner: channel::Sender<NetworkRequest>,
}

impl HealthCheckerNetworkSender {
#[allow(dead_code)]
pub fn new(inner: channel::Sender<NetworkRequest>) -> Self {
Self { inner }
}

/// Send a HealthChecker Ping RPC request to remote peer `recipient`. Returns
/// the remote peer's future `Pong` reply.
///
/// The rpc request can be canceled at any point by dropping the returned
/// future.
#[allow(dead_code)]
pub async fn ping(
&mut self,
recipient: PeerId,
req_msg: Ping2,
timeout: Duration,
) -> Result<Pong2, RpcError> {
let protocol = ProtocolId::from_static(HEALTH_CHECKER_RPC_PROTOCOL);
let req_msg_enum = HealthCheckerMsg {
message: Some(HealthCheckerMsg_oneof::Ping(req_msg)),
};
let res_msg_enum = rpc::utils::unary_rpc(
self.inner.clone(),
recipient,
protocol,
req_msg_enum,
timeout,
)
.await?;

if let Some(HealthCheckerMsg_oneof::Pong(response)) = res_msg_enum.message {
Ok(response)
} else {
// TODO: context
Err(RpcError::InvalidRpcResponse)
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{protocols::rpc::InboundRpcRequest, utils::MessageExt};
use futures::{channel::oneshot, executor::block_on, future::try_join, sink::SinkExt};

// `HealthCheckerNetworkEvents` should deserialize inbound RPC requests
#[test]
fn test_health_checker_inbound_rpc() {
let (mut network_reqs_tx, network_reqs_rx) = channel::new_test(8);
let mut stream = HealthCheckerNetworkEvents::new(network_reqs_rx);

// build rpc request
let req_msg = Ping2 { nonce: 1234 };
let req_msg_enum = HealthCheckerMsg {
message: Some(HealthCheckerMsg_oneof::Ping(req_msg)),
};
let req_data = req_msg_enum.clone().to_bytes().unwrap();

let (res_tx, _) = oneshot::channel();
let rpc_req = InboundRpcRequest {
protocol: ProtocolId::from_static(HEALTH_CHECKER_RPC_PROTOCOL),
data: req_data,
res_tx,
};

// mock receiving rpc request
let peer_id = PeerId::random();
let event = NetworkNotification::RecvRpc(peer_id, rpc_req);
block_on(network_reqs_tx.send(event)).unwrap();

// request should be properly deserialized
let (res_tx, _) = oneshot::channel();
let expected_event = Event::RpcRequest((peer_id, req_msg_enum.clone(), res_tx));
let event = block_on(stream.next()).unwrap().unwrap();
assert_eq!(event, expected_event);
}

// When health_checker sends an rpc request, network should get a
// `NetworkRequest::SendRpc` with the serialized request.
#[test]
fn test_health_checker_outbound_rpc() {
let (network_reqs_tx, mut network_reqs_rx) = channel::new_test(8);
let mut sender = HealthCheckerNetworkSender::new(network_reqs_tx);

// send ping rpc request
let peer_id = PeerId::random();
let req_msg = Ping2 { nonce: 1234 };
let f_res_msg = sender.ping(peer_id, req_msg.clone(), Duration::from_secs(5));

// build rpc response
let res_msg = Pong2 { nonce: 1234 };
let res_msg_enum = HealthCheckerMsg {
message: Some(HealthCheckerMsg_oneof::Pong(res_msg.clone())),
};
let res_data = res_msg_enum.to_bytes().unwrap();

// the future response
let f_recv = async move {
match network_reqs_rx.next().await.unwrap() {
NetworkRequest::SendRpc(recv_peer_id, req) => {
assert_eq!(recv_peer_id, peer_id);
assert_eq!(req.protocol.as_ref(), HEALTH_CHECKER_RPC_PROTOCOL);

// check request deserializes
let req_msg_enum = HealthCheckerMsg::decode(req.data.as_ref()).unwrap();
assert_eq!(
req_msg_enum.message,
Some(HealthCheckerMsg_oneof::Ping(req_msg))
);

// remote replies with some response message
req.res_tx.send(Ok(res_data)).unwrap();
Ok(())
}
event => panic!("Unexpected event: {:?}", event),
}
};

let (recv_res_msg, _) = block_on(try_join(f_res_msg, f_recv)).unwrap();
assert_eq!(recv_res_msg, res_msg);
}
}
4 changes: 3 additions & 1 deletion network/src/validator_network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod network_builder;

mod admission_control;
mod consensus;
mod health_checker;
mod mempool;
mod state_synchronizer;
#[cfg(test)]
Expand All @@ -25,6 +26,7 @@ pub use consensus::{
ConsensusNetworkEvents, ConsensusNetworkSender, CONSENSUS_DIRECT_SEND_PROTOCOL,
CONSENSUS_RPC_PROTOCOL,
};
pub use health_checker::HealthCheckerNetworkEvents;
use libra_types::PeerId;
pub use mempool::{MempoolNetworkEvents, MempoolNetworkSender, MEMPOOL_DIRECT_SEND_PROTOCOL};
pub use state_synchronizer::{
Expand All @@ -45,7 +47,7 @@ pub enum Event<TMessage> {
/// New inbound direct-send message from peer.
Message((PeerId, TMessage)),
/// New inbound rpc request. The request is fulfilled by sending the
/// serialized response `Bytes` over the `onshot::Sender`, where the network
/// serialized response `Bytes` over the `oneshot::Sender`, where the network
/// layer will handle sending the response over-the-wire.
RpcRequest((PeerId, TMessage, oneshot::Sender<Result<Bytes, RpcError>>)),
/// Peer which we have a newly established connection with.
Expand Down

0 comments on commit cc12920

Please sign in to comment.