forked from facebookresearch/narwhal
/
main.rs
141 lines (127 loc) · 5.29 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// Copyright(C) Facebook, Inc. and its affiliates.
use anyhow::{Context, Result};
use clap::{crate_name, crate_version, App, AppSettings, ArgMatches, SubCommand};
use config::Export as _;
use config::Import as _;
use config::{Committee, KeyPair, Parameters, WorkerId};
use consensus::Consensus;
use env_logger::Env;
use primary::{Certificate, Primary};
use store::Store;
use tokio::sync::mpsc::{channel, Receiver};
use worker::Worker;
/// The default channel capacity.
pub const CHANNEL_CAPACITY: usize = 1_000;
#[tokio::main]
async fn main() -> Result<()> {
let matches = App::new(crate_name!())
.version(crate_version!())
.about("A research implementation of Narwhal and Tusk.")
.args_from_usage("-v... 'Sets the level of verbosity'")
.subcommand(
SubCommand::with_name("generate_keys")
.about("Print a fresh key pair to file")
.args_from_usage("--filename=<FILE> 'The file where to print the new key pair'"),
)
.subcommand(
SubCommand::with_name("run")
.about("Run a node")
.args_from_usage("--keys=<FILE> 'The file containing the node keys'")
.args_from_usage("--committee=<FILE> 'The file containing committee information'")
.args_from_usage("--parameters=[FILE] 'The file containing the node parameters'")
.args_from_usage("--store=<PATH> 'The path where to create the data store'")
.subcommand(SubCommand::with_name("primary").about("Run a single primary"))
.subcommand(
SubCommand::with_name("worker")
.about("Run a single worker")
.args_from_usage("--id=<INT> 'The worker id'"),
)
.setting(AppSettings::SubcommandRequiredElseHelp),
)
.setting(AppSettings::SubcommandRequiredElseHelp)
.get_matches();
let log_level = match matches.occurrences_of("v") {
0 => "error",
1 => "warn",
2 => "info",
3 => "debug",
_ => "trace",
};
let mut logger = env_logger::Builder::from_env(Env::default().default_filter_or(log_level));
#[cfg(feature = "benchmark")]
logger.format_timestamp_millis();
logger.init();
match matches.subcommand() {
("generate_keys", Some(sub_matches)) => KeyPair::new()
.export(sub_matches.value_of("filename").unwrap())
.context("Failed to generate key pair")?,
("run", Some(sub_matches)) => run(sub_matches).await?,
_ => unreachable!(),
}
Ok(())
}
// Runs either a worker or a primary.
async fn run(matches: &ArgMatches<'_>) -> Result<()> {
let key_file = matches.value_of("keys").unwrap();
let committee_file = matches.value_of("committee").unwrap();
let parameters_file = matches.value_of("parameters");
let store_path = matches.value_of("store").unwrap();
// Read the committee and node's keypair from file.
let keypair = KeyPair::import(key_file).context("Failed to load the node's keypair")?;
let committee =
Committee::import(committee_file).context("Failed to load the committee information")?;
// Load default parameters if none are specified.
let parameters = match parameters_file {
Some(filename) => {
Parameters::import(filename).context("Failed to load the node's parameters")?
}
None => Parameters::default(),
};
// Make the data store.
let store = Store::new(store_path).context("Failed to create a store")?;
// Channels the sequence of certificates.
let (tx_output, rx_output) = channel(CHANNEL_CAPACITY);
// Check whether to run a primary, a worker, or an entire authority.
match matches.subcommand() {
// Spawn the primary and consensus core.
("primary", _) => {
let (tx_new_certificates, rx_new_certificates) = channel(CHANNEL_CAPACITY);
let (tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY);
Primary::spawn(
keypair,
committee.clone(),
parameters.clone(),
store,
/* tx_consensus */ tx_new_certificates,
/* rx_consensus */ rx_feedback,
);
Consensus::spawn(
committee,
parameters.gc_depth,
/* rx_primary */ rx_new_certificates,
/* tx_primary */ tx_feedback,
tx_output,
);
}
// Spawn a single worker.
("worker", Some(sub_matches)) => {
let id = sub_matches
.value_of("id")
.unwrap()
.parse::<WorkerId>()
.context("The worker id must be a positive integer")?;
Worker::spawn(keypair.name, id, committee, parameters, store);
}
_ => unreachable!(),
}
// Analyze the consensus' output.
analyze(rx_output).await;
// If this expression is reached, the program ends and all other tasks terminate.
unreachable!();
}
/// Receives an ordered list of certificates and apply any application-specific logic.
async fn analyze(mut rx_output: Receiver<Certificate>) {
while let Some(_certificate) = rx_output.recv().await {
// NOTE: Here goes the application logic.
}
}