@@ -6,11 +6,10 @@ pub mod test_utils;
6
6
7
7
use bit_vec:: BitVec ;
8
8
pub use checkpoint:: { CheckpointSerialization , MappingSerialization } ;
9
- use ic_config:: flag_status:: FlagStatus ;
10
9
use ic_config:: state_manager:: LsmtConfig ;
11
10
use ic_metrics:: buckets:: { decimal_buckets, linear_buckets} ;
12
11
use ic_metrics:: MetricsRegistry ;
13
- use ic_sys:: { fs :: write_all_vectored , PageBytes } ;
12
+ use ic_sys:: PageBytes ;
14
13
pub use ic_sys:: { PageIndex , PAGE_SIZE } ;
15
14
use ic_utils:: deterministic_operations:: deterministic_copy_from_slice;
16
15
pub use page_allocator:: {
@@ -32,15 +31,10 @@ use page_allocator::Page;
32
31
use prometheus:: { Histogram , HistogramVec , IntCounter , IntCounterVec } ;
33
32
use serde:: { Deserialize , Serialize } ;
34
33
use std:: collections:: HashMap ;
35
- use std:: fs:: { File , OpenOptions } ;
36
34
use std:: ops:: Range ;
37
35
use std:: os:: unix:: io:: RawFd ;
38
- use std:: path:: Path ;
39
36
use std:: sync:: Arc ;
40
37
41
- // When persisting data we expand dirty pages to an aligned bucket of given size.
42
- const WRITE_BUCKET_PAGES : u64 = 16 ;
43
-
44
38
const LABEL_OP : & str = "op" ;
45
39
const LABEL_TYPE : & str = "type" ;
46
40
const LABEL_OP_FLUSH : & str = "flush" ;
@@ -128,39 +122,6 @@ impl StorageMetrics {
128
122
}
129
123
}
130
124
131
- struct WriteBuffer < ' a > {
132
- content : Vec < & ' a [ u8 ] > ,
133
- start_index : PageIndex ,
134
- }
135
-
136
- impl WriteBuffer < ' _ > {
137
- fn apply_to_file ( & mut self , file : & mut File , path : & Path ) -> Result < ( ) , PersistenceError > {
138
- use std:: io:: { Seek , SeekFrom } ;
139
-
140
- let offset = self . start_index . get ( ) * PAGE_SIZE as u64 ;
141
- file. seek ( SeekFrom :: Start ( offset) )
142
- . map_err ( |err| PersistenceError :: FileSystemError {
143
- path : path. display ( ) . to_string ( ) ,
144
- context : format ! ( "Failed to seek to {}" , offset) ,
145
- internal_error : err. to_string ( ) ,
146
- } ) ?;
147
-
148
- write_all_vectored ( file, & self . content ) . map_err ( |err| {
149
- PersistenceError :: FileSystemError {
150
- path : path. display ( ) . to_string ( ) ,
151
- context : format ! (
152
- "Failed to copy page range #{}..{}" ,
153
- self . start_index,
154
- self . start_index. get( ) + self . content. len( ) as u64
155
- ) ,
156
- internal_error : err. to_string ( ) ,
157
- }
158
- } ) ?;
159
-
160
- Ok ( ( ) )
161
- }
162
- }
163
-
164
125
/// `PageDelta` represents a changeset of the module heap.
165
126
///
166
127
/// NOTE: We use a persistent map to make snapshotting of a PageMap a cheap
@@ -590,16 +551,13 @@ impl PageMap {
590
551
lsmt_config : & LsmtConfig ,
591
552
metrics : & StorageMetrics ,
592
553
) -> Result < ( ) , PersistenceError > {
593
- match lsmt_config. lsmt_status {
594
- FlagStatus :: Disabled => self . persist_to_file ( & self . page_delta , & storage_layout. base ( ) ) ,
595
- FlagStatus :: Enabled => self . persist_to_overlay (
596
- & self . page_delta ,
597
- storage_layout,
598
- height,
599
- lsmt_config,
600
- metrics,
601
- ) ,
602
- }
554
+ self . persist_to_overlay (
555
+ & self . page_delta ,
556
+ storage_layout,
557
+ height,
558
+ lsmt_config,
559
+ metrics,
560
+ )
603
561
}
604
562
605
563
/// Persists the unflushed delta contained in this page map to the specified
@@ -611,18 +569,13 @@ impl PageMap {
611
569
lsmt_config : & LsmtConfig ,
612
570
metrics : & StorageMetrics ,
613
571
) -> Result < ( ) , PersistenceError > {
614
- match lsmt_config. lsmt_status {
615
- FlagStatus :: Disabled => {
616
- self . persist_to_file ( & self . unflushed_delta , & storage_layout. base ( ) )
617
- }
618
- FlagStatus :: Enabled => self . persist_to_overlay (
619
- & self . unflushed_delta ,
620
- storage_layout,
621
- height,
622
- lsmt_config,
623
- metrics,
624
- ) ,
625
- }
572
+ self . persist_to_overlay (
573
+ & self . unflushed_delta ,
574
+ storage_layout,
575
+ height,
576
+ lsmt_config,
577
+ metrics,
578
+ )
626
579
}
627
580
628
581
fn persist_to_overlay (
@@ -903,66 +856,6 @@ impl PageMap {
903
856
self . unflushed_delta . update ( delta)
904
857
}
905
858
906
- /// Persists the given delta to the specified destination.
907
- fn persist_to_file ( & self , page_delta : & PageDelta , dst : & Path ) -> Result < ( ) , PersistenceError > {
908
- let mut file = OpenOptions :: new ( )
909
- . write ( true )
910
- . create ( true )
911
- . truncate ( false )
912
- . open ( dst)
913
- . map_err ( |err| PersistenceError :: FileSystemError {
914
- path : dst. display ( ) . to_string ( ) ,
915
- context : "Failed to open file" . to_string ( ) ,
916
- internal_error : err. to_string ( ) ,
917
- } ) ?;
918
- self . apply_delta_to_file ( & mut file, page_delta, dst) ?;
919
- Ok ( ( ) )
920
- }
921
-
922
- /// Applies the given delta to the specified file.
923
- /// Precondition: `file` is seekable and writeable.
924
- fn apply_delta_to_file (
925
- & self ,
926
- file : & mut File ,
927
- page_delta : & PageDelta ,
928
- path : & Path ,
929
- ) -> Result < ( ) , PersistenceError > {
930
- // Empty delta
931
- if page_delta. max_page_index ( ) . is_none ( ) {
932
- return Ok ( ( ) ) ;
933
- }
934
-
935
- let mut last_applied_index: Option < PageIndex > = None ;
936
- let num_host_pages = self . num_host_pages ( ) as u64 ;
937
- for ( index, _) in page_delta. iter ( ) {
938
- debug_assert ! ( self . page_delta. 0 . get( index) . is_some( ) ) ;
939
- assert ! ( * index < num_host_pages. into( ) ) ;
940
-
941
- if last_applied_index. is_some ( ) && last_applied_index. unwrap ( ) >= * index {
942
- continue ;
943
- }
944
-
945
- let bucket_start_index =
946
- PageIndex :: from ( ( index. get ( ) / WRITE_BUCKET_PAGES ) * WRITE_BUCKET_PAGES ) ;
947
- let mut buffer = WriteBuffer {
948
- content : vec ! [ ] ,
949
- start_index : bucket_start_index,
950
- } ;
951
- for i in 0 ..WRITE_BUCKET_PAGES {
952
- let index_to_apply = PageIndex :: from ( bucket_start_index. get ( ) + i) ;
953
- // We don't expand past the end of file to make bucketing transparent.
954
- if index_to_apply. get ( ) < num_host_pages {
955
- let content = self . get_page ( index_to_apply) ;
956
- buffer. content . push ( content) ;
957
- last_applied_index = Some ( index_to_apply) ;
958
- }
959
- }
960
- buffer. apply_to_file ( file, path) ?;
961
- }
962
-
963
- Ok ( ( ) )
964
- }
965
-
966
859
/// Returns the number of delta pages included in this PageMap.
967
860
pub fn num_delta_pages ( & self ) -> usize {
968
861
self . page_delta . len ( )
0 commit comments