Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[node] Follow latest & Safe client integration test #1670

Merged
merged 3 commits into from
May 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sui/src/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ async fn run_follower(network_client: NetworkClient) {
loop {
let receiver = authority_client
.handle_batch_stream(BatchInfoRequest {
start,
end: start + FOLLOWER_BATCH_SIZE,
start: Some(start),
length: FOLLOWER_BATCH_SIZE,
})
.await;

Expand Down
35 changes: 26 additions & 9 deletions sui_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use sui_adapter::adapter;
use sui_types::serialize::serialize_transaction_info;
use sui_types::{
base_types::*,
batch::UpdateItem,
batch::{TxSequenceNumber, UpdateItem},
committee::Committee,
crypto::AuthoritySignature,
error::{SuiError, SuiResult},
Expand Down Expand Up @@ -535,20 +535,37 @@ impl AuthorityState {
pub async fn handle_batch_info_request(
&self,
request: BatchInfoRequest,
) -> Result<(VecDeque<UpdateItem>, bool), SuiError> {
) -> Result<
(
VecDeque<UpdateItem>,
// Should subscribe, computer start, computed end
(bool, TxSequenceNumber, TxSequenceNumber),
),
SuiError,
> {
// Ensure the range contains some elements and end > start
if request.end <= request.start {
if request.length == 0 {
return Err(SuiError::InvalidSequenceRangeError);
};

// Ensure we are not doing too much work per request
if request.end - request.start > MAX_ITEMS_LIMIT {
if request.length > MAX_ITEMS_LIMIT {
return Err(SuiError::TooManyItemsError(MAX_ITEMS_LIMIT));
}

let (batches, transactions) = self
._database
.batches_and_transactions(request.start, request.end)?;
// If we do not have a start, pick the low watermark from the notifier.
let start = match request.start {
Some(start) => start,
None => {
self.last_batch()?
.expect("Authority is always initialized with a batch")
.batch
.next_sequence_number
}
};
let end = start + request.length;

let (batches, transactions) = self._database.batches_and_transactions(start, end)?;

let mut dq_batches = std::collections::VecDeque::from(batches);
let mut dq_transactions = std::collections::VecDeque::from(transactions);
Expand Down Expand Up @@ -577,15 +594,15 @@ impl AuthorityState {

// whether we have sent everything requested, or need to start
// live notifications.
let should_subscribe = request.end > last_batch_next_seq;
let should_subscribe = end > last_batch_next_seq;

// If any transactions are left they must be outside a batch
while let Some(current_transaction) = dq_transactions.pop_front() {
// Remember the last sequence sent
items.push_back(UpdateItem::Transaction(current_transaction));
}

Ok((items, should_subscribe))
Ok((items, (should_subscribe, start, end)))
}

pub async fn new(
Expand Down
4 changes: 4 additions & 0 deletions sui_core/src/authority/authority_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ impl TransactionNotifier {
})
}

pub fn low_watermark(&self) -> TxSequenceNumber {
self.low_watermark.load(std::sync::atomic::Ordering::SeqCst)
}

/// Get a ticket with a sequence number
pub fn ticket(self: &Arc<Self>) -> SuiResult<TransactionNotifierTicket> {
if self.is_closed.load(std::sync::atomic::Ordering::SeqCst) {
Expand Down
19 changes: 13 additions & 6 deletions sui_core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,21 @@
*/

use std::{collections::BTreeMap, sync::Arc};

use sui_types::{base_types::AuthorityName, error::SuiResult};

use crate::{
authority::AuthorityState, authority_aggregator::AuthorityAggregator,
authority_client::AuthorityAPI,
};

pub mod gossip;
use gossip::gossip_process;

pub struct ActiveAuthority<A> {
// The local authority state
pub authority: Arc<AuthorityState>,
pub state: Arc<AuthorityState>,
// The network interfaces to other authorities
pub net: AuthorityAggregator<A>,
pub net: Arc<AuthorityAggregator<A>>,
}

impl<A> ActiveAuthority<A> {
Expand All @@ -53,8 +55,8 @@ impl<A> ActiveAuthority<A> {
let committee = authority.committee.clone();

Ok(ActiveAuthority {
authority,
net: AuthorityAggregator::new(committee, authority_clients),
state: authority,
net: Arc::new(AuthorityAggregator::new(committee, authority_clients)),
})
}
}
Expand All @@ -65,6 +67,11 @@ where
{
// TODO: Active tasks go here + logic to spawn them all
pub async fn spawn_all_active_processes(self) -> Option<()> {
None
// Spawn a task to take care of gossip
let _gossip_join = tokio::task::spawn(async move {
gossip_process(&self, 4).await;
});

Some(())
}
}
199 changes: 199 additions & 0 deletions sui_core/src/authority_active/gossip/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use futures::{stream::FuturesUnordered, StreamExt};
use std::{collections::HashSet, sync::Arc, time::Duration};
use sui_types::{
base_types::AuthorityName,
batch::{TxSequenceNumber, UpdateItem},
error::SuiError,
messages::{
BatchInfoRequest, BatchInfoResponseItem, ConfirmationTransaction, TransactionInfoRequest,
},
};

use crate::{
authority::AuthorityState, authority_aggregator::AuthorityAggregator,
authority_client::AuthorityAPI, safe_client::SafeClient,
};

use futures::stream::FuturesOrdered;
use tracing::{error, info};

#[cfg(test)]
mod tests;

struct PeerGossip<A> {
peer_name: AuthorityName,
client: SafeClient<A>,
state: Arc<AuthorityState>,
max_seq: Option<TxSequenceNumber>,
aggregator: Arc<AuthorityAggregator<A>>,
}

const EACH_ITEM_DELAY_MS: u64 = 1_000;
const REQUEST_FOLLOW_NUM_DIGESTS: u64 = 100_000;
const REFRESH_FOLLOWER_PERIOD_SECS: u64 = 60;

use super::ActiveAuthority;

pub async fn gossip_process<A>(active_authority: &ActiveAuthority<A>, degree: usize)
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
// Number of tasks at most "degree" and no more than committee - 1
let target_num_tasks: usize = usize::min(
active_authority.state.committee.voting_rights.len() - 1,
degree,
);

// Keep track of names of active peers
let mut peer_names = HashSet::new();
let mut gossip_tasks = FuturesUnordered::new();

// TODO: provide a clean way to get out of the loop.
loop {
let mut k = 0;
while gossip_tasks.len() < target_num_tasks {
let name = active_authority.state.committee.sample();
if peer_names.contains(name) || *name == active_authority.state.name {
continue;
}
peer_names.insert(*name);
gossip_tasks.push(async move {
let peer_gossip = PeerGossip::new(*name, active_authority);
// Add more duration if we make more than 1 to ensure overlap
info!("Gossip: Start gossip from peer {:?}", *name);
peer_gossip
.spawn(Duration::from_secs(REFRESH_FOLLOWER_PERIOD_SECS + k * 15))
.await
});
k += 1;
}

// Let the peer gossip task finish
debug_assert!(!gossip_tasks.is_empty());
let (finished_name, _result) = gossip_tasks.select_next_some().await;
if let Err(err) = _result {
error!(
"Gossip: Peer {:?} finished with error: {}",
finished_name, err
);
} else {
info!("Gossip: End gossip from peer {:?}", finished_name);
}
peer_names.remove(&finished_name);
}
}

impl<A> PeerGossip<A>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
pub fn new(peer_name: AuthorityName, active_authority: &ActiveAuthority<A>) -> PeerGossip<A> {
PeerGossip {
peer_name,
client: active_authority.net.authority_clients[&peer_name].clone(),
state: active_authority.state.clone(),
max_seq: None,
aggregator: active_authority.net.clone(),
}
}

pub async fn spawn(mut self, duration: Duration) -> (AuthorityName, Result<(), SuiError>) {
let peer_name = self.peer_name;
let result = tokio::task::spawn(async move { self.gossip_timeout(duration).await })
.await
.map(|_| ())
.map_err(|_err| SuiError::GenericAuthorityError {
error: "Gossip Join Error".to_string(),
});

(peer_name, result)
}

async fn gossip_timeout(&mut self, duration: Duration) -> Result<(), SuiError> {
// Global timeout, we do not exceed this time in this task.
let mut timeout = Box::pin(tokio::time::sleep(duration));
let mut queue = FuturesOrdered::new();

let req = BatchInfoRequest {
start: self.max_seq,
length: REQUEST_FOLLOW_NUM_DIGESTS,
};

// Get a client
let mut streamx = Box::pin(self.client.handle_batch_stream(req).await?);

loop {
tokio::select! {
_ = &mut timeout => {
// No matter what happens we do not spend too much time
// for any peer.

break },

items = &mut streamx.next() => {
match items {
// Upon receiving a batch
Some(Ok(BatchInfoResponseItem(UpdateItem::Batch(_signed_batch)) )) => {
// Update the longer term seqeunce_number only after a batch that is signed
self.max_seq = Some(_signed_batch.batch.next_sequence_number);
},
// Upon receiving a trasnaction digest we store it, if it is not processed already.
Some(Ok(BatchInfoResponseItem(UpdateItem::Transaction((_seq, _digest))))) => {
if !self.state._database.effects_exists(&_digest)? {
queue.push(async move {
tokio::time::sleep(Duration::from_millis(EACH_ITEM_DELAY_MS)).await;
_digest
});

}

},
// When an error occurs we simply send back the error
Some(Err( err )) => {
return Err(err);
},
// The stream has closed, re-request:
None => {

let req = BatchInfoRequest {
start: self.max_seq,
length: REQUEST_FOLLOW_NUM_DIGESTS,
};

// Get a client
streamx = Box::pin(self.client.handle_batch_stream(req).await?);
},
}
},

digest = &mut queue.next() , if !queue.is_empty() => {
let digest = digest.unwrap();
if !self.state._database.effects_exists(&digest)? {
// We still do not have a transaction others have after some time

// Download the certificate
let response = self.client.handle_transaction_info_request(TransactionInfoRequest::from(digest)).await?;
if let Some(certificate) = response.certified_transaction {

// Process the certificate from one authority to ourselves
self.aggregator.sync_authority_source_to_destination(
ConfirmationTransaction { certificate },
self.peer_name,
self.state.name).await?;
}
else {
// The authority did not return the certificate, despite returning info
// But it should know the certificate!
return Err(SuiError::ByzantineAuthoritySuspicion { authority : self.peer_name });
}
}
},
};
}

Ok(())
}
}
Loading