Skip to content

Commit

Permalink
Add timestamps to executor jobs.
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisstaite-menlo authored and allada committed Jun 29, 2023
1 parent eebd6be commit fa97b28
Show file tree
Hide file tree
Showing 10 changed files with 244 additions and 75 deletions.
1 change: 1 addition & 0 deletions cas/grpc_service/execution_server.rs
Expand Up @@ -96,6 +96,7 @@ impl InstanceInfo {
timeout,
platform_properties: PlatformProperties::new(platform_properties),
priority,
load_timestamp: UNIX_EPOCH,
insert_timestamp: SystemTime::now(),
unique_qualifier: ActionInfoHashKey {
digest: action_digest,
Expand Down
1 change: 1 addition & 0 deletions cas/grpc_service/tests/worker_api_server_test.rs
Expand Up @@ -282,6 +282,7 @@ pub mod execution_response_tests {
properties: HashMap::new(),
},
priority: 0,
load_timestamp: make_system_time(0),
insert_timestamp: make_system_time(0),
unique_qualifier: ActionInfoHashKey {
digest: action_digest.clone(),
Expand Down
7 changes: 6 additions & 1 deletion cas/scheduler/action_messages.rs
Expand Up @@ -80,6 +80,8 @@ pub struct ActionInfo {
pub platform_properties: PlatformProperties,
/// The priority of the action. Higher value means it should execute faster.
pub priority: i32,
/// When this action started to be loaded from the CAS
pub load_timestamp: SystemTime,
/// When this action was created.
pub insert_timestamp: SystemTime,

Expand Down Expand Up @@ -112,6 +114,8 @@ impl ActionInfo {
execute_request: ExecuteRequest,
action: Action,
salt: u64,
load_timestamp: SystemTime,
queued_timestamp: SystemTime,
) -> Result<Self, Error> {
Ok(Self {
instance_name: execute_request.instance_name,
Expand All @@ -133,7 +137,8 @@ impl ActionInfo {
.execution_policy
.unwrap_or(ExecutionPolicy::default())
.priority,
insert_timestamp: SystemTime::UNIX_EPOCH, // We can't know it at this point.
load_timestamp,
insert_timestamp: queued_timestamp,
unique_qualifier: ActionInfoHashKey {
digest: execute_request
.action_digest
Expand Down
89 changes: 71 additions & 18 deletions cas/scheduler/tests/scheduler_test.rs
Expand Up @@ -35,7 +35,7 @@ use worker::{Worker, WorkerId};

const INSTANCE_NAME: &str = "foobar_instance_name";

fn make_base_action_info() -> ActionInfo {
fn make_base_action_info(insert_timestamp: SystemTime) -> ActionInfo {
ActionInfo {
instance_name: INSTANCE_NAME.to_string(),
command_digest: DigestInfo::new([0u8; 32], 0),
Expand All @@ -45,7 +45,8 @@ fn make_base_action_info() -> ActionInfo {
properties: HashMap::new(),
},
priority: 0,
insert_timestamp: SystemTime::now(),
load_timestamp: UNIX_EPOCH,
insert_timestamp,
unique_qualifier: ActionInfoHashKey {
digest: DigestInfo::new([0u8; 32], 0),
salt: 0,
Expand Down Expand Up @@ -89,8 +90,9 @@ async fn setup_action(
scheduler: &Scheduler,
action_digest: DigestInfo,
platform_properties: PlatformProperties,
insert_timestamp: SystemTime,
) -> Result<watch::Receiver<Arc<ActionState>>, Error> {
let mut action_info = make_base_action_info();
let mut action_info = make_base_action_info(insert_timestamp);
action_info.platform_properties = platform_properties;
action_info.unique_qualifier.digest = action_digest;
scheduler.add_action(action_info).await
Expand All @@ -111,7 +113,9 @@ mod scheduler_tests {
let action_digest = DigestInfo::new([99u8; 32], 512);

let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, Default::default()).await?;
let mut client_rx = setup_action(&scheduler, action_digest.clone(), Default::default()).await?;
let insert_timestamp = make_system_time(1);
let mut client_rx =
setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp).await?;

{
// Worker should have been sent an execute command.
Expand All @@ -124,6 +128,7 @@ mod scheduler_tests {
..Default::default()
}),
salt: 0,
queued_timestamp: Some(insert_timestamp.into()),
})),
};
let msg_for_worker = rx_from_worker.recv().await.unwrap();
Expand Down Expand Up @@ -155,7 +160,9 @@ mod scheduler_tests {
let action_digest = DigestInfo::new([99u8; 32], 512);

let mut rx_from_worker1 = setup_new_worker(&scheduler, WORKER_ID1, Default::default()).await?;
let mut client_rx = setup_action(&scheduler, action_digest.clone(), Default::default()).await?;
let insert_timestamp = make_system_time(1);
let mut client_rx =
setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp).await?;
let mut rx_from_worker2 = setup_new_worker(&scheduler, WORKER_ID2, Default::default()).await?;

let mut expected_action_state = ActionState {
Expand All @@ -174,6 +181,7 @@ mod scheduler_tests {
..Default::default()
}),
salt: 0,
queued_timestamp: Some(insert_timestamp.into()),
})),
};
{
Expand Down Expand Up @@ -234,7 +242,14 @@ mod scheduler_tests {
const WORKER_ID1: WorkerId = WorkerId(0x100001);
const WORKER_ID2: WorkerId = WorkerId(0x100002);
let mut rx_from_worker1 = setup_new_worker(&scheduler, WORKER_ID1, platform_properties.clone()).await?;
let mut client_rx = setup_action(&scheduler, action_digest.clone(), worker_properties.clone()).await?;
let insert_timestamp = make_system_time(1);
let mut client_rx = setup_action(
&scheduler,
action_digest.clone(),
worker_properties.clone(),
insert_timestamp,
)
.await?;

{
// Client should get notification saying it's been queued.
Expand All @@ -260,6 +275,7 @@ mod scheduler_tests {
..Default::default()
}),
salt: 0,
queued_timestamp: Some(insert_timestamp.into()),
})),
};
let msg_for_worker = rx_from_worker2.recv().await.unwrap();
Expand Down Expand Up @@ -296,8 +312,12 @@ mod scheduler_tests {
stage: ActionStage::Queued,
};

let mut client1_rx = setup_action(&scheduler, action_digest.clone(), Default::default()).await?;
let mut client2_rx = setup_action(&scheduler, action_digest.clone(), Default::default()).await?;
let insert_timestamp1 = make_system_time(1);
let insert_timestamp2 = make_system_time(2);
let mut client1_rx =
setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp1).await?;
let mut client2_rx =
setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp2).await?;

{
// Clients should get notification saying it's been queued.
Expand All @@ -322,6 +342,7 @@ mod scheduler_tests {
..Default::default()
}),
salt: 0,
queued_timestamp: Some(insert_timestamp1.into()),
})),
};
let msg_for_worker = rx_from_worker.recv().await.unwrap();
Expand All @@ -339,7 +360,9 @@ mod scheduler_tests {

{
// Now if another action is requested it should also join with executing action.
let mut client3_rx = setup_action(&scheduler, action_digest.clone(), Default::default()).await?;
let insert_timestamp3 = make_system_time(2);
let mut client3_rx =
setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp3).await?;
assert_eq!(client3_rx.borrow_and_update().as_ref(), &expected_action_state);
}

Expand All @@ -357,7 +380,9 @@ mod scheduler_tests {
// Now act like the worker disconnected.
drop(rx_from_worker);

let mut client_rx = setup_action(&scheduler, action_digest.clone(), Default::default()).await?;
let insert_timestamp = make_system_time(1);
let mut client_rx =
setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp).await?;
{
// Client should get notification saying it's being queued not executed.
let action_state = client_rx.borrow_and_update();
Expand Down Expand Up @@ -385,7 +410,9 @@ mod scheduler_tests {

// Note: This needs to stay in scope or a disconnect will trigger.
let mut rx_from_worker1 = setup_new_worker(&scheduler, WORKER_ID1, Default::default()).await?;
let mut client_rx = setup_action(&scheduler, action_digest.clone(), Default::default()).await?;
let insert_timestamp = make_system_time(1);
let mut client_rx =
setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp).await?;

// Note: This needs to stay in scope or a disconnect will trigger.
let mut rx_from_worker2 = setup_new_worker(&scheduler, WORKER_ID2, Default::default()).await?;
Expand All @@ -406,6 +433,7 @@ mod scheduler_tests {
..Default::default()
}),
salt: 0,
queued_timestamp: Some(insert_timestamp.into()),
})),
};

Expand Down Expand Up @@ -463,7 +491,9 @@ mod scheduler_tests {
let action_digest = DigestInfo::new([99u8; 32], 512);

let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, Default::default()).await?;
let mut client_rx = setup_action(&scheduler, action_digest.clone(), Default::default()).await?;
let insert_timestamp = make_system_time(1);
let mut client_rx =
setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp).await?;

{
// Other tests check full data. We only care if we got StartAction.
Expand Down Expand Up @@ -551,7 +581,9 @@ mod scheduler_tests {
let action_digest = DigestInfo::new([99u8; 32], 512);

let mut rx_from_worker = setup_new_worker(&scheduler, GOOD_WORKER_ID, Default::default()).await?;
let mut client_rx = setup_action(&scheduler, action_digest.clone(), Default::default()).await?;
let insert_timestamp = make_system_time(1);
let mut client_rx =
setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp).await?;

{
// Other tests check full data. We only care if we got StartAction.
Expand Down Expand Up @@ -638,7 +670,9 @@ mod scheduler_tests {
stage: ActionStage::Executing,
};

let mut client_rx = setup_action(&scheduler, action_digest.clone(), Default::default()).await?;
let insert_timestamp = make_system_time(1);
let mut client_rx =
setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp).await?;
let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, Default::default()).await?;

{
Expand All @@ -652,6 +686,7 @@ mod scheduler_tests {
..Default::default()
}),
salt: 0,
queued_timestamp: Some(insert_timestamp.into()),
})),
};
let msg_for_worker = rx_from_worker.recv().await.unwrap();
Expand Down Expand Up @@ -710,7 +745,9 @@ mod scheduler_tests {
// fail.

{
let mut client_rx = setup_action(&scheduler, action_digest.clone(), Default::default()).await?;
let insert_timestamp = make_system_time(1);
let mut client_rx =
setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp).await?;
// We didn't disconnect our worker, so it will have scheduled it to the worker.
expected_action_state.stage = ActionStage::Executing;
let action_state = client_rx.borrow_and_update();
Expand All @@ -736,8 +773,22 @@ mod scheduler_tests {
properties.insert("prop1".to_string(), PlatformPropertyValue::Minimum(1));
let platform_properties = PlatformProperties { properties };
let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, platform_properties.clone()).await?;
let mut client1_rx = setup_action(&scheduler, action_digest1.clone(), platform_properties.clone()).await?;
let mut client2_rx = setup_action(&scheduler, action_digest2.clone(), platform_properties).await?;
let insert_timestamp1 = make_system_time(1);
let mut client1_rx = setup_action(
&scheduler,
action_digest1.clone(),
platform_properties.clone(),
insert_timestamp1,
)
.await?;
let insert_timestamp2 = make_system_time(1);
let mut client2_rx = setup_action(
&scheduler,
action_digest2.clone(),
platform_properties,
insert_timestamp2,
)
.await?;

match rx_from_worker.recv().await.unwrap().update {
Some(update_for_worker::Update::StartAction(_)) => { /* Success */ }
Expand Down Expand Up @@ -858,7 +909,9 @@ mod scheduler_tests {
let action_digest = DigestInfo::new([99u8; 32], 512);

let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, Default::default()).await?;
let mut client_rx = setup_action(&scheduler, action_digest.clone(), Default::default()).await?;
let insert_timestamp = make_system_time(1);
let mut client_rx =
setup_action(&scheduler, action_digest.clone(), Default::default(), insert_timestamp).await?;

{
// Other tests check full data. We only care if we got StartAction.
Expand Down
1 change: 1 addition & 0 deletions cas/scheduler/worker.rs
Expand Up @@ -141,6 +141,7 @@ impl Worker {
self.send_msg_to_worker(update_for_worker::Update::StartAction(StartExecute {
execute_request: Some(action_info_clone.into()),
salt: *action_info.salt(),
queued_timestamp: Some(action_info.insert_timestamp.into()),
}))
}

Expand Down

0 comments on commit fa97b28

Please sign in to comment.