Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 27 additions & 11 deletions pkg/runtime/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ func (r *LocalRuntime) registerDefaultTools() {
})
}

// appendSteerAndEmit adds a steer message to the session and emits the corresponding event.
func (r *LocalRuntime) appendSteerAndEmit(sess *session.Session, sm QueuedMessage, events chan<- Event) {
sess.AddMessage(session.UserMessage(sm.Content, sm.MultiContent...))
events <- UserMessage(sm.Content, sess.ID, sm.MultiContent, len(sess.Messages)-1)
}

// finalizeEventChannel performs cleanup at the end of a RunStream goroutine:
// restores the previous elicitation channel, emits the StreamStopped event,
// fires hooks, and closes the events channel.
Expand Down Expand Up @@ -294,6 +300,16 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c
}
}

// Drain steer messages queued while idle or before the first model call
// (covers idle-window and first-turn-miss races).
if steered := r.steerQueue.Drain(ctx); len(steered) > 0 {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why this one is needed here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 cases:

  • if a user sends a steering message while the turn is being ended, you end up with the steering messages being sent after the next tool call of the next turn instead of immediately
  • when used as a library, in an app that consider all messages as "interrupting" - a la Claude Code - this simplify things a lot: you do not have to treat messages sent while the runner is idle differently than when it is active: they all come from the same queue

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not talking about the case when the turn is being ended, that's lower in the code. I'm asking why are we steering before the turn starts

messageCountBeforeSteer := len(sess.GetAllMessages())
for _, sm := range steered {
r.appendSteerAndEmit(sess, sm, events)
}
r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeSteer, events)
}

messages := sess.GetMessages(a)
slog.Debug("Retrieved messages for processing", "agent", a.Name(), "message_count", len(messages))

Expand Down Expand Up @@ -418,19 +434,10 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c
// Record per-toolset model override for the next LLM turn.
toolModelOverride = resolveToolCallModelOverride(res.Calls, agentTools)

// --- STEERING: mid-turn injection ---
// Drain ALL pending steer messages. These are urgent course-
// corrections that the model should see on the very next
// iteration, wrapped in <system-reminder> tags.
// Drain steer messages that arrived during tool calls.
if steered := r.steerQueue.Drain(ctx); len(steered) > 0 {
for _, sm := range steered {
wrapped := fmt.Sprintf(
"<system-reminder>\nThe user sent the following message while you were working:\n%s\n\nPlease address this in your next response while continuing with your current tasks.\n</system-reminder>",
sm.Content,
)
userMsg := session.UserMessage(wrapped, sm.MultiContent...)
sess.AddMessage(userMsg)
events <- UserMessage(sm.Content, sess.ID, sm.MultiContent, len(sess.Messages)-1)
r.appendSteerAndEmit(sess, sm, events)
}

r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events)
Expand All @@ -441,6 +448,15 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c
slog.Debug("Conversation stopped", "agent", a.Name())
r.executeStopHooks(ctx, sess, a, res.Content, events)

// Re-check steer queue: closes the race between the mid-loop drain and this stop.
if steered := r.steerQueue.Drain(ctx); len(steered) > 0 {
for _, sm := range steered {
r.appendSteerAndEmit(sess, sm, events)
}
r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events)
continue
}

// --- FOLLOW-UP: end-of-turn injection ---
// Pop exactly one follow-up message. Unlike steered
// messages, follow-ups are plain user messages that start
Expand Down
Loading
Loading