Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/jp_attachment_bear_note/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ Fetch a single note by its unique identifier:
```sh
# You can copy the note ID using <kbd>⌥⇧⌘I</kbd>
jp attachment add "bear://get/2356A6D7-49D7-4818-8E37-3E02D1B95146"

# Shorthand for the same:
jp attachment add "bear:2356A6D7-49D7-4818-8E37-3E02D1B95146"
```

Fetch a list of notes based on a search query:
Expand Down
3 changes: 3 additions & 0 deletions crates/jp_attachment_bear_note/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ fn uri_to_query(uri: &Url) -> Result<Query, Box<dyn Error + Send + Sync>> {

Query::Search { query: path, tags }
}
// Shorthand: `bear:NOTE_ID`. Opaque form (no `//`) has no host;
// the path holds the note id directly. Mirrors the gh handler.
None if !path.is_empty() => Query::Get(path),
_ => return Err("Invalid bear note query".into()),
};

Expand Down
7 changes: 7 additions & 0 deletions crates/jp_attachment_bear_note/src/lib_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ fn test_uri_to_query() {
Ok(Query::Get("123-456".to_string())),
),
("bear://get/1", Ok(Query::Get("1".to_string()))),
(
"bear:F70FD86D-752F-403D-A517-BF020591F530",
Ok(Query::Get(
"F70FD86D-752F-403D-A517-BF020591F530".to_string(),
)),
),
("bear:", Err("Invalid bear note query".to_string())),
(
"bear://get/tag%20%231",
Ok(Query::Get("tag #1".to_string())),
Expand Down
82 changes: 82 additions & 0 deletions crates/jp_cli/src/cmd/conversation/print_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,88 @@ fn prints_structured_data() {
assert!(output.contains("```json"), "got: {output}");
}

/// Regression: replay must close the structured `json` fence. Before this
/// fix, `TurnRenderer::flush()` only flushed the chat sub-renderer, so a
/// conversation ending on a `ChatResponse::Structured` printed an opening
/// ```json with no matching close.
#[test]
fn structured_fence_is_closed_at_end_of_replay() {
let data = json!({"name": "Alice"});
let (mut ctx, id, out, _err, _rt) = setup_ctx(vec![ConversationEvent::new(
ChatResponse::structured(data),
ts(0, 0, 0),
)]);

let print = Print {
target: PositionalIds::from_targets(vec![ConversationTarget::Id(id)]),
last: None,
turn: None,
current_config: false,
style: None,
};
let h = ctx.workspace.acquire_conversation(&id).unwrap();
print.run(&mut ctx, &[h]).unwrap();
ctx.printer.flush();

let output = strip_ansi(&out.lock());
let opens = output.matches("```json").count();
// Closing fence is a backtick triple at line start with nothing after.
let closes = output.lines().filter(|l| l.trim_end() == "```").count();
assert_eq!(
opens, closes,
"opening and closing ``` fences must match, got opens={opens} closes={closes}, output: \
{output:?}"
);
}

/// Regression: a message following a structured response in the same
/// conversation must render OUTSIDE the `json` fence — the fence is
/// closed at the role/content boundary, not left open until end-of-stream.
#[test]
fn structured_response_followed_by_message_closes_fence_first() {
let (mut ctx, id, out, _err, _rt) = setup_ctx(vec![
ConversationEvent::new(TurnStart, ts(0, 0, 0)),
ConversationEvent::new(ChatRequest::from("Extract"), ts(0, 0, 1)),
ConversationEvent::new(
ChatResponse::structured(json!({"name": "Alice"})),
ts(0, 0, 2),
),
ConversationEvent::new(TurnStart, ts(0, 1, 0)),
ConversationEvent::new(ChatRequest::from("Thanks"), ts(0, 1, 1)),
ConversationEvent::new(ChatResponse::message("You're welcome.\n\n"), ts(0, 1, 2)),
]);

let print = Print {
target: PositionalIds::from_targets(vec![ConversationTarget::Id(id)]),
last: None,
turn: None,
current_config: false,
style: None,
};
let h = ctx.workspace.acquire_conversation(&id).unwrap();
print.run(&mut ctx, &[h]).unwrap();
ctx.printer.flush();

let output = strip_ansi(&out.lock());

// The trailing message text must not appear before the JSON fence has
// been closed. Find the index of the JSON close (a `\`\`\`` line
// standing alone) that follows the opening `\`\`\`json`, and assert
// the message body shows up only after it.
let open_idx = output.find("```json").expect("expected an opening fence");
let close_idx = output[open_idx..]
.find("\n```")
.map(|i| open_idx + i)
.expect("expected a closing fence after the opening one");
let welcome_idx = output
.find("You're welcome.")
.expect("expected the trailing message text");
assert!(
welcome_idx > close_idx,
"trailing message must render after the JSON fence is closed; output: {output:?}"
);
}

#[test]
fn turn_separators_between_turns() {
let (mut ctx, id, out, _err, _rt) = setup_ctx(vec![
Expand Down
22 changes: 11 additions & 11 deletions crates/jp_cli/src/cmd/query/tool/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
//! the stream first or add a new public API method, which is the visible
//! smell that earlier conventions lacked.
//!
//! See also: the `JP refactor` Bear note for the prep-flow unification
//! follow-up.
//!
//! [`ToolCallResponse`]: jp_conversation::event::ToolCallResponse
use std::collections::HashMap;

Expand Down Expand Up @@ -110,18 +107,21 @@ pub(crate) struct ExecutionPlan {
items: Vec<PlanItem>,

/// Tool-call requests that appear in the stream's current turn without a
/// matching response AND without a matching entry in `PendingTools`.
/// Should be empty in correct operation; a non-empty vec signals a
/// contract violation (some path added a `ToolCallRequest` to the stream
/// without going through the streaming-phase preparation flow). The
/// caller decides what to do — synthesize an error response, log, etc.
orphaned: Vec<ToolCallRequest>,
/// matching response AND without a matching entry in `PendingTools`,
/// paired with the plan index they would have occupied in document
/// order. Should be empty in correct operation; a non-empty vec signals
/// a contract violation (some path added a `ToolCallRequest` to the
/// stream without going through the streaming-phase preparation flow).
/// The caller decides what to do — synthesize an error response, log,
/// etc. — but MUST preserve the carried index when committing a
/// response so stream order is retained.
orphaned: Vec<(usize, ToolCallRequest)>,
}

impl ExecutionPlan {
/// Decompose the plan into its parts. Consumes the plan; there's no
/// way to read it twice.
pub(crate) fn into_parts(self) -> (Vec<PlanItem>, Vec<ToolCallRequest>) {
pub(crate) fn into_parts(self) -> (Vec<PlanItem>, Vec<(usize, ToolCallRequest)>) {
(self.items, self.orphaned)
}

Expand Down Expand Up @@ -177,7 +177,7 @@ pub(crate) fn build_execution_plan(
request: request.clone(),
work,
}),
None => orphaned.push(request.clone()),
None => orphaned.push((index, request.clone())),
}
}

Expand Down
41 changes: 40 additions & 1 deletion crates/jp_cli/src/cmd/query/tool/pending_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,44 @@ fn responded_requests_are_skipped() {
assert_eq!(items[0].index, 0);
}

/// Orphans interleaved with non-orphan items must keep the plan index
/// they would have occupied in document order. The downstream
/// `commit_tool_responses` sorts by that index, so any other choice would
/// reorder responses relative to their requests in the persisted stream.
#[test]
fn orphan_index_matches_stream_position() {
let mut stream = ConversationStream::new_test();
stream.start_turn(ChatRequest::from("user"));
stream
.current_turn_mut()
.add_tool_call_request(req("a", "tool_a"))
.add_tool_call_request(req("orphan", "tool_orphan"))
.add_tool_call_request(req("b", "tool_b"))
.build()
.unwrap();

let mut pending = PendingTools::new();
pending.insert_approved("a".into(), approved_executor("a", "tool_a"));
pending.insert_approved("b".into(), approved_executor("b", "tool_b"));
// No entry for `orphan` — it falls through to the orphaned vec.

let plan = build_execution_plan(&stream, &mut pending);
let (items, orphaned) = plan.into_parts();

assert_eq!(items.len(), 2);
assert_eq!(items[0].request.id, "a");
assert_eq!(items[0].index, 0);
assert_eq!(items[1].request.id, "b");
assert_eq!(items[1].index, 2);

assert_eq!(orphaned.len(), 1);
assert_eq!(
orphaned[0].0, 1,
"orphan must keep its document-order index"
);
assert_eq!(orphaned[0].1.id, "orphan");
}

/// A request in the stream without a matching pending entry is reported as
/// orphaned. In normal operation this should never happen — the streaming
/// phase always populates pending for every request it adds. Treating it
Expand All @@ -121,7 +159,8 @@ fn unmatched_request_is_reported_as_orphaned() {

assert!(items.is_empty());
assert_eq!(orphaned.len(), 1);
assert_eq!(orphaned[0].id, "ghost");
assert_eq!(orphaned[0].0, 0);
assert_eq!(orphaned[0].1.id, "ghost");
}

/// The plan walks only the current (most recent) turn. Tool calls from a
Expand Down
15 changes: 11 additions & 4 deletions crates/jp_cli/src/cmd/query/turn/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl TurnCoordinator {
for event in self.event_builder.drain() {
self.push_event(stream, event);
}
self.view.flush_all();
self.view.flush();
self.transition_from_streaming(stream, reason)
}

Expand Down Expand Up @@ -367,12 +367,19 @@ impl TurnCoordinator {
self.push_event(conversation_stream, ChatResponse::message(&partial));
}

// Add user's reply as a new request
self.push_event(conversation_stream, ChatRequest {
// Add user's reply as a new request, then render it through
// the view so the live terminal gets the same labeled
// user header replay would emit for this `ChatRequest`.
// `render_user_request` also resets the assistant-header
// gate, so the next assistant chunk will print a fresh
// `── jp …` header.
let request = ChatRequest {
content,
schema: None,
author: self.author.clone(),
});
};
self.view.render_user_request(&request);
self.push_event(conversation_stream, request);
self.prepare_continuation();
}

Expand Down
93 changes: 93 additions & 0 deletions crates/jp_cli/src/cmd/query/turn/coordinator_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ use jp_printer::{OutputFormat, Printer};
use serde_json::{Map, json};

use super::{super::state::TurnState, *};
use crate::cmd::query::interrupt::InterruptAction;

/// Strip ANSI escape codes for readable assertions on captured stdout.
fn strip_ansi(s: &str) -> String {
let bytes = strip_ansi_escapes::strip(s);
String::from_utf8(bytes).expect("valid utf-8 after stripping ANSI")
}

#[test]
fn test_transitions_to_executing_on_tool_call() {
Expand Down Expand Up @@ -418,3 +425,89 @@ fn test_structured_not_routed_to_chat_renderer() {
"Expected code fence, got: {output:?}"
);
}

/// Regression: if the user interrupts before any chunk has arrived and
/// chooses Continue, the next assistant event MUST emit a fresh role
/// header. The previous behaviour forced `assistant_header_rendered =
/// true` in `reset_for_continuation`, which suppressed the header
/// unconditionally and left the resumed output without a `── jp …`
/// boundary.
#[test]
fn interrupt_continue_before_first_chunk_emits_assistant_header_on_resume() {
let mut stream = ConversationStream::new_test();
let (printer, out, _) = Printer::memory(OutputFormat::Text);
let printer = Arc::new(printer);
let mut coordinator = TurnCoordinator::new(
Arc::clone(&printer),
AppConfig::new_test().style,
None,
None,
Some("anthropic/test".into()),
);

coordinator.start_turn(&mut stream, ChatRequest::from("hello"));

// No chunks have arrived yet — the assistant header has NOT been emitted.
coordinator.handle_streaming_interrupt(InterruptAction::Continue, &mut stream);

// Resumed cycle delivers its first chunk.
coordinator.handle_event(&mut stream, Event::message(0, "hi there"));
coordinator.handle_event(&mut stream, Event::flush(0));
coordinator.handle_event(&mut stream, Event::Finished(FinishReason::Completed));

printer.flush();
let output = strip_ansi(&out.lock());
assert!(
output.contains("\u{2500}\u{2500} jp "),
"resumed assistant content must be preceded by a `── jp` header, got: {output:?}"
);
}

/// Regression: a Reply interrupt inserts a new `ChatRequest` boundary,
/// which in replay would render a labeled user header AND a fresh
/// assistant header for the following content. Live mode must match.
#[test]
fn interrupt_reply_renders_user_header_for_new_request() {
let mut stream = ConversationStream::new_test();
let (printer, out, _) = Printer::memory(OutputFormat::Text);
let printer = Arc::new(printer);
let mut coordinator = TurnCoordinator::new(
Arc::clone(&printer),
AppConfig::new_test().style,
Some("alice".into()),
None,
Some("anthropic/test".into()),
);

coordinator.start_turn(&mut stream, ChatRequest::from("first question"));

// Some assistant content arrives so the assistant header is emitted.
coordinator.handle_event(&mut stream, Event::message(0, "partial answer"));

// User interrupts with a follow-up reply.
coordinator.handle_streaming_interrupt(
InterruptAction::Reply("actually, ignore that".into()),
&mut stream,
);

// Resumed cycle delivers the new assistant content.
coordinator.handle_event(&mut stream, Event::message(1, "new answer"));
coordinator.handle_event(&mut stream, Event::flush(1));
coordinator.handle_event(&mut stream, Event::Finished(FinishReason::Completed));

printer.flush();
let output = strip_ansi(&out.lock());

// The labeled user header for the Reply must show up between the
// partial answer and the resumed assistant content.
let alice_idx = output
.find("\u{2500}\u{2500} alice ")
.unwrap_or_else(|| panic!("expected a `── alice` header for the Reply, got: {output:?}"));

// And a fresh assistant header must follow the user boundary.
let after_alice = &output[alice_idx..];
assert!(
after_alice.contains("\u{2500}\u{2500} jp "),
"expected a fresh `── jp` header after the Reply, got: {output:?}"
);
}
12 changes: 5 additions & 7 deletions crates/jp_cli/src/cmd/query/turn_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,10 +490,9 @@ pub(super) async fn run_turn_loop(
// the unified executing path below picks up.
//
// The streaming and restart prep flows are still two
// separate codepaths today (see Bear note: "unify the
// streaming and restart tool-prep codepaths"). Both
// converge on `pending_tools` and `build_execution_plan`,
// which is the load-bearing invariant for this refactor.
// separate codepaths today; both converge on
// `pending_tools` and `build_execution_plan`, which is the
// load-bearing invariant for this refactor.
if restart_requested {
restart_requested = false;

Expand Down Expand Up @@ -572,15 +571,14 @@ pub(super) async fn run_turn_loop(
// an error response so the conversation stays valid (every
// request must have a response before the next provider
// call) and surface the inconsistency.
for req in orphaned {
for (idx, req) in orphaned {
warn!(
id = %req.id,
name = %req.name,
"ToolCallRequest in stream without a pending entry; synthesizing error \
response.",
);
let next_idx = approved.len() + pre_resolved.len();
pre_resolved.push((next_idx, ToolCallResponse {
pre_resolved.push((idx, ToolCallResponse {
id: req.id,
result: Err(
"Tool call had no prepared executor (internal inconsistency).".into(),
Expand Down
Loading
Loading