Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 172 additions & 39 deletions src/services/shell_tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,10 @@ fn format_shell_call_output_item(
/// before producing real output (boot failure, passthrough misconfig,
/// exec error). Both items carry `status: "incomplete"`; the output
/// item's stderr surfaces the error message so the model can react.
///
/// The output item's `done` is sent before the call's `done` for the
/// same reason as the success path: the call resolving must reliably
/// mean its paired output is already on the wire.
#[allow(clippy::too_many_arguments)]
async fn emit_failure_done(
event_tx: &mpsc::Sender<Bytes>,
Expand All @@ -1127,6 +1131,21 @@ async fn emit_failure_done(
working_directory: Option<&str>,
stderr: &str,
) {
let _ = event_tx
.send(format_shell_call_output_item(
ItemLifecycle::Done,
id,
id,
0,
-1,
"",
stderr,
&[],
true,
max_output_length,
Some("gateway"),
))
.await;
let _ = event_tx
.send(format_shell_call_item(
ItemLifecycle::Done,
Expand All @@ -1143,21 +1162,71 @@ async fn emit_failure_done(
Some("model"),
))
.await;
}

/// Emit the spec-canonical `output_item.done` events for a shell call
/// that produced real output (normal exit, non-zero exit, or timeout).
///
/// Ordering invariant: the `shell_call_output` item's terminal `done` is
/// sent BEFORE the `shell_call`'s. Hadrian overloads `shell_call.status`
/// with the execution outcome (`completed` vs `incomplete`), so consumers
/// reasonably read the call's `done(completed)` as "execution finished —
/// output available". Emitting the output first makes that a reliable
/// signal: by the time the call resolves, its paired output (same
/// `call_id`) is already on the wire. (Events share one ordered channel,
/// so this is a strict wire ordering, not a hint.)
///
/// `killed` drives both terminal statuses (`incomplete` on a timeout /
/// no-exit, `completed` otherwise) so the call and its output can never
/// disagree.
#[allow(clippy::too_many_arguments)]
async fn emit_success_done(
event_tx: &mpsc::Sender<Bytes>,
id: &str,
commands: &[String],
timeout_ms: Option<u64>,
max_output_length: Option<usize>,
env: Option<&HashMap<String, String>>,
working_directory: Option<&str>,
exit_code: i32,
stdout: &str,
stderr: &str,
files: &[ContainerFileRef],
killed: bool,
environment: Option<&serde_json::Value>,
) {
let _ = event_tx
.send(format_shell_call_output_item(
ItemLifecycle::Done,
id,
id,
0,
-1,
"",
exit_code,
stdout,
stderr,
&[],
true,
files,
killed,
max_output_length,
Some("gateway"),
))
.await;
let shell_call_status = if killed { "incomplete" } else { "completed" };
let _ = event_tx
.send(format_shell_call_item(
ItemLifecycle::Done,
id,
id,
0,
commands,
timeout_ms,
max_output_length,
env,
working_directory,
shell_call_status,
environment,
Some("model"),
))
.await;
}

// ─────────────────────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -2251,45 +2320,28 @@ impl ServerExecutedTool for ShellExecutor {
std::mem::replace(&mut stdout_buf, BoundedHeadTail::new(max_chars)).into_trimmed();
let stderr_render =
std::mem::replace(&mut stderr_buf, BoundedHeadTail::new(max_chars)).into_trimmed();
// Spec status: `incomplete` when we killed/timed out,
// `completed` otherwise (a non-zero exit code still counts
// as `completed` per spec).
let shell_call_status = if killed { "incomplete" } else { "completed" };
let environment_val = serde_json::json!({
"type": "container_reference",
"container_id": container_id_for_items,
});
let _ = event_tx
.send(format_shell_call_item(
ItemLifecycle::Done,
&id_for_task,
&id_for_task,
0,
&commands_for_task,
action_timeout_ms_for_task,
model_max_output_length,
action_env_for_task.as_ref(),
working_directory_for_task.as_deref(),
shell_call_status,
Some(&environment_val),
Some("model"),
))
.await;
let _ = event_tx
.send(format_shell_call_output_item(
ItemLifecycle::Done,
&id_for_task,
&id_for_task,
0,
exit_for_report,
&stdout_render,
&stderr_render,
&new_files,
killed,
model_max_output_length,
Some("gateway"),
))
.await;
// Terminal `done` pair, output-before-call — see
// `emit_success_done` for the ordering invariant.
emit_success_done(
&event_tx,
&id_for_task,
&commands_for_task,
action_timeout_ms_for_task,
model_max_output_length,
action_env_for_task.as_ref(),
working_directory_for_task.as_deref(),
exit_for_report,
&stdout_render,
&stderr_render,
&new_files,
killed,
Some(&environment_val),
)
.await;
info!(
stage = "shell_completed",
call_id = %id_for_task,
Expand Down Expand Up @@ -3331,4 +3383,85 @@ mod tests {
assert_eq!(files[0]["file_id"], "cfile_abc");
assert_eq!(files[0]["container_id"], "cntr_test");
}

/// Parse an SSE `data:` frame into the carried `output_item.<verb>`
/// event JSON.
fn parse_sse_event(bytes: &Bytes) -> Value {
let s = std::str::from_utf8(bytes).unwrap();
let json_str = s.trim().strip_prefix("data: ").unwrap();
serde_json::from_str(json_str).unwrap()
}

/// The terminal `done` events must arrive output-then-call so that a
/// consumer treating `shell_call → done` as "fully resolved" never
/// races the paired `shell_call_output`. Covers the failure path
/// directly; the success path emits the same pair in the same order.
#[tokio::test]
async fn failure_done_emits_output_before_call() {
let (tx, mut rx) = mpsc::channel::<Bytes>(8);
emit_failure_done(
&tx,
"call_1",
&["echo hi".to_string()],
None,
None,
None,
None,
"boot failed",
)
.await;
drop(tx);

let first = parse_sse_event(&rx.recv().await.unwrap());
let second = parse_sse_event(&rx.recv().await.unwrap());
assert!(rx.recv().await.is_none(), "exactly two events expected");

assert_eq!(first["type"], "response.output_item.done");
assert_eq!(first["item"]["type"], "shell_call_output");
assert_eq!(first["item"]["call_id"], "call_1");

assert_eq!(second["type"], "response.output_item.done");
assert_eq!(second["item"]["type"], "shell_call");
assert_eq!(second["item"]["call_id"], "call_1");
}

/// The production success path (every normal shell execution) must
/// emit the output item's `done` before the call's `done`, with both
/// carrying `completed` for a clean exit. Guards against a future
/// re-inversion of the `send` order inside `ShellExecutor`.
#[tokio::test]
async fn success_done_emits_output_before_call() {
let (tx, mut rx) = mpsc::channel::<Bytes>(8);
emit_success_done(
&tx,
"call_1",
&["echo hi".to_string()],
None,
None,
None,
None,
0,
"hi\n",
"",
&[],
false,
None,
)
.await;
drop(tx);

let first = parse_sse_event(&rx.recv().await.unwrap());
let second = parse_sse_event(&rx.recv().await.unwrap());
assert!(rx.recv().await.is_none(), "exactly two events expected");

assert_eq!(first["type"], "response.output_item.done");
assert_eq!(first["item"]["type"], "shell_call_output");
assert_eq!(first["item"]["status"], "completed");
assert_eq!(first["item"]["output"][0]["stdout"], "hi\n");

assert_eq!(second["type"], "response.output_item.done");
assert_eq!(second["item"]["type"], "shell_call");
assert_eq!(second["item"]["status"], "completed");
assert_eq!(second["item"]["call_id"], "call_1");
}
}
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Loading