Skip to content

Commit

Permalink
Fix bug in scheduler of not removing actions after execution
Browse files Browse the repository at this point in the history
The scheduler had a bug where it was not removing actions from the
HashSet when an action completed. This caused an error to be returned
if the same action tried to be executed again.

Also adds test.
  • Loading branch information
allada committed Jun 13, 2022
1 parent 1c02c09 commit f2b825b
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 9 deletions.
27 changes: 18 additions & 9 deletions cas/scheduler/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,16 @@ impl Workers {
type ShouldRunAgain = bool;

struct SchedulerImpl {
// We cannot use the special hash function we use for ActionInfo with BTreeMap because
// we need to be able to get an exact match when we look for `ActionInfo` structs that
// have `digest` and `salt` matches and nothing else. This is because we want to sort
// the `queued_actions` entries in order of `priority`, but if a new entry matches
// we want to ignore the priority and join them together then use the higher priority
// of the two, so we use a `HashSet` to find the original `ActionInfo` when trying to
// merge actions together.
// BTreeMap uses `cmp` to do it's comparisons, this is a problem because we want to sort our
// actions based on priority and insert timestamp but also want to find and join new actions
// onto already executing (or queued) actions. We don't know the insert timestamp of queued
// actions, so we won't be able to find it in a BTreeMap without iterating the entire map. To
// get around this issue, we use two containers, one that will search using `Eq` which will
// only match on the `unique_qualifier` field, which ignores fields that would prevent
// multiplexing, and another which uses `Ord` for sorting.
//
// Important: These two fields must be kept in-sync, so if you modify one, you likely need to
// modify the other.
queued_actions_set: HashSet<Arc<ActionInfo>>,
queued_actions: BTreeMap<Arc<ActionInfo>, AwaitedAction>,
workers: Workers,
Expand Down Expand Up @@ -150,10 +153,12 @@ impl SchedulerImpl {
.remove_entry(&arc_action_info)
.err_tip(|| "Internal error queued_actions and queued_actions_set should match")?;

let new_priority = cmp::max(original_action_info.priority, action_info.priority);
drop(original_action_info); // This increases the chance Arc::make_mut won't copy.

// In the event our task is higher priority than the one already scheduled, increase
// the priority of the scheduled one.
Arc::make_mut(&mut arc_action_info).priority =
cmp::max(original_action_info.priority, action_info.priority);
Arc::make_mut(&mut arc_action_info).priority = new_priority;

let rx = queued_action.notify_channel.subscribe();
queued_action
Expand Down Expand Up @@ -270,6 +275,10 @@ impl SchedulerImpl {

// At this point everything looks good, so remove it from the queue and add it to active actions.
let (action_info, mut awaited_action) = self.queued_actions.remove_entry(action_info.as_ref()).unwrap();
assert!(
self.queued_actions_set.remove(&action_info),
"queued_actions_set should always have same keys as queued_actions"
);
Arc::make_mut(&mut awaited_action.current_state).stage = ActionStage::Executing;
let send_result = awaited_action.notify_channel.send(awaited_action.current_state.clone());
if send_result.is_err() {
Expand Down
97 changes: 97 additions & 0 deletions cas/scheduler/tests/scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,4 +612,101 @@ mod scheduler_tests {

Ok(())
}

#[tokio::test]
async fn does_not_crash_if_operation_joined_then_relaunched() -> Result<(), Error> {
const WORKER_ID: WorkerId = WorkerId(0x10000f);

let scheduler = Scheduler::new(&SchedulerConfig::default());
let action_digest = DigestInfo::new([99u8; 32], 512);

let mut expected_action_state = ActionState {
name: "".to_string(), // Will be filled later.
action_digest: action_digest.clone(),
stage: ActionStage::Executing,
};

let mut client_rx = setup_action(&scheduler, action_digest.clone(), Default::default()).await?;
let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, Default::default()).await?;

{
// Worker should have been sent an execute command.
let expected_msg_for_worker = UpdateForWorker {
update: Some(update_for_worker::Update::StartAction(StartExecute {
execute_request: Some(ExecuteRequest {
instance_name: INSTANCE_NAME.to_string(),
skip_cache_lookup: true,
action_digest: Some(action_digest.clone().into()),
..Default::default()
}),
salt: 0,
})),
};
let msg_for_worker = rx_from_worker.recv().await.unwrap();
assert_eq!(msg_for_worker, expected_msg_for_worker);
}

{
// Client should get notification saying it's being executed.
let action_state = client_rx.borrow_and_update();
// We now know the name of the action so populate it.
expected_action_state.name = action_state.name.clone();
assert_eq!(action_state.as_ref(), &expected_action_state);
}

let action_result = ActionResult {
output_files: Default::default(),
output_folders: Default::default(),
output_directory_symlinks: Default::default(),
output_file_symlinks: Default::default(),
exit_code: Default::default(),
stdout_digest: DigestInfo::new([1u8; 32], 512),
stderr_digest: DigestInfo::new([2u8; 32], 512),
execution_metadata: ExecutionMetadata {
worker: "".to_string(),
queued_timestamp: SystemTime::UNIX_EPOCH,
worker_start_timestamp: SystemTime::UNIX_EPOCH,
worker_completed_timestamp: SystemTime::UNIX_EPOCH,
input_fetch_start_timestamp: SystemTime::UNIX_EPOCH,
input_fetch_completed_timestamp: SystemTime::UNIX_EPOCH,
execution_start_timestamp: SystemTime::UNIX_EPOCH,
execution_completed_timestamp: SystemTime::UNIX_EPOCH,
output_upload_start_timestamp: SystemTime::UNIX_EPOCH,
output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
},
server_logs: Default::default(),
};

scheduler
.update_action(
&WORKER_ID,
&ActionInfoHashKey {
digest: action_digest.clone(),
salt: 0,
},
ActionStage::Completed(action_result.clone()),
)
.await?;

{
// Action should now be executing.
expected_action_state.stage = ActionStage::Completed(action_result.clone());
assert_eq!(client_rx.borrow_and_update().as_ref(), &expected_action_state);
}

// Now we need to ensure that if we schedule another execution of the same job it doesn't
// fail.

{
let mut client_rx = setup_action(&scheduler, action_digest.clone(), Default::default()).await?;
// We didn't disconnect our worker, so it will have scheduled it to the worker.
expected_action_state.stage = ActionStage::Executing;
let action_state = client_rx.borrow_and_update();
// The name of the action changed (since it's a new action), so update it.
expected_action_state.name = action_state.name.clone();
assert_eq!(action_state.as_ref(), &expected_action_state);
}

Ok(())
}
}

0 comments on commit f2b825b

Please sign in to comment.