Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server + bench] Now start the batch maker, and integrate into bench #1154

Merged
merged 3 commits into from
Mar 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions network_utils/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,18 @@ pub trait RwChannel<'a> {
}

/// The result of spawning a server is oneshot channel to kill it and a handle to track completion.
pub struct SpawnedServer {
pub struct SpawnedServer<S> {
state: Arc<S>,
tx_cancellation: futures::channel::oneshot::Sender<()>,
handle: tokio::task::JoinHandle<Result<(), std::io::Error>>,
local_addr: SocketAddr,
}

impl SpawnedServer {
impl<S> SpawnedServer<S> {
pub fn state(&self) -> &Arc<S> {
&self.state
}

pub async fn join(self) -> Result<(), std::io::Error> {
// Note that dropping `self.complete` would terminate the server.
self.handle.await??;
Expand Down Expand Up @@ -85,9 +90,9 @@ pub async fn connect(
/// Run a server for this protocol and the given message handler.
pub async fn spawn_server<S>(
address: &str,
state: S,
state: Arc<S>,
buffer_size: usize,
) -> Result<SpawnedServer, std::io::Error>
) -> Result<SpawnedServer<S>, std::io::Error>
where
S: MessageHandler<TcpDataStream> + Send + Sync + 'static,
{
Expand All @@ -104,11 +109,12 @@ where

let handle = tokio::spawn(run_tcp_server(
listener,
state,
state.clone(),
rx_cancellation,
buffer_size,
));
Ok(SpawnedServer {
state,
tx_cancellation,
handle,
local_addr,
Expand Down Expand Up @@ -187,14 +193,13 @@ impl<'a> RwChannel<'a> for TcpDataStream {
// Server implementation for TCP.
async fn run_tcp_server<S>(
listener: TcpListener,
state: S,
state: Arc<S>,
mut exit_future: futures::channel::oneshot::Receiver<()>,
_buffer_size: usize,
) -> Result<(), std::io::Error>
where
S: MessageHandler<TcpDataStream> + Send + Sync + 'static,
{
let guarded_state = Arc::new(state);
loop {
let stream;

Expand All @@ -206,7 +211,7 @@ where
}
}

let guarded_state = guarded_state.clone();
let guarded_state = state.clone();
tokio::spawn(async move {
let framed = TcpDataStream::from_tcp_stream(stream, _buffer_size);
guarded_state.handle_messages(framed).await
Expand Down
2 changes: 1 addition & 1 deletion network_utils/src/unit_tests/transport_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async fn test_server() -> Result<(usize, usize), std::io::Error> {
let counter = Arc::new(AtomicUsize::new(0));
let mut received = 0;

let server = spawn_server(&address, TestService::new(counter.clone()), 100).await?;
let server = spawn_server(&address, Arc::new(TestService::new(counter.clone())), 100).await?;

let mut client = connect(address.clone(), 1000).await?;
client.write_data(b"abcdef").await?;
Expand Down
62 changes: 60 additions & 2 deletions sui/src/microbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ use std::collections::{HashSet, VecDeque};
use std::time::{Duration, Instant};
use structopt::StructOpt;
use sui_adapter::genesis;
use sui_core::authority_client::AuthorityClient;
use sui_core::{authority::*, authority_server::AuthorityServer};
use sui_network::{network::NetworkClient, transport};
use sui_types::batch::UpdateItem;
use sui_types::crypto::{get_key_pair, AuthoritySignature, KeyPair, PublicKeyBytes, Signature};
use sui_types::SUI_FRAMEWORK_ADDRESS;
use sui_types::{base_types::*, committee::*, messages::*, object::Object, serialize::*};
Expand Down Expand Up @@ -260,7 +262,10 @@ impl ClientServerBenchmark {
(state, transactions)
}

async fn spawn_server(&self, state: AuthorityState) -> transport::SpawnedServer {
async fn spawn_server(
&self,
state: AuthorityState,
) -> transport::SpawnedServer<AuthorityServer> {
let server = AuthorityServer::new(self.host.clone(), self.port, self.buffer_size, state);
server.spawn().await.unwrap()
}
Expand Down Expand Up @@ -292,6 +297,59 @@ impl ClientServerBenchmark {
Duration::from_micros(self.send_timeout_us),
Duration::from_micros(self.recv_timeout_us),
);

// We spawn a second client that listens to the batch interface
let client_batch = NetworkClient::new(
self.host.clone(),
self.port,
self.buffer_size,
Duration::from_micros(self.send_timeout_us),
Duration::from_micros(self.recv_timeout_us),
);

let _batch_client_handle = tokio::task::spawn(async move {
let authority_client = AuthorityClient::new(client_batch);

let mut start = 0;

loop {
let receiver = authority_client
.handle_batch_streaming_as_stream(BatchInfoRequest {
start,
end: start + 10_000,
})
.await;

if let Err(e) = &receiver {
error!("Listener error: {:?}", e);
break;
}
let mut receiver = receiver.unwrap();

info!("Start batch listener at sequence: {}.", start);
while let Some(item) = receiver.next().await {
match item {
Ok(BatchInfoResponseItem(UpdateItem::Transaction((
_tx_seq,
_tx_digest,
)))) => {
start = _tx_seq + 1;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have run this locally and confirmed that it is functioning as a stream now. 👍

We are only generating transaction load and not batch load so far, is that right or are we making batches somewhere, as alluded to in the title?

Copy link
Collaborator Author

@gdanezis gdanezis Mar 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see batches when I increase the number of transactions in the microbench. See Client received batch up to sequence ...:

george@george-XPS-13-9310:~/Projects/fastnft$ cargo run --release --bin=microbench -- --num-transactions 1000000 --batch-size 100 --use-move   Compiling sui_core v0.1.0 (/home/george/Projects/fastnft/sui_core)
   Compiling sui v0.1.0 (/home/george/Projects/fastnft/sui)
    Finished release [optimized] target(s) in 40.83s
     Running `target/release/microbench --num-transactions 1000000 --batch-size 100 --use-move`
2022-03-31T09:19:01.534738Z  INFO microbench: Starting benchmark: TransactionsAndCerts
2022-03-31T09:19:01.534796Z  INFO microbench: Preparing accounts.
2022-03-31T09:19:01.535815Z  INFO microbench: Init Authority.
2022-03-31T09:19:01.535996Z  INFO microbench: Open database on path: "/tmp/DB_19AA4FBA3E838077C1929FF3FB6B53E7FF7CFBD7"
2022-03-31T09:19:01.763730Z  INFO microbench: Generate empty store with Genesis.
2022-03-31T09:19:07.675950Z  INFO microbench: Preparing transactions.
2022-03-31T09:19:10.508837Z  INFO sui_network::transport: Listening to TCP traffic on 127.0.0.1:9555
2022-03-31T09:19:10.508919Z  INFO microbench: Sending requests.
2022-03-31T09:19:10.508942Z  INFO microbench: Number of TCP connections: 8
2022-03-31T09:19:10.509096Z  INFO microbench: Start batch listener at sequence: 0.
2022-03-31T09:19:10.509153Z  INFO sui_network::network: Sending TCP requests to 127.0.0.1:9555
2022-03-31T09:19:10.509392Z  INFO sui_network::network: Sending TCP requests to 127.0.0.1:9555
2022-03-31T09:19:10.509425Z  INFO microbench: Client received batch up to sequence 0
2022-03-31T09:19:10.509668Z  INFO sui_network::network: Sending TCP requests to 127.0.0.1:9555
2022-03-31T09:19:10.509902Z  INFO sui_network::network: Sending TCP requests to 127.0.0.1:9555
2022-03-31T09:19:10.510135Z  INFO sui_network::network: Sending TCP requests to 127.0.0.1:9555
2022-03-31T09:19:10.510453Z  INFO sui_network::network: Sending TCP requests to 127.0.0.1:9555
2022-03-31T09:19:10.510718Z  INFO sui_network::network: Sending TCP requests to 127.0.0.1:9555
2022-03-31T09:19:10.510995Z  INFO sui_network::network: Sending TCP requests to 127.0.0.1:9555
2022-03-31T09:19:14.988668Z  INFO microbench: Client received batch up to sequence 1000
2022-03-31T09:19:15.958129Z  INFO sui_core::authority_server: 127.0.0.1:9555 has processed 5000 packets
2022-03-31T09:19:19.672465Z  INFO microbench: Client received batch up to sequence 2000
2022-03-31T09:19:21.763921Z  INFO sui_core::authority_server: 127.0.0.1:9555 has processed 10000 packets
2022-03-31T09:19:24.736939Z  INFO microbench: Client received batch up to sequence 3000
2022-03-31T09:19:27.950491Z  INFO sui_core::authority_server: 127.0.0.1:9555 has processed 15000 packets
2022-03-31T09:19:30.056884Z  INFO microbench: Client received batch up to sequence 4000
2022-03-31T09:19:34.156146Z  INFO sui_network::network: Done sending TCP requests to 127.0.0.1:9555
2022-03-31T09:19:34.158119Z  INFO microbench: Client received batch up to sequence 5000
2022-03-31T09:19:34.160644Z  INFO microbench: Client received batch up to sequence 6000
2022-03-31T09:19:34.163405Z  INFO microbench: Client received batch up to sequence 7000
2022-03-31T09:19:34.164014Z  INFO microbench: Client received batch up to sequence 8000
2022-03-31T09:19:34.166854Z  INFO microbench: Client received batch up to sequence 9000
2022-03-31T09:19:34.369739Z  INFO sui_network::network: Done sending TCP requests to 127.0.0.1:9555
2022-03-31T09:19:34.848897Z  INFO sui_network::network: Done sending TCP requests to 127.0.0.1:9555
2022-03-31T09:19:34.923066Z  INFO sui_network::network: Done sending TCP requests to 127.0.0.1:9555
2022-03-31T09:19:34.949682Z  INFO sui_network::network: Done sending TCP requests to 127.0.0.1:9555
2022-03-31T09:19:34.988530Z  INFO sui_network::network: Done sending TCP requests to 127.0.0.1:9555
2022-03-31T09:19:35.006763Z  INFO sui_network::network: Done sending TCP requests to 127.0.0.1:9555
2022-03-31T09:19:35.046621Z  INFO sui_core::authority_server: 127.0.0.1:9555 has processed 20000 packets
2022-03-31T09:19:35.046776Z  INFO sui_network::network: Done sending TCP requests to 127.0.0.1:9555
2022-03-31T09:19:35.046929Z  INFO microbench: Received 20000 responses.
2022-03-31T09:19:35.047350Z  INFO microbench: Start batch listener at sequence: 10000.
2022-03-31T09:19:35.047553Z  INFO microbench: Client received batch up to sequence 10000
2022-03-31T09:19:37.330513Z  WARN microbench: Completed benchmark for TransactionsAndCerts
Total time: 24537974us, items: 1000000, tx/sec: 40753.16079477466

Maybe what makes the difference is --num-transactions 1000000 --batch-size 100. The default values are too low number of transactions, and too high level of batching them to observe a follower batch.

Ok(BatchInfoResponseItem(UpdateItem::Batch(_signed_batch))) => {
info!(
"Client received batch up to sequence {}",
_signed_batch.batch.next_sequence_number
);
}
Err(err) => {
error!("{:?}", err);
break;
}
}
}
}
});

info!("Sending requests.");
if self.single_operation {
// Send batches one by one
Expand Down Expand Up @@ -354,7 +412,7 @@ fn make_transfer_transaction(

SingleTransactionKind::Call(MoveCall {
package: framework_obj_ref,
module: ident_str!("GAS").to_owned(),
module: ident_str!("SUI").to_owned(),
function: ident_str!("transfer").to_owned(),
type_arguments: Vec::new(),
object_arguments: vec![object_ref],
Expand Down
2 changes: 1 addition & 1 deletion sui/src/sui_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl SuiCommand {
}

pub struct SuiNetwork {
pub spawned_authorities: Vec<SpawnedServer>,
pub spawned_authorities: Vec<SpawnedServer<AuthorityServer>>,
}

impl SuiNetwork {
Expand Down
2 changes: 1 addition & 1 deletion sui_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub use authority_store::{AuthorityStore, GatewayStore};

pub mod authority_notifier;

const MAX_ITEMS_LIMIT: u64 = 10_000;
const MAX_ITEMS_LIMIT: u64 = 100_000;
const BROADCAST_CAPACITY: usize = 10_000;

/// a Trait object for `signature::Signer` that is:
Expand Down
59 changes: 57 additions & 2 deletions sui_core/src/authority_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

use async_trait::async_trait;
use futures::channel::mpsc::{channel, Receiver};
use futures::SinkExt;
use futures::Stream;
use futures::{SinkExt, StreamExt};
use std::io;
use sui_network::network::{parse_recv_bytes, NetworkClient};
use sui_network::transport::TcpDataStream;
use sui_types::batch::UpdateItem;
use sui_types::{error::SuiError, messages::*, serialize::*};

Expand Down Expand Up @@ -141,7 +143,7 @@ impl AuthorityAPI for AuthorityClient {
loop {
let next_data = tcp_stream.read_data().await.transpose();
let data_result = parse_recv_bytes(next_data);
match deserialize_batch_info(data_result) {
match data_result.and_then(deserialize_batch_info) {
Ok(batch_info_response_item) => {
// send to the caller via the channel
let _ = tx_output.send(Ok(batch_info_response_item.clone())).await;
Expand Down Expand Up @@ -172,3 +174,56 @@ impl AuthorityAPI for AuthorityClient {
Ok(tr_output)
}
}

impl AuthorityClient {
/// Handle Batch information requests for this authority.
pub async fn handle_batch_streaming_as_stream(
&self,
request: BatchInfoRequest,
) -> Result<impl Stream<Item = Result<BatchInfoResponseItem, SuiError>>, io::Error> {
let tcp_stream = self
.0
.connect_for_stream(serialize_batch_request(&request))
.await?;

let mut error_count = 0;
let TcpDataStream { framed_read, .. } = tcp_stream;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have never seen this syntax before, this is useful.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah if you own something outright (not just & or &mut), you can break it up subject to some visibility constraints I think.


let stream = framed_read
.map(|item| {
item
// Convert io error to SuiCLient error
.map_err(|err| SuiError::ClientIoError {
error: format!("io error: {:?}", err),
})
// If no error try to deserialize
.and_then(|bytes| match deserialize_message(&bytes[..]) {
Ok(SerializedMessage::Error(error)) => Err(SuiError::ClientIoError {
error: format!("io error: {:?}", error),
}),
Ok(message) => Ok(message),
Err(_) => Err(SuiError::InvalidDecoding),
})
// If deserialized try to parse as Batch Item
.and_then(deserialize_batch_info)
})
// Establish conditions to stop taking from the stream
.take_while(move |item| {
let flag = match item {
Ok(BatchInfoResponseItem(UpdateItem::Batch(signed_batch))) => {
signed_batch.batch.next_sequence_number < request.end
}
Ok(BatchInfoResponseItem(UpdateItem::Transaction((seq, _digest)))) => {
*seq < request.end
}
Err(_e) => {
// TODO: record e
error_count += 1;
error_count < MAX_ERRORS
}
};
futures::future::ready(flag)
});
Ok(stream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhhh. This is how we would do this functionally. I see the map function allows us to turn a "stream" into an iterator, and this is possible because the framed_read part of the tcpDataStream is iterable. I attempted to iterate on the entire stream and that doesn't work.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the read part is a stream, and the write part is a sink. They offer interesting combinators to build more fancy streams and sinks.

}
}
14 changes: 10 additions & 4 deletions sui_core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ mod server_tests;
set it, or a dynamic mechanism to adapt it according to observed workload.
*/
const CHUNK_SIZE: usize = 36;
const MIN_BATCH_SIZE: u64 = 1000;
const MAX_DELAY_MILLIS: u64 = 5_000; // 5 sec

pub struct AuthorityServer {
server: NetworkServer,
Expand Down Expand Up @@ -61,7 +63,6 @@ impl AuthorityServer {
max_delay: Duration,
) -> SuiResult<tokio::task::JoinHandle<SuiResult<()>>> {
// Start the batching subsystem, and register the handles with the authority.
// let last_batch = self.state.init_batches_from_database()?;
let local_server = self.clone();

let _batch_join_handle = tokio::task::spawn(async move {
Expand All @@ -74,12 +75,17 @@ impl AuthorityServer {
Ok(_batch_join_handle)
}

pub async fn spawn(self) -> Result<SpawnedServer, io::Error> {
pub async fn spawn(self) -> Result<SpawnedServer<AuthorityServer>, io::Error> {
let address = format!("{}:{}", self.server.base_address, self.server.base_port);
let buffer_size = self.server.buffer_size;
let guarded_state = Arc::new(self);

// Launch server for the appropriate protocol.
spawn_server(&address, self, buffer_size).await
// Start the batching subsystem
let _join_handle = guarded_state
.spawn_batch_subsystem(MIN_BATCH_SIZE, Duration::from_millis(MAX_DELAY_MILLIS))
.await;

spawn_server(&address, guarded_state, buffer_size).await
}

async fn handle_batch_streaming<'a, 'b, A>(
Expand Down
11 changes: 4 additions & 7 deletions sui_types/src/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,11 @@ pub fn deserialize_transaction_info(
}

pub fn deserialize_batch_info(
message: Result<SerializedMessage, SuiError>,
message: SerializedMessage,
) -> Result<BatchInfoResponseItem, SuiError> {
match message {
Ok(message) => match message {
SerializedMessage::BatchInfoResp(resp) => Ok(*resp),
SerializedMessage::Error(error) => Err(*error),
_ => Err(SuiError::UnexpectedMessage),
},
Err(e) => Err(e),
SerializedMessage::BatchInfoResp(resp) => Ok(*resp),
SerializedMessage::Error(error) => Err(*error),
_ => Err(SuiError::UnexpectedMessage),
}
}
4 changes: 2 additions & 2 deletions test_utils/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Sequencer {
let input_server = InputServer { tx_input };
sui_network::transport::spawn_server(
&sequencer.input_address.to_string(),
input_server,
Arc::new(input_server),
sequencer.buffer_size,
)
.await
Expand All @@ -73,7 +73,7 @@ impl Sequencer {
let subscriber_server = SubscriberServer::new(tx_subscriber, store);
sui_network::transport::spawn_server(
&sequencer.subscriber_address.to_string(),
subscriber_server,
Arc::new(subscriber_server),
sequencer.buffer_size,
)
.await
Expand Down