Skip to content

Commit

Permalink
[consensus] push RPC processing into event processor to make network …
Browse files Browse the repository at this point in the history
…thread non-blocking
  • Loading branch information
zekun000 committed Nov 12, 2019
1 parent 5cc9ebb commit c9fc19e
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 36 deletions.
24 changes: 20 additions & 4 deletions consensus/src/chained_bft/event_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,22 @@ use consensus_types::{
use failure::ResultExt;
use libra_crypto::hash::TransactionAccumulatorHasher;
use libra_logger::prelude::*;
use libra_prost_ext::MessageExt;
use libra_types::crypto_proxies::{
LedgerInfoWithSignatures, ValidatorChangeEventWithProof, ValidatorVerifier,
};
use mirai_annotations::{
debug_checked_precondition, debug_checked_precondition_eq, debug_checked_verify,
debug_checked_verify_eq,
};
use network::proto::{ConsensusMsg, ConsensusMsg_oneof};

use crate::chained_bft::network::IncomingBlockRetrievalRequest;
use consensus_types::block_retrieval::{BlockRetrievalResponse, BlockRetrievalStatus};
#[cfg(test)]
use safety_rules::ConsensusState;
use safety_rules::SafetyRules;
use std::convert::TryInto;
use std::time::Instant;
use std::{sync::Arc, time::Duration};
use termion::color::*;
Expand Down Expand Up @@ -792,7 +795,7 @@ impl<T: Payload> EventProcessor<T> {
///
/// The current version of the function is not really async, but keeping it this way for
/// future possible changes.
pub async fn process_block_retrieval(&self, request: IncomingBlockRetrievalRequest<T>) {
pub async fn process_block_retrieval(&self, request: IncomingBlockRetrievalRequest) {
let mut blocks = vec![];
let mut status = BlockRetrievalStatus::Succeeded;
let mut id = request.req.block_id();
Expand All @@ -810,9 +813,22 @@ impl<T: Payload> EventProcessor<T> {
status = BlockRetrievalStatus::IdNotFound;
}

if let Err(e) = request
.response_sender
.send(BlockRetrievalResponse::new(status, blocks))
let response = BlockRetrievalResponse::new(status, blocks);
if let Err(e) = response
.try_into()
.and_then(|proto| {
let bytes = ConsensusMsg {
message: Some(ConsensusMsg_oneof::RespondBlock(proto)),
}
.to_bytes()?;
Ok(bytes)
})
.and_then(|response_data| {
request
.response_sender
.send(Ok(response_data))
.map_err(|e| format_err!("{:?}", e))
})
{
error!("Failed to return the requested block: {:?}", e);
}
Expand Down
37 changes: 32 additions & 5 deletions consensus/src/chained_bft/event_processor_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use crate::{
util::time_service::{ClockTimeService, TimeService},
};
use channel;
use consensus_types::block_retrieval::{BlockRetrievalRequest, BlockRetrievalStatus};
use consensus_types::block_retrieval::{
BlockRetrievalRequest, BlockRetrievalResponse, BlockRetrievalStatus,
};
use consensus_types::{
block::{
block_test_utils::{
Expand All @@ -50,9 +52,10 @@ use libra_types::crypto_proxies::{
random_validator_verifier, LedgerInfoWithSignatures, ValidatorSigner, ValidatorVerifier,
};
use network::{
proto::ConsensusMsg_oneof,
proto::{ConsensusMsg, ConsensusMsg_oneof},
validator_network::{ConsensusNetworkEvents, ConsensusNetworkSender},
};
use prost::Message as _;
use safety_rules::{ConsensusState, OnDiskStorage, SafetyRules};
use std::{collections::HashMap, convert::TryFrom, path::PathBuf, sync::Arc, time::Duration};
use tempfile::NamedTempFile;
Expand Down Expand Up @@ -704,7 +707,15 @@ fn process_block_retrieval() {
.process_block_retrieval(single_block_request)
.await;
match rx1.await {
Ok(response) => {
Ok(Ok(bytes)) => {
let msg = ConsensusMsg::decode(bytes).unwrap();
let response = match msg.message {
Some(ConsensusMsg_oneof::RespondBlock(proto)) => {
BlockRetrievalResponse::<TestPayload>::try_from(proto)
}
_ => panic!("block retrieval failure"),
}
.unwrap();
assert_eq!(response.status(), BlockRetrievalStatus::Succeeded);
assert_eq!(response.blocks().get(0).unwrap().id(), block_id);
}
Expand All @@ -722,7 +733,15 @@ fn process_block_retrieval() {
.process_block_retrieval(missing_block_request)
.await;
match rx2.await {
Ok(response) => {
Ok(Ok(bytes)) => {
let msg = ConsensusMsg::decode(bytes).unwrap();
let response = match msg.message {
Some(ConsensusMsg_oneof::RespondBlock(proto)) => {
BlockRetrievalResponse::<TestPayload>::try_from(proto)
}
_ => panic!("block retrieval failure"),
}
.unwrap();
assert_eq!(response.status(), BlockRetrievalStatus::IdNotFound);
assert!(response.blocks().is_empty());
}
Expand All @@ -739,7 +758,15 @@ fn process_block_retrieval() {
.process_block_retrieval(many_block_request)
.await;
match rx3.await {
Ok(response) => {
Ok(Ok(bytes)) => {
let msg = ConsensusMsg::decode(bytes).unwrap();
let response = match msg.message {
Some(ConsensusMsg_oneof::RespondBlock(proto)) => {
BlockRetrievalResponse::<TestPayload>::try_from(proto)
}
_ => panic!("block retrieval failure"),
}
.unwrap();
assert_eq!(response.status(), BlockRetrievalStatus::NotEnoughBlocks);
assert_eq!(block_id, response.blocks().get(0).unwrap().id());
assert_eq!(
Expand Down
25 changes: 7 additions & 18 deletions consensus/src/chained_bft/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ use consensus_types::{
use failure::{self};
use futures::{channel::oneshot, stream::select, SinkExt, Stream, StreamExt, TryStreamExt};
use libra_logger::prelude::*;
use libra_prost_ext::MessageExt;
use libra_types::account_address::AccountAddress;
use libra_types::crypto_proxies::ValidatorChangeEventWithProof;
use libra_types::crypto_proxies::{LedgerInfoWithSignatures, ValidatorVerifier};
use libra_types::proto::types::ValidatorChangeEventWithProof as ValidatorChangeEventWithProofProto;
use network::{
proto::{
ConsensusMsg, ConsensusMsg_oneof, Proposal, RequestBlock, RequestEpoch, RespondBlock,
ConsensusMsg, ConsensusMsg_oneof, Proposal, RequestBlock, RequestEpoch,
SyncInfo as SyncInfoProto, VoteMsg as VoteMsgProto,
},
validator_network::{ConsensusNetworkEvents, ConsensusNetworkSender, Event, RpcError},
Expand All @@ -40,17 +39,17 @@ use std::{
/// The block retrieval request is used internally for implementing RPC: the callback is executed
/// for carrying the response
#[derive(Debug)]
pub struct IncomingBlockRetrievalRequest<T> {
pub struct IncomingBlockRetrievalRequest {
pub req: BlockRetrievalRequest,
pub response_sender: oneshot::Sender<BlockRetrievalResponse<T>>,
pub response_sender: oneshot::Sender<Result<Bytes, RpcError>>,
}

/// Just a convenience struct to keep all the network proxy receiving queues in one place.
/// Will be returned by the networking trait upon startup.
pub struct NetworkReceivers<T> {
pub proposals: libra_channel::Receiver<AccountAddress, ProposalMsg<T>>,
pub votes: libra_channel::Receiver<AccountAddress, VoteMsg>,
pub block_retrieval: libra_channel::Receiver<AccountAddress, IncomingBlockRetrievalRequest<T>>,
pub block_retrieval: libra_channel::Receiver<AccountAddress, IncomingBlockRetrievalRequest>,
pub sync_info_msgs: libra_channel::Receiver<AccountAddress, (SyncInfo, AccountAddress)>,
pub epoch_change: libra_channel::Receiver<AccountAddress, LedgerInfoWithSignatures>,
pub different_epoch: libra_channel::Receiver<AccountAddress, (u64, AccountAddress)>,
Expand Down Expand Up @@ -257,7 +256,7 @@ pub struct NetworkTask<T> {
epoch: u64,
proposal_tx: libra_channel::Sender<AccountAddress, ProposalMsg<T>>,
vote_tx: libra_channel::Sender<AccountAddress, VoteMsg>,
block_request_tx: libra_channel::Sender<AccountAddress, IncomingBlockRetrievalRequest<T>>,
block_request_tx: libra_channel::Sender<AccountAddress, IncomingBlockRetrievalRequest>,
sync_info_tx: libra_channel::Sender<AccountAddress, (SyncInfo, AccountAddress)>,
epoch_change_tx: libra_channel::Sender<AccountAddress, LedgerInfoWithSignatures>,
different_epoch_tx: libra_channel::Sender<AccountAddress, (u64, AccountAddress)>,
Expand Down Expand Up @@ -485,21 +484,11 @@ impl<T: Payload> NetworkTask<T> {
) -> failure::Result<()> {
let req = BlockRetrievalRequest::try_from(request_msg)?;
debug!("Received block retrieval request {}", req);
let (tx, rx) = oneshot::channel();
let req_with_callback = IncomingBlockRetrievalRequest {
req,
response_sender: tx,
response_sender: callback,
};
self.block_request_tx.push(peer_id, req_with_callback)?;
let response = rx.await?;
let response_serialized = RespondBlock::try_from(response)?;
let response_msg = ConsensusMsg {
message: Some(ConsensusMsg_oneof::RespondBlock(response_serialized)),
};
let response_data = response_msg.to_bytes()?;
callback
.send(Ok(response_data))
.map_err(|_| format_err!("handling inbound rpc call timed out"))
self.block_request_tx.push(peer_id, req_with_callback)
}

async fn process_epoch_change(
Expand Down
43 changes: 34 additions & 9 deletions consensus/src/chained_bft/network_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use consensus_types::{
vote_msg::VoteMsg,
};
use futures::{channel::mpsc, executor::block_on, SinkExt, StreamExt};
use libra_prost_ext::MessageExt;
use libra_types::block_info::BlockInfo;
use network::{
interface::{NetworkNotification, NetworkRequest},
Expand Down Expand Up @@ -333,7 +334,7 @@ use consensus_types::block_retrieval::{
};
#[cfg(test)]
use libra_types::crypto_proxies::random_validator_verifier;
use std::convert::TryFrom;
use std::convert::{TryFrom, TryInto};

#[test]
fn test_network_api() {
Expand Down Expand Up @@ -409,7 +410,7 @@ fn test_rpc() {
let peers: Vec<_> = signers.iter().map(|signer| signer.author()).collect();
for peer in peers.iter() {
let (network_reqs_tx, network_reqs_rx) = channel::new_test(8);
let (consensus_tx, consensus_rx) = channel::new_test(8);
let (consensus_tx, consensus_rx) = channel::new_test(1);
let network_sender = ConsensusNetworkSender::new(network_reqs_tx);
let network_events = ConsensusNetworkEvents::new(consensus_rx);

Expand All @@ -431,18 +432,42 @@ fn test_rpc() {
let receiver_1 = receivers.remove(1);
let genesis = Arc::new(Block::<u64>::make_genesis_block());
let genesis_clone = Arc::clone(&genesis);
let node0 = nodes[0].clone();
let peer1 = peers[1];
let vote_msg = VoteMsg::new(
Vote::new(
VoteData::new(BlockInfo::random(1), BlockInfo::random(0)),
peers[0],
placeholder_ledger_info(),
&signers[0],
),
test_utils::placeholder_sync_info(),
);

// verify request block rpc
let mut block_retrieval = receiver_1.block_retrieval;
let on_request_block = async move {
while let Some(request) = block_retrieval.next().await {
request
.response_sender
.send(BlockRetrievalResponse::new(
BlockRetrievalStatus::Succeeded,
vec![Block::clone(genesis_clone.as_ref())],
))
.unwrap();
// make sure the network task is not blocked during RPC
// we limit the network notification queue size to 1 so if it's blocked,
// we can not process 2 votes and the test will timeout
node0.send_vote(vote_msg.clone(), vec![peer1]).await;
node0.send_vote(vote_msg.clone(), vec![peer1]).await;
playground
.wait_for_messages(2, NetworkPlayground::votes_only)
.await;
let response = BlockRetrievalResponse::new(
BlockRetrievalStatus::Succeeded,
vec![Block::clone(genesis_clone.as_ref())],
);
let bytes = ConsensusMsg {
message: Some(ConsensusMsg_oneof::RespondBlock(
response.try_into().unwrap(),
)),
}
.to_bytes()
.unwrap();
request.response_sender.send(Ok(bytes)).unwrap();
}
};
runtime.executor().spawn(on_request_block);
Expand Down

0 comments on commit c9fc19e

Please sign in to comment.