Skip to content

Commit

Permalink
[node] Follow latest & Safe client integration test (#1670)
Browse files Browse the repository at this point in the history
Modify follower protocol to allow follow of latest
Moved follower streaming to authority
Changed io to sui errors
Added integration test for safe client

[node] Basic gossip (#1676)
* Simple gossip logic
* Added gossip test + fix follower edge case at 0
* Add logging

Co-authored-by: George Danezis <george@danez.is>
  • Loading branch information
2 people authored and longbowlu committed May 12, 2022
1 parent 6093137 commit fc8852d
Show file tree
Hide file tree
Showing 17 changed files with 818 additions and 170 deletions.
4 changes: 2 additions & 2 deletions sui/src/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ async fn run_follower(network_client: NetworkClient) {
loop {
let receiver = authority_client
.handle_batch_stream(BatchInfoRequest {
start,
end: start + FOLLOWER_BATCH_SIZE,
start: Some(start),
length: FOLLOWER_BATCH_SIZE,
})
.await;

Expand Down
35 changes: 26 additions & 9 deletions sui_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use sui_adapter::adapter;
use sui_types::serialize::serialize_transaction_info;
use sui_types::{
base_types::*,
batch::UpdateItem,
batch::{TxSequenceNumber, UpdateItem},
committee::Committee,
crypto::AuthoritySignature,
error::{SuiError, SuiResult},
Expand Down Expand Up @@ -535,20 +535,37 @@ impl AuthorityState {
pub async fn handle_batch_info_request(
&self,
request: BatchInfoRequest,
) -> Result<(VecDeque<UpdateItem>, bool), SuiError> {
) -> Result<
(
VecDeque<UpdateItem>,
// Should subscribe, computer start, computed end
(bool, TxSequenceNumber, TxSequenceNumber),
),
SuiError,
> {
// Ensure the range contains some elements and end > start
if request.end <= request.start {
if request.length == 0 {
return Err(SuiError::InvalidSequenceRangeError);
};

// Ensure we are not doing too much work per request
if request.end - request.start > MAX_ITEMS_LIMIT {
if request.length > MAX_ITEMS_LIMIT {
return Err(SuiError::TooManyItemsError(MAX_ITEMS_LIMIT));
}

let (batches, transactions) = self
._database
.batches_and_transactions(request.start, request.end)?;
// If we do not have a start, pick the low watermark from the notifier.
let start = match request.start {
Some(start) => start,
None => {
self.last_batch()?
.expect("Authority is always initialized with a batch")
.batch
.next_sequence_number
}
};
let end = start + request.length;

let (batches, transactions) = self._database.batches_and_transactions(start, end)?;

let mut dq_batches = std::collections::VecDeque::from(batches);
let mut dq_transactions = std::collections::VecDeque::from(transactions);
Expand Down Expand Up @@ -577,15 +594,15 @@ impl AuthorityState {

// whether we have sent everything requested, or need to start
// live notifications.
let should_subscribe = request.end > last_batch_next_seq;
let should_subscribe = end > last_batch_next_seq;

// If any transactions are left they must be outside a batch
while let Some(current_transaction) = dq_transactions.pop_front() {
// Remember the last sequence sent
items.push_back(UpdateItem::Transaction(current_transaction));
}

Ok((items, should_subscribe))
Ok((items, (should_subscribe, start, end)))
}

pub async fn new(
Expand Down
4 changes: 4 additions & 0 deletions sui_core/src/authority/authority_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ impl TransactionNotifier {
})
}

pub fn low_watermark(&self) -> TxSequenceNumber {
self.low_watermark.load(std::sync::atomic::Ordering::SeqCst)
}

/// Get a ticket with a sequence number
pub fn ticket(self: &Arc<Self>) -> SuiResult<TransactionNotifierTicket> {
if self.is_closed.load(std::sync::atomic::Ordering::SeqCst) {
Expand Down
19 changes: 13 additions & 6 deletions sui_core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,21 @@
*/

use std::{collections::BTreeMap, sync::Arc};

use sui_types::{base_types::AuthorityName, error::SuiResult};

use crate::{
authority::AuthorityState, authority_aggregator::AuthorityAggregator,
authority_client::AuthorityAPI,
};

pub mod gossip;
use gossip::gossip_process;

pub struct ActiveAuthority<A> {
// The local authority state
pub authority: Arc<AuthorityState>,
pub state: Arc<AuthorityState>,
// The network interfaces to other authorities
pub net: AuthorityAggregator<A>,
pub net: Arc<AuthorityAggregator<A>>,
}

impl<A> ActiveAuthority<A> {
Expand All @@ -53,8 +55,8 @@ impl<A> ActiveAuthority<A> {
let committee = authority.committee.clone();

Ok(ActiveAuthority {
authority,
net: AuthorityAggregator::new(committee, authority_clients),
state: authority,
net: Arc::new(AuthorityAggregator::new(committee, authority_clients)),
})
}
}
Expand All @@ -65,6 +67,11 @@ where
{
// TODO: Active tasks go here + logic to spawn them all
pub async fn spawn_all_active_processes(self) -> Option<()> {
None
// Spawn a task to take care of gossip
let _gossip_join = tokio::task::spawn(async move {
gossip_process(&self, 4).await;
});

Some(())
}
}
199 changes: 199 additions & 0 deletions sui_core/src/authority_active/gossip/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use futures::{stream::FuturesUnordered, StreamExt};
use std::{collections::HashSet, sync::Arc, time::Duration};
use sui_types::{
base_types::AuthorityName,
batch::{TxSequenceNumber, UpdateItem},
error::SuiError,
messages::{
BatchInfoRequest, BatchInfoResponseItem, ConfirmationTransaction, TransactionInfoRequest,
},
};

use crate::{
authority::AuthorityState, authority_aggregator::AuthorityAggregator,
authority_client::AuthorityAPI, safe_client::SafeClient,
};

use futures::stream::FuturesOrdered;
use tracing::{error, info};

#[cfg(test)]
mod tests;

struct PeerGossip<A> {
peer_name: AuthorityName,
client: SafeClient<A>,
state: Arc<AuthorityState>,
max_seq: Option<TxSequenceNumber>,
aggregator: Arc<AuthorityAggregator<A>>,
}

const EACH_ITEM_DELAY_MS: u64 = 1_000;
const REQUEST_FOLLOW_NUM_DIGESTS: u64 = 100_000;
const REFRESH_FOLLOWER_PERIOD_SECS: u64 = 60;

use super::ActiveAuthority;

pub async fn gossip_process<A>(active_authority: &ActiveAuthority<A>, degree: usize)
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
// Number of tasks at most "degree" and no more than committee - 1
let target_num_tasks: usize = usize::min(
active_authority.state.committee.voting_rights.len() - 1,
degree,
);

// Keep track of names of active peers
let mut peer_names = HashSet::new();
let mut gossip_tasks = FuturesUnordered::new();

// TODO: provide a clean way to get out of the loop.
loop {
let mut k = 0;
while gossip_tasks.len() < target_num_tasks {
let name = active_authority.state.committee.sample();
if peer_names.contains(name) || *name == active_authority.state.name {
continue;
}
peer_names.insert(*name);
gossip_tasks.push(async move {
let peer_gossip = PeerGossip::new(*name, active_authority);
// Add more duration if we make more than 1 to ensure overlap
info!("Gossip: Start gossip from peer {:?}", *name);
peer_gossip
.spawn(Duration::from_secs(REFRESH_FOLLOWER_PERIOD_SECS + k * 15))
.await
});
k += 1;
}

// Let the peer gossip task finish
debug_assert!(!gossip_tasks.is_empty());
let (finished_name, _result) = gossip_tasks.select_next_some().await;
if let Err(err) = _result {
error!(
"Gossip: Peer {:?} finished with error: {}",
finished_name, err
);
} else {
info!("Gossip: End gossip from peer {:?}", finished_name);
}
peer_names.remove(&finished_name);
}
}

impl<A> PeerGossip<A>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
pub fn new(peer_name: AuthorityName, active_authority: &ActiveAuthority<A>) -> PeerGossip<A> {
PeerGossip {
peer_name,
client: active_authority.net.authority_clients[&peer_name].clone(),
state: active_authority.state.clone(),
max_seq: None,
aggregator: active_authority.net.clone(),
}
}

pub async fn spawn(mut self, duration: Duration) -> (AuthorityName, Result<(), SuiError>) {
let peer_name = self.peer_name;
let result = tokio::task::spawn(async move { self.gossip_timeout(duration).await })
.await
.map(|_| ())
.map_err(|_err| SuiError::GenericAuthorityError {
error: "Gossip Join Error".to_string(),
});

(peer_name, result)
}

async fn gossip_timeout(&mut self, duration: Duration) -> Result<(), SuiError> {
// Global timeout, we do not exceed this time in this task.
let mut timeout = Box::pin(tokio::time::sleep(duration));
let mut queue = FuturesOrdered::new();

let req = BatchInfoRequest {
start: self.max_seq,
length: REQUEST_FOLLOW_NUM_DIGESTS,
};

// Get a client
let mut streamx = Box::pin(self.client.handle_batch_stream(req).await?);

loop {
tokio::select! {
_ = &mut timeout => {
// No matter what happens we do not spend too much time
// for any peer.

break },

items = &mut streamx.next() => {
match items {
// Upon receiving a batch
Some(Ok(BatchInfoResponseItem(UpdateItem::Batch(_signed_batch)) )) => {
// Update the longer term seqeunce_number only after a batch that is signed
self.max_seq = Some(_signed_batch.batch.next_sequence_number);
},
// Upon receiving a trasnaction digest we store it, if it is not processed already.
Some(Ok(BatchInfoResponseItem(UpdateItem::Transaction((_seq, _digest))))) => {
if !self.state._database.effects_exists(&_digest)? {
queue.push(async move {
tokio::time::sleep(Duration::from_millis(EACH_ITEM_DELAY_MS)).await;
_digest
});

}

},
// When an error occurs we simply send back the error
Some(Err( err )) => {
return Err(err);
},
// The stream has closed, re-request:
None => {

let req = BatchInfoRequest {
start: self.max_seq,
length: REQUEST_FOLLOW_NUM_DIGESTS,
};

// Get a client
streamx = Box::pin(self.client.handle_batch_stream(req).await?);
},
}
},

digest = &mut queue.next() , if !queue.is_empty() => {
let digest = digest.unwrap();
if !self.state._database.effects_exists(&digest)? {
// We still do not have a transaction others have after some time

// Download the certificate
let response = self.client.handle_transaction_info_request(TransactionInfoRequest::from(digest)).await?;
if let Some(certificate) = response.certified_transaction {

// Process the certificate from one authority to ourselves
self.aggregator.sync_authority_source_to_destination(
ConfirmationTransaction { certificate },
self.peer_name,
self.state.name).await?;
}
else {
// The authority did not return the certificate, despite returning info
// But it should know the certificate!
return Err(SuiError::ByzantineAuthoritySuspicion { authority : self.peer_name });
}
}
},
};
}

Ok(())
}
}
Loading

0 comments on commit fc8852d

Please sign in to comment.