diff --git a/src/services/shell_tool.rs b/src/services/shell_tool.rs index 7a216bd..1c656d2 100644 --- a/src/services/shell_tool.rs +++ b/src/services/shell_tool.rs @@ -1116,6 +1116,10 @@ fn format_shell_call_output_item( /// before producing real output (boot failure, passthrough misconfig, /// exec error). Both items carry `status: "incomplete"`; the output /// item's stderr surfaces the error message so the model can react. +/// +/// The output item's `done` is sent before the call's `done` for the +/// same reason as the success path: the call resolving must reliably +/// mean its paired output is already on the wire. #[allow(clippy::too_many_arguments)] async fn emit_failure_done( event_tx: &mpsc::Sender, @@ -1127,6 +1131,21 @@ async fn emit_failure_done( working_directory: Option<&str>, stderr: &str, ) { + let _ = event_tx + .send(format_shell_call_output_item( + ItemLifecycle::Done, + id, + id, + 0, + -1, + "", + stderr, + &[], + true, + max_output_length, + Some("gateway"), + )) + .await; let _ = event_tx .send(format_shell_call_item( ItemLifecycle::Done, @@ -1143,21 +1162,71 @@ async fn emit_failure_done( Some("model"), )) .await; +} + +/// Emit the spec-canonical `output_item.done` events for a shell call +/// that produced real output (normal exit, non-zero exit, or timeout). +/// +/// Ordering invariant: the `shell_call_output` item's terminal `done` is +/// sent BEFORE the `shell_call`'s. Hadrian overloads `shell_call.status` +/// with the execution outcome (`completed` vs `incomplete`), so consumers +/// reasonably read the call's `done(completed)` as "execution finished — +/// output available". Emitting the output first makes that a reliable +/// signal: by the time the call resolves, its paired output (same +/// `call_id`) is already on the wire. (Events share one ordered channel, +/// so this is a strict wire ordering, not a hint.) +/// +/// `killed` drives both terminal statuses (`incomplete` on a timeout / +/// no-exit, `completed` otherwise) so the call and its output can never +/// disagree. +#[allow(clippy::too_many_arguments)] +async fn emit_success_done( + event_tx: &mpsc::Sender, + id: &str, + commands: &[String], + timeout_ms: Option, + max_output_length: Option, + env: Option<&HashMap>, + working_directory: Option<&str>, + exit_code: i32, + stdout: &str, + stderr: &str, + files: &[ContainerFileRef], + killed: bool, + environment: Option<&serde_json::Value>, +) { let _ = event_tx .send(format_shell_call_output_item( ItemLifecycle::Done, id, id, 0, - -1, - "", + exit_code, + stdout, stderr, - &[], - true, + files, + killed, max_output_length, Some("gateway"), )) .await; + let shell_call_status = if killed { "incomplete" } else { "completed" }; + let _ = event_tx + .send(format_shell_call_item( + ItemLifecycle::Done, + id, + id, + 0, + commands, + timeout_ms, + max_output_length, + env, + working_directory, + shell_call_status, + environment, + Some("model"), + )) + .await; } // ───────────────────────────────────────────────────────────────────────────── @@ -2251,45 +2320,28 @@ impl ServerExecutedTool for ShellExecutor { std::mem::replace(&mut stdout_buf, BoundedHeadTail::new(max_chars)).into_trimmed(); let stderr_render = std::mem::replace(&mut stderr_buf, BoundedHeadTail::new(max_chars)).into_trimmed(); - // Spec status: `incomplete` when we killed/timed out, - // `completed` otherwise (a non-zero exit code still counts - // as `completed` per spec). - let shell_call_status = if killed { "incomplete" } else { "completed" }; let environment_val = serde_json::json!({ "type": "container_reference", "container_id": container_id_for_items, }); - let _ = event_tx - .send(format_shell_call_item( - ItemLifecycle::Done, - &id_for_task, - &id_for_task, - 0, - &commands_for_task, - action_timeout_ms_for_task, - model_max_output_length, - action_env_for_task.as_ref(), - working_directory_for_task.as_deref(), - shell_call_status, - Some(&environment_val), - Some("model"), - )) - .await; - let _ = event_tx - .send(format_shell_call_output_item( - ItemLifecycle::Done, - &id_for_task, - &id_for_task, - 0, - exit_for_report, - &stdout_render, - &stderr_render, - &new_files, - killed, - model_max_output_length, - Some("gateway"), - )) - .await; + // Terminal `done` pair, output-before-call — see + // `emit_success_done` for the ordering invariant. + emit_success_done( + &event_tx, + &id_for_task, + &commands_for_task, + action_timeout_ms_for_task, + model_max_output_length, + action_env_for_task.as_ref(), + working_directory_for_task.as_deref(), + exit_for_report, + &stdout_render, + &stderr_render, + &new_files, + killed, + Some(&environment_val), + ) + .await; info!( stage = "shell_completed", call_id = %id_for_task, @@ -3331,4 +3383,85 @@ mod tests { assert_eq!(files[0]["file_id"], "cfile_abc"); assert_eq!(files[0]["container_id"], "cntr_test"); } + + /// Parse an SSE `data:` frame into the carried `output_item.` + /// event JSON. + fn parse_sse_event(bytes: &Bytes) -> Value { + let s = std::str::from_utf8(bytes).unwrap(); + let json_str = s.trim().strip_prefix("data: ").unwrap(); + serde_json::from_str(json_str).unwrap() + } + + /// The terminal `done` events must arrive output-then-call so that a + /// consumer treating `shell_call → done` as "fully resolved" never + /// races the paired `shell_call_output`. Covers the failure path + /// directly; the success path emits the same pair in the same order. + #[tokio::test] + async fn failure_done_emits_output_before_call() { + let (tx, mut rx) = mpsc::channel::(8); + emit_failure_done( + &tx, + "call_1", + &["echo hi".to_string()], + None, + None, + None, + None, + "boot failed", + ) + .await; + drop(tx); + + let first = parse_sse_event(&rx.recv().await.unwrap()); + let second = parse_sse_event(&rx.recv().await.unwrap()); + assert!(rx.recv().await.is_none(), "exactly two events expected"); + + assert_eq!(first["type"], "response.output_item.done"); + assert_eq!(first["item"]["type"], "shell_call_output"); + assert_eq!(first["item"]["call_id"], "call_1"); + + assert_eq!(second["type"], "response.output_item.done"); + assert_eq!(second["item"]["type"], "shell_call"); + assert_eq!(second["item"]["call_id"], "call_1"); + } + + /// The production success path (every normal shell execution) must + /// emit the output item's `done` before the call's `done`, with both + /// carrying `completed` for a clean exit. Guards against a future + /// re-inversion of the `send` order inside `ShellExecutor`. + #[tokio::test] + async fn success_done_emits_output_before_call() { + let (tx, mut rx) = mpsc::channel::(8); + emit_success_done( + &tx, + "call_1", + &["echo hi".to_string()], + None, + None, + None, + None, + 0, + "hi\n", + "", + &[], + false, + None, + ) + .await; + drop(tx); + + let first = parse_sse_event(&rx.recv().await.unwrap()); + let second = parse_sse_event(&rx.recv().await.unwrap()); + assert!(rx.recv().await.is_none(), "exactly two events expected"); + + assert_eq!(first["type"], "response.output_item.done"); + assert_eq!(first["item"]["type"], "shell_call_output"); + assert_eq!(first["item"]["status"], "completed"); + assert_eq!(first["item"]["output"][0]["stdout"], "hi\n"); + + assert_eq!(second["type"], "response.output_item.done"); + assert_eq!(second["item"]["type"], "shell_call"); + assert_eq!(second["item"]["status"], "completed"); + assert_eq!(second["item"]["call_id"], "call_1"); + } }