diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 3afd3fdef..bb3c18996 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -621,9 +621,6 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c return } slog.Debug("Starting conversation loop iteration", "agent", a.Name()) - // Looping, get the updated messages from the session - messages := sess.GetMessages(a) - slog.Debug("Retrieved messages for processing", "agent", a.Name(), "message_count", len(messages)) streamCtx, streamSpan := r.startSpan(ctx, "runtime.stream", trace.WithAttributes( attribute.String("agent", a.Name()), @@ -639,6 +636,21 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c slog.Debug("Failed to get model definition", "error", err) } + var contextLimit int64 + if m != nil { + contextLimit = int64(m.Limit.Context) + } + + if m != nil && r.sessionCompaction { + if sess.InputTokens+sess.OutputTokens > int64(float64(contextLimit)*0.9) { + r.Summarize(ctx, sess, events) + events <- TokenUsage(sess.ID, r.currentAgent, sess.InputTokens, sess.OutputTokens, sess.InputTokens+sess.OutputTokens, contextLimit, sess.Cost) + } + } + + messages := sess.GetMessages(a) + slog.Debug("Retrieved messages for processing", "agent", a.Name(), "message_count", len(messages)) + slog.Debug("Creating chat completion stream", "agent", a.Name()) stream, err := model.CreateChatCompletionStream(streamCtx, messages, agentTools) if err != nil { @@ -697,38 +709,10 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c slog.Debug("Skipping empty assistant message (no content and no tool calls)", "agent", a.Name()) } - var contextLimit int64 - if m != nil { - contextLimit = int64(m.Limit.Context) - } events <- TokenUsage(sess.ID, r.currentAgent, sess.InputTokens, sess.OutputTokens, sess.InputTokens+sess.OutputTokens, contextLimit, sess.Cost) - if m != nil && r.sessionCompaction { - if sess.InputTokens+sess.OutputTokens > int64(float64(contextLimit)*0.9) { - // Avoid inserting a summary between assistant tool_use and tool_result messages. - // Defer compaction until after tool calls are processed in this iteration. - if len(res.Calls) == 0 { - events <- SessionCompaction(sess.ID, "start", r.currentAgent) - r.Summarize(ctx, sess, events) - events <- TokenUsage(sess.ID, r.currentAgent, sess.InputTokens, sess.OutputTokens, sess.InputTokens+sess.OutputTokens, contextLimit, sess.Cost) - events <- SessionCompaction(sess.ID, "completed", r.currentAgent) - } - } - } - r.processToolCalls(ctx, sess, res.Calls, agentTools, events) - // If tool_use occurred, perform compaction after tool results are appended - // to avoid splitting assistant tool_use and user tool_result adjacency. - if m != nil && r.sessionCompaction && len(res.Calls) > 0 { - if sess.InputTokens+sess.OutputTokens > int64(float64(contextLimit)*0.9) { - events <- SessionCompaction(sess.ID, "start", r.currentAgent) - r.Summarize(ctx, sess, events) - events <- TokenUsage(sess.ID, r.currentAgent, sess.InputTokens, sess.OutputTokens, sess.InputTokens+sess.OutputTokens, contextLimit, sess.Cost) - events <- SessionCompaction(sess.ID, "completed", r.currentAgent) - } - } - if res.Stopped { slog.Debug("Conversation stopped", "agent", a.Name()) break diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go index 608ac4778..93fbb81e9 100644 --- a/pkg/runtime/runtime_test.go +++ b/pkg/runtime/runtime_test.go @@ -584,12 +584,11 @@ func (m mockModelStoreWithLimit) GetModel(context.Context, string) (*modelsdev.M return &modelsdev.Model{Limit: modelsdev.Limit{Context: m.limit}, Cost: &modelsdev.Cost{}}, nil } -func TestCompactionOccursAfterToolResultsWhenToolUsePresent(t *testing.T) { +func TestCompaction(t *testing.T) { // First stream: assistant issues a tool call and usage exceeds 90% threshold mainStream := newStreamBuilder(). - AddToolCallName("call_1", "test_tool"). - AddToolCallArguments("call_1", "{}"). - AddStopWithUsage(95, 0). // Context limit will be 100 + AddContent("Hello there"). + AddStopWithUsage(101, 0). // Context limit will be 100 Build() // Second stream: summary generation (simple content) @@ -600,21 +599,7 @@ func TestCompactionOccursAfterToolResultsWhenToolUsePresent(t *testing.T) { prov := &queueProvider{id: "test/mock-model", streams: []chat.MessageStream{mainStream, summaryStream}} - // Provide an agent tool that will satisfy the tool call without requiring approvals - testTool := tools.Tool{ - Name: "test_tool", - Description: "test", - Parameters: map[string]any{}, - Annotations: tools.ToolAnnotations{ReadOnlyHint: true}, - Handler: func(ctx context.Context, call tools.ToolCall) (*tools.ToolCallResult, error) { - return &tools.ToolCallResult{Output: "ok"}, nil - }, - } - - root := agent.New("root", "You are a test agent", - agent.WithModel(prov), - agent.WithTools(testTool), - ) + root := agent.New("root", "You are a test agent", agent.WithModel(prov)) tm := team.New(team.WithAgents(root)) // Enable compaction and provide a model store with context limit = 100 @@ -622,35 +607,27 @@ func TestCompactionOccursAfterToolResultsWhenToolUsePresent(t *testing.T) { require.NoError(t, err) sess := session.New(session.WithUserMessage("Start")) + e := rt.RunStream(t.Context(), sess) + for range e { + } + sess.AddMessage(session.UserMessage("Again")) events := rt.RunStream(t.Context(), sess) - // Collect events var seen []Event for ev := range events { seen = append(seen, ev) } - // Find indices of ToolCallResponse and compaction start (from RunStream) - toolRespIdx := -1 compactionStartIdx := -1 for i, ev := range seen { - switch e := ev.(type) { - case *ToolCallResponseEvent: - if toolRespIdx == -1 { - toolRespIdx = i - } - case *SessionCompactionEvent: - // We only want the RunStream-level "start" status (not Summarize's "started") - if e.Status == "start" && compactionStartIdx == -1 { + if e, ok := ev.(*SessionCompactionEvent); ok { + if e.Status == "started" && compactionStartIdx == -1 { compactionStartIdx = i } } } - require.NotEqual(t, -1, toolRespIdx, "expected a ToolCallResponseEvent") require.NotEqual(t, -1, compactionStartIdx, "expected a SessionCompaction start event") - - require.Greater(t, compactionStartIdx, toolRespIdx, "compaction should occur after tool results when tool_use is present") } func TestSessionWithoutUserMessage(t *testing.T) {