-
Notifications
You must be signed in to change notification settings - Fork 296
/
lib.rs
3760 lines (3379 loc) · 139 KB
/
lib.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Needs to be `pub` so that the benchmarking code in `state_benches`
// can access it.
pub mod checkpoint;
pub mod labeled_tree_visitor;
pub mod manifest;
pub mod split;
pub mod state_sync;
pub mod stream_encoding;
pub mod tip;
pub mod tree_diff;
pub mod tree_hash;
use crate::{
manifest::compute_bundled_manifest,
state_sync::chunkable::cache::StateSyncCache,
tip::{spawn_tip_thread, PageMapToFlush, TipRequest},
};
use crossbeam_channel::{unbounded, Sender};
use ic_base_types::CanisterId;
use ic_canonical_state::lazy_tree_conversion::replicated_state_as_lazy_tree;
use ic_canonical_state_tree_hash::{
hash_tree::{hash_lazy_tree, HashTree, HashTreeError},
lazy_tree::materialize::materialize_partial,
};
use ic_config::flag_status::FlagStatus;
use ic_config::state_manager::Config;
use ic_crypto_tree_hash::{recompute_digest, Digest, LabeledTree, MixedHashTree, Witness};
use ic_interfaces::certification::Verifier;
use ic_interfaces_certified_stream_store::{
CertifiedStreamStore, DecodeStreamError, EncodeStreamError,
};
use ic_interfaces_state_manager::{
CertificationMask, CertificationScope, CertifiedStateSnapshot, Labeled,
PermanentStateHashError::*, StateHashError, StateManager, StateManagerError,
StateManagerResult, StateReader, TransientStateHashError::*, CERT_CERTIFIED, CERT_UNCERTIFIED,
};
use ic_logger::{debug, error, fatal, info, warn, ReplicaLogger};
use ic_metrics::{buckets::decimal_buckets, MetricsRegistry};
use ic_protobuf::proxy::{ProtoProxy, ProxyDecodeError};
use ic_protobuf::{messaging::xnet::v1, state::v1 as pb};
use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::{
canister_state::execution_state::SandboxMemory,
page_map::{PersistenceError, StorageMetrics},
PageIndex, PageMap, ReplicatedState,
};
use ic_state_layout::{error::LayoutError, AccessPolicy, CheckpointLayout, ReadOnly, StateLayout};
use ic_types::{
consensus::certification::Certification,
crypto::CryptoHash,
malicious_flags::MaliciousFlags,
state_sync::{FileGroupChunks, Manifest, MetaManifest, CURRENT_STATE_SYNC_VERSION},
xnet::{CertifiedStreamSlice, StreamIndex, StreamSlice},
CryptoHashOfPartialState, CryptoHashOfState, Height, RegistryVersion, SubnetId,
};
use ic_utils::thread::JoinOnDrop;
use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge};
use prost::Message;
use std::convert::{From, TryFrom};
use std::fs::File;
use std::fs::OpenOptions;
use std::path::{Path, PathBuf};
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use std::time::{Duration, Instant, SystemTime};
use std::{
collections::{BTreeMap, BTreeSet, VecDeque},
sync::Mutex,
};
use ic_replicated_state::page_map::PageAllocatorFileDescriptor;
use std::os::unix::io::RawFd;
use std::os::unix::prelude::IntoRawFd;
use tempfile::tempfile;
use uuid::Uuid;
/// The number of threads that state manager starts to construct checkpoints.
/// It is exported as public for use in tests and benchmarks.
pub const NUMBER_OF_CHECKPOINT_THREADS: u32 = 16;
/// Critical error tracking mismatches between reused and recomputed chunk
/// hashes during manifest computation.
const CRITICAL_ERROR_REUSED_CHUNK_HASH: &str =
"state_manager_manifest_reused_chunk_hash_error_count";
/// Critical error tracking unexpectedly corrupted chunks.
const CRITICAL_ERROR_STATE_SYNC_CORRUPTED_CHUNKS: &str = "state_sync_corrupted_chunks";
/// Critical error tracking that chunk ID space usage of any state sync chunk type is nearing the limit.
const CRITICAL_ERROR_CHUNK_ID_USAGE_NEARING_LIMITS: &str =
"state_sync_chunk_id_usage_nearing_limits";
/// Critical error tracking broken soft invariants encountered upon checkpoint loading.
/// See note [Replicated State Invariants].
pub(crate) const CRITICAL_ERROR_CHECKPOINT_SOFT_INVARIANT_BROKEN: &str =
"state_manager_checkpoint_soft_invariant_broken";
/// How long to keep archived and diverged states.
const ARCHIVED_DIVERGED_CHECKPOINT_MAX_AGE: Duration = Duration::from_secs(30 * 24 * 60 * 60); // 30 days
/// Write an overlay file this many rounds before each checkpoint.
const NUM_ROUNDS_BEFORE_CHECKPOINT_TO_WRITE_OVERLAY: u64 = 50;
/// Labels for manifest metrics
const LABEL_TYPE: &str = "type";
const LABEL_VALUE_HASHED: &str = "hashed";
const LABEL_VALUE_HASHED_AND_COMPARED: &str = "hashed_and_compared";
const LABEL_VALUE_REUSED: &str = "reused";
/// Labels for state sync metrics
const LABEL_FETCH: &str = "fetch";
const LABEL_COPY_FILES: &str = "copy_files";
const LABEL_COPY_CHUNKS: &str = "copy_chunks";
const LABEL_PREALLOCATE: &str = "preallocate";
const LABEL_STATE_SYNC_MAKE_CHECKPOINT: &str = "state_sync_make_checkpoint";
/// Labels for slice validation metrics
const LABEL_VERIFY_SIG: &str = "verify";
const LABEL_CMP_HASH: &str = "compare";
const LABEL_VALUE_SUCCESS: &str = "success";
const LABEL_VALUE_FAILURE: &str = "failure";
#[derive(Clone)]
pub struct StateManagerMetrics {
state_manager_error_count: IntCounterVec,
checkpoint_op_duration: HistogramVec,
api_call_duration: HistogramVec,
last_diverged_state_timestamp: IntGauge,
latest_certified_height: IntGauge,
max_resident_height: IntGauge,
min_resident_height: IntGauge,
last_computed_manifest_height: IntGauge,
resident_state_count: IntGauge,
checkpoints_on_disk_count: IntGauge,
state_sync_metrics: StateSyncMetrics,
state_size: IntGauge,
states_metadata_pbuf_size: IntGauge,
checkpoint_metrics: CheckpointMetrics,
manifest_metrics: ManifestMetrics,
tip_handler_queue_length: IntGauge,
decode_slice_status: IntCounterVec,
height_update_time_seconds: Histogram,
storage_metrics: StorageMetrics,
latest_hash_tree_size: IntGauge,
latest_hash_tree_max_index: IntGauge,
}
#[derive(Clone)]
pub struct ManifestMetrics {
chunk_bytes: IntCounterVec,
reused_chunk_hash_error_count: IntCounter,
manifest_size: IntGauge,
chunk_table_length: IntGauge,
file_group_chunks: IntGauge,
sub_manifest_chunks: IntGauge,
chunk_id_usage_nearing_limits_critical: IntCounter,
}
#[derive(Clone)]
pub struct StateSyncMetrics {
size: IntCounterVec,
duration: HistogramVec,
step_duration: HistogramVec,
remaining: IntGauge,
corrupted_chunks_critical: IntCounter,
corrupted_chunks: IntCounterVec,
}
#[derive(Clone)]
pub struct CheckpointMetrics {
make_checkpoint_step_duration: HistogramVec,
load_checkpoint_step_duration: HistogramVec,
load_canister_step_duration: HistogramVec,
load_checkpoint_soft_invariant_broken: IntCounter,
tip_handler_request_duration: HistogramVec,
page_map_flushes: IntCounter,
page_map_flush_skips: IntCounter,
log: ReplicaLogger,
}
impl CheckpointMetrics {
pub fn new(metrics_registry: &MetricsRegistry, replica_logger: ReplicaLogger) -> Self {
let make_checkpoint_step_duration = metrics_registry.histogram_vec(
"state_manager_checkpoint_steps_duration_seconds",
"Duration of make_checkpoint steps in seconds.",
// 1ms, 2ms, 5ms, 10ms, 20ms, 50ms, …, 10s, 20s, 50s
decimal_buckets(-3, 1),
&["step"],
);
let load_checkpoint_step_duration = metrics_registry.histogram_vec(
"state_manager_load_checkpoint_steps_duration_seconds",
"Duration of load_checkpoint steps in seconds.",
// 1ms, 2ms, 5ms, 10ms, 20ms, 50ms, …, 10s, 20s, 50s
decimal_buckets(-3, 1),
&["step"],
);
let load_canister_step_duration = metrics_registry.histogram_vec(
"state_manager_load_canister_steps_duration_seconds",
"Duration of load_canister_state steps in seconds.",
// 1ms, 2ms, 5ms, 10ms, 20ms, 50ms, …, 10s, 20s, 50s
decimal_buckets(-3, 1),
&["step"],
);
let load_checkpoint_soft_invariant_broken =
metrics_registry.error_counter(CRITICAL_ERROR_CHECKPOINT_SOFT_INVARIANT_BROKEN);
let tip_handler_request_duration = metrics_registry.histogram_vec(
"state_manager_tip_handler_request_duration_seconds",
"Duration to execute requests to Tip handling thread in seconds.",
// 1ms, 2ms, 5ms, 10ms, 20ms, 50ms, …, 10s, 20s, 50s
decimal_buckets(-3, 1),
&["request"],
);
let page_map_flushes = metrics_registry.int_counter(
"state_manager_page_map_flushes",
"Amount of sent FlushPageMap requests.",
);
let page_map_flush_skips = metrics_registry.int_counter(
"state_manager_page_map_flush_skips",
"Amount of FlushPageMap requests that were skipped.",
);
Self {
make_checkpoint_step_duration,
load_checkpoint_step_duration,
load_canister_step_duration,
load_checkpoint_soft_invariant_broken,
tip_handler_request_duration,
page_map_flushes,
page_map_flush_skips,
log: replica_logger,
}
}
}
// Note [Metrics preallocation]
// ============================
//
// If vectorized metrics are used for events that happen rarely (like state
// sync), it becomes a challenge to visualize them. As Prometheus doesn't know
// which label values are going to be used in advance, the values are simply
// missing until they are set for the first time. This leads to
// rate(metric[period]) returning 0 because the value switched from NONE to,
// say, 1, not from 0 to 1. So only the next update of the metric will result
// in a meaningful rate, which in the case of state sync might never appear.
//
// In order to solve this, we "preallocate" metrics with values of labels we
// expect to see. This makes initial vectorized metric values equal to 0, so the
// very first metric update should be visible to Prometheus.
impl StateManagerMetrics {
fn new(metrics_registry: &MetricsRegistry, log: ReplicaLogger) -> Self {
let checkpoint_op_duration = metrics_registry.histogram_vec(
"state_manager_checkpoint_op_duration_seconds",
"Duration of checkpoint operations in seconds.",
// 1ms, 2ms, 5ms, 10ms, 20ms, 50ms, …, 10s, 20s, 50s
decimal_buckets(-3, 1),
&["op"],
);
for op in &["compute_manifest", "create"] {
checkpoint_op_duration.with_label_values(&[*op]);
}
let api_call_duration = metrics_registry.histogram_vec(
"state_manager_api_call_duration_seconds",
"Duration of a StateManager API call in seconds.",
// 1ms, 2ms, 5ms, 10ms, 20ms, 50ms, …, 10s, 20s, 50s
decimal_buckets(-3, 1),
&["op"],
);
let state_manager_error_count = metrics_registry.int_counter_vec(
"state_manager_error_count",
"Total number of errors encountered in the state manager.",
&["source"],
);
let last_diverged_state_timestamp = metrics_registry.int_gauge(
"state_manager_last_diverged_state_timestamp_seconds",
"The (UTC) timestamp of the last diverged state report.",
);
let latest_certified_height = metrics_registry.int_gauge(
"state_manager_latest_certified_height",
"Height of the latest certified state.",
);
let min_resident_height = metrics_registry.int_gauge(
"state_manager_min_resident_height",
"Height of the oldest state resident in memory.",
);
let max_resident_height = metrics_registry.int_gauge(
"state_manager_max_resident_height",
"Height of the latest state resident in memory.",
);
let resident_state_count = metrics_registry.int_gauge(
"state_manager_resident_state_count",
"Total count of states loaded to memory by the state manager.",
);
let checkpoints_on_disk_count = metrics_registry.int_gauge(
"state_manager_checkpoints_on_disk_count",
"Number of checkpoints on disk, independent of if they are loaded or not.",
);
let last_computed_manifest_height = metrics_registry.int_gauge(
"state_manager_last_computed_manifest_height",
"Height of the last checkpoint we computed manifest for.",
);
let state_size = metrics_registry.int_gauge(
"state_manager_state_size_bytes",
"Total size of the state on disk in bytes.",
);
let states_metadata_pbuf_size = metrics_registry.int_gauge(
"state_manager_states_metadata_pbuf_size_bytes",
"Size of states_metadata.pbuf in bytes.",
);
let tip_handler_queue_length = metrics_registry.int_gauge(
"state_manager_tip_handler_queue_length",
"Length of TipChannel queue.",
);
let decode_slice_status = metrics_registry.int_counter_vec(
"state_manager_decode_slice",
"Statuses of slice decoding.",
&["op", "status"],
);
// Initialize all `decode_slice_status` counters with zero, so they are all
// exported from process start (`IntCounterVec` is really a map).
for op in &[LABEL_VERIFY_SIG, LABEL_CMP_HASH] {
for status in &[LABEL_VALUE_SUCCESS, LABEL_VALUE_FAILURE] {
decode_slice_status.with_label_values(&[op, status]);
}
}
let height_update_time_seconds = metrics_registry.histogram(
"state_manager_height_update_time_seconds",
"Time between invocations of commit_and_certify that update height.",
// 1s, 2s, 5s, 10s, …, 100s, 200s, 500s
decimal_buckets(0, 2),
);
let latest_hash_tree_size = metrics_registry.int_gauge(
"state_manager_latest_hash_tree_size",
"Number of digests in the latest hash tree.",
);
let latest_hash_tree_max_index = metrics_registry.int_gauge(
"state_manager_latest_hash_tree_max_index",
"Largest index in the latest hash tree.",
);
Self {
state_manager_error_count,
checkpoint_op_duration,
api_call_duration,
last_diverged_state_timestamp,
latest_certified_height,
max_resident_height,
min_resident_height,
last_computed_manifest_height,
resident_state_count,
checkpoints_on_disk_count,
state_sync_metrics: StateSyncMetrics::new(metrics_registry),
state_size,
states_metadata_pbuf_size,
checkpoint_metrics: CheckpointMetrics::new(metrics_registry, log),
manifest_metrics: ManifestMetrics::new(metrics_registry),
tip_handler_queue_length,
decode_slice_status,
height_update_time_seconds,
storage_metrics: StorageMetrics::new(metrics_registry),
latest_hash_tree_size,
latest_hash_tree_max_index,
}
}
/// Records a decode slice status for `label`.
fn observe_decode_slice(&self, operation: &str, success: bool) {
let status = if success {
LABEL_VALUE_SUCCESS
} else {
LABEL_VALUE_FAILURE
};
self.decode_slice_status
.with_label_values(&[operation, status])
.inc();
}
/// Records a decode slice status for a signature verification.
fn observe_decode_slice_signature_verification(&self, success: bool) {
self.observe_decode_slice(LABEL_VERIFY_SIG, success);
}
/// Records a decode slice status for a comparison of a tree's root
/// hash and the hash in the corresponding certification.
fn observe_decode_slice_hash_comparison(&self, success: bool) {
self.observe_decode_slice(LABEL_CMP_HASH, success);
}
}
impl ManifestMetrics {
pub fn new(metrics_registry: &MetricsRegistry) -> Self {
let chunk_bytes = metrics_registry.int_counter_vec(
"state_manager_manifest_chunk_bytes",
"Size of chunks in manifest by hash type ('reused', 'hashed', 'hashed_and_compared') during all manifest computations in bytes.",
&[LABEL_TYPE],
);
for tp in &[
LABEL_VALUE_REUSED,
LABEL_VALUE_HASHED,
LABEL_VALUE_HASHED_AND_COMPARED,
] {
chunk_bytes.with_label_values(&[*tp]);
}
let manifest_size = metrics_registry.int_gauge(
"state_manager_manifest_state_size_bytes",
"Size of the encoded manifest in bytes.",
);
let chunk_table_length = metrics_registry.int_gauge(
"state_manager_manifest_chunk_table_length",
"Number of chunks in the manifest chunk table.",
);
let file_group_chunks = metrics_registry.int_gauge(
"state_manager_file_group_chunks",
"Number of virtual chunks containing the grouped small files.",
);
let sub_manifest_chunks = metrics_registry.int_gauge(
"state_manager_sub_manifest_chunks",
"Number of chunks of the manifest after it is encoded and split into sub-manifests.",
);
Self {
// Number of bytes that are either reused, hashed, or hashed and compared during the
// manifest computation
chunk_bytes,
// Count of the chunks which have a mismatch between the recomputed hash and the reused
// one.
reused_chunk_hash_error_count: metrics_registry
.error_counter(CRITICAL_ERROR_REUSED_CHUNK_HASH),
manifest_size,
chunk_table_length,
file_group_chunks,
sub_manifest_chunks,
chunk_id_usage_nearing_limits_critical: metrics_registry
.error_counter(CRITICAL_ERROR_CHUNK_ID_USAGE_NEARING_LIMITS),
}
}
}
impl StateSyncMetrics {
pub fn new(metrics_registry: &MetricsRegistry) -> Self {
let size = metrics_registry.int_counter_vec(
"state_sync_size_bytes_total",
"Size of chunks synchronized by different operations ('fetch', 'copy_files', 'copy_chunks', 'preallocate') during all the state sync in bytes.",
&["op"],
);
// Note [Metrics preallocation]
for op in &[
LABEL_FETCH,
LABEL_COPY_FILES,
LABEL_COPY_CHUNKS,
LABEL_PREALLOCATE,
] {
size.with_label_values(&[*op]);
}
let remaining = metrics_registry.int_gauge(
"state_sync_remaining_chunks",
"Number of chunks not synchronized yet of all active state syncs",
);
let duration = metrics_registry.histogram_vec(
"state_sync_duration_seconds",
"Duration of state sync in seconds indexed by status ('ok', 'already_exists', 'unrecoverable', 'io_err', 'aborted', 'aborted_blank').",
// 1s, 2s, 5s, 10s, 20s, 50s, …, 1000s, 2000s, 5000s
decimal_buckets(0, 3),
&["status"],
);
// Note [Metrics preallocation]
for status in &[
"ok",
"already_exists",
"unrecoverable",
"io_err",
"aborted",
"aborted_blank",
] {
duration.with_label_values(&[*status]);
}
let step_duration = metrics_registry.histogram_vec(
"state_sync_step_duration_seconds",
"Duration of state sync sub-steps in seconds indexed by step ('copy_files', 'copy_chunks', 'fetch', 'state_sync_make_checkpoint')",
// 0.1s, 0.2s, 0.5s, 1s, 2s, 5s, …, 1000s, 2000s, 5000s
decimal_buckets(-1, 3),
&["step"],
);
// Note [Metrics preallocation]
for step in &[
LABEL_COPY_FILES,
LABEL_COPY_CHUNKS,
LABEL_FETCH,
LABEL_STATE_SYNC_MAKE_CHECKPOINT,
] {
step_duration.with_label_values(&[*step]);
}
let corrupted_chunks_critical =
metrics_registry.error_counter(CRITICAL_ERROR_STATE_SYNC_CORRUPTED_CHUNKS);
let corrupted_chunks = metrics_registry.int_counter_vec(
"state_sync_corrupted_chunks",
"Number of chunks not copied during state sync due to hash mismatch by source ('fetch', copy_files', 'copy_chunks')",
&["source"],
);
// Note [Metrics preallocation]
for source in &[LABEL_FETCH, LABEL_COPY_FILES, LABEL_COPY_CHUNKS] {
corrupted_chunks.with_label_values(&[*source]);
}
Self {
size,
duration,
step_duration,
remaining,
corrupted_chunks_critical,
corrupted_chunks,
}
}
}
type StatesMetadata = BTreeMap<Height, StateMetadata>;
type CertificationsMetadata = BTreeMap<Height, CertificationMetadata>;
/// This struct bundles the root hash, manifest and meta-manifest.
#[derive(Debug, Clone)]
pub(crate) struct BundledManifest {
root_hash: CryptoHashOfState,
manifest: Manifest,
// `meta_manifest` will be used during state sync in future replica versions.
#[allow(dead_code)]
meta_manifest: Arc<MetaManifest>,
}
#[derive(Debug, Default, Clone)]
struct StateMetadata {
/// We don't persist the checkpoint layout because we re-create it every
/// time we discover a checkpoint on disk.
checkpoint_layout: Option<CheckpointLayout<ReadOnly>>,
/// Manifest and root hash are computed asynchronously, so the bundle is set to
/// None before the values are computed.
bundled_manifest: Option<BundledManifest>,
/// The field is set as `None` until we serve a state sync for the first time.
state_sync_file_group: Option<Arc<FileGroupChunks>>,
}
impl StateMetadata {
pub fn root_hash(&self) -> Option<&CryptoHashOfState> {
self.bundled_manifest
.as_ref()
.map(|bundled_manifest| &bundled_manifest.root_hash)
}
pub fn manifest(&self) -> Option<&Manifest> {
self.bundled_manifest
.as_ref()
.map(|bundled_manifest| &bundled_manifest.manifest)
}
// `meta_manifest` will be used during state sync in future replica versions.
#[allow(dead_code)]
pub fn meta_manifest(&self) -> Option<Arc<MetaManifest>> {
self.bundled_manifest
.as_ref()
.map(|bundled_manifest| bundled_manifest.meta_manifest.clone())
}
}
impl From<&StateMetadata> for pb::StateMetadata {
fn from(metadata: &StateMetadata) -> Self {
Self {
manifest: metadata.manifest().map(|m| m.clone().into()),
}
}
}
impl TryFrom<pb::StateMetadata> for StateMetadata {
type Error = ProxyDecodeError;
fn try_from(proto: pb::StateMetadata) -> Result<Self, ProxyDecodeError> {
match proto.manifest {
None => Ok(Default::default()),
Some(manifest) => {
let manifest = Manifest::try_from(manifest)?;
let bundled_manifest = compute_bundled_manifest(manifest);
Ok(Self {
checkpoint_layout: None,
bundled_manifest: Some(bundled_manifest),
state_sync_file_group: None,
})
}
}
}
}
/// This type holds per-height metadata related to certification.
#[derive(Debug)]
struct CertificationMetadata {
/// Fully materialized hash tree built from the part of the state that is
/// certified every round. Dropped as soon as a higher state is certified.
hash_tree: Option<Arc<HashTree>>,
/// Root hash of the tree above. It's stored even if the hash tree is
/// dropped.
certified_state_hash: CryptoHash,
/// Certification of the root hash delivered by consensus via
/// `deliver_state_certification()`.
certification: Option<Certification>,
}
fn crypto_hash_of_partial_state(d: &Digest) -> CryptoHashOfPartialState {
CryptoHashOfPartialState::from(CryptoHash(d.0.to_vec()))
}
#[derive(Clone)]
pub struct Snapshot {
pub height: Height,
pub state: Arc<ReplicatedState>,
}
/// StateSyncRefs keeps track of the ongoing and aborted state syncs.
#[derive(Clone)]
pub struct StateSyncRefs {
/// IncompleteState adds the corresponding height to StateSyncRefs when
/// it's constructed and removes the height from active syncs when it's
/// dropped.
/// The priority function for state sync artifacts uses this information on
/// to prioritize state fetches.
active: Arc<parking_lot::RwLock<BTreeMap<Height, CryptoHashOfState>>>,
/// A cache of chunks from a previously aborted IncompleteState. State syncs
/// can take chunks from the cache instead of fetching them from other nodes
/// when possible.
cache: Arc<parking_lot::RwLock<StateSyncCache>>,
}
impl StateSyncRefs {
fn new(log: ReplicaLogger) -> Self {
Self {
active: Arc::new(parking_lot::RwLock::new(BTreeMap::new())),
cache: Arc::new(parking_lot::RwLock::new(StateSyncCache::new(log))),
}
}
/// Get the hash of the active sync at `height`
fn get(&self, height: &Height) -> Option<CryptoHashOfState> {
let refs = self.active.read();
refs.get(height).cloned()
}
/// Insert into collection of active syncs
fn insert(&self, height: Height, root_hash: CryptoHashOfState) -> Option<CryptoHashOfState> {
let mut refs = self.active.write();
refs.insert(height, root_hash)
}
/// Remove from collection of active syncs
fn remove(&self, height: &Height) -> Option<CryptoHashOfState> {
let mut refs = self.active.write();
refs.remove(height)
}
/// True if there is no active sync
fn is_empty(&self) -> bool {
let refs = self.active.read();
refs.is_empty()
}
}
/// SharedState is mutable state that can be accessed from multiple threads.
struct SharedState {
/// Certifications metadata kept for all states
certifications_metadata: CertificationsMetadata,
/// Metadata for each checkpoint
states_metadata: StatesMetadata,
/// A list of states present in the memory. This list is guaranteed to not be
/// empty as it should always contain the state at height=0.
snapshots: VecDeque<Snapshot>,
/// The last checkpoint that was advertised.
last_advertised: Height,
/// The state we are are trying to fetch.
fetch_state: Option<(Height, CryptoHashOfState, Height)>,
/// State representing the on disk mutable state
tip: Option<(Height, ReplicatedState)>,
}
impl SharedState {
fn disable_state_fetch_below(&mut self, height: Height) {
if let Some((sync_height, _hash, _cup_interval_length)) = &self.fetch_state {
if *sync_height <= height {
self.fetch_state = None
}
}
}
}
// We send complex objects to a different thread to free them. This will spread
// the cost of deallocation over a longer period of time, and avoid long pauses.
type Deallocation = Box<dyn std::any::Any + Send + 'static>;
// We will not use the deallocation thread when the number of pending
// deallocation objects goes above the threshold.
const DEALLOCATION_BACKLOG_THRESHOLD: usize = 500;
/// The number of archived and diverged states to keep before we start deleting the old ones.
const MAX_ARCHIVED_DIVERGED_CHECKPOINTS_TO_KEEP: usize = 1;
/// The number of diverged state markers to keep.
const MAX_DIVERGED_STATE_MARKERS_TO_KEEP: usize = 100;
/// The number of extra checkpoints to keep for state sync.
const EXTRA_CHECKPOINTS_TO_KEEP: usize = 0;
pub struct StateManagerImpl {
log: ReplicaLogger,
metrics: StateManagerMetrics,
state_layout: StateLayout,
/// The main metadata. Different threads will need to access this field.
///
/// To avoid the risk of deadlocks, this lock should be held as short a time
/// as possible.
states: Arc<parking_lot::RwLock<SharedState>>,
verifier: Arc<dyn Verifier>,
own_subnet_id: SubnetId,
own_subnet_type: SubnetType,
deallocation_sender: Sender<Deallocation>,
// Cached latest state height. We cache it separately because it's
// requested quite often and this causes high contention on the lock.
latest_state_height: AtomicU64,
latest_certified_height: AtomicU64,
_deallocation_handle: JoinOnDrop<()>,
persist_metadata_guard: Arc<Mutex<()>>,
tip_channel: Sender<TipRequest>,
_tip_thread_handle: JoinOnDrop<()>,
fd_factory: Arc<dyn PageAllocatorFileDescriptor>,
malicious_flags: MaliciousFlags,
latest_height_update_time: Arc<Mutex<Instant>>,
lsmt_storage: FlagStatus,
}
#[cfg(debug_assertions)]
impl Drop for StateManagerImpl {
fn drop(&mut self) {
// Make sure the tip thread didn't panic. Otherwise we may be blind to it in tests.
// If the tip thread panics after the latest communication with tip_channel the test returns
// success.
self.flush_tip_channel();
}
}
fn load_checkpoint(
state_layout: &StateLayout,
height: Height,
metrics: &StateManagerMetrics,
own_subnet_type: SubnetType,
fd_factory: Arc<dyn PageAllocatorFileDescriptor>,
) -> Result<ReplicatedState, CheckpointError> {
let mut thread_pool = scoped_threadpool::Pool::new(NUMBER_OF_CHECKPOINT_THREADS);
state_layout
.checkpoint(height)
.map_err(|e| e.into())
.and_then(|layout| {
let _timer = metrics
.checkpoint_op_duration
.with_label_values(&["recover"])
.start_timer();
checkpoint::load_checkpoint(
&layout,
own_subnet_type,
&metrics.checkpoint_metrics,
Some(&mut thread_pool),
Arc::clone(&fd_factory),
)
})
}
#[cfg(debug_assertions)]
fn check_certifications_metadata_snapshots_and_states_metadata_are_consistent(
states: &SharedState,
) {
let certification_heights = states
.certifications_metadata
.keys()
.copied()
.collect::<Vec<_>>();
let snapshot_heights = states
.snapshots
.iter()
.map(|s| s.height)
.filter(|h| h.get() != 0)
.collect::<Vec<_>>();
debug_assert_eq!(certification_heights, snapshot_heights);
for h in states.states_metadata.keys() {
debug_assert!(states.certifications_metadata.contains_key(h));
}
}
fn initialize_tip(
log: &ReplicaLogger,
tip_channel: &Sender<TipRequest>,
snapshot: &Snapshot,
checkpoint_layout: CheckpointLayout<ReadOnly>,
) -> ReplicatedState {
debug_assert_eq!(snapshot.height, checkpoint_layout.height());
// Since we initialize tip from checkpoint states, we expect a clean sandbox slate
#[cfg(debug_assertions)]
for canister in snapshot.state.canisters_iter() {
if let Some(canister_state) = &canister.execution_state {
if let SandboxMemory::Synced(_) =
*canister_state.wasm_memory.sandbox_memory.lock().unwrap()
{
panic!(
"Unexpected sandbox state for canister {}",
canister.canister_id()
);
}
if let SandboxMemory::Synced(_) =
*canister_state.stable_memory.sandbox_memory.lock().unwrap()
{
panic!(
"Unexpected sandbox state for canister {}",
canister.canister_id()
);
}
}
}
info!(log, "Recovering checkpoint @{} as tip", snapshot.height);
tip_channel
.send(TipRequest::ResetTipTo { checkpoint_layout })
.unwrap();
ReplicatedState::clone(&snapshot.state)
}
/// Return duration since path creation (or modification, if no creation)
/// Return zero duration and log a warning on failure.
fn path_age(log: &ReplicaLogger, path: &Path) -> Duration {
let now = SystemTime::now();
match path.metadata().and_then(|m| m.modified()) {
Ok(mtime) => {
if let Ok(duration) = now.duration_since(mtime) {
duration
} else {
// Only happens when created in the future. Return 0 is OK
Duration::from_secs(0)
}
}
Err(err) => {
warn!(
log,
"Could not determine age for the path {}; error: {:?}",
path.display(),
err
);
Duration::from_secs(0)
}
}
}
/// Deletes obsolete diverged states and state backups, keeping at most
/// MAX_ARCHIVED_DIVERGED_CHECKPOINTS_TO_KEEP archived checkpoints and backups no older than
/// ARCHIVED_DIVERGED_CHECKPOINT_MAX_AGE. The same for diverged checkpoints.
/// On top of that we keep maximum MAX_DIVERGED_STATE_MARKERS_TO_KEEP of diverged markers
/// no older then ARCHIVED_DIVERGED_CHECKPOINT_MAX_AGE
fn cleanup_diverged_states(log: &ReplicaLogger, layout: &StateLayout) {
let last_checkpoint: Height = match layout.checkpoint_heights() {
Err(err) => {
fatal!(log, "Failed to get list of checkpoints: {}", err);
}
Ok(v) => v
.last()
.copied()
.unwrap_or(StateManagerImpl::INITIAL_STATE_HEIGHT),
};
if let Ok(diverged_heights) = layout.diverged_checkpoint_heights() {
let to_remove = diverged_heights
.len()
.saturating_sub(MAX_ARCHIVED_DIVERGED_CHECKPOINTS_TO_KEEP);
for (i, h) in diverged_heights.iter().enumerate() {
if i < to_remove
|| (last_checkpoint > *h
&& path_age(log, &layout.diverged_checkpoint_path(*h))
> ARCHIVED_DIVERGED_CHECKPOINT_MAX_AGE)
{
match layout.remove_diverged_checkpoint(*h) {
Ok(()) => info!(log, "Successfully removed diverged state {}", *h),
Err(err) => info!(log, "{}", err),
}
}
}
}
if let Ok(backup_heights) = layout.backup_heights() {
let to_remove = backup_heights
.len()
.saturating_sub(MAX_ARCHIVED_DIVERGED_CHECKPOINTS_TO_KEEP);
for (i, h) in backup_heights.iter().enumerate() {
if i < to_remove
|| (last_checkpoint > *h
&& path_age(log, &layout.backup_checkpoint_path(*h))
> ARCHIVED_DIVERGED_CHECKPOINT_MAX_AGE)
{
match layout.remove_backup(*h) {
Ok(()) => info!(log, "Successfully removed backup {}", *h),
Err(err) => info!(log, "Failed to remove backup {}", err),
}
}
}
}
if let Ok(state_heights) = layout.diverged_state_heights() {
let to_remove = state_heights
.len()
.saturating_sub(MAX_DIVERGED_STATE_MARKERS_TO_KEEP);
for (i, h) in state_heights.iter().enumerate() {
if i < to_remove
|| path_age(log, &layout.diverged_state_marker_path(*h))
> ARCHIVED_DIVERGED_CHECKPOINT_MAX_AGE
{
match layout.remove_diverged_state_marker(*h) {
Ok(()) => info!(log, "Successfully removed diverged state marker {}", h),
Err(err) => info!(log, "{}", err),
}
}
}
}
}
fn report_last_diverged_state(
log: &ReplicaLogger,
metrics: &StateManagerMetrics,
state_layout: &StateLayout,
) {
let mut diverged_paths = std::vec::Vec::new();
let mut last_time = SystemTime::UNIX_EPOCH;
match state_layout.diverged_checkpoint_heights() {
Err(e) => warn!(log, "failed to enumerate diverged checkpoints: {}", e),
Ok(heights) => {
for h in heights {
diverged_paths.push(state_layout.diverged_checkpoint_path(h));
}
}
}
match state_layout.diverged_state_heights() {
Err(e) => warn!(log, "failed to enumerate diverged states: {}", e),
Ok(heights) => {
for h in heights {
diverged_paths.push(state_layout.diverged_state_marker_path(h));
}
}
}
for p in diverged_paths {
match p.metadata().and_then(|m| m.modified()) {
Ok(mtime) => {
last_time = last_time.max(mtime);
}
Err(e) => info!(
log,
"Failed to stat diverged checkpoint directory {}: {}",
p.display(),
e
),
}
}
metrics.last_diverged_state_timestamp.set(
last_time
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
)