Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- Remote mode: `ask_user` now reaches both the editor and connected SSE/web clients simultaneously, with the first answer from either winning, instead of only the web client when one is connected.

- Remote REST API: `GET /api/v1/chats/:id` `pendingToolCalls` now includes `ask_user` tool calls while waiting for an answer, with a `requestId` field for `POST /api/v1/answer`.

- Add OpenAI Responses API for GitHub Copilot models that require it (gpt-5.5, gpt-5.4-mini).
Expand Down
47 changes: 28 additions & 19 deletions src/eca/remote/messenger.clj
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,26 @@
(editor-diagnostics [_this uri]
(messenger/editor-diagnostics inner uri))
(ask-question [_this params]
;; No SSE clients: fall back to inner so JSON-RPC editor sessions work
;; unchanged. Otherwise reuse the caller-supplied :request-id (set by
;; ask_user to match the id in tool-call state) and route the answer via
;; SSE + /api/v1/answer.
;; No SSE clients: delegate to inner. Otherwise ask both the editor (inner)
;; and SSE clients; the first answer wins.
(if (empty? @sse-connections*)
(messenger/ask-question inner params)
(let [request-id (or (:request-id params) (str (random-uuid)))
p (promise)
wire-params (-> params (dissoc :request-id) (assoc :requestId request-id))]
(swap! pending-questions* assoc request-id p)
result (promise)
inner-result (messenger/ask-question inner params)
wire-params (-> params (dissoc :request-id) (assoc :requestId request-id))
;; Watch the editor's answer and forward it; deliver is idempotent
;; so the first answer (editor or SSE) wins.
watcher (future
(try
(deliver result (deref inner-result))
(catch Throwable _ nil)
(finally
(swap! pending-questions* dissoc request-id))))]
(swap! pending-questions* assoc request-id
{:promise result :inner-result inner-result :watcher watcher})
(sse/broadcast! sse-connections* "chat:ask-question" (->camel wire-params))
p))))
result))))

(defn make-broadcast-messenger
"Creates a BroadcastMessenger with a fresh pending-questions registry.
Expand All @@ -89,17 +97,18 @@
(->BroadcastMessenger inner sse-connections* (atom {})))

(defn answer-question!
"Resolves a pending question (previously registered by `ask-question`) by
request-id. Delivers `{:answer answer :cancelled (boolean cancelled)}` to
the registered promise and removes the entry from the registry.
Returns true if a pending question was found and delivered, nil otherwise
(e.g. unknown or already-answered request-id).

Uses `swap-vals!` so that claiming the entry is a single atomic op:
under concurrent answer-question! calls for the same request-id only the
caller that wins the swap observes the entry in `old` and delivers."
"Resolves a pending question by request-id: delivers
`{:answer answer :cancelled (boolean cancelled)}` to the registered promise,
cancels the editor's pending request (sending `$/cancelRequest`) and its
watcher, and removes the entry. Returns true when a pending question was
found and delivered, nil otherwise.

Uses `swap-vals!` so claiming the entry is a single atomic op: under
concurrent calls for the same request-id only the swap winner delivers."
[{:keys [pending-questions*]} request-id answer cancelled]
(let [[old _new] (swap-vals! pending-questions* dissoc request-id)]
(when-let [p (get old request-id)]
(deliver p {:answer answer :cancelled (boolean cancelled)})
(when-let [{:keys [promise inner-result watcher]} (get old request-id)]
(deliver promise {:answer answer :cancelled (boolean cancelled)})
(when (future? inner-result) (future-cancel inner-result))
(when watcher (future-cancel watcher))
true)))
2 changes: 2 additions & 0 deletions test/eca/remote/handlers_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@

(deftest handle-answer-question-test
(let [inner (h/messenger)
;; Editor doesn't answer; isolates the SSE path (inner is also asked).
_ (reset! (:ask-question-response* inner) :block)
sse-connections* (sse/create-connections)
broadcast-messenger (remote.messenger/make-broadcast-messenger inner sse-connections*)
os (java.io.ByteArrayOutputStream.)
Expand Down
84 changes: 84 additions & 0 deletions test/eca/remote/messenger_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
(deftest ask-question-broadcasts-and-resolves-via-answer-test
(testing "ask-question registers a promise, broadcasts SSE, and answer-question! resolves it"
(let [inner (h/messenger)
;; Editor doesn't answer; isolates the SSE path (inner is also asked).
_ (reset! (:ask-question-response* inner) :block)
sse-connections* (sse/create-connections)
broadcast-messenger (remote.messenger/make-broadcast-messenger inner sse-connections*)
os (java.io.ByteArrayOutputStream.)
Expand All @@ -89,6 +91,7 @@
(deftest ask-question-uses-caller-supplied-request-id-test
(testing "caller-supplied :request-id is used as the SSE requestId and pending-questions* key"
(let [inner (h/messenger)
_ (reset! (:ask-question-response* inner) :block)
sse-connections* (sse/create-connections)
broadcast-messenger (remote.messenger/make-broadcast-messenger inner sse-connections*)
os (java.io.ByteArrayOutputStream.)
Expand Down Expand Up @@ -123,3 +126,84 @@
sse-connections* (sse/create-connections)
broadcast-messenger (remote.messenger/make-broadcast-messenger inner sse-connections*)]
(is (nil? (remote.messenger/answer-question! broadcast-messenger "nonexistent" "x" false))))))

;;; ask-question dual-dispatch: with both an SSE client and the editor (inner)
;;; connected, the question reaches both and the first answer wins.

(deftest ask-question-reaches-editor-and-sse-when-both-connected-test
(testing "with an SSE client connected, inner (editor) still receives chat/askQuestion"
(let [inner-params* (atom nil)
inner (reify messenger/IMessenger
(ask-question [_ params]
(reset! inner-params* params)
(promise)))
sse-connections* (sse/create-connections)
broadcast-messenger (remote.messenger/make-broadcast-messenger inner sse-connections*)
os (java.io.ByteArrayOutputStream.)
_client (sse/add-client! sse-connections* os)]
(messenger/ask-question broadcast-messenger
{:chat-id "c1" :question "Why?" :request-id "req-1"})
(Thread/sleep 100)
(is (some? @inner-params*)
"editor must receive the question even when an SSE client is connected")
(is (.contains (.toString os "UTF-8") "chat:ask-question")
"SSE clients must also receive the question"))))

(deftest ask-question-sse-answer-wins-test
(testing "an SSE answer resolves the call when both transports are connected"
(let [inner (reify messenger/IMessenger
;; editor never answers
(ask-question [_ _params] (promise)))
sse-connections* (sse/create-connections)
broadcast-messenger (remote.messenger/make-broadcast-messenger inner sse-connections*)
os (java.io.ByteArrayOutputStream.)
_client (sse/add-client! sse-connections* os)
result (messenger/ask-question broadcast-messenger
{:chat-id "c1" :question "Q?" :request-id "req-1"})
watcher (:watcher (get @(:pending-questions* broadcast-messenger) "req-1"))]
(Thread/sleep 50)
(is (= :pending (deref result 1 :pending))
"result should block until someone answers")
(remote.messenger/answer-question! broadcast-messenger "req-1" "via-sse" false)
(is (= {:answer "via-sse" :cancelled false} (deref result 1000 :timeout)))
;; The editor watcher must be cancelled so it doesn't park forever on an
;; editor that may never answer.
(is (future-cancelled? watcher)))))

(deftest ask-question-sse-answer-retracts-editor-request-test
(testing "an SSE answer cancels the editor's outstanding request (→ $/cancelRequest)"
;; A CompletableFuture models the jsonrpc PendingRequest: future-cancellable,
;; and cancelling it is what fires $/cancelRequest in the real ServerMessenger.
(let [inner-result (java.util.concurrent.CompletableFuture.)
inner (reify messenger/IMessenger
(ask-question [_ _params] inner-result))
sse-connections* (sse/create-connections)
broadcast-messenger (remote.messenger/make-broadcast-messenger inner sse-connections*)
os (java.io.ByteArrayOutputStream.)
_client (sse/add-client! sse-connections* os)
result (messenger/ask-question broadcast-messenger
{:chat-id "c1" :question "Q?" :request-id "req-1"})]
(Thread/sleep 50)
(remote.messenger/answer-question! broadcast-messenger "req-1" "via-sse" false)
(is (= {:answer "via-sse" :cancelled false} (deref result 1000 :timeout)))
(is (future-cancelled? inner-result)
"the editor's pending request must be cancelled so the server retracts it"))))

(deftest ask-question-editor-answer-wins-test
(testing "an editor answer resolves the call and cleans up the SSE pending entry"
(let [inner-promise (promise)
inner (reify messenger/IMessenger
(ask-question [_ _params] inner-promise))
sse-connections* (sse/create-connections)
broadcast-messenger (remote.messenger/make-broadcast-messenger inner sse-connections*)
os (java.io.ByteArrayOutputStream.)
_client (sse/add-client! sse-connections* os)
result (messenger/ask-question broadcast-messenger
{:chat-id "c1" :question "Q?" :request-id "req-1"})]
(Thread/sleep 50)
(deliver inner-promise {:answer "via-editor" :cancelled false})
(is (= {:answer "via-editor" :cancelled false} (deref result 1000 :timeout))
"editor's answer must resolve the call")
(Thread/sleep 50)
(is (empty? @(:pending-questions* broadcast-messenger))
"answering via the editor must clear the SSE pending entry so a late /answer is a no-op"))))
Loading