Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions crates/jp_cli/src/cmd/conversation/print_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,88 @@ fn prints_structured_data() {
assert!(output.contains("```json"), "got: {output}");
}

/// Regression: replay must close the structured `json` fence. Before this
/// fix, `TurnRenderer::flush()` only flushed the chat sub-renderer, so a
/// conversation ending on a `ChatResponse::Structured` printed an opening
/// ```json with no matching close.
#[test]
fn structured_fence_is_closed_at_end_of_replay() {
let data = json!({"name": "Alice"});
let (mut ctx, id, out, _err, _rt) = setup_ctx(vec![ConversationEvent::new(
ChatResponse::structured(data),
ts(0, 0, 0),
)]);

let print = Print {
target: PositionalIds::from_targets(vec![ConversationTarget::Id(id)]),
last: None,
turn: None,
current_config: false,
style: None,
};
let h = ctx.workspace.acquire_conversation(&id).unwrap();
print.run(&mut ctx, &[h]).unwrap();
ctx.printer.flush();

let output = strip_ansi(&out.lock());
let opens = output.matches("```json").count();
// Closing fence is a backtick triple at line start with nothing after.
let closes = output.lines().filter(|l| l.trim_end() == "```").count();
assert_eq!(
opens, closes,
"opening and closing ``` fences must match, got opens={opens} closes={closes}, output: \
{output:?}"
);
}

/// Regression: a message following a structured response in the same
/// conversation must render OUTSIDE the `json` fence — the fence is
/// closed at the role/content boundary, not left open until end-of-stream.
#[test]
fn structured_response_followed_by_message_closes_fence_first() {
let (mut ctx, id, out, _err, _rt) = setup_ctx(vec![
ConversationEvent::new(TurnStart, ts(0, 0, 0)),
ConversationEvent::new(ChatRequest::from("Extract"), ts(0, 0, 1)),
ConversationEvent::new(
ChatResponse::structured(json!({"name": "Alice"})),
ts(0, 0, 2),
),
ConversationEvent::new(TurnStart, ts(0, 1, 0)),
ConversationEvent::new(ChatRequest::from("Thanks"), ts(0, 1, 1)),
ConversationEvent::new(ChatResponse::message("You're welcome.\n\n"), ts(0, 1, 2)),
]);

let print = Print {
target: PositionalIds::from_targets(vec![ConversationTarget::Id(id)]),
last: None,
turn: None,
current_config: false,
style: None,
};
let h = ctx.workspace.acquire_conversation(&id).unwrap();
print.run(&mut ctx, &[h]).unwrap();
ctx.printer.flush();

let output = strip_ansi(&out.lock());

// The trailing message text must not appear before the JSON fence has
// been closed. Find the index of the JSON close (a `\`\`\`` line
// standing alone) that follows the opening `\`\`\`json`, and assert
// the message body shows up only after it.
let open_idx = output.find("```json").expect("expected an opening fence");
let close_idx = output[open_idx..]
.find("\n```")
.map(|i| open_idx + i)
.expect("expected a closing fence after the opening one");
let welcome_idx = output
.find("You're welcome.")
.expect("expected the trailing message text");
assert!(
welcome_idx > close_idx,
"trailing message must render after the JSON fence is closed; output: {output:?}"
);
}

#[test]
fn turn_separators_between_turns() {
let (mut ctx, id, out, _err, _rt) = setup_ctx(vec![
Expand Down
15 changes: 11 additions & 4 deletions crates/jp_cli/src/cmd/query/turn/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl TurnCoordinator {
for event in self.event_builder.drain() {
self.push_event(stream, event);
}
self.view.flush_all();
self.view.flush();
self.transition_from_streaming(stream, reason)
}

Expand Down Expand Up @@ -367,12 +367,19 @@ impl TurnCoordinator {
self.push_event(conversation_stream, ChatResponse::message(&partial));
}

// Add user's reply as a new request
self.push_event(conversation_stream, ChatRequest {
// Add user's reply as a new request, then render it through
// the view so the live terminal gets the same labeled
// user header replay would emit for this `ChatRequest`.
// `render_user_request` also resets the assistant-header
// gate, so the next assistant chunk will print a fresh
// `── jp …` header.
let request = ChatRequest {
content,
schema: None,
author: self.author.clone(),
});
};
self.view.render_user_request(&request);
self.push_event(conversation_stream, request);
self.prepare_continuation();
}

Expand Down
93 changes: 93 additions & 0 deletions crates/jp_cli/src/cmd/query/turn/coordinator_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ use jp_printer::{OutputFormat, Printer};
use serde_json::{Map, json};

use super::{super::state::TurnState, *};
use crate::cmd::query::interrupt::InterruptAction;

/// Strip ANSI escape codes for readable assertions on captured stdout.
fn strip_ansi(s: &str) -> String {
let bytes = strip_ansi_escapes::strip(s);
String::from_utf8(bytes).expect("valid utf-8 after stripping ANSI")
}

#[test]
fn test_transitions_to_executing_on_tool_call() {
Expand Down Expand Up @@ -418,3 +425,89 @@ fn test_structured_not_routed_to_chat_renderer() {
"Expected code fence, got: {output:?}"
);
}

/// Regression: if the user interrupts before any chunk has arrived and
/// chooses Continue, the next assistant event MUST emit a fresh role
/// header. The previous behaviour forced `assistant_header_rendered =
/// true` in `reset_for_continuation`, which suppressed the header
/// unconditionally and left the resumed output without a `── jp …`
/// boundary.
#[test]
fn interrupt_continue_before_first_chunk_emits_assistant_header_on_resume() {
let mut stream = ConversationStream::new_test();
let (printer, out, _) = Printer::memory(OutputFormat::Text);
let printer = Arc::new(printer);
let mut coordinator = TurnCoordinator::new(
Arc::clone(&printer),
AppConfig::new_test().style,
None,
None,
Some("anthropic/test".into()),
);

coordinator.start_turn(&mut stream, ChatRequest::from("hello"));

// No chunks have arrived yet — the assistant header has NOT been emitted.
coordinator.handle_streaming_interrupt(InterruptAction::Continue, &mut stream);

// Resumed cycle delivers its first chunk.
coordinator.handle_event(&mut stream, Event::message(0, "hi there"));
coordinator.handle_event(&mut stream, Event::flush(0));
coordinator.handle_event(&mut stream, Event::Finished(FinishReason::Completed));

printer.flush();
let output = strip_ansi(&out.lock());
assert!(
output.contains("\u{2500}\u{2500} jp "),
"resumed assistant content must be preceded by a `── jp` header, got: {output:?}"
);
}

/// Regression: a Reply interrupt inserts a new `ChatRequest` boundary,
/// which in replay would render a labeled user header AND a fresh
/// assistant header for the following content. Live mode must match.
#[test]
fn interrupt_reply_renders_user_header_for_new_request() {
let mut stream = ConversationStream::new_test();
let (printer, out, _) = Printer::memory(OutputFormat::Text);
let printer = Arc::new(printer);
let mut coordinator = TurnCoordinator::new(
Arc::clone(&printer),
AppConfig::new_test().style,
Some("alice".into()),
None,
Some("anthropic/test".into()),
);

coordinator.start_turn(&mut stream, ChatRequest::from("first question"));

// Some assistant content arrives so the assistant header is emitted.
coordinator.handle_event(&mut stream, Event::message(0, "partial answer"));

// User interrupts with a follow-up reply.
coordinator.handle_streaming_interrupt(
InterruptAction::Reply("actually, ignore that".into()),
&mut stream,
);

// Resumed cycle delivers the new assistant content.
coordinator.handle_event(&mut stream, Event::message(1, "new answer"));
coordinator.handle_event(&mut stream, Event::flush(1));
coordinator.handle_event(&mut stream, Event::Finished(FinishReason::Completed));

printer.flush();
let output = strip_ansi(&out.lock());

// The labeled user header for the Reply must show up between the
// partial answer and the resumed assistant content.
let alice_idx = output
.find("\u{2500}\u{2500} alice ")
.unwrap_or_else(|| panic!("expected a `── alice` header for the Reply, got: {output:?}"));

// And a fresh assistant header must follow the user boundary.
let after_alice = &output[alice_idx..];
assert!(
after_alice.contains("\u{2500}\u{2500} jp "),
"expected a fresh `── jp` header after the Reply, got: {output:?}"
);
}
49 changes: 33 additions & 16 deletions crates/jp_cli/src/render/turn_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,21 @@ impl TurnView {
}

/// Mark the start of a new turn. The next assistant event will emit a
/// fresh role header.
/// fresh role header. Closes any open structured fence so a turn that
/// ended on a `ChatResponse::Structured` doesn't bleed into the next
/// turn's content.
pub fn begin_turn(&mut self) {
self.structured.flush();
self.assistant_header_rendered = false;
}

/// Render a user request: a labeled role header followed by the request
/// body. Resets assistant-header gating so the next assistant event
/// emits a fresh header.
pub fn render_user_request(&mut self, req: &ChatRequest) {
// Close any open structured fence before the user header so the
// boundary marker isn't rendered inside a `json` block.
self.structured.flush();
let label = req.author.as_deref().unwrap_or(DEFAULT_USER_LABEL);
self.chat.render_role_header(label, None);
self.chat.render_request(&req.content);
Expand All @@ -86,13 +92,17 @@ impl TurnView {
/// role header first if it hasn't been emitted yet for this turn.
///
/// Dispatches structured responses to the structured renderer and
/// everything else (messages, reasoning) to the chat renderer.
/// everything else (messages, reasoning) to the chat renderer. A
/// non-structured response after structured content closes the open
/// `json` fence first; a structured response after non-structured
/// content flushes the chat buffer first.
pub fn render_chat_response(&mut self, resp: &ChatResponse) {
self.ensure_assistant_header();
if resp.is_structured() {
self.chat.flush();
self.structured.render_chunk(resp);
} else {
self.structured.flush();
self.chat.render_response(resp);
}
}
Expand All @@ -101,58 +111,65 @@ impl TurnView {
///
/// Emits the assistant header if not already shown, then flushes the
/// chat buffer so surrounding messages render as distinct paragraphs.
/// Also closes any open structured fence — a tool call after
/// structured output is a content boundary that must not stay inside
/// the `json` block.
///
/// `hidden` controls whether the chat renderer transitions into the
/// `ToolCall` content kind: passing `true` keeps the boundary
/// invisible (suitable for hidden tool calls so the next message
/// doesn't pick up an extra blank line); `false` is the normal case
/// where tool UI follows.
pub fn enter_tool_call(&mut self, hidden: bool) {
self.ensure_assistant_header();
self.structured.flush();
self.chat.flush();
if !hidden {
self.chat.transition_to_tool_call();
}
}

/// Flush pending chat output.
pub fn flush(&mut self) {
self.chat.flush();
}

/// Flush pending output for both chat and structured renderers.
/// Flush pending output across both chat and structured renderers.
///
/// Used at the end of a streaming cycle so any unclosed JSON code
/// fence gets its closing fence before the next phase starts.
pub fn flush_all(&mut self) {
/// Closes any open `json` fence and drains any buffered chat content.
/// Safe to call at any boundary; in particular, replay's final flush
/// after the last turn relies on this to terminate a trailing
/// structured response.
pub fn flush(&mut self) {
self.chat.flush();
self.structured.flush();
}

/// Reset internal renderer state, discarding partial buffers.
///
/// Used when a streaming cycle is interrupted and a new one begins
/// (e.g. interrupt-with-prefill). Crucially, this leaves the
/// assistant-header flag set: the continuation is part of the same
/// turn, so re-emitting the header would be a duplicate.
/// (e.g. interrupt-with-prefill). Preserves the existing
/// `assistant_header_rendered` flag: if a header was already on the
/// terminal before the interrupt, the continuation is part of the
/// same assistant turn and must not re-emit it; if no header had been
/// rendered yet (the user interrupted before the first chunk), the
/// flag stays `false` and the next assistant event will emit one.
pub fn reset_for_continuation(&mut self) {
self.chat.reset();
self.structured.reset();
self.assistant_header_rendered = true;
}

/// Replace the underlying renderers and identity.
///
/// Used by replay's per-turn config rebuild when the conversation's
/// historical config differs from the workspace's current config.
/// Header gating state is preserved (the rebuild itself doesn't open
/// a new turn).
/// a new turn). Flushes both sub-renderers before swapping them out so
/// any open `json` fence or buffered chat output is committed before
/// the new instances take over.
pub fn reconfigure(
&mut self,
printer: Arc<Printer>,
style: StyleConfig,
assistant_name: Option<String>,
model_id: Option<String>,
) {
self.flush();
self.chat = ChatRenderer::new(printer.clone(), style);
self.structured = StructuredRenderer::new(printer);
self.assistant_name = assistant_name;
Expand Down
Loading