@@ -18,7 +18,7 @@ use ic_interfaces::{
18
18
messaging:: { MessageRouting , MessageRoutingError } ,
19
19
} ;
20
20
use ic_interfaces_registry:: RegistryClient ;
21
- use ic_logger:: { debug, error, info, trace , warn, ReplicaLogger } ;
21
+ use ic_logger:: { debug, error, info, warn, ReplicaLogger } ;
22
22
use ic_management_canister_types:: SetupInitialDKGResponse ;
23
23
use ic_protobuf:: {
24
24
log:: consensus_log_entry:: v1:: ConsensusLogEntry ,
@@ -62,176 +62,176 @@ pub fn deliver_batches(
62
62
. unwrap_or ( finalized_height)
63
63
. min ( finalized_height) ;
64
64
65
- let mut h = message_routing. expected_batch_height ( ) ;
66
- if h == Height :: from ( 0 ) {
65
+ let mut height = message_routing. expected_batch_height ( ) ;
66
+ if height == Height :: from ( 0 ) {
67
67
return Ok ( Height :: from ( 0 ) ) ;
68
68
}
69
- let mut last_delivered_batch_height = h. decrement ( ) ;
70
- while h <= target_height {
71
- match ( pool. get_finalized_block ( h) , pool. get_random_tape ( h) ) {
72
- ( Some ( block) , Some ( tape) ) => {
73
- debug ! (
74
- every_n_seconds => 5 ,
75
- log,
76
- "Finalized height" ;
77
- consensus => ConsensusLogEntry {
78
- height: Some ( h. get( ) ) ,
79
- hash: Some ( get_block_hash_string( & block) ) ,
80
- replica_version: Some ( String :: from( current_replica_version. clone( ) ) )
81
- }
82
- ) ;
83
-
84
- if block. payload . is_summary ( ) {
85
- info ! ( log, "Delivering finalized batch at CUP height of {}" , h) ;
86
- }
87
- // When we are not delivering CUP block, we must check if the subnet is halted.
88
- else {
89
- match status:: get_status ( h, registry_client, subnet_id, pool, log) {
90
- Some ( Status :: Halting | Status :: Halted ) => {
91
- debug ! (
92
- every_n_seconds => 5 ,
93
- log,
94
- "Batch of height {} is not delivered because replica is halted" ,
95
- h,
96
- ) ;
97
- return Ok ( last_delivered_batch_height) ;
98
- }
99
- Some ( Status :: Running ) => { }
100
- None => {
101
- warn ! (
102
- log,
103
- "Skipping batch delivery because checking if replica is halted failed" ,
104
- ) ;
105
- return Ok ( last_delivered_batch_height) ;
106
- }
107
- }
108
- }
109
-
110
- let randomness = Randomness :: from ( crypto_hashable_to_seed ( & tape) ) ;
111
-
112
- let ecdsa_subnet_public_keys = match get_ecdsa_subnet_public_key ( & block, pool, log)
113
- {
114
- Ok ( keys) => keys,
115
- Err ( e) => {
116
- // Do not deliver batch if we can't find a previous summary block,
117
- // this means we should continue with the latest CUP.
118
- warn ! (
119
- every_n_seconds => 5 ,
120
- log,
121
- "Do not deliver height {:?}: {}" , h, e
122
- ) ;
123
- return Ok ( last_delivered_batch_height) ;
124
- }
125
- } ;
126
-
127
- let block_stats = BlockStats :: from ( & block) ;
128
- let mut batch_stats = BatchStats :: new ( h) ;
129
-
130
- // Compute consensus' responses to subnet calls.
131
- let consensus_responses =
132
- generate_responses_to_subnet_calls ( & block, & mut batch_stats, log) ;
133
-
134
- // This flag can only be true, if we've called deliver_batches with a height
135
- // limit. In this case we also want to have a checkpoint for that last height.
136
- let persist_batch = Some ( h) == max_batch_height_to_deliver;
137
- let requires_full_state_hash = block. payload . is_summary ( ) || persist_batch;
138
- let batch_messages = if block. payload . is_summary ( ) {
139
- BatchMessages :: default ( )
140
- } else {
141
- let batch_payload = & block. payload . as_ref ( ) . as_data ( ) . batch ;
142
- batch_stats. add_from_payload ( batch_payload) ;
143
- batch_payload
144
- . clone ( )
145
- . into_messages ( )
146
- . map_err ( |err| {
147
- error ! ( log, "batch payload deserialization failed: {:?}" , err) ;
148
- err
149
- } )
150
- . unwrap_or_default ( )
151
- } ;
69
+ let mut last_delivered_batch_height = height. decrement ( ) ;
70
+ while height <= target_height {
71
+ let Some ( block) = pool. get_finalized_block ( height) else {
72
+ warn ! (
73
+ every_n_seconds => 30 ,
74
+ log,
75
+ "Do not deliver height {} because no finalized block was found. \
76
+ This should indicate we are waiting for state sync. \
77
+ Finalized height: {}",
78
+ height,
79
+ finalized_height
80
+ ) ;
81
+ break ;
82
+ } ;
83
+ let Some ( tape) = pool. get_random_tape ( height) else {
84
+ // Do not deliver batch if we don't have random tape
85
+ warn ! (
86
+ every_n_seconds => 30 ,
87
+ log,
88
+ "Do not deliver height {} because RandomTape is not ready. Will re-try later" ,
89
+ height
90
+ ) ;
91
+ break ;
92
+ } ;
93
+ debug ! (
94
+ every_n_seconds => 5 ,
95
+ log,
96
+ "Finalized height" ;
97
+ consensus => ConsensusLogEntry {
98
+ height: Some ( height. get( ) ) ,
99
+ hash: Some ( get_block_hash_string( & block) ) ,
100
+ replica_version: Some ( String :: from( current_replica_version. clone( ) ) )
101
+ }
102
+ ) ;
152
103
153
- let Some ( previous_beacon) = pool. get_random_beacon ( last_delivered_batch_height)
154
- else {
155
- warn ! (
104
+ if block. payload . is_summary ( ) {
105
+ info ! (
106
+ log,
107
+ "Delivering finalized batch at CUP height of {}" , height
108
+ ) ;
109
+ }
110
+ // When we are not delivering CUP block, we must check if the subnet is halted.
111
+ else {
112
+ match status:: get_status ( height, registry_client, subnet_id, pool, log) {
113
+ Some ( Status :: Halting | Status :: Halted ) => {
114
+ debug ! (
156
115
every_n_seconds => 5 ,
157
116
log,
158
- "No batch delivery at height {}: no random beacon found. " ,
159
- h
117
+ "Batch of height {} is not delivered because replica is halted " ,
118
+ height ,
160
119
) ;
161
120
return Ok ( last_delivered_batch_height) ;
162
- } ;
163
- let blockmaker_ranking = match membership. get_shuffled_nodes (
164
- block. height ,
165
- & previous_beacon,
166
- & ic_crypto_prng:: RandomnessPurpose :: BlockmakerRanking ,
167
- ) {
168
- Ok ( nodes) => nodes,
169
- Err ( e) => {
170
- warn ! (
171
- every_n_seconds => 5 ,
172
- log,
173
- "No batch delivery at height {}: membership error: {:?}" ,
174
- h,
175
- e
176
- ) ;
177
- return Ok ( last_delivered_batch_height) ;
178
- }
179
- } ;
180
- let blockmaker_metrics = BlockmakerMetrics {
181
- blockmaker : blockmaker_ranking[ block. rank . 0 as usize ] ,
182
- failed_blockmakers : blockmaker_ranking[ 0 ..( block. rank . 0 as usize ) ] . to_vec ( ) ,
183
- } ;
184
-
185
- let batch = Batch {
186
- batch_number : h,
187
- requires_full_state_hash,
188
- messages : batch_messages,
189
- randomness,
190
- ecdsa_subnet_public_keys,
191
- ecdsa_quadruple_ids : get_quadruple_ids_to_deliver ( & block) ,
192
- registry_version : block. context . registry_version ,
193
- time : block. context . time ,
194
- consensus_responses,
195
- blockmaker_metrics,
196
- } ;
197
-
198
- debug ! (
199
- log,
200
- "replica {:?} delivered batch {:?} for block_hash {:?}" ,
201
- current_replica_version,
202
- batch_stats. batch_height,
203
- block_stats. block_hash
204
- ) ;
205
- let result = message_routing. deliver_batch ( batch) ;
206
- if let Some ( f) = result_processor {
207
- f ( & result, block_stats, batch_stats) ;
208
121
}
209
- if let Err ( err) = result {
210
- warn ! ( every_n_seconds => 5 , log, "Batch delivery failed: {:?}" , err) ;
211
- return Err ( err) ;
122
+ Some ( Status :: Running ) => { }
123
+ None => {
124
+ warn ! (
125
+ log,
126
+ "Skipping batch delivery because checking if replica is halted failed" ,
127
+ ) ;
128
+ return Ok ( last_delivered_batch_height) ;
212
129
}
213
- last_delivered_batch_height = h;
214
- h = h. increment ( ) ;
215
130
}
216
- ( None , _) => {
217
- trace ! (
131
+ }
132
+
133
+ let randomness = Randomness :: from ( crypto_hashable_to_seed ( & tape) ) ;
134
+
135
+ let ecdsa_subnet_public_keys = match get_ecdsa_subnet_public_key ( & block, pool, log) {
136
+ Ok ( keys) => keys,
137
+ Err ( e) => {
138
+ // Do not deliver batch if we can't find a previous summary block,
139
+ // this means we should continue with the latest CUP.
140
+ warn ! (
141
+ every_n_seconds => 5 ,
218
142
log,
219
- "Do not deliver height {:?} because no finalized block was found. \
220
- This should indicate we are waiting for state sync.",
221
- h
143
+ "Do not deliver height {:?}: {}" , height, e
222
144
) ;
223
- break ;
145
+ return Ok ( last_delivered_batch_height ) ;
224
146
}
225
- ( _, None ) => {
226
- // Do not deliver batch if we don't have random tape
227
- trace ! (
147
+ } ;
148
+
149
+ let block_stats = BlockStats :: from ( & block) ;
150
+ let mut batch_stats = BatchStats :: new ( height) ;
151
+
152
+ // Compute consensus' responses to subnet calls.
153
+ let consensus_responses = generate_responses_to_subnet_calls ( & block, & mut batch_stats, log) ;
154
+
155
+ // This flag can only be true, if we've called deliver_batches with a height
156
+ // limit. In this case we also want to have a checkpoint for that last height.
157
+ let persist_batch = Some ( height) == max_batch_height_to_deliver;
158
+ let requires_full_state_hash = block. payload . is_summary ( ) || persist_batch;
159
+ let batch_messages = if block. payload . is_summary ( ) {
160
+ BatchMessages :: default ( )
161
+ } else {
162
+ let batch_payload = & block. payload . as_ref ( ) . as_data ( ) . batch ;
163
+ batch_stats. add_from_payload ( batch_payload) ;
164
+ batch_payload
165
+ . clone ( )
166
+ . into_messages ( )
167
+ . map_err ( |err| {
168
+ error ! ( log, "batch payload deserialization failed: {:?}" , err) ;
169
+ err
170
+ } )
171
+ . unwrap_or_default ( )
172
+ } ;
173
+
174
+ let Some ( previous_beacon) = pool. get_random_beacon ( last_delivered_batch_height) else {
175
+ warn ! (
176
+ every_n_seconds => 5 ,
177
+ log,
178
+ "No batch delivery at height {}: no random beacon found." ,
179
+ height
180
+ ) ;
181
+ return Ok ( last_delivered_batch_height) ;
182
+ } ;
183
+ let blockmaker_ranking = match membership. get_shuffled_nodes (
184
+ block. height ,
185
+ & previous_beacon,
186
+ & ic_crypto_prng:: RandomnessPurpose :: BlockmakerRanking ,
187
+ ) {
188
+ Ok ( nodes) => nodes,
189
+ Err ( e) => {
190
+ warn ! (
191
+ every_n_seconds => 5 ,
228
192
log,
229
- "Do not deliver height {:?} because RandomTape is not ready. Will re-try later" ,
230
- h
193
+ "No batch delivery at height {}: membership error: {:?}" ,
194
+ height,
195
+ e
231
196
) ;
232
- break ;
197
+ return Ok ( last_delivered_batch_height ) ;
233
198
}
199
+ } ;
200
+ let blockmaker_metrics = BlockmakerMetrics {
201
+ blockmaker : blockmaker_ranking[ block. rank . 0 as usize ] ,
202
+ failed_blockmakers : blockmaker_ranking[ 0 ..( block. rank . 0 as usize ) ] . to_vec ( ) ,
203
+ } ;
204
+
205
+ let batch = Batch {
206
+ batch_number : height,
207
+ requires_full_state_hash,
208
+ messages : batch_messages,
209
+ randomness,
210
+ ecdsa_subnet_public_keys,
211
+ ecdsa_quadruple_ids : get_quadruple_ids_to_deliver ( & block) ,
212
+ registry_version : block. context . registry_version ,
213
+ time : block. context . time ,
214
+ consensus_responses,
215
+ blockmaker_metrics,
216
+ } ;
217
+
218
+ debug ! (
219
+ log,
220
+ "replica {:?} delivered batch {:?} for block_hash {:?}" ,
221
+ current_replica_version,
222
+ batch_stats. batch_height,
223
+ block_stats. block_hash
224
+ ) ;
225
+ let result = message_routing. deliver_batch ( batch) ;
226
+ if let Some ( f) = result_processor {
227
+ f ( & result, block_stats, batch_stats) ;
228
+ }
229
+ if let Err ( err) = result {
230
+ warn ! ( every_n_seconds => 5 , log, "Batch delivery failed: {:?}" , err) ;
231
+ return Err ( err) ;
234
232
}
233
+ last_delivered_batch_height = height;
234
+ height = height. increment ( ) ;
235
235
}
236
236
Ok ( last_delivered_batch_height)
237
237
}
0 commit comments