forked from facebookresearch/narwhal
-
Notifications
You must be signed in to change notification settings - Fork 32
/
lib.rs
304 lines (266 loc) · 11.8 KB
/
lib.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
// Copyright(C) Facebook, Inc. and its affiliates.
use config::{Committee, Stake};
use crypto::Hash as _;
use crypto::{Digest, PublicKey};
use log::{debug, info, log_enabled, warn};
use primary::{Certificate, Round};
use std::cmp::max;
use std::collections::{HashMap, HashSet};
use tokio::sync::mpsc::{Receiver, Sender};
#[cfg(test)]
#[path = "tests/consensus_tests.rs"]
pub mod consensus_tests;
/// The representation of the DAG in memory.
type Dag = HashMap<Round, HashMap<PublicKey, (Digest, Certificate)>>;
/// The state that needs to be persisted for crash-recovery.
struct State {
/// The last committed round.
last_committed_round: Round,
// Keeps the last committed round for each authority. This map is used to clean up the dag and
// ensure we don't commit twice the same certificate.
last_committed: HashMap<PublicKey, Round>,
/// Keeps the latest committed certificate (and its parents) for every authority. Anything older
/// must be regularly cleaned up through the function `update`.
dag: Dag,
}
impl State {
fn new(genesis: Vec<Certificate>) -> Self {
let genesis = genesis
.into_iter()
.map(|x| (x.origin(), (x.digest(), x)))
.collect::<HashMap<_, _>>();
Self {
last_committed_round: 0,
last_committed: genesis.iter().map(|(x, (_, y))| (*x, y.round())).collect(),
dag: [(0, genesis)].iter().cloned().collect(),
}
}
/// Update and clean up internal state base on committed certificates.
fn update(&mut self, certificate: &Certificate, gc_depth: Round) {
self.last_committed
.entry(certificate.origin())
.and_modify(|r| *r = max(*r, certificate.round()))
.or_insert_with(|| certificate.round());
let last_committed_round = *self.last_committed.values().max().unwrap();
self.last_committed_round = last_committed_round;
// TODO: This cleanup is dangerous: we need to ensure consensus can receive idempotent replies
// from the primary. Here we risk cleaning up a certificate and receiving it again later.
for (name, round) in &self.last_committed {
self.dag.retain(|r, authorities| {
authorities.retain(|n, _| n != name || r >= round);
!authorities.is_empty() && r + gc_depth >= last_committed_round
});
}
}
}
pub struct Consensus {
/// The committee information.
committee: Committee,
/// The depth of the garbage collector.
gc_depth: Round,
/// Receives new certificates from the primary. The primary should send us new certificates only
/// if it already sent us its whole history.
rx_primary: Receiver<Certificate>,
/// Outputs the sequence of ordered certificates to the primary (for cleanup and feedback).
tx_primary: Sender<Certificate>,
/// Outputs the sequence of ordered certificates to the application layer.
tx_output: Sender<Certificate>,
/// The genesis certificates.
genesis: Vec<Certificate>,
}
impl Consensus {
pub fn spawn(
committee: Committee,
gc_depth: Round,
rx_primary: Receiver<Certificate>,
tx_primary: Sender<Certificate>,
tx_output: Sender<Certificate>,
) {
tokio::spawn(async move {
Self {
committee: committee.clone(),
gc_depth,
rx_primary,
tx_primary,
tx_output,
genesis: Certificate::genesis(&committee),
}
.run()
.await;
});
}
async fn run(&mut self) {
// The consensus state (everything else is immutable).
let mut state = State::new(self.genesis.clone());
// Listen to incoming certificates.
while let Some(certificate) = self.rx_primary.recv().await {
debug!("Processing {:?}", certificate);
let round = certificate.round();
// Add the new certificate to the local storage.
state
.dag
.entry(round)
.or_insert_with(HashMap::new)
.insert(certificate.origin(), (certificate.digest(), certificate));
// Try to order the dag to commit. Start from the highest round for which we have at least
// 2f+1 certificates. This is because we need them to reveal the common coin.
let r = round - 1;
// We only elect leaders for even round numbers.
if r % 2 != 0 || r < 4 {
continue;
}
// Get the certificate's digest of the leader of round r-2. If we already ordered this leader,
// there is nothing to do.
let leader_round = r - 2;
if leader_round <= state.last_committed_round {
continue;
}
let (leader_digest, leader) = match self.leader(leader_round, &state.dag) {
Some(x) => x,
None => continue,
};
// Check if the leader has f+1 support from its children (ie. round r-1).
let stake: Stake = state
.dag
.get(&(r - 1))
.expect("We should have the whole history by now")
.values()
.filter(|(_, x)| x.header.parents.contains(&leader_digest))
.map(|(_, x)| self.committee.stake(&x.origin()))
.sum();
// If it is the case, we can commit the leader. But first, we need to recursively go back to
// the last committed leader, and commit all preceding leaders in the right order. Committing
// a leader block means committing all its dependencies.
if stake < self.committee.validity_threshold() {
debug!("Leader {:?} does not have enough support", leader);
continue;
}
// Get an ordered list of past leaders that are linked to the current leader.
debug!("Leader {:?} has enough support", leader);
let mut sequence = Vec::new();
for leader in self.order_leaders(leader, &state).iter().rev() {
// Starting from the oldest leader, flatten the sub-dag referenced by the leader.
for x in self.order_dag(leader, &state) {
// Update and clean up internal state.
state.update(&x, self.gc_depth);
// Add the certificate to the sequence.
sequence.push(x);
}
}
// Log the latest committed round of every authority (for debug).
if log_enabled!(log::Level::Debug) {
for (name, round) in &state.last_committed {
debug!("Latest commit of {}: Round {}", name, round);
}
}
// Output the sequence in the right order.
for certificate in sequence {
#[cfg(not(feature = "benchmark"))]
info!("Committed {}", certificate.header);
#[cfg(feature = "benchmark")]
for digest in certificate.header.payload.keys() {
// NOTE: This log entry is used to compute performance.
info!("Committed {} -> {:?}", certificate.header, digest);
}
self.tx_primary
.send(certificate.clone())
.await
.expect("Failed to send certificate to primary");
if let Err(e) = self.tx_output.send(certificate).await {
warn!("Failed to output certificate: {}", e);
}
}
}
}
/// Returns the certificate (and the certificate's digest) originated by the leader of the
/// specified round (if any).
fn leader<'a>(&self, round: Round, dag: &'a Dag) -> Option<&'a (Digest, Certificate)> {
// TODO: We should elect the leader of round r-2 using the common coin revealed at round r.
// At this stage, we are guaranteed to have 2f+1 certificates from round r (which is enough to
// compute the coin). We currently just use round-robin.
#[cfg(test)]
let coin = 0;
#[cfg(not(test))]
let coin = round;
// Elect the leader.
let mut keys: Vec<_> = self.committee.authorities.keys().cloned().collect();
keys.sort();
let leader = keys[coin as usize % self.committee.size()];
// Return its certificate and the certificate's digest.
dag.get(&round).map(|x| x.get(&leader)).flatten()
}
/// Order the past leaders that we didn't already commit.
fn order_leaders(&self, leader: &Certificate, state: &State) -> Vec<Certificate> {
let mut to_commit = vec![leader.clone()];
let mut leader = leader;
for r in (state.last_committed_round + 2..=leader.round() - 2)
.rev()
.step_by(2)
{
// Get the certificate proposed by the previous leader.
let (_, prev_leader) = match self.leader(r, &state.dag) {
Some(x) => x,
None => continue,
};
// Check whether there is a path between the last two leaders.
if self.linked(leader, prev_leader, &state.dag) {
to_commit.push(prev_leader.clone());
leader = prev_leader;
}
}
to_commit
}
/// Checks if there is a path between two leaders.
fn linked(&self, leader: &Certificate, prev_leader: &Certificate, dag: &Dag) -> bool {
let mut parents = vec![leader];
for r in (prev_leader.round()..leader.round()).rev() {
parents = dag
.get(&(r))
.expect("We should have the whole history by now")
.values()
.filter(|(digest, _)| parents.iter().any(|x| x.header.parents.contains(digest)))
.map(|(_, certificate)| certificate)
.collect();
}
parents.contains(&prev_leader)
}
/// Flatten the dag referenced by the input certificate. This is a classic depth-first search (pre-order):
/// https://en.wikipedia.org/wiki/Tree_traversal#Pre-order
fn order_dag(&self, leader: &Certificate, state: &State) -> Vec<Certificate> {
debug!("Processing sub-dag of {:?}", leader);
let mut ordered = Vec::new();
let mut already_ordered = HashSet::new();
let mut buffer = vec![leader];
while let Some(x) = buffer.pop() {
debug!("Sequencing {:?}", x);
ordered.push(x.clone());
for parent in &x.header.parents {
let (digest, certificate) = match state
.dag
.get(&(x.round() - 1))
.map(|x| x.values().find(|(x, _)| x == parent))
.flatten()
{
Some(x) => x,
None => continue, // We already ordered or GC up to here.
};
// We skip the certificate if we (1) already processed it or (2) we reached a round that we already
// committed for this authority.
let mut skip = already_ordered.contains(&digest);
skip |= state
.last_committed
.get(&certificate.origin())
.map_or_else(|| false, |r| r == &certificate.round());
if !skip {
buffer.push(certificate);
already_ordered.insert(digest);
}
}
}
// Ensure we do not commit garbage collected certificates.
ordered.retain(|x| x.round() + self.gc_depth >= state.last_committed_round);
// Ordering the output by round is not really necessary but it makes the commit sequence prettier.
ordered.sort_by_key(|x| x.round());
ordered
}
}