Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,24 @@ func (a *Agent) ToolSets() []tools.ToolSet {
func (a *Agent) ensureToolSetsAreStarted(ctx context.Context) {
for _, toolSet := range a.toolsets {
if err := toolSet.Start(ctx); err != nil {
desc := tools.DescribeToolSet(toolSet)
slog.Warn("Toolset start failed; skipping", "agent", a.Name(), "toolset", desc, "error", err)
a.addToolWarning(fmt.Sprintf("%s start failed: %v", desc, err))
// Only warn on the first failure in a streak; suppress duplicate
// warnings for subsequent retries that also fail.
if toolSet.ShouldReportFailure() {
desc := tools.DescribeToolSet(toolSet)
slog.Warn("Toolset start failed; will retry on next turn", "agent", a.Name(), "toolset", desc, "error", err)
a.addToolWarning(fmt.Sprintf("%s start failed: %v", desc, err))
} else {
desc := tools.DescribeToolSet(toolSet)
slog.Debug("Toolset still unavailable; retrying next turn", "agent", a.Name(), "toolset", desc, "error", err)
}
continue
}
// Emit a one-time notice when a previously-failed toolset recovers.
if toolSet.ConsumeRecovery() {
desc := tools.DescribeToolSet(toolSet)
slog.Info("Toolset now available", "agent", a.Name(), "toolset", desc)
a.addToolWarning(desc + " is now available")
}
}
}

Expand Down
86 changes: 86 additions & 0 deletions pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,34 @@ func (s *stubToolSet) Tools(context.Context) ([]tools.Tool, error) {
return s.tools, nil
}

// flappyToolSet is a ToolSet+Startable that returns a scripted sequence of
// errors from Start(). nil in the sequence means success.
type flappyToolSet struct {
errs []error
callIdx int
stubs []tools.Tool
}

var (
_ tools.ToolSet = (*flappyToolSet)(nil)
_ tools.Startable = (*flappyToolSet)(nil)
)

func (f *flappyToolSet) Start(_ context.Context) error {
if f.callIdx >= len(f.errs) {
return nil
}
err := f.errs[f.callIdx]
f.callIdx++
return err
}

func (f *flappyToolSet) Stop(_ context.Context) error { return nil }

func (f *flappyToolSet) Tools(_ context.Context) ([]tools.Tool, error) {
return f.stubs, nil
}

func TestAgentTools(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -210,3 +238,61 @@ func TestModelOverride_ConcurrentAccess(t *testing.T) {
<-done
// If we got here without a race condition panic, the test passes
}

// TestAgentReProbeEmitsWarningThenNotice verifies the full retry lifecycle:
// turn 1 fails → warning emitted; turn 2 succeeds → notice emitted; tools available.
func TestAgentReProbeEmitsWarningThenNotice(t *testing.T) {
t.Parallel()

errBoom := errors.New("server unavailable")
stub := &flappyToolSet{
errs: []error{errBoom, nil},
stubs: []tools.Tool{{Name: "mcp_ping", Parameters: map[string]any{}}},
}
a := New("root", "test", WithToolSets(stub))

// Turn 1: start fails → 1 warning, 0 tools.
got, err := a.Tools(t.Context())
require.NoError(t, err)
assert.Empty(t, got, "turn 1: no tools while toolset is unavailable")
warnings := a.DrainWarnings()
require.Len(t, warnings, 1, "turn 1: exactly one warning expected")
assert.Contains(t, warnings[0], "start failed")

// Turn 2: start succeeds → 1 recovery warning, tools available.
got, err = a.Tools(t.Context())
require.NoError(t, err)
assert.Len(t, got, 1, "turn 2: tool should be available after recovery")
recovery := a.DrainWarnings()
require.Len(t, recovery, 1, "turn 2: exactly one recovery warning expected")
assert.Contains(t, recovery[0], "now available", "turn 2: recovery warning must mention availability")
}

// TestAgentNoDuplicateStartWarnings verifies that repeated failures generate
// only one warning (on the first failure), not one per retry.
func TestAgentNoDuplicateStartWarnings(t *testing.T) {
t.Parallel()

errBoom := errors.New("server unavailable")
stub := &flappyToolSet{
errs: []error{errBoom, errBoom, errBoom},
stubs: []tools.Tool{{Name: "mcp_ping", Parameters: map[string]any{}}},
}
a := New("root", "test", WithToolSets(stub))

// Turn 1: first failure → warning.
_, err := a.Tools(t.Context())
require.NoError(t, err)
warnings := a.DrainWarnings()
require.Len(t, warnings, 1, "turn 1: exactly one warning on first failure")

// Turn 2: repeated failure → no new warning.
_, err = a.Tools(t.Context())
require.NoError(t, err)
assert.Empty(t, a.DrainWarnings(), "turn 2: no duplicate warning on repeated failure")

// Turn 3: still failing → still no new warning.
_, err = a.Tools(t.Context())
require.NoError(t, err)
assert.Empty(t, a.DrainWarnings(), "turn 3: no duplicate warning on repeated failure")
}
94 changes: 77 additions & 17 deletions pkg/runtime/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c
r.emitAgentWarnings(a, chanSend(events))
r.configureToolsetHandlers(a, events)

agentTools, err := r.getTools(ctx, a, sessionSpan, events)
agentTools, err := r.getTools(ctx, a, sessionSpan, events, true)
if err != nil {
events <- Error(fmt.Sprintf("failed to get tools: %v", err))
return
Expand Down Expand Up @@ -163,7 +163,7 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c
r.emitAgentWarnings(a, chanSend(events))
r.configureToolsetHandlers(a, events)

agentTools, err := r.getTools(ctx, a, sessionSpan, events)
agentTools, err := r.getTools(ctx, a, sessionSpan, events, true)
if err != nil {
events <- Error(fmt.Sprintf("failed to get tools: %v", err))
return
Expand Down Expand Up @@ -382,6 +382,20 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c

r.processToolCalls(ctx, sess, res.Calls, agentTools, events)

// Re-probe toolsets after tool calls: an install/setup tool call may
// have made a previously-unavailable LSP or MCP connectable. reprobe()
// calls ensureToolSetsAreStarted, emits recovery notices, and updates
// the TUI tool-count immediately.
//
// The new tools are picked up by the next iteration's getTools() call
// at the top of this loop, so the model sees them on its very next
// response — within the same user turn, without requiring a new user
// message. reprobe's return value is intentionally discarded here;
// the top-of-loop getTools() is the authoritative source.
if len(res.Calls) > 0 {
r.reprobe(ctx, sess, a, agentTools, sessionSpan, events)
}

// Check for degenerate tool call loops
if loopDetector.record(res.Calls) {
toolName := "unknown"
Expand Down Expand Up @@ -575,17 +589,14 @@ func (r *LocalRuntime) compactIfNeeded(
r.Summarize(ctx, sess, "", events)
}

// getTools executes tool retrieval with automatic OAuth handling
func (r *LocalRuntime) getTools(ctx context.Context, a *agent.Agent, sessionSpan trace.Span, events chan Event) ([]tools.Tool, error) {
shouldEmitMCPInit := len(a.ToolSets()) > 0
if shouldEmitMCPInit {
// getTools executes tool retrieval with automatic OAuth handling.
// emitLifecycleEvents controls whether MCPInitStarted/Finished are emitted;
// pass false when calling from reprobe to avoid spurious TUI spinner flicker.
func (r *LocalRuntime) getTools(ctx context.Context, a *agent.Agent, sessionSpan trace.Span, events chan Event, emitLifecycleEvents bool) ([]tools.Tool, error) {
if emitLifecycleEvents && len(a.ToolSets()) > 0 {
events <- MCPInitStarted(a.Name())
defer func() { events <- MCPInitFinished(a.Name()) }()
}
defer func() {
if shouldEmitMCPInit {
events <- MCPInitFinished(a.Name())
}
}()

agentTools, err := a.Tools(ctx)
if err != nil {
Expand Down Expand Up @@ -616,15 +627,15 @@ func (r *LocalRuntime) configureToolsetHandlers(a *agent.Agent, events chan Even
}
}

// emitAgentWarnings drains and emits any agent initialization warnings.
// emitAgentWarnings drains and emits any pending toolset warnings as persistent
// TUI notifications. Both start failures and recovery notices are emitted as
// warnings so they remain visible until the user dismisses them.
func (r *LocalRuntime) emitAgentWarnings(a *agent.Agent, send func(Event)) {
warnings := a.DrainWarnings()
if len(warnings) == 0 {
return
if len(warnings) > 0 {
slog.Warn("Tool setup partially failed; continuing", "agent", a.Name(), "warnings", warnings)
send(Warning(formatToolWarning(a, warnings), a.Name()))
}

slog.Warn("Tool setup partially failed; continuing", "agent", a.Name(), "warnings", warnings)
send(Warning(formatToolWarning(a, warnings), a.Name()))
}

func formatToolWarning(a *agent.Agent, warnings []string) string {
Expand Down Expand Up @@ -669,3 +680,52 @@ func chanSend(ch chan Event) func(Event) {
}
}
}

// reprobe re-runs ensureToolSetsAreStarted after a batch of tool calls.
// If new tools became available (by name-set diff), it emits recovery notices
// and a ToolsetInfo event to update the TUI immediately. The new tools will be
// picked up by the next iteration's getTools() call at the top of the loop.
//
// reprobe deliberately does NOT return the new tool list: the top-of-loop
// getTools() is the single authoritative source for agentTools each iteration.
func (r *LocalRuntime) reprobe(
ctx context.Context,
sess *session.Session,
a *agent.Agent,
currentTools []tools.Tool,
sessionSpan trace.Span,
events chan Event,
) {
updated, err := r.getTools(ctx, a, sessionSpan, events, false)
if err != nil {
slog.Warn("reprobe: getTools failed", "agent", a.Name(), "error", err)
return
}
updated = filterExcludedTools(updated, sess.ExcludedTools)

// Emit any pending warnings/notices that getTools just generated.
r.emitAgentWarnings(a, chanSend(events))

// Compute added tools by comparing name-sets (not just counts), so we
// correctly handle a toolset that replaced one tool with another.
prev := make(map[string]struct{}, len(currentTools))
for _, t := range currentTools {
prev[t.Name] = struct{}{}
}
var added []string
for _, t := range updated {
if _, exists := prev[t.Name]; !exists {
added = append(added, t.Name)
}
}

if len(added) == 0 {
return
}

slog.Info("New tools available after toolset re-probe",
"agent", a.Name(), "added", added)

// Emit updated tool count to the TUI immediately.
chanSend(events)(ToolsetInfo(len(updated), false, a.Name()))
}
Loading
Loading