Skip to content

Commit

Permalink
Merge branch 'stschnei/fix-deadlock-cherrypick' into 'rc--2022-03-24_…
Browse files Browse the repository at this point in the history
…18-31'

Revert "Parallelize checkpoint loading in more code paths"

This reverts commit 703ef9c. 

See merge request dfinity-lab/public/ic!4017
  • Loading branch information
sasa-tomic committed Mar 28, 2022
2 parents 8cae490 + 2fb05d4 commit 9817eb2
Showing 1 changed file with 18 additions and 42 deletions.
60 changes: 18 additions & 42 deletions rs/state_manager/src/lib.rs
Expand Up @@ -626,10 +626,7 @@ fn load_checkpoint(
height: Height,
metrics: &StateManagerMetrics,
own_subnet_type: SubnetType,
thread_pool: Option<&Mutex<scoped_threadpool::Pool>>,
) -> Result<ReplicatedState, CheckpointError> {
let mut lock = thread_pool.map(|x| x.lock().unwrap());

state_layout
.checkpoint(height)
.map_err(|e| e.into())
Expand All @@ -638,7 +635,7 @@ fn load_checkpoint(
.checkpoint_op_duration
.with_label_values(&["recover"])
.start_timer();
checkpoint::load_checkpoint(&layout, own_subnet_type, lock.as_deref_mut())
checkpoint::load_checkpoint(&layout, own_subnet_type, None)
})
}

Expand All @@ -652,7 +649,6 @@ fn load_checkpoint_as_tip(
state_layout: &StateLayout,
height: Height,
own_subnet_type: SubnetType,
thread_pool: Option<&Mutex<scoped_threadpool::Pool>>,
) -> ReplicatedState {
info!(log, "Recovering checkpoint @{} as tip", height);

Expand All @@ -671,13 +667,8 @@ fn load_checkpoint_as_tip(
.checkpoint(height)
.unwrap_or_else(|err| fatal!(log, "Failed to retrieve checkpoint {:?}", err));

let mut lock = thread_pool.map(|x| x.lock().unwrap());

let checkpointed_state =
checkpoint::load_checkpoint(&checkpoint_layout, own_subnet_type, lock.as_deref_mut())
.unwrap_or_else(|err| fatal!(log, "Failed to load checkpoint {:?}", err));

drop(lock);
let checkpointed_state = checkpoint::load_checkpoint(&checkpoint_layout, own_subnet_type, None)
.unwrap_or_else(|err| fatal!(log, "Failed to load checkpoint {:?}", err));

// Ensure that the `PageMap`s of the tip use the clean read-only checkpoint
// files similar to how this is done in `commit_and_certify()` after a full
Expand Down Expand Up @@ -1073,24 +1064,14 @@ impl StateManagerImpl {
let latest_certified_height = AtomicU64::new(0);
let last_checkpoint = checkpoint_heights.last();

let checkpoint_thread_pool = Arc::new(Mutex::new(scoped_threadpool::Pool::new(
NUMBER_OF_CHECKPOINT_THREADS,
)));

let height_and_state = match last_checkpoint {
Some(height) => {
// Set latest state height in metadata to be last checkpoint height
latest_state_height.store(height.get(), Ordering::Relaxed);

(
*height,
load_checkpoint_as_tip(
&log,
&state_layout,
*height,
own_subnet_type,
Some(&checkpoint_thread_pool),
),
load_checkpoint_as_tip(&log, &state_layout, *height, own_subnet_type),
)
}
None => (
Expand All @@ -1109,21 +1090,15 @@ impl StateManagerImpl {
};

let maybe_last_snapshot = last_checkpoint.map(|height| {
let state = load_checkpoint(
&state_layout,
*height,
&metrics,
own_subnet_type,
Some(&checkpoint_thread_pool),
)
.unwrap_or_else(|err| {
fatal!(
log,
"Failed to load the latest checkpoint {} on start: {}",
height,
err
)
});
let state = load_checkpoint(&state_layout, *height, &metrics, own_subnet_type)
.unwrap_or_else(|err| {
fatal!(
log,
"Failed to load the latest checkpoint {} on start: {}",
height,
err
)
});
Snapshot {
height: *height,
state: Arc::new(state),
Expand All @@ -1150,6 +1125,10 @@ impl StateManagerImpl {
tip: Some(height_and_state),
}));

let checkpoint_thread_pool = Arc::new(Mutex::new(scoped_threadpool::Pool::new(
NUMBER_OF_CHECKPOINT_THREADS,
)));

let (compute_manifest_request_sender, compute_manifest_request_receiver) = unbounded();

let requested_to_remove_states_below = Arc::new(AtomicU64::new(
Expand Down Expand Up @@ -1880,7 +1859,6 @@ impl StateManager for StateManagerImpl {
&self.state_layout,
*checkpoint_height,
self.own_subnet_type,
Some(&self.checkpoint_thread_pool),
);
return (*checkpoint_height, new_tip);
}
Expand Down Expand Up @@ -1959,7 +1937,7 @@ impl StateManager for StateManagerImpl {

match self.clone_checkpoint(checkpoint_height, height) {
Ok(_) => {
let state = load_checkpoint(&self.state_layout, height, &self.metrics, self.own_subnet_type, Some(&self.checkpoint_thread_pool))
let state = load_checkpoint(&self.state_layout, height, &self.metrics, self.own_subnet_type)
.expect("failed to load checkpoint");
self.on_synced_checkpoint(state, height, manifest, root_hash);
return;
Expand Down Expand Up @@ -2495,7 +2473,6 @@ impl StateManager for StateManagerImpl {
&self.state_layout,
latest_snapshot.height,
self.own_subnet_type,
Some(&self.checkpoint_thread_pool),
),
)
}
Expand Down Expand Up @@ -2657,7 +2634,6 @@ impl StateReader for StateManagerImpl {
height,
&self.metrics,
self.own_subnet_type,
Some(&self.checkpoint_thread_pool),
) {
Ok(state) => Ok(Labeled::new(height, Arc::new(state))),
Err(CheckpointError::NotFound(_)) => Err(StateManagerError::StateRemoved(height)),
Expand Down

0 comments on commit 9817eb2

Please sign in to comment.