-
Notifications
You must be signed in to change notification settings - Fork 296
/
driver.rs
180 lines (176 loc) · 6.86 KB
/
driver.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
pub use super::types::*;
use ic_artifact_pool::{
certification_pool::CertificationPoolImpl, consensus_pool::ConsensusPoolImpl,
dkg_pool::DkgPoolImpl, ecdsa_pool::EcdsaPoolImpl,
};
use ic_config::artifact_pool::ArtifactPoolConfig;
use ic_consensus::consensus::ConsensusGossipImpl;
use ic_interfaces::{
certification,
consensus_pool::{ChangeAction, ChangeSet as ConsensusChangeSet},
dkg::ChangeAction as DkgChangeAction,
ecdsa::{EcdsaChangeAction, EcdsaChangeSet},
p2p::consensus::{ChangeSetProducer, MutablePool},
};
use ic_logger::{debug, ReplicaLogger};
use ic_metrics::MetricsRegistry;
use ic_test_artifact_pool::ingress_pool::TestIngressPool;
use ic_types::{consensus::ConsensusMessage, NodeId};
use std::cell::RefCell;
use std::sync::{Arc, RwLock};
/// A helper that drives consensus using a separate consensus artifact pool.
impl<'a> ConsensusDriver<'a> {
/// Create a new ConsensusDriver with the given node id and consensus
/// component.
#[allow(clippy::too_many_arguments)]
pub fn new(
node_id: NodeId,
pool_config: ArtifactPoolConfig,
consensus: Box<dyn ChangeSetProducer<ConsensusPoolImpl, ChangeSet = ConsensusChangeSet>>,
consensus_gossip: ConsensusGossipImpl,
dkg: ic_consensus::dkg::DkgImpl,
ecdsa: Box<dyn ChangeSetProducer<EcdsaPoolImpl, ChangeSet = EcdsaChangeSet>>,
certifier: Box<
dyn ChangeSetProducer<CertificationPoolImpl, ChangeSet = certification::ChangeSet> + 'a,
>,
consensus_pool: Arc<RwLock<ConsensusPoolImpl>>,
dkg_pool: Arc<RwLock<DkgPoolImpl>>,
ecdsa_pool: Arc<RwLock<EcdsaPoolImpl>>,
logger: ReplicaLogger,
metrics_registry: MetricsRegistry,
) -> ConsensusDriver<'a> {
let ingress_pool = RefCell::new(TestIngressPool::new(node_id, pool_config.clone()));
let certification_pool = Arc::new(RwLock::new(CertificationPoolImpl::new(
node_id,
pool_config,
logger.clone(),
metrics_registry,
)));
let consensus_priority =
PriorityFnState::new(&consensus_gossip, &*consensus_pool.read().unwrap());
ConsensusDriver {
consensus,
consensus_gossip,
dkg,
ecdsa,
certifier,
logger,
consensus_pool,
certification_pool,
ingress_pool,
dkg_pool,
ecdsa_pool,
consensus_priority,
}
}
/// Run a single step of consensus, dkg, certification, and ecdsa by repeatedly
/// calling on_state_change and apply the changes until no more changes
/// occur.
///
/// Return a list of output messages produced in the process.
pub fn step(&self) -> Vec<InputMessage> {
let mut to_deliver = Vec::new();
loop {
let changeset = self
.consensus
.on_state_change(&*self.consensus_pool.read().unwrap());
if changeset.is_empty() {
break;
}
for change_action in &changeset {
match change_action {
// MoveToValidated are what we have received and verified.
// But we don't deliver them to peers.
ChangeAction::MoveToValidated(to_move) => {
debug_print_msg(&self.logger, "Receive", to_move);
}
// AddToValidated are what we have produced.
// We will deliver them to peers.
ChangeAction::AddToValidated(to_add) => {
debug_print_msg(&self.logger, "Deliver", &to_add.msg);
to_deliver.push(InputMessage::Consensus(to_add.msg.clone()));
}
_ => (),
}
}
self.consensus_pool
.write()
.unwrap()
.apply_changes(changeset);
}
loop {
let changeset = self.dkg.on_state_change(&*self.dkg_pool.read().unwrap());
if changeset.is_empty() {
break;
}
{
for change_action in &changeset {
if let DkgChangeAction::AddToValidated(to_add) = change_action {
debug!(self.logger, "Deliver {:?}", to_add);
to_deliver.push(InputMessage::Dkg(Box::new(to_add.clone())));
}
}
let dkg_pool = &mut self.dkg_pool.write().unwrap();
dkg_pool.apply_changes(changeset);
}
}
loop {
let changeset = self
.certifier
.on_state_change(&*self.certification_pool.read().unwrap());
if changeset.is_empty() {
break;
}
{
for change_action in &changeset {
if let certification::ChangeAction::AddToValidated(msg) = change_action {
debug!(self.logger, "Certification Message Deliver {:?}", msg);
to_deliver.push(InputMessage::Certification(msg.clone()));
}
if let certification::ChangeAction::MoveToValidated(msg) = change_action {
debug!(self.logger, "Certification Message Validated {:?}", msg);
}
}
let mut certification_pool = self.certification_pool.write().unwrap();
certification_pool.apply_changes(changeset);
}
}
loop {
let changeset = self
.ecdsa
.on_state_change(&*self.ecdsa_pool.read().unwrap());
if changeset.is_empty() {
break;
}
{
for change_action in &changeset {
match change_action {
EcdsaChangeAction::AddToValidated(msg) => {
debug!(self.logger, "Ecdsa Message Deliver {:?}", msg);
to_deliver.push(InputMessage::Ecdsa(msg.clone()));
}
EcdsaChangeAction::MoveToValidated(msg) => {
debug!(self.logger, "Ecdsa Message Validated {:?}", msg);
}
_ => {}
}
}
let mut ecdsa_pool = self.ecdsa_pool.write().unwrap();
ecdsa_pool.apply_changes(changeset);
}
}
to_deliver
}
}
fn debug_print_msg(logger: &ReplicaLogger, prefix: &str, msg: &ConsensusMessage) {
match msg {
ConsensusMessage::BlockProposal(x) => debug!(
logger,
"{} {:?}, blockhash = {:?}",
prefix,
msg,
x.content.get_hash()
),
_ => debug!(logger, "{} {:?}", prefix, msg),
}
}