Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 15 additions & 31 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,9 +621,6 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c
return
}
slog.Debug("Starting conversation loop iteration", "agent", a.Name())
// Looping, get the updated messages from the session
messages := sess.GetMessages(a)
slog.Debug("Retrieved messages for processing", "agent", a.Name(), "message_count", len(messages))

streamCtx, streamSpan := r.startSpan(ctx, "runtime.stream", trace.WithAttributes(
attribute.String("agent", a.Name()),
Expand All @@ -639,6 +636,21 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c
slog.Debug("Failed to get model definition", "error", err)
}

var contextLimit int64
if m != nil {
contextLimit = int64(m.Limit.Context)
}

if m != nil && r.sessionCompaction {
if sess.InputTokens+sess.OutputTokens > int64(float64(contextLimit)*0.9) {
r.Summarize(ctx, sess, events)
events <- TokenUsage(sess.ID, r.currentAgent, sess.InputTokens, sess.OutputTokens, sess.InputTokens+sess.OutputTokens, contextLimit, sess.Cost)
}
}

messages := sess.GetMessages(a)
slog.Debug("Retrieved messages for processing", "agent", a.Name(), "message_count", len(messages))

slog.Debug("Creating chat completion stream", "agent", a.Name())
stream, err := model.CreateChatCompletionStream(streamCtx, messages, agentTools)
if err != nil {
Expand Down Expand Up @@ -697,38 +709,10 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c
slog.Debug("Skipping empty assistant message (no content and no tool calls)", "agent", a.Name())
}

var contextLimit int64
if m != nil {
contextLimit = int64(m.Limit.Context)
}
events <- TokenUsage(sess.ID, r.currentAgent, sess.InputTokens, sess.OutputTokens, sess.InputTokens+sess.OutputTokens, contextLimit, sess.Cost)

if m != nil && r.sessionCompaction {
if sess.InputTokens+sess.OutputTokens > int64(float64(contextLimit)*0.9) {
// Avoid inserting a summary between assistant tool_use and tool_result messages.
// Defer compaction until after tool calls are processed in this iteration.
if len(res.Calls) == 0 {
events <- SessionCompaction(sess.ID, "start", r.currentAgent)
r.Summarize(ctx, sess, events)
events <- TokenUsage(sess.ID, r.currentAgent, sess.InputTokens, sess.OutputTokens, sess.InputTokens+sess.OutputTokens, contextLimit, sess.Cost)
events <- SessionCompaction(sess.ID, "completed", r.currentAgent)
}
}
}

r.processToolCalls(ctx, sess, res.Calls, agentTools, events)

// If tool_use occurred, perform compaction after tool results are appended
// to avoid splitting assistant tool_use and user tool_result adjacency.
if m != nil && r.sessionCompaction && len(res.Calls) > 0 {
if sess.InputTokens+sess.OutputTokens > int64(float64(contextLimit)*0.9) {
events <- SessionCompaction(sess.ID, "start", r.currentAgent)
r.Summarize(ctx, sess, events)
events <- TokenUsage(sess.ID, r.currentAgent, sess.InputTokens, sess.OutputTokens, sess.InputTokens+sess.OutputTokens, contextLimit, sess.Cost)
events <- SessionCompaction(sess.ID, "completed", r.currentAgent)
}
}

if res.Stopped {
slog.Debug("Conversation stopped", "agent", a.Name())
break
Expand Down
43 changes: 10 additions & 33 deletions pkg/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,12 +584,11 @@ func (m mockModelStoreWithLimit) GetModel(context.Context, string) (*modelsdev.M
return &modelsdev.Model{Limit: modelsdev.Limit{Context: m.limit}, Cost: &modelsdev.Cost{}}, nil
}

func TestCompactionOccursAfterToolResultsWhenToolUsePresent(t *testing.T) {
func TestCompaction(t *testing.T) {
// First stream: assistant issues a tool call and usage exceeds 90% threshold
mainStream := newStreamBuilder().
AddToolCallName("call_1", "test_tool").
AddToolCallArguments("call_1", "{}").
AddStopWithUsage(95, 0). // Context limit will be 100
AddContent("Hello there").
AddStopWithUsage(101, 0). // Context limit will be 100
Build()

// Second stream: summary generation (simple content)
Expand All @@ -600,57 +599,35 @@ func TestCompactionOccursAfterToolResultsWhenToolUsePresent(t *testing.T) {

prov := &queueProvider{id: "test/mock-model", streams: []chat.MessageStream{mainStream, summaryStream}}

// Provide an agent tool that will satisfy the tool call without requiring approvals
testTool := tools.Tool{
Name: "test_tool",
Description: "test",
Parameters: map[string]any{},
Annotations: tools.ToolAnnotations{ReadOnlyHint: true},
Handler: func(ctx context.Context, call tools.ToolCall) (*tools.ToolCallResult, error) {
return &tools.ToolCallResult{Output: "ok"}, nil
},
}

root := agent.New("root", "You are a test agent",
agent.WithModel(prov),
agent.WithTools(testTool),
)
root := agent.New("root", "You are a test agent", agent.WithModel(prov))
tm := team.New(team.WithAgents(root))

// Enable compaction and provide a model store with context limit = 100
rt, err := New(tm, WithSessionCompaction(true), WithModelStore(mockModelStoreWithLimit{limit: 100}))
require.NoError(t, err)

sess := session.New(session.WithUserMessage("Start"))
e := rt.RunStream(t.Context(), sess)
for range e {
}
sess.AddMessage(session.UserMessage("Again"))
events := rt.RunStream(t.Context(), sess)

// Collect events
var seen []Event
for ev := range events {
seen = append(seen, ev)
}

// Find indices of ToolCallResponse and compaction start (from RunStream)
toolRespIdx := -1
compactionStartIdx := -1
for i, ev := range seen {
switch e := ev.(type) {
case *ToolCallResponseEvent:
if toolRespIdx == -1 {
toolRespIdx = i
}
case *SessionCompactionEvent:
// We only want the RunStream-level "start" status (not Summarize's "started")
if e.Status == "start" && compactionStartIdx == -1 {
if e, ok := ev.(*SessionCompactionEvent); ok {
if e.Status == "started" && compactionStartIdx == -1 {
compactionStartIdx = i
}
}
}

require.NotEqual(t, -1, toolRespIdx, "expected a ToolCallResponseEvent")
require.NotEqual(t, -1, compactionStartIdx, "expected a SessionCompaction start event")

require.Greater(t, compactionStartIdx, toolRespIdx, "compaction should occur after tool results when tool_use is present")
}

func TestSessionWithoutUserMessage(t *testing.T) {
Expand Down