Skip to content

Commit

Permalink
Add safe request timeout for running actions manager (#743)
Browse files Browse the repository at this point in the history
Changes running actions manager to use the action timeout if set to a
nonzero value for deciding when to kill a process, otherwise defaults to
its configured max_action_timeout

Co-Authored-By: Zach Birenbaum <zach@tracemachina.com>
  • Loading branch information
zbirenbaum and Zach Birenbaum committed Apr 8, 2024
1 parent 6c4fb7e commit 33db963
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 3 deletions.
12 changes: 9 additions & 3 deletions nativelink-worker/src/running_actions_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,12 @@ impl RunningActionImpl {
))
.env_clear();

let requested_timeout = if self.action_info.timeout.is_zero() {
self.running_actions_manager.max_action_timeout
} else {
self.action_info.timeout
};

let mut maybe_side_channel_file: Option<Cow<'_, OsStr>> = None;
if let Some(additional_environment) = &self
.running_actions_manager
Expand All @@ -767,7 +773,7 @@ impl RunningActionImpl {
.map_or_else(|| Cow::Borrowed(""), |v| v.as_str()),
EnvironmentSource::value(value) => Cow::Borrowed(value.as_str()),
EnvironmentSource::timeout_millis => {
Cow::Owned(self.timeout.as_millis().to_string())
Cow::Owned(requested_timeout.as_millis().to_string())
}
EnvironmentSource::side_channel_file => {
let file_cow =
Expand Down Expand Up @@ -912,7 +918,7 @@ impl RunningActionImpl {
};

let maybe_error_override = if let Some(side_channel_file) = maybe_side_channel_file {
process_side_channel_file(side_channel_file.clone(), &args, self.timeout).await
process_side_channel_file(side_channel_file.clone(), &args, requested_timeout).await
.err_tip(|| format!("Error processing side channel file: {side_channel_file:?}"))?
} else {
None
Expand Down Expand Up @@ -1739,7 +1745,7 @@ impl RunningActionsManager for RunningActionsManagerImpl {
output_upload_start_timestamp: SystemTime::UNIX_EPOCH,
output_upload_completed_timestamp: SystemTime::UNIX_EPOCH,
};
let timeout = if action_info.timeout == Duration::ZERO || self.timeout_handled_externally {
let timeout = if action_info.timeout.is_zero() || self.timeout_handled_externally {
self.max_action_timeout
} else {
action_info.timeout
Expand Down
115 changes: 115 additions & 0 deletions nativelink-worker/tests/running_actions_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3208,4 +3208,119 @@ exit 1
);
Ok(())
}

#[tokio::test]
async fn running_actions_manager_respects_action_timeout(
) -> Result<(), Box<dyn std::error::Error>> {
const WORKER_ID: &str = "foo_worker_id";
const SALT: u64 = 66;

let (_, _, cas_store, ac_store) = setup_stores().await?;
let root_action_directory = make_temp_path("root_work_directory");
fs::create_dir_all(&root_action_directory).await?;

// Ignore the sleep and immediately timeout.
static ACTION_TIMEOUT: i64 = 1;
fn test_monotonic_clock() -> SystemTime {
static CLOCK: AtomicU64 = AtomicU64::new(0);
monotonic_clock(&CLOCK)
}

let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks(
RunningActionsManagerArgs {
root_action_directory,
execution_configuration: Default::default(),
cas_store: Pin::into_inner(cas_store.clone()),
ac_store: Some(Pin::into_inner(ac_store.clone())),
historical_store: Pin::into_inner(cas_store.clone()),
upload_action_result_config:
&nativelink_config::cas_server::UploadActionResultConfig {
upload_ac_results_strategy:
nativelink_config::cas_server::UploadCacheResultsStrategy::never,
..Default::default()
},
max_action_timeout: Duration::MAX,
timeout_handled_externally: false,
},
Callbacks {
now_fn: test_monotonic_clock,
// If action_timeout is the passed duration then return immeidately,
// which will cause the action to be killed and pass the test,
// otherwise return pending and fail the test.
sleep_fn: |duration| {
assert_eq!(duration.as_secs(), ACTION_TIMEOUT as u64);
Box::pin(futures::future::ready(()))
},
},
)?);
#[cfg(target_family = "unix")]
let arguments = vec!["sh".to_string(), "-c".to_string(), "sleep 2".to_string()];
#[cfg(target_family = "windows")]
let arguments = vec![
"cmd".to_string(),
"/C".to_string(),
"ping -n 99999 127.0.0.1".to_string(),
];
let command = Command {
arguments,
working_directory: ".".to_string(),
..Default::default()
};
let command_digest = serialize_and_upload_message(
&command,
cas_store.as_ref(),
&mut DigestHasherFunc::Sha256.hasher(),
)
.await?;
let input_root_digest = serialize_and_upload_message(
&Directory::default(),
cas_store.as_ref(),
&mut DigestHasherFunc::Sha256.hasher(),
)
.await?;
let action = Action {
command_digest: Some(command_digest.into()),
input_root_digest: Some(input_root_digest.into()),
platform: Some(Platform {
properties: vec![Property {
name: "property_name".into(),
value: "property_value".into(),
}],
}),
timeout: Some(prost_types::Duration {
seconds: ACTION_TIMEOUT,
nanos: 0,
}),
..Default::default()
};
let action_digest = serialize_and_upload_message(
&action,
cas_store.as_ref(),
&mut DigestHasherFunc::Sha256.hasher(),
)
.await?;

let running_action_impl = running_actions_manager
.clone()
.create_and_add_action(
WORKER_ID.to_string(),
StartExecute {
execute_request: Some(ExecuteRequest {
action_digest: Some(action_digest.into()),
..Default::default()
}),
salt: SALT,
queued_timestamp: Some(make_system_time(1000).into()),
},
)
.await?;

let result = run_action(running_action_impl).await?;

#[cfg(target_family = "unix")]
assert_eq!(result.exit_code, 9, "Action process should be been killed");
#[cfg(target_family = "windows")]
assert_eq!(result.exit_code, 1, "Action process should be been killed");
Ok(())
}
}

0 comments on commit 33db963

Please sign in to comment.