Skip to content

Commit

Permalink
Merge branch 'pakhomov/lsmt-statemanager-order2' into 'master'
Browse files Browse the repository at this point in the history
feat: Split ResetTipTo functionality from TipToCheckpoint

When creating checkpoint from tip (`TipToCheckpoint`), we make all tip files read only, rename tip dir into checkpoint, then reflink/copy/hardlink files from checkpoint back into tip.

The second part is the same as `ResetTipTo`. With LSMT we postpone this till after the manifest computation, so it makes sense to split it from `TipToCheckpoint`, and without LSMT just replace `TipToCheckpoint` with `TipToCheckpoint` + `ResetTipTo`. 

See merge request dfinity-lab/public/ic!16665
  • Loading branch information
pakhomov-dfinity committed Dec 12, 2023
2 parents ced121d + 0da6a11 commit d8fb391
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 57 deletions.
15 changes: 13 additions & 2 deletions rs/state_manager/src/checkpoint.rs
Expand Up @@ -4,6 +4,7 @@ use crate::{
};
use crossbeam_channel::{unbounded, Sender};
use ic_base_types::{subnet_id_try_from_protobuf, CanisterId};
use ic_config::flag_status::FlagStatus;
use ic_logger::error;
use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::page_map::PageAllocatorFileDescriptor;
Expand Down Expand Up @@ -54,6 +55,7 @@ pub(crate) fn make_checkpoint(
metrics: &CheckpointMetrics,
thread_pool: &mut scoped_threadpool::Pool,
fd_factory: Arc<dyn PageAllocatorFileDescriptor>,
lsmt_storage: FlagStatus,
) -> Result<(CheckpointLayout<ReadOnly>, ReplicatedState), CheckpointError> {
{
let _timer = metrics
Expand Down Expand Up @@ -87,10 +89,19 @@ pub(crate) fn make_checkpoint(
sender: send,
})
.unwrap();
recv.recv().unwrap()?
let cp = recv.recv().unwrap()?;
// With lsmt storage, this happens later (after manifest)
if lsmt_storage == FlagStatus::Disabled {
tip_channel
.send(TipRequest::ResetTipTo {
checkpoint_layout: cp.clone(),
})
.unwrap();
}
cp
};

{
if lsmt_storage == FlagStatus::Disabled {
// Wait for reset_tip_to so that we don't reflink in parallel with other operations.
let _timer = metrics
.make_checkpoint_step_duration
Expand Down
2 changes: 2 additions & 0 deletions rs/state_manager/src/checkpoint/tests.rs
Expand Up @@ -78,6 +78,7 @@ fn make_checkpoint_and_get_state_impl(
&state_manager_metrics(log).checkpoint_metrics,
&mut thread_pool(),
Arc::new(TestPageAllocatorFileDescriptorImpl::new()),
ic_config::state_manager::lsmt_storage_default(),
)
.unwrap_or_else(|err| panic!("Expected make_checkpoint to succeed, got {:?}", err))
.1
Expand Down Expand Up @@ -196,6 +197,7 @@ fn scratchpad_dir_is_deleted_if_checkpointing_failed() {
&state_manager_metrics.checkpoint_metrics,
&mut thread_pool(),
Arc::new(TestPageAllocatorFileDescriptorImpl::new()),
ic_config::state_manager::lsmt_storage_default(),
);

match replicated_state {
Expand Down
26 changes: 19 additions & 7 deletions rs/state_manager/src/lib.rs
Expand Up @@ -1047,7 +1047,7 @@ impl PageMapType {
}
}

/// The path of an overlay file written during round `height`
/// The path of an overlay file written during round `height`.
fn overlay<Access>(
&self,
layout: &CheckpointLayout<Access>,
Expand All @@ -1065,7 +1065,7 @@ impl PageMapType {
}
}

/// List all existing overlay files of a this PageMapType inside `layout`
/// List all existing overlay files of a this PageMapType inside `layout`.
fn overlays<Access>(
&self,
layout: &CheckpointLayout<Access>,
Expand Down Expand Up @@ -2333,6 +2333,7 @@ impl StateManagerImpl {
&self.metrics.checkpoint_metrics,
&mut scoped_threadpool::Pool::new(NUMBER_OF_CHECKPOINT_THREADS),
self.get_fd_factory(),
self.lsmt_storage,
)
};
let (cp_layout, checkpointed_state) = match result {
Expand Down Expand Up @@ -2429,7 +2430,22 @@ impl StateManagerImpl {
.make_checkpoint_step_duration
.with_label_values(&["create_checkpoint_result"])
.start_timer();
let checkpoint_layout = self.state_layout.checkpoint(height).unwrap();
// With lsmt, we do not need the defrag.
// Without lsmt, the ResetTipTo happens earlier in make_checkpoint.
let tip_requests = if self.lsmt_storage == FlagStatus::Enabled {
vec![TipRequest::ResetTipTo {
checkpoint_layout: checkpoint_layout.clone(),
}]
} else {
vec![TipRequest::DefragTip {
height,
page_map_types: PageMapType::list_all(state),
}]
};

CreateCheckpointResult {
tip_requests,
checkpointed_state,
state_metadata: StateMetadata {
checkpoint_layout: Some(self.state_layout.checkpoint(height).unwrap()),
Expand All @@ -2438,14 +2454,10 @@ impl StateManagerImpl {
},
compute_manifest_request: TipRequest::ComputeManifest {
checkpoint_layout: cp_layout,
manifest_delta: if is_nns { None } else { manifest_delta },
manifest_delta,
states: self.states.clone(),
persist_metadata_guard: self.persist_metadata_guard.clone(),
},
tip_requests: vec![TipRequest::DefragTip {
height,
page_map_types: PageMapType::list_all(state),
}],
}
};

Expand Down
1 change: 1 addition & 0 deletions rs/state_manager/src/split.rs
Expand Up @@ -206,6 +206,7 @@ fn write_checkpoint(
&metrics.checkpoint_metrics,
thread_pool,
fd_factory,
config.lsmt_storage,
)
.map_err(|e| format!("Failed to write checkpoint: {}", e))?;

Expand Down
1 change: 1 addition & 0 deletions rs/state_manager/src/split/tests.rs
Expand Up @@ -388,6 +388,7 @@ fn new_state_layout(log: ReplicaLogger) -> (TempDir, Time) {
&state_manager_metrics.checkpoint_metrics,
&mut thread_pool(),
Arc::new(TestPageAllocatorFileDescriptorImpl::new()),
lsmt_storage_default(),
)
.unwrap_or_else(|err| panic!("Expected make_checkpoint to succeed, got {:?}", err))
.1;
Expand Down
77 changes: 29 additions & 48 deletions rs/state_manager/src/tip.rs
Expand Up @@ -189,58 +189,39 @@ pub(crate) fn spawn_tip_thread(
debug_assert_eq!(tip_state, TipState::Serialized(height));
debug_assert!(have_latest_manifest);
have_latest_manifest = false;
let cp = {
let _timer =
request_timer(&metrics, "tip_to_checkpoint_send_checkpoint");
let tip = tip_handler.tip(height);
match tip {
Err(err) => {
sender
.send(Err(err))
.expect("Failed to return TipToCheckpoint error");
continue;
}
Ok(tip) => {
let cp_or_err = state_layout.scratchpad_to_checkpoint(
tip,
height,
Some(&mut thread_pool),
);
match cp_or_err {
Err(err) => {
sender.send(Err(err)).expect(
"Failed to return TipToCheckpoint error",
);
continue;
}
Ok(cp) => {
sender.send(Ok(cp.clone())).expect(
"Failed to return TipToCheckpoint result",
);
cp
}
}
}
let _timer =
request_timer(&metrics, "tip_to_checkpoint_send_checkpoint");
let tip = tip_handler.tip(height);
match tip {
Err(err) => {
sender
.send(Err(err))
.expect("Failed to return TipToCheckpoint error");
continue;
}
};

let _timer = request_timer(&metrics, "tip_to_checkpoint_reset_tip_to");
tip_handler
.reset_tip_to(
&state_layout,
&cp,
lsmt_storage,
Some(&mut thread_pool),
)
.unwrap_or_else(|err| {
fatal!(
log,
"Failed to reset tip to checkpoint @{}: {}",
Ok(tip) => {
let cp_or_err = state_layout.scratchpad_to_checkpoint(
tip,
height,
err
Some(&mut thread_pool),
);
});
match cp_or_err {
Err(err) => {
sender
.send(Err(err))
.expect("Failed to return TipToCheckpoint error");
continue;
}
Ok(cp) => {
sender
.send(Ok(cp.clone()))
.expect("Failed to return TipToCheckpoint result");
}
}
}
}
}

TipRequest::FlushPageMapDelta { height, pagemaps } => {
let _timer = request_timer(&metrics, "flush_unflushed_delta");
#[cfg(debug_assertions)]
Expand Down

0 comments on commit d8fb391

Please sign in to comment.