From 2421482c1377651dc38226f968dc7c6393fb4c05 Mon Sep 17 00:00:00 2001 From: harshitsinghbhandari <24b4506@iitb.ac.in> Date: Tue, 2 Jun 2026 18:54:07 +0530 Subject: [PATCH 1/6] feat(messenger): ao send + live zellij pane ping (live agent nudges) Replace the daemon's noopMessenger stub with a composite AgentMessenger that writes a durable inbox file (primary) and types a live pointer into the running zellij pane (best-effort secondary), plus the `ao send` CLI that drives the existing POST /api/v1/sessions/{id}/send route. - composite: fans Send to inbox then panep, pinning one timestamp so both derive the same filename; a secondary failure is logged at WARN and swallowed (the file is on disk), a primary failure aborts the call. - inbox: writes /.ao/inbox/_.md. - panep: types "new message at .ao/inbox/" + Enter via a new narrow zellij WriteChars seam (RuntimePaneWriter), kept off ports.Runtime. - wiring: newSessionMessenger composes inbox+panep over the shared store; startSession takes the messenger instead of the noop stub. Carries across @aa-43's work from PR #74 (staging), adapted to main's post-#65/#77 daemon wiring shape. Closes #79 Co-Authored-By: Claude Opus 4.7 --- .../adapters/messenger/composite/composite.go | 69 ++++++ .../messenger/composite/composite_test.go | 223 ++++++++++++++++++ .../adapters/messenger/inbox/inbox.go | 135 +++++++++++ .../adapters/messenger/inbox/inbox_test.go | 189 +++++++++++++++ .../adapters/messenger/panep/panep.go | 82 +++++++ .../adapters/messenger/panep/panep_test.go | 134 +++++++++++ .../adapters/runtime/zellij/commands.go | 4 + .../adapters/runtime/zellij/zellij.go | 19 ++ .../adapters/runtime/zellij/zellij_test.go | 29 +++ backend/internal/cli/root.go | 1 + backend/internal/cli/send.go | 53 +++++ backend/internal/cli/send_test.go | 220 +++++++++++++++++ backend/internal/daemon/daemon.go | 12 +- backend/internal/daemon/lifecycle_wiring.go | 64 ++++- backend/internal/daemon/wiring_test.go | 69 +++++- 15 files changed, 1287 insertions(+), 16 deletions(-) create mode 100644 backend/internal/adapters/messenger/composite/composite.go create mode 100644 backend/internal/adapters/messenger/composite/composite_test.go create mode 100644 backend/internal/adapters/messenger/inbox/inbox.go create mode 100644 backend/internal/adapters/messenger/inbox/inbox_test.go create mode 100644 backend/internal/adapters/messenger/panep/panep.go create mode 100644 backend/internal/adapters/messenger/panep/panep_test.go create mode 100644 backend/internal/cli/send.go create mode 100644 backend/internal/cli/send_test.go diff --git a/backend/internal/adapters/messenger/composite/composite.go b/backend/internal/adapters/messenger/composite/composite.go new file mode 100644 index 00000000..9d6cefaf --- /dev/null +++ b/backend/internal/adapters/messenger/composite/composite.go @@ -0,0 +1,69 @@ +// Package composite fans Send out to a primary AgentMessenger followed by +// best-effort secondaries. The primary's failure aborts the whole call (the +// message was never delivered); a secondary's failure is logged and swallowed +// (the message IS delivered — the secondary was a live nudge, not a record). +// +// This is how the daemon wires the inbox messenger (primary, durable file +// write) with the pane-ping messenger (secondary, live zellij nudge): if we +// could not write the file we must not tell the agent about a file that does +// not exist, but if we wrote the file and could not ping, the agent will see +// it on the next inbox check. +package composite + +import ( + "context" + "io" + "log/slog" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/inbox" + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +// Messenger fans Send out across Inner messengers with primary-then-secondary +// semantics. Inner[0] is the primary (must succeed); Inner[1:] are best-effort +// secondaries whose failures are logged at WARN and swallowed. Inner is public +// so wiring tests can assert the daemon assembles the composite with the +// expected ordering (inbox first, panep second). +// +// Each Send pins one timestamp via inbox.WithTime so inner messengers that +// derive a filename from (t, message) agree on the same name — without this, +// the inbox file write and a downstream panep ping would land on different +// nanoseconds and the ping would point at a file that does not exist. +type Messenger struct { + Inner []ports.AgentMessenger + Logger *slog.Logger + Clock func() time.Time +} + +// New builds a Messenger. A nil logger is replaced with a discard logger so a +// secondary failure never panics a misconfigured caller. Clock defaults to +// time.Now. +func New(inner []ports.AgentMessenger, logger *slog.Logger) *Messenger { + if logger == nil { + logger = slog.New(slog.NewTextHandler(io.Discard, nil)) + } + return &Messenger{Inner: inner, Logger: logger, Clock: time.Now} +} + +var _ ports.AgentMessenger = (*Messenger)(nil) + +// Send invokes Inner[0] (primary); on failure, no secondaries run and the +// error is returned. On primary success, every Inner[1:] is invoked in order +// and their errors are logged at WARN. +func (c *Messenger) Send(ctx context.Context, id domain.SessionID, message string) error { + if len(c.Inner) == 0 { + return nil + } + ctx = inbox.WithTime(ctx, c.Clock()) + if err := c.Inner[0].Send(ctx, id, message); err != nil { + return err + } + for _, m := range c.Inner[1:] { + if err := m.Send(ctx, id, message); err != nil { + c.Logger.Warn("composite: secondary messenger failed", "sessionId", id, "err", err) + } + } + return nil +} diff --git a/backend/internal/adapters/messenger/composite/composite_test.go b/backend/internal/adapters/messenger/composite/composite_test.go new file mode 100644 index 00000000..9d60a060 --- /dev/null +++ b/backend/internal/adapters/messenger/composite/composite_test.go @@ -0,0 +1,223 @@ +package composite_test + +import ( + "context" + "errors" + "io" + "log/slog" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/composite" + "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/inbox" + "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/panep" + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +func TestSatisfiesAgentMessenger(t *testing.T) { + var _ ports.AgentMessenger = (*composite.Messenger)(nil) +} + +type recordingMessenger struct { + name string + err error + calls *[]string +} + +func (r *recordingMessenger) Send(_ context.Context, _ domain.SessionID, _ string) error { + *r.calls = append(*r.calls, r.name) + return r.err +} + +func nopLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, nil)) +} + +func TestSend_FansOutInOrder(t *testing.T) { + var calls []string + primary := &recordingMessenger{name: "primary", calls: &calls} + secondary := &recordingMessenger{name: "secondary", calls: &calls} + + c := composite.New([]ports.AgentMessenger{primary, secondary}, nopLogger()) + if err := c.Send(context.Background(), "s-1", "hi"); err != nil { + t.Fatalf("Send: %v", err) + } + if len(calls) != 2 { + t.Fatalf("calls = %v, want 2", calls) + } + if calls[0] != "primary" || calls[1] != "secondary" { + t.Fatalf("call order = %v, want [primary secondary]", calls) + } +} + +func TestSend_PrimaryFailureSkipsSecondaries(t *testing.T) { + var calls []string + primary := &recordingMessenger{name: "primary", err: errors.New("disk full"), calls: &calls} + secondary := &recordingMessenger{name: "secondary", calls: &calls} + + c := composite.New([]ports.AgentMessenger{primary, secondary}, nopLogger()) + err := c.Send(context.Background(), "s-1", "hi") + if err == nil { + t.Fatal("expected error when primary fails") + } + if !strings.Contains(err.Error(), "disk full") { + t.Errorf("error should surface primary failure, got %v", err) + } + if len(calls) != 1 || calls[0] != "primary" { + t.Fatalf("calls = %v, want only [primary]", calls) + } +} + +func TestSend_SecondaryFailureIsLoggedAtWarnAndSwallowed(t *testing.T) { + var calls []string + primary := &recordingMessenger{name: "primary", calls: &calls} + secondary := &recordingMessenger{name: "secondary", err: errors.New("pipe broken"), calls: &calls} + + rec := &levelRecorder{} + logger := slog.New(rec) + + c := composite.New([]ports.AgentMessenger{primary, secondary}, logger) + if err := c.Send(context.Background(), "s-1", "hi"); err != nil { + t.Fatalf("Send must succeed when only the secondary fails, got %v", err) + } + if len(calls) != 2 { + t.Fatalf("calls = %v, want both invoked", calls) + } + if len(rec.records) != 1 { + t.Fatalf("want 1 log record for secondary failure, got %d", len(rec.records)) + } + if rec.records[0].Level != slog.LevelWarn { + t.Errorf("secondary failure logged at %v, want WARN", rec.records[0].Level) + } + if !strings.Contains(rec.records[0].Message+" "+rec.attrText(0), "pipe broken") { + t.Errorf("expected secondary failure surfaced in log, got message=%q attrs=%q", + rec.records[0].Message, rec.attrText(0)) + } +} + +// levelRecorder is a slog.Handler that captures full Records so tests can +// assert on level + message + attrs, not just a serialized substring. +type levelRecorder struct { + records []slog.Record + attrs [][]slog.Attr +} + +func (r *levelRecorder) Enabled(context.Context, slog.Level) bool { return true } + +func (r *levelRecorder) Handle(_ context.Context, rec slog.Record) error { + var collected []slog.Attr + rec.Attrs(func(a slog.Attr) bool { + collected = append(collected, a) + return true + }) + r.records = append(r.records, rec) + r.attrs = append(r.attrs, collected) + return nil +} + +func (r *levelRecorder) WithAttrs([]slog.Attr) slog.Handler { return r } +func (r *levelRecorder) WithGroup(string) slog.Handler { return r } + +func (r *levelRecorder) attrText(i int) string { + var b strings.Builder + for _, a := range r.attrs[i] { + b.WriteString(a.Key) + b.WriteString("=") + b.WriteString(a.Value.String()) + b.WriteString(" ") + } + return b.String() +} + +func TestSend_AllSecondariesAttemptedEvenIfOneFails(t *testing.T) { + var calls []string + primary := &recordingMessenger{name: "primary", calls: &calls} + sec1 := &recordingMessenger{name: "sec1", err: errors.New("transient"), calls: &calls} + sec2 := &recordingMessenger{name: "sec2", calls: &calls} + + c := composite.New([]ports.AgentMessenger{primary, sec1, sec2}, nopLogger()) + if err := c.Send(context.Background(), "s-1", "hi"); err != nil { + t.Fatalf("Send: %v", err) + } + if len(calls) != 3 || calls[0] != "primary" || calls[1] != "sec1" || calls[2] != "sec2" { + t.Fatalf("call order = %v, want [primary sec1 sec2]", calls) + } +} + +func TestSend_EmptyInnerListIsNoOp(t *testing.T) { + c := composite.New(nil, nopLogger()) + if err := c.Send(context.Background(), "s-1", "hi"); err != nil { + t.Fatalf("empty composite Send should be no-op, got %v", err) + } +} + +// inboxLookup adapts a tempdir into inbox.SessionWorkspace. +type fixedWorkspace struct{ path string } + +func (f fixedWorkspace) WorkspacePath(context.Context, domain.SessionID) (string, error) { + return f.path, nil +} + +// fixedSession adapts to panep.SessionLookup. +type fixedSession struct{ handle, workspace string } + +func (f fixedSession) SessionHandle(context.Context, domain.SessionID) (string, string, error) { + return f.handle, f.workspace, nil +} + +type recordingRuntime struct { + calls []string +} + +func (r *recordingRuntime) WriteChars(_ context.Context, _ ports.RuntimeHandle, s string) error { + r.calls = append(r.calls, s) + return nil +} + +// TestSend_PingFilenameMatchesOnDiskFilename is the end-to-end consistency +// proof the brief implicitly requires: a single composite.Send invocation must +// produce one inbox file on disk AND a ping body that names exactly that file. +// Without shared time, the inbox clock and panep clock fire at different +// nanoseconds and the filenames diverge. +func TestSend_PingFilenameMatchesOnDiskFilename(t *testing.T) { + workspace := t.TempDir() + inboxMsg := inbox.New(fixedWorkspace{path: workspace}) + rt := &recordingRuntime{} + panepMsg := panep.New(rt, fixedSession{handle: "sess-id/terminal_0", workspace: workspace}) + + c := composite.New([]ports.AgentMessenger{inboxMsg, panepMsg}, nopLogger()) + if err := c.Send(context.Background(), "s-1", "hello agent"); err != nil { + t.Fatalf("Send: %v", err) + } + + entries, err := os.ReadDir(filepath.Join(workspace, ".ao", "inbox")) + if err != nil { + t.Fatalf("read inbox dir: %v", err) + } + if len(entries) != 1 { + t.Fatalf("want 1 inbox file, got %d", len(entries)) + } + onDisk := entries[0].Name() + if len(rt.calls) == 0 { + t.Fatal("panep did not ping the pane") + } + if !strings.Contains(rt.calls[0], onDisk) { + t.Fatalf("ping body must reference the on-disk file %q, got %q", onDisk, rt.calls[0]) + } +} + +func TestNew_NilLoggerStillWorks(t *testing.T) { + // A nil logger should not panic — composite must default to a discard + // logger so misconfigured callers don't crash on the first secondary error. + var calls []string + primary := &recordingMessenger{name: "primary", calls: &calls} + secondary := &recordingMessenger{name: "secondary", err: errors.New("x"), calls: &calls} + + c := composite.New([]ports.AgentMessenger{primary, secondary}, nil) + if err := c.Send(context.Background(), "s-1", "hi"); err != nil { + t.Fatalf("Send with nil logger and secondary error must not return error, got %v", err) + } +} diff --git a/backend/internal/adapters/messenger/inbox/inbox.go b/backend/internal/adapters/messenger/inbox/inbox.go new file mode 100644 index 00000000..c55573ab --- /dev/null +++ b/backend/internal/adapters/messenger/inbox/inbox.go @@ -0,0 +1,135 @@ +// Package inbox implements ports.AgentMessenger by writing each message as a +// file in /.ao/inbox/. The agent reads its inbox on demand; +// pinging the runtime pane to consume new files is a separate concern that +// lives in the runtime adapter, not here. +package inbox + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "errors" + "fmt" + "os" + "path/filepath" + "strconv" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +// SessionWorkspace resolves a session id to the absolute path of its workspace. +// The sqlite store satisfies this via GetSession; the adapter is in +// daemon/lifecycle_wiring.go. +type SessionWorkspace interface { + WorkspacePath(ctx context.Context, id domain.SessionID) (string, error) +} + +// Messenger writes inbox files into per-session workspaces. +type Messenger struct { + lookup SessionWorkspace + clock func() time.Time +} + +// New builds a Messenger over the given workspace lookup. lookup is required. +func New(lookup SessionWorkspace) *Messenger { + return &Messenger{lookup: lookup, clock: time.Now} +} + +var _ ports.AgentMessenger = (*Messenger)(nil) + +// Send writes message into /.ao/inbox/_.md. +// +// Filename collisions are practically impossible: nanosecond timestamp plus an +// 8-char hash of the body. We do not retry on EEXIST. +// +// Symlink safety: if .ao or .ao/inbox already exists as a symlink, refuse. +// Otherwise os.MkdirAll creates real directories and os.WriteFile (which uses +// O_CREATE|O_WRONLY|O_TRUNC without O_NOFOLLOW) writes the message body. The +// inbox is owned by ao; a symlink there is either user misconfig or attack. +func (m *Messenger) Send(ctx context.Context, id domain.SessionID, message string) error { + ws, err := m.lookup.WorkspacePath(ctx, id) + if err != nil { + return fmt.Errorf("inbox: lookup workspace for %s: %w", id, err) + } + if ws == "" { + return fmt.Errorf("inbox: empty workspace path for %s", id) + } + + aoDir := filepath.Join(ws, ".ao") + if err := ensureRealDir(aoDir); err != nil { + return fmt.Errorf("inbox: prepare .ao for %s: %w", id, err) + } + inboxDir := filepath.Join(aoDir, "inbox") + if err := ensureRealDir(inboxDir); err != nil { + return fmt.Errorf("inbox: prepare inbox for %s: %w", id, err) + } + + name := FilenameFor(TimeFromContext(ctx, m.clock), message) + if err := os.WriteFile(filepath.Join(inboxDir, name), []byte(message), 0o600); err != nil { + return fmt.Errorf("inbox: write %s for %s: %w", name, id, err) + } + return nil +} + +// ensureRealDir creates path if missing (0755), refuses if path is a symlink. +// Lstat (not Stat) is used so a symlink isn't followed into a different tree. +// +// The workspace root itself is not Lstat-checked because gitworktree.Workspace +// resolves ManagedRoot to an absolute, symlink-free path at construction +// (gitworktree.physicalAbs); per-session workspaces under it are created by ao. +// A symlinked .ao or .ao/inbox inside an ao-owned workspace would be user +// misconfig or attack, and is the only segment that can be tampered with +// between Spawn and Send. +func ensureRealDir(path string) error { + info, err := os.Lstat(path) + switch { + case err == nil: + if info.Mode()&os.ModeSymlink != 0 { + return fmt.Errorf("%q is a symlink; refusing to follow", path) + } + if !info.IsDir() { + return fmt.Errorf("%q exists and is not a directory", path) + } + return nil + case errors.Is(err, os.ErrNotExist): + return os.MkdirAll(path, 0o750) + default: + return err + } +} + +// FilenameFor builds a sortable, collision-resistant name from the timestamp +// and message body. Underscore separator keeps the timestamp's own dashes +// distinguishable from the hash prefix. Exported so adapters that point at the +// file (e.g. the pane-ping messenger) can derive the same name the inbox +// messenger would write for the same (t, message). +func FilenameFor(t time.Time, message string) string { + sum := sha256.Sum256([]byte(message)) + hash := hex.EncodeToString(sum[:])[:8] + return strconv.FormatInt(t.UnixNano(), 10) + "_" + hash + ".md" +} + +// sendTimeKey scopes the shared "Send timestamp" composite injects so inbox +// and panep derive the same filename for one Send call. The key is unexported +// — only the inbox.WithTime / TimeFromContext helpers can read or write it. +type sendTimeKey struct{} + +// WithTime attaches t to ctx as the shared timestamp the inbox messenger and +// any peers (panep) should use when deriving a filename via FilenameFor. The +// composite messenger calls this so a single Send produces one filename across +// every inner messenger, regardless of how long inbox's file I/O takes. +func WithTime(ctx context.Context, t time.Time) context.Context { + return context.WithValue(ctx, sendTimeKey{}, t) +} + +// TimeFromContext returns the timestamp WithTime stashed on ctx, falling back +// to fallback() if none is present. Callers running outside the composite +// (e.g. tests, direct use of inbox.Messenger) just get the fallback. +func TimeFromContext(ctx context.Context, fallback func() time.Time) time.Time { + if t, ok := ctx.Value(sendTimeKey{}).(time.Time); ok { + return t + } + return fallback() +} diff --git a/backend/internal/adapters/messenger/inbox/inbox_test.go b/backend/internal/adapters/messenger/inbox/inbox_test.go new file mode 100644 index 00000000..5f29fced --- /dev/null +++ b/backend/internal/adapters/messenger/inbox/inbox_test.go @@ -0,0 +1,189 @@ +package inbox_test + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "errors" + "os" + "path/filepath" + "strconv" + "strings" + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/inbox" + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +// FilenameFor is the shared helper used by both the inbox messenger (writes the +// file) and the panep messenger (points at the file). Both must agree on the +// exact name for the same (timestamp, message) input. +func TestFilenameFor_IsDeterministicForSameInput(t *testing.T) { + ts := time.Unix(1717171717, 42).UTC() + body := "ε ao test" + if got, want := inbox.FilenameFor(ts, body), inbox.FilenameFor(ts, body); got != want { + t.Fatalf("FilenameFor not deterministic: %q vs %q", got, want) + } +} + +func TestFilenameFor_MatchesTimestampNanoAndHashPrefix(t *testing.T) { + ts := time.Unix(1717171717, 42).UTC() + body := "hello agent" + + got := inbox.FilenameFor(ts, body) + + sum := sha256.Sum256([]byte(body)) + wantPrefix := strconv.FormatInt(ts.UnixNano(), 10) + "_" + hex.EncodeToString(sum[:])[:8] + ".md" + if got != wantPrefix { + t.Fatalf("FilenameFor(%v, %q) = %q, want %q", ts, body, got, wantPrefix) + } +} + +func TestFilenameFor_DiffersByMessage(t *testing.T) { + ts := time.Unix(1717171717, 42).UTC() + a := inbox.FilenameFor(ts, "alpha") + b := inbox.FilenameFor(ts, "beta") + if a == b { + t.Fatalf("different messages produced same filename: %q", a) + } +} + +func TestSatisfiesAgentMessenger(t *testing.T) { + var _ ports.AgentMessenger = (*inbox.Messenger)(nil) +} + +type fakeLookup struct { + path string + err error +} + +func (f fakeLookup) WorkspacePath(context.Context, domain.SessionID) (string, error) { + return f.path, f.err +} + +func TestSend_WritesMessageFile(t *testing.T) { + dir := t.TempDir() + m := inbox.New(fakeLookup{path: dir}) + if err := m.Send(context.Background(), "s-1", "hello agent"); err != nil { + t.Fatal(err) + } + inboxDir := filepath.Join(dir, ".ao", "inbox") + entries, err := os.ReadDir(inboxDir) + if err != nil { + t.Fatalf("inbox dir: %v", err) + } + if len(entries) != 1 { + t.Fatalf("want 1 file, got %d", len(entries)) + } + name := entries[0].Name() + if !strings.HasSuffix(name, ".md") { + t.Errorf("want .md suffix, got %q", name) + } + body, err := os.ReadFile(filepath.Join(inboxDir, name)) + if err != nil { + t.Fatal(err) + } + if string(body) != "hello agent" { + t.Errorf("body %q want %q", body, "hello agent") + } +} + +func TestSend_CreatesInboxDirIfMissing(t *testing.T) { + dir := t.TempDir() + // dir contains no .ao yet. + m := inbox.New(fakeLookup{path: dir}) + if err := m.Send(context.Background(), "s-1", "x"); err != nil { + t.Fatal(err) + } + if _, err := os.Stat(filepath.Join(dir, ".ao", "inbox")); err != nil { + t.Fatalf("inbox dir not created: %v", err) + } +} + +func TestSend_TwoSendsProduceTwoFiles(t *testing.T) { + dir := t.TempDir() + m := inbox.New(fakeLookup{path: dir}) + ctx := context.Background() + if err := m.Send(ctx, "s-1", "first"); err != nil { + t.Fatal(err) + } + if err := m.Send(ctx, "s-1", "second"); err != nil { + t.Fatal(err) + } + entries, _ := os.ReadDir(filepath.Join(dir, ".ao", "inbox")) + if len(entries) != 2 { + t.Fatalf("want 2 files, got %d", len(entries)) + } +} + +func TestSend_UnknownSessionReturnsError(t *testing.T) { + m := inbox.New(fakeLookup{err: errors.New("not found")}) + err := m.Send(context.Background(), "s-1", "x") + if err == nil { + t.Fatal("expected error when workspace lookup fails") + } + if !strings.Contains(err.Error(), "not found") { + t.Errorf("error should wrap lookup error, got %v", err) + } +} + +func TestSend_EmptyWorkspacePathReturnsError(t *testing.T) { + // A spawned-but-not-yet-mark-spawned row has WorkspacePath == "". The + // messenger must refuse rather than write into "/.ao/inbox/...". + m := inbox.New(fakeLookup{path: ""}) + if err := m.Send(context.Background(), "s-1", "x"); err == nil { + t.Fatal("expected error for empty workspace path") + } +} + +func TestSend_SymlinkedInboxIsRefused(t *testing.T) { + dir := t.TempDir() + // Create .ao/inbox as a symlink to a sibling directory. + target := t.TempDir() + if err := os.MkdirAll(filepath.Join(dir, ".ao"), 0o755); err != nil { + t.Fatal(err) + } + if err := os.Symlink(target, filepath.Join(dir, ".ao", "inbox")); err != nil { + t.Skipf("symlink not supported: %v", err) + } + m := inbox.New(fakeLookup{path: dir}) + err := m.Send(context.Background(), "s-1", "x") + if err == nil { + t.Fatal("expected refusal when inbox is a symlink") + } + if entries, _ := os.ReadDir(target); len(entries) != 0 { + t.Errorf("symlink target should not have received writes, got %d entries", len(entries)) + } +} + +func TestSend_EmptyMessageStillWritesAFile(t *testing.T) { + dir := t.TempDir() + m := inbox.New(fakeLookup{path: dir}) + if err := m.Send(context.Background(), "s-1", ""); err != nil { + t.Fatal(err) + } + entries, _ := os.ReadDir(filepath.Join(dir, ".ao", "inbox")) + if len(entries) != 1 { + t.Fatalf("want 1 file even for empty message, got %d", len(entries)) + } +} + +func TestSend_FilenameContainsTimestampAndHashPrefix(t *testing.T) { + dir := t.TempDir() + m := inbox.New(fakeLookup{path: dir}) + if err := m.Send(context.Background(), "s-1", "payload"); err != nil { + t.Fatal(err) + } + entries, _ := os.ReadDir(filepath.Join(dir, ".ao", "inbox")) + name := strings.TrimSuffix(entries[0].Name(), ".md") + // Format: _; underscore separator avoids the timestamp's own dashes. + parts := strings.SplitN(name, "_", 2) + if len(parts) != 2 { + t.Fatalf("filename should be _.md, got %q", entries[0].Name()) + } + if len(parts[1]) < 4 { + t.Errorf("hash prefix too short: %q", parts[1]) + } +} diff --git a/backend/internal/adapters/messenger/panep/panep.go b/backend/internal/adapters/messenger/panep/panep.go new file mode 100644 index 00000000..eb4586ad --- /dev/null +++ b/backend/internal/adapters/messenger/panep/panep.go @@ -0,0 +1,82 @@ +// Package panep ("pane ping") implements ports.AgentMessenger by typing a +// short pointer into the agent's runtime pane: "📥 ao: new message at +// .ao/inbox/ — please read it". The inbox messenger writes the file; +// panep merely tells the agent to read it. Multi-line message bodies typed +// verbatim fight with the agent's input handler, so a pointer is robust where +// pasting the full body is not. +// +// panep is best-effort: composite.Messenger logs and swallows panep errors so a +// missed nudge never loses the message — the inbox file is still on disk. +package panep + +import ( + "context" + "fmt" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/inbox" + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +// SessionLookup resolves a session id to its runtime handle id and workspace +// path. The sqlite store satisfies this via a small adapter in +// daemon/lifecycle_wiring.go. The workspace path is required so panep can prove +// the inbox messenger had somewhere to write before pointing at the file. +type SessionLookup interface { + SessionHandle(ctx context.Context, id domain.SessionID) (handleID, workspacePath string, err error) +} + +// RuntimePaneWriter is the narrow runtime contract panep depends on: type +// characters into the pane identified by handle. Kept separate from +// ports.Runtime so adding a pane-ping does not widen the runtime port. +type RuntimePaneWriter interface { + WriteChars(ctx context.Context, handle ports.RuntimeHandle, s string) error +} + +// Messenger pings the agent's pane with a pointer at a freshly-written inbox +// file. It does not write the file itself — that is the inbox messenger's job. +type Messenger struct { + runtime RuntimePaneWriter + lookup SessionLookup + clock func() time.Time +} + +// New constructs a Messenger. Both deps are required. +func New(runtime RuntimePaneWriter, lookup SessionLookup) *Messenger { + return &Messenger{runtime: runtime, lookup: lookup, clock: time.Now} +} + +var _ ports.AgentMessenger = (*Messenger)(nil) + +// Send pings the agent pane with a pointer to .ao/inbox/. The filename +// is derived from the same (clock(), message) inputs the inbox messenger uses, +// so the two adapters agree on a single name when invoked together by the +// composite messenger. +func (m *Messenger) Send(ctx context.Context, id domain.SessionID, message string) error { + handleID, ws, err := m.lookup.SessionHandle(ctx, id) + if err != nil { + return fmt.Errorf("panep: lookup handle for %s: %w", id, err) + } + if handleID == "" { + return fmt.Errorf("panep: empty runtime handle for %s", id) + } + if ws == "" { + return fmt.Errorf("panep: empty workspace path for %s", id) + } + + // Reads the timestamp the composite messenger stashed via inbox.WithTime + // so the filename here matches what the inbox messenger just wrote. Outside + // a composite, falls back to m.clock — useful for tests and any future + // caller that uses panep stand-alone. + filename := inbox.FilenameFor(inbox.TimeFromContext(ctx, m.clock), message) + body := fmt.Sprintf("📥 ao: new message at .ao/inbox/%s — please read it", filename) + handle := ports.RuntimeHandle{ID: handleID} + if err := m.runtime.WriteChars(ctx, handle, body); err != nil { + return fmt.Errorf("panep: write ping for %s: %w", id, err) + } + if err := m.runtime.WriteChars(ctx, handle, "\n"); err != nil { + return fmt.Errorf("panep: submit ping for %s: %w", id, err) + } + return nil +} diff --git a/backend/internal/adapters/messenger/panep/panep_test.go b/backend/internal/adapters/messenger/panep/panep_test.go new file mode 100644 index 00000000..578e7d40 --- /dev/null +++ b/backend/internal/adapters/messenger/panep/panep_test.go @@ -0,0 +1,134 @@ +package panep_test + +import ( + "context" + "errors" + "strings" + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/inbox" + "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/panep" + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +func TestSatisfiesAgentMessenger(t *testing.T) { + var _ ports.AgentMessenger = (*panep.Messenger)(nil) +} + +type fakeLookup struct { + handle string + workspace string + err error +} + +func (f fakeLookup) SessionHandle(context.Context, domain.SessionID) (string, string, error) { + return f.handle, f.workspace, f.err +} + +type writeCall struct { + handle ports.RuntimeHandle + s string +} + +type fakeRuntime struct { + err error + calls []writeCall +} + +func (f *fakeRuntime) WriteChars(_ context.Context, handle ports.RuntimeHandle, s string) error { + f.calls = append(f.calls, writeCall{handle: handle, s: s}) + return f.err +} + +func TestSend_PingsPaneWithFilenameMatchingInbox(t *testing.T) { + lookup := fakeLookup{handle: "sess-id/terminal_0", workspace: "/ws"} + rt := &fakeRuntime{} + fixed := time.Unix(1717171717, 42).UTC() + m := panep.New(rt, lookup) + + // In production the composite injects the timestamp via inbox.WithTime so + // the panep ping filename matches what inbox just wrote. Pin it here. + ctx := inbox.WithTime(context.Background(), fixed) + if err := m.Send(ctx, "s-1", "ε hello"); err != nil { + t.Fatalf("Send: %v", err) + } + if len(rt.calls) != 2 { + t.Fatalf("want 2 WriteChars calls (body + newline), got %d", len(rt.calls)) + } + // The exact filename the inbox messenger would write for the same + // (clock, message); panep must point at it. + wantFilename := inbox.FilenameFor(fixed, "ε hello") + if !strings.Contains(rt.calls[0].s, wantFilename) { + t.Errorf("ping body should reference inbox filename %q, got %q", wantFilename, rt.calls[0].s) + } + if !strings.Contains(rt.calls[0].s, ".ao/inbox/") { + t.Errorf("ping body should mention .ao/inbox/, got %q", rt.calls[0].s) + } + if rt.calls[0].handle.ID != "sess-id/terminal_0" { + t.Errorf("handle ID = %q, want sess-id/terminal_0", rt.calls[0].handle.ID) + } + if rt.calls[1].s != "\n" { + t.Errorf("second WriteChars should be newline to submit, got %q", rt.calls[1].s) + } +} + +func TestSend_LookupErrorIsWrapped(t *testing.T) { + lookup := fakeLookup{err: errors.New("db dead")} + rt := &fakeRuntime{} + m := panep.New(rt, lookup) + + err := m.Send(context.Background(), "s-1", "x") + if err == nil { + t.Fatal("expected error when SessionHandle fails") + } + if !strings.Contains(err.Error(), "db dead") { + t.Errorf("error should wrap lookup error, got %v", err) + } + if len(rt.calls) != 0 { + t.Errorf("runtime must not be called when lookup fails, got %d calls", len(rt.calls)) + } +} + +func TestSend_EmptyHandleIsError(t *testing.T) { + lookup := fakeLookup{handle: "", workspace: "/ws"} + rt := &fakeRuntime{} + m := panep.New(rt, lookup) + + if err := m.Send(context.Background(), "s-1", "x"); err == nil { + t.Fatal("expected error when runtime handle is empty") + } + if len(rt.calls) != 0 { + t.Errorf("runtime must not be called for empty handle, got %d calls", len(rt.calls)) + } +} + +func TestSend_EmptyWorkspacePathIsError(t *testing.T) { + // The ping body references .ao/inbox/; with no workspace path we + // cannot trust the inbox messenger wrote a real file there. + lookup := fakeLookup{handle: "sess-id/terminal_0", workspace: ""} + rt := &fakeRuntime{} + m := panep.New(rt, lookup) + + if err := m.Send(context.Background(), "s-1", "x"); err == nil { + t.Fatal("expected error when workspace path is empty") + } + if len(rt.calls) != 0 { + t.Errorf("runtime must not be called for empty workspace, got %d calls", len(rt.calls)) + } +} + +func TestSend_RuntimeErrorIsWrapped(t *testing.T) { + lookup := fakeLookup{handle: "sess-id/terminal_0", workspace: "/ws"} + rt := &fakeRuntime{err: errors.New("zellij: pipe broken")} + m := panep.New(rt, lookup) + + err := m.Send(context.Background(), "s-1", "x") + if err == nil { + t.Fatal("expected error when WriteChars fails") + } + if !strings.Contains(err.Error(), "zellij: pipe broken") { + t.Errorf("error should wrap runtime error, got %v", err) + } +} diff --git a/backend/internal/adapters/runtime/zellij/commands.go b/backend/internal/adapters/runtime/zellij/commands.go index 20446dde..96dd250e 100644 --- a/backend/internal/adapters/runtime/zellij/commands.go +++ b/backend/internal/adapters/runtime/zellij/commands.go @@ -42,6 +42,10 @@ func sendEnterArgs(id, paneID string) []string { return []string{"--session", id, "action", "send-keys", "--pane-id", paneID, "Enter"} } +func writeCharsArgs(id, paneID, s string) []string { + return []string{"--session", id, "action", "write-chars", "--pane-id", paneID, s} +} + func dumpScreenArgs(id, paneID string) []string { return []string{"--session", id, "action", "dump-screen", "--pane-id", paneID, "--full"} } diff --git a/backend/internal/adapters/runtime/zellij/zellij.go b/backend/internal/adapters/runtime/zellij/zellij.go index 536ad28a..09cff2c7 100644 --- a/backend/internal/adapters/runtime/zellij/zellij.go +++ b/backend/internal/adapters/runtime/zellij/zellij.go @@ -169,6 +169,25 @@ func (r *Runtime) Destroy(ctx context.Context, handle ports.RuntimeHandle) error return nil } +// WriteChars types s into the session's pane via `zellij action write-chars`. +// Single shot, no chunking, no follow-up Enter. The pane-ping messenger +// (panep) sequences body + newline as two separate calls so the agent's input +// handler sees one well-formed line followed by a submit. +// +// Not part of ports.Runtime — runtime adapters that don't model a typing +// surface (none of the message-injection adapters), and panep depends on the +// narrow RuntimePaneWriter contract instead of the full Runtime interface. +func (r *Runtime) WriteChars(ctx context.Context, handle ports.RuntimeHandle, s string) error { + id, paneID, err := handleID(handle) + if err != nil { + return err + } + if _, err := r.run(ctx, writeCharsArgs(id, paneID, s)...); err != nil { + return fmt.Errorf("zellij runtime: write-chars %s/%s: %w", id, paneID, err) + } + return nil +} + // SendMessage pastes a message into the session's pane (chunked) and presses // Enter to submit it. func (r *Runtime) SendMessage(ctx context.Context, handle ports.RuntimeHandle, message string) error { diff --git a/backend/internal/adapters/runtime/zellij/zellij_test.go b/backend/internal/adapters/runtime/zellij/zellij_test.go index 22e2e509..327f5b34 100644 --- a/backend/internal/adapters/runtime/zellij/zellij_test.go +++ b/backend/internal/adapters/runtime/zellij/zellij_test.go @@ -316,6 +316,35 @@ func TestParseVersion(t *testing.T) { } } +// WriteChars is the narrow seam panep's RuntimePaneWriter calls — a single +// `zellij action write-chars` against the handle's pane, no chunking, no +// follow-up Enter. The composite messenger sequences body + newline as two +// separate calls. +func TestWriteCharsInvokesZellijAction(t *testing.T) { + fr := &fakeRunner{} + r := New(Options{Timeout: time.Second}) + r.runner = fr + + if err := r.WriteChars(context.Background(), ports.RuntimeHandle{ID: "sess-1/terminal_0"}, "hi"); err != nil { + t.Fatalf("WriteChars: %v", err) + } + if len(fr.calls) != 1 { + t.Fatalf("calls = %d, want 1", len(fr.calls)) + } + want := []string{"--session", "sess-1", "action", "write-chars", "--pane-id", "terminal_0", "hi"} + if got := fr.calls[0].args; !reflect.DeepEqual(got, want) { + t.Fatalf("WriteChars args = %#v, want %#v", got, want) + } +} + +func TestWriteCharsRejectsBadHandle(t *testing.T) { + r := New(Options{Timeout: time.Second}) + r.runner = &fakeRunner{} + if err := r.WriteChars(context.Background(), ports.RuntimeHandle{ID: ""}, "hi"); err == nil { + t.Fatal("WriteChars with empty handle: got nil, want error") + } +} + func TestSendMessageChunksAndSendsEnter(t *testing.T) { fr := &fakeRunner{} r := New(Options{Timeout: time.Second, ChunkSize: 5}) diff --git a/backend/internal/cli/root.go b/backend/internal/cli/root.go index 293d431f..280cd7b1 100644 --- a/backend/internal/cli/root.go +++ b/backend/internal/cli/root.go @@ -148,6 +148,7 @@ func NewRootCommand(deps Deps) *cobra.Command { root.AddCommand(newStatusCommand(ctx)) root.AddCommand(newDoctorCommand(ctx)) root.AddCommand(newSpawnCommand(ctx)) + root.AddCommand(newSendCommand(ctx)) root.AddCommand(newProjectCommand(ctx)) root.AddCommand(newCompletionCommand()) root.AddCommand(newVersionCommand()) diff --git a/backend/internal/cli/send.go b/backend/internal/cli/send.go new file mode 100644 index 00000000..1effb2d1 --- /dev/null +++ b/backend/internal/cli/send.go @@ -0,0 +1,53 @@ +package cli + +import ( + "context" + "errors" + "net/url" + "strings" + + "github.com/spf13/cobra" +) + +type sendOptions struct { + session string + message string +} + +// sendAPIRequest mirrors the daemon's SendSessionMessageRequest body for +// POST /api/v1/sessions/{id}/send. The CLI keeps its own copy so it need not +// import httpd. +type sendAPIRequest struct { + Message string `json:"message"` +} + +func newSendCommand(ctx *commandContext) *cobra.Command { + var opts sendOptions + cmd := &cobra.Command{ + Use: "send", + Short: "Send a message to a running agent session", + Args: noArgs, + RunE: func(cmd *cobra.Command, _ []string) error { + return ctx.sendMessage(cmd.Context(), opts) + }, + } + cmd.Flags().StringVar(&opts.session, "session", "", "Session id (required)") + cmd.Flags().StringVar(&opts.message, "message", "", "Message body (required)") + return cmd +} + +func (c *commandContext) sendMessage(ctx context.Context, opts sendOptions) error { + message := strings.TrimSpace(opts.message) + if message == "" { + return usageError{errors.New("usage: --message is required")} + } + session := strings.TrimSpace(opts.session) + if session == "" { + return usageError{errors.New("usage: --session is required")} + } + + // PathEscape: session ids are already "-"/digit safe, but may later come + // from sanitized issue refs; keep the URL well-formed regardless. + path := "sessions/" + url.PathEscape(session) + "/send" + return c.postJSON(ctx, path, sendAPIRequest{Message: message}, nil) +} diff --git a/backend/internal/cli/send_test.go b/backend/internal/cli/send_test.go new file mode 100644 index 00000000..cd61a0fb --- /dev/null +++ b/backend/internal/cli/send_test.go @@ -0,0 +1,220 @@ +package cli + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/runfile" +) + +// sendServer wires an httptest server expecting POST /api/v1/sessions/{id}/send +// and captures the request body and path the CLI hit. +type sendCapture struct { + body string + path string +} + +// writeRunFileFor points the CLI's run-file at srv so postJSON dials the test +// server. It mirrors the run-file convention the other CLI tests use. +func writeRunFileFor(t *testing.T, cfg testConfig, srv *httptest.Server) { + t.Helper() + if err := runfile.Write(cfg.runFile, runfile.Info{ + PID: os.Getpid(), Port: serverPort(t, srv.URL), StartedAt: time.Unix(100, 0).UTC(), + }); err != nil { + t.Fatalf("write run-file: %v", err) + } +} + +func sendServer(t *testing.T, status int, respBody string) (*httptest.Server, *sendCapture) { + t.Helper() + capture := &sendCapture{} + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.NotFound(w, r) + return + } + if !strings.HasPrefix(r.URL.Path, "/api/v1/sessions/") || !strings.HasSuffix(r.URL.Path, "/send") { + http.NotFound(w, r) + return + } + body, err := io.ReadAll(r.Body) + if err != nil { + t.Fatalf("read body: %v", err) + } + capture.body = string(body) + capture.path = r.URL.Path + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _, _ = io.WriteString(w, respBody) + })) + t.Cleanup(srv.Close) + return srv, capture +} + +func TestSend_Success(t *testing.T) { + cfg := setConfigEnv(t) + srv, capture := sendServer(t, http.StatusOK, + `{"ok":true,"sessionId":"demo-1","message":"hello agent"}`) + writeRunFileFor(t, cfg, srv) + + _, errOut, err := executeCLI(t, Deps{ + ProcessAlive: func(int) bool { return true }, + }, "send", "--session", "demo-1", "--message", "hello agent") + if err != nil { + t.Fatalf("unexpected error: %v\nstderr=%s", err, errOut) + } + if capture.path != "/api/v1/sessions/demo-1/send" { + t.Errorf("path = %q, want /api/v1/sessions/demo-1/send", capture.path) + } + var req struct { + Message string `json:"message"` + } + if err := json.Unmarshal([]byte(capture.body), &req); err != nil { + t.Fatalf("decode body: %v\nbody=%s", err, capture.body) + } + if req.Message != "hello agent" { + t.Errorf("captured message = %q, want %q", req.Message, "hello agent") + } +} + +func TestSend_TrimsLeadingAndTrailingWhitespace(t *testing.T) { + cfg := setConfigEnv(t) + srv, capture := sendServer(t, http.StatusOK, `{"ok":true,"sessionId":"demo-1","message":"hi"}`) + writeRunFileFor(t, cfg, srv) + + _, _, err := executeCLI(t, Deps{ + ProcessAlive: func(int) bool { return true }, + }, "send", "--session", "demo-1", "--message", " hi ") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + var req struct { + Message string `json:"message"` + } + if err := json.Unmarshal([]byte(capture.body), &req); err != nil { + t.Fatalf("decode body: %v\nbody=%s", err, capture.body) + } + if req.Message != "hi" { + t.Errorf("server received %q, want trimmed %q", req.Message, "hi") + } +} + +func TestSend_EmptyMessageIsUsageError(t *testing.T) { + setConfigEnv(t) + _, _, err := executeCLI(t, Deps{}, "send", "--session", "demo-1", "--message", " ") + if err == nil { + t.Fatal("expected usage error for empty message") + } + if got := ExitCode(err); got != 2 { + t.Fatalf("exit code = %d, want 2", got) + } + if !strings.Contains(err.Error(), "--message is required") { + t.Fatalf("error missing usage message: %v", err) + } +} + +func TestSend_MissingSessionIsUsageError(t *testing.T) { + setConfigEnv(t) + _, _, err := executeCLI(t, Deps{}, "send", "--message", "hi") + if err == nil { + t.Fatal("expected usage error for missing --session") + } + if got := ExitCode(err); got != 2 { + t.Fatalf("exit code = %d, want 2", got) + } +} + +func TestSend_ServerBadRequestExits1(t *testing.T) { + cfg := setConfigEnv(t) + srv, _ := sendServer(t, http.StatusBadRequest, + `{"error":"bad_request","code":"MESSAGE_REQUIRED","message":"Message is required"}`) + writeRunFileFor(t, cfg, srv) + + _, errOut, err := executeCLI(t, Deps{ + ProcessAlive: func(int) bool { return true }, + }, "send", "--session", "demo-1", "--message", "hi") + if err == nil { + t.Fatal("expected runtime error from 400") + } + if got := ExitCode(err); got != 1 { + t.Fatalf("exit code = %d, want 1", got) + } + if !strings.Contains(err.Error(), "MESSAGE_REQUIRED") && !strings.Contains(errOut, "MESSAGE_REQUIRED") { + t.Fatalf("error did not surface the server error envelope: %v\nstderr=%s", err, errOut) + } +} + +func TestSend_ServerNotFoundExits1(t *testing.T) { + cfg := setConfigEnv(t) + srv, _ := sendServer(t, http.StatusNotFound, + `{"error":"not_found","code":"SESSION_NOT_FOUND","message":"Unknown session"}`) + writeRunFileFor(t, cfg, srv) + + _, _, err := executeCLI(t, Deps{ + ProcessAlive: func(int) bool { return true }, + }, "send", "--session", "missing", "--message", "hi") + if err == nil { + t.Fatal("expected runtime error from 404") + } + if got := ExitCode(err); got != 1 { + t.Fatalf("exit code = %d, want 1", got) + } +} + +func TestSend_ServerInternalErrorExits1(t *testing.T) { + cfg := setConfigEnv(t) + srv, _ := sendServer(t, http.StatusInternalServerError, + `{"error":"internal","code":"SESSION_OPERATION_FAILED","message":"Session operation failed"}`) + writeRunFileFor(t, cfg, srv) + + _, errOut, err := executeCLI(t, Deps{ + ProcessAlive: func(int) bool { return true }, + }, "send", "--session", "demo-1", "--message", "hi") + if err == nil { + t.Fatal("expected runtime error from 500") + } + if got := ExitCode(err); got != 1 { + t.Fatalf("exit code = %d, want 1", got) + } + // Regression guard: a future change that swallows the API envelope and + // prints only "daemon returned HTTP 500" would silently hide what the + // daemon was trying to tell the operator. + if !strings.Contains(err.Error(), "SESSION_OPERATION_FAILED") && !strings.Contains(errOut, "SESSION_OPERATION_FAILED") { + t.Fatalf("error did not surface the server error envelope: %v\nstderr=%s", err, errOut) + } +} + +func TestSend_DaemonNotRunningExits1(t *testing.T) { + setConfigEnv(t) + _, _, err := executeCLI(t, Deps{}, "send", "--session", "demo-1", "--message", "hi") + if err == nil { + t.Fatal("expected error when daemon is not running") + } + if got := ExitCode(err); got != 1 { + t.Fatalf("exit code = %d, want 1", got) + } +} + +func TestSend_NetworkErrorExits1(t *testing.T) { + cfg := setConfigEnv(t) + // Start and immediately close a server so the run-file points at a closed port. + srv, _ := sendServer(t, http.StatusOK, "{}") + writeRunFileFor(t, cfg, srv) + srv.Close() + + _, _, err := executeCLI(t, Deps{ + ProcessAlive: func(int) bool { return true }, + }, "send", "--session", "demo-1", "--message", "hi") + if err == nil { + t.Fatal("expected runtime error from network failure") + } + if got := ExitCode(err); got != 1 { + t.Fatalf("exit code = %d, want 1", got) + } +} diff --git a/backend/internal/daemon/daemon.go b/backend/internal/daemon/daemon.go index d38fa4ea..6d7c6e73 100644 --- a/backend/internal/daemon/daemon.go +++ b/backend/internal/daemon/daemon.go @@ -81,10 +81,16 @@ func Run() error { // change_log -> poller -> broadcaster) and gives startSession the shared LCM. lcStack := startLifecycle(ctx, store, runtimeAdapter, log) + // The agent messenger: inbox file write (durable, primary) composed with a + // live zellij pane ping (best-effort secondary). runtimeAdapter is the + // concrete zellij runtime, so it satisfies panep's RuntimePaneWriter seam. + messenger := newSessionMessenger(store, runtimeAdapter, log) + // Wire the controller-facing session service over the same store + LCM, the - // zellij runtime, a gitworktree workspace, and the per-session agent resolver - // (AO_AGENT default, validated here), then mount it on the API. - sessionSvc, err := startSession(cfg, runtimeAdapter, store, lcStack.LCM, log) + // zellij runtime, a gitworktree workspace, the per-session agent resolver + // (AO_AGENT default, validated here), and the agent messenger, then mount it + // on the API. + sessionSvc, err := startSession(cfg, runtimeAdapter, store, lcStack.LCM, messenger, log) if err != nil { stop() lcStack.Stop() diff --git a/backend/internal/daemon/lifecycle_wiring.go b/backend/internal/daemon/lifecycle_wiring.go index 51232299..57fd5409 100644 --- a/backend/internal/daemon/lifecycle_wiring.go +++ b/backend/internal/daemon/lifecycle_wiring.go @@ -9,6 +9,9 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/adapters" "github.com/aoagents/agent-orchestrator/backend/internal/adapters/agent/claudecode" "github.com/aoagents/agent-orchestrator/backend/internal/adapters/agent/codex" + "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/composite" + "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/inbox" + "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/panep" "github.com/aoagents/agent-orchestrator/backend/internal/adapters/workspace/gitworktree" "github.com/aoagents/agent-orchestrator/backend/internal/config" "github.com/aoagents/agent-orchestrator/backend/internal/domain" @@ -42,19 +45,12 @@ func startLifecycle(ctx context.Context, store *sqlite.Store, runtime ports.Runt // passed to startLifecycle before calling Stop. func (l *lifecycleStack) Stop() { <-l.reaperDone } -// noopMessenger is a stub ports.AgentMessenger: durable writes and notifications -// work without it; only live agent nudges are absent until the runtime/agent -// nudge path is wired. -type noopMessenger struct{} - -func (noopMessenger) Send(context.Context, domain.SessionID, string) error { return nil } - // startSession builds the controller-facing session service: a session manager // over the real zellij runtime, a per-session gitworktree workspace, the shared -// store + LCM, and the per-session agent resolver (AO_AGENT default). The -// Messenger is a stub until the live agent-nudge path lands. The returned -// service is mounted at httpd APIDeps.Sessions. -func startSession(cfg config.Config, runtime ports.Runtime, store *sqlite.Store, lcm *lifecycle.Manager, log *slog.Logger) (*sessionsvc.Service, error) { +// store + LCM, the per-session agent resolver (AO_AGENT default), and the +// composite agent messenger (inbox file write + live zellij pane ping). The +// returned service is mounted at httpd APIDeps.Sessions. +func startSession(cfg config.Config, runtime ports.Runtime, store *sqlite.Store, lcm *lifecycle.Manager, messenger ports.AgentMessenger, log *slog.Logger) (*sessionsvc.Service, error) { agents, err := buildAgentResolver(cfg.Agent, log) if err != nil { return nil, err @@ -76,13 +72,57 @@ func startSession(cfg config.Config, runtime ports.Runtime, store *sqlite.Store, Agents: agents, Workspace: ws, Store: store, - Messenger: noopMessenger{}, + Messenger: messenger, Lifecycle: lcm, DataDir: cfg.DataDir, }) return sessionsvc.New(mgr, store), nil } +// storeWorkspaceLookup adapts the sqlite store to the SessionWorkspace lookup +// the inbox messenger needs. WorkspacePath becomes meaningful only after the +// LCM records spawn metadata, so a session that exists but has no path is an +// error — Send must not invent a destination. +type storeWorkspaceLookup struct{ store *sqlite.Store } + +func (s storeWorkspaceLookup) WorkspacePath(ctx context.Context, id domain.SessionID) (string, error) { + rec, ok, err := s.store.GetSession(ctx, id) + if err != nil { + return "", err + } + if !ok { + return "", fmt.Errorf("session %s not found", id) + } + return rec.Metadata.WorkspacePath, nil +} + +// storeSessionHandleLookup adapts the sqlite store to panep.SessionLookup. +// panep needs the runtime handle id (to address the right zellij pane) and the +// workspace path (proof the inbox messenger had a real directory to write to). +type storeSessionHandleLookup struct{ store *sqlite.Store } + +func (s storeSessionHandleLookup) SessionHandle(ctx context.Context, id domain.SessionID) (string, string, error) { + rec, ok, err := s.store.GetSession(ctx, id) + if err != nil { + return "", "", err + } + if !ok { + return "", "", fmt.Errorf("session %s not found", id) + } + return rec.Metadata.RuntimeHandleID, rec.Metadata.WorkspacePath, nil +} + +// newSessionMessenger assembles the per-daemon agent messenger: inbox (durable +// file write, primary) wrapped in a composite with panep (live pane ping, +// best-effort secondary). Ordering matters — see composite.Messenger for the +// "primary must succeed, secondaries are nudges" contract. Replaces the old +// noopMessenger stub that silently dropped every agent nudge. +func newSessionMessenger(store *sqlite.Store, runtime panep.RuntimePaneWriter, logger *slog.Logger) ports.AgentMessenger { + inboxMsg := inbox.New(storeWorkspaceLookup{store: store}) + panepMsg := panep.New(runtime, storeSessionHandleLookup{store: store}) + return composite.New([]ports.AgentMessenger{inboxMsg, panepMsg}, logger) +} + // buildAgentRegistry returns a registry populated with the agent adapters the // daemon ships, keyed by manifest id. Registration only fails on an // empty/duplicate id — a programmer error, not a runtime condition. diff --git a/backend/internal/daemon/wiring_test.go b/backend/internal/daemon/wiring_test.go index d15f9dee..38ff2fce 100644 --- a/backend/internal/daemon/wiring_test.go +++ b/backend/internal/daemon/wiring_test.go @@ -5,11 +5,16 @@ import ( "errors" "io" "log/slog" + "os" + "path/filepath" "sync" "testing" "time" "github.com/aoagents/agent-orchestrator/backend/internal/adapters" + "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/composite" + "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/inbox" + "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/panep" "github.com/aoagents/agent-orchestrator/backend/internal/adapters/runtime/zellij" "github.com/aoagents/agent-orchestrator/backend/internal/cdc" "github.com/aoagents/agent-orchestrator/backend/internal/config" @@ -126,7 +131,9 @@ func TestWiring_StartSessionBuildsSessionService(t *testing.T) { lcm := lifecycle.New(store, nil) cfg := config.Config{DataDir: t.TempDir()} - svc, err := startSession(cfg, zellij.New(zellij.Options{}), store, lcm, log) + runtime := zellij.New(zellij.Options{}) + messenger := newSessionMessenger(store, runtime, log) + svc, err := startSession(cfg, runtime, store, lcm, messenger, log) if err != nil { t.Fatalf("startSession: %v", err) } @@ -135,6 +142,66 @@ func TestWiring_StartSessionBuildsSessionService(t *testing.T) { } } +// TestWiring_SessionMessengerIsInboxThenPanepComposite asserts the daemon wires +// the agent messenger as a composite of inbox (primary, durable file write) +// then panep (secondary, live pane ping) — the ordering the "primary must +// succeed, secondaries are nudges" contract depends on. It also proves the +// messenger reaches the same store the SM reads: a Send through a row the store +// owns lands an inbox file under that row's workspace. This is the switch that +// replaced the old noopMessenger, which silently dropped every nudge. +func TestWiring_SessionMessengerIsInboxThenPanepComposite(t *testing.T) { + store, err := sqlite.Open(t.TempDir()) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = store.Close() }) + + runtime := zellij.New(zellij.Options{}) + messenger := newSessionMessenger(store, runtime, nil) + + comp, ok := messenger.(*composite.Messenger) + if !ok { + t.Fatalf("session messenger should be *composite.Messenger, got %T", messenger) + } + if len(comp.Inner) != 2 { + t.Fatalf("composite should wrap exactly 2 inner messengers (inbox + panep), got %d", len(comp.Inner)) + } + if _, ok := comp.Inner[0].(*inbox.Messenger); !ok { + t.Errorf("composite Inner[0] should be *inbox.Messenger (primary), got %T", comp.Inner[0]) + } + if _, ok := comp.Inner[1].(*panep.Messenger); !ok { + t.Errorf("composite Inner[1] should be *panep.Messenger (secondary), got %T", comp.Inner[1]) + } + + // End-to-end: a session row in the shared store is reachable through the + // messenger. A second store would surface as "session not found" here. + ctx := context.Background() + if err := store.UpsertProject(ctx, domain.ProjectRecord{ID: "p", Path: "/repo/p", RegisteredAt: time.Now()}); err != nil { + t.Fatal(err) + } + workspaceDir := t.TempDir() + rec, err := store.CreateSession(ctx, domain.SessionRecord{ + ProjectID: "p", Kind: domain.KindWorker, + Activity: domain.Activity{State: domain.ActivityIdle, LastActivityAt: time.Now()}, + Metadata: domain.SessionMetadata{WorkspacePath: workspaceDir}, + }) + if err != nil { + t.Fatal(err) + } + // panep will fail (no live zellij pane), but it is best-effort: Send must + // still succeed because the inbox file write (primary) succeeded. + if err := messenger.Send(ctx, rec.ID, "hello agent"); err != nil { + t.Fatalf("messenger.Send through shared store lookup: %v", err) + } + entries, err := os.ReadDir(filepath.Join(workspaceDir, ".ao", "inbox")) + if err != nil { + t.Fatalf("inbox dir: %v", err) + } + if len(entries) != 1 { + t.Fatalf("want 1 inbox file, got %d", len(entries)) + } +} + // TestProjectRepoResolver_ResolvesRegisteredProject asserts the DB-backed repo // resolver turns a registered project into its on-disk repo path (so spawns // materialise a worktree), and fails loudly for an unregistered project. From 00a728843b12966d8a9cc352ddc83983992011d7 Mon Sep 17 00:00:00 2001 From: harshitsinghbhandari <24b4506@iitb.ac.in> Date: Tue, 2 Jun 2026 19:04:54 +0530 Subject: [PATCH 2/6] fix(inbox): use O_EXCL so a filename collision errors instead of clobbering MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit os.WriteFile opens with O_CREATE|O_WRONLY|O_TRUNC, which silently overwrites an existing file. The doc comment already stated the intent ("we do not retry on EEXIST"), but O_TRUNC never yields EEXIST — two identical messages sent on the same composite-pinned nanosecond would produce the same filename and the second Send would silently lose the first message. Switch to O_CREATE|O_EXCL|O_WRONLY so a collision surfaces as an error; O_EXCL also refuses to follow a symlink at the final path component. Add a regression test. Addresses greptile review on PR #83. Co-Authored-By: Claude Opus 4.7 --- .../adapters/messenger/inbox/inbox.go | 21 ++++++++++++++----- .../adapters/messenger/inbox/inbox_test.go | 21 +++++++++++++++++++ 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/backend/internal/adapters/messenger/inbox/inbox.go b/backend/internal/adapters/messenger/inbox/inbox.go index c55573ab..2f3d901c 100644 --- a/backend/internal/adapters/messenger/inbox/inbox.go +++ b/backend/internal/adapters/messenger/inbox/inbox.go @@ -42,12 +42,15 @@ var _ ports.AgentMessenger = (*Messenger)(nil) // Send writes message into /.ao/inbox/_.md. // // Filename collisions are practically impossible: nanosecond timestamp plus an -// 8-char hash of the body. We do not retry on EEXIST. +// 8-char hash of the body. The write uses O_EXCL so a collision (two identical +// messages on the same pinned nanosecond) surfaces as an error rather than +// silently clobbering the earlier message; we do not retry on EEXIST. // // Symlink safety: if .ao or .ao/inbox already exists as a symlink, refuse. -// Otherwise os.MkdirAll creates real directories and os.WriteFile (which uses -// O_CREATE|O_WRONLY|O_TRUNC without O_NOFOLLOW) writes the message body. The -// inbox is owned by ao; a symlink there is either user misconfig or attack. +// Otherwise os.MkdirAll creates real directories and the O_CREATE|O_EXCL open +// writes the message body. O_EXCL also refuses to follow a symlink at the final +// path component. The inbox is owned by ao; a symlink there is either user +// misconfig or attack. func (m *Messenger) Send(ctx context.Context, id domain.SessionID, message string) error { ws, err := m.lookup.WorkspacePath(ctx, id) if err != nil { @@ -67,9 +70,17 @@ func (m *Messenger) Send(ctx context.Context, id domain.SessionID, message strin } name := FilenameFor(TimeFromContext(ctx, m.clock), message) - if err := os.WriteFile(filepath.Join(inboxDir, name), []byte(message), 0o600); err != nil { + f, err := os.OpenFile(filepath.Join(inboxDir, name), os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o600) + if err != nil { return fmt.Errorf("inbox: write %s for %s: %w", name, id, err) } + if _, err := f.WriteString(message); err != nil { + _ = f.Close() + return fmt.Errorf("inbox: write body %s for %s: %w", name, id, err) + } + if err := f.Close(); err != nil { + return fmt.Errorf("inbox: close %s for %s: %w", name, id, err) + } return nil } diff --git a/backend/internal/adapters/messenger/inbox/inbox_test.go b/backend/internal/adapters/messenger/inbox/inbox_test.go index 5f29fced..3c0e6f47 100644 --- a/backend/internal/adapters/messenger/inbox/inbox_test.go +++ b/backend/internal/adapters/messenger/inbox/inbox_test.go @@ -118,6 +118,27 @@ func TestSend_TwoSendsProduceTwoFiles(t *testing.T) { } } +// TestSend_CollisionOnPinnedTimeSurfacesError guards the O_EXCL write: when the +// composite pins one timestamp for the whole fan-out (inbox.WithTime) and the +// same message is sent twice on that timestamp, the second write must error +// rather than silently clobber the first message's file. +func TestSend_CollisionOnPinnedTimeSurfacesError(t *testing.T) { + dir := t.TempDir() + m := inbox.New(fakeLookup{path: dir}) + ctx := inbox.WithTime(context.Background(), time.Unix(0, 1_700_000_000_123_456_789)) + + if err := m.Send(ctx, "s-1", "dup"); err != nil { + t.Fatalf("first send: %v", err) + } + if err := m.Send(ctx, "s-1", "dup"); err == nil { + t.Fatal("second send with the same pinned time + message should error, not clobber") + } + entries, _ := os.ReadDir(filepath.Join(dir, ".ao", "inbox")) + if len(entries) != 1 { + t.Fatalf("want exactly 1 file preserved, got %d", len(entries)) + } +} + func TestSend_UnknownSessionReturnsError(t *testing.T) { m := inbox.New(fakeLookup{err: errors.New("not found")}) err := m.Send(context.Background(), "s-1", "x") From 49af7baf6114859c01af4d009456cc89b668b578 Mon Sep 17 00:00:00 2001 From: harshitsinghbhandari <24b4506@iitb.ac.in> Date: Tue, 2 Jun 2026 19:12:44 +0530 Subject: [PATCH 3/6] fix(inbox): remove the freshly-created file when write or close fails The O_EXCL switch creates the inbox file before writing its body; if WriteString or Close then fails, the empty/partial .md was left on disk and the agent's next inbox scan would pick up a truncated ghost message. Remove the file on those error paths. O_EXCL guarantees the file did not exist before this call, so the cleanup can only delete our own partial write, never a legitimate earlier message. Addresses greptile review on PR #83. Co-Authored-By: Claude Opus 4.7 --- backend/internal/adapters/messenger/inbox/inbox.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/backend/internal/adapters/messenger/inbox/inbox.go b/backend/internal/adapters/messenger/inbox/inbox.go index 2f3d901c..e8b235b6 100644 --- a/backend/internal/adapters/messenger/inbox/inbox.go +++ b/backend/internal/adapters/messenger/inbox/inbox.go @@ -70,15 +70,22 @@ func (m *Messenger) Send(ctx context.Context, id domain.SessionID, message strin } name := FilenameFor(TimeFromContext(ctx, m.clock), message) - f, err := os.OpenFile(filepath.Join(inboxDir, name), os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o600) + path := filepath.Join(inboxDir, name) + f, err := os.OpenFile(path, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o600) if err != nil { return fmt.Errorf("inbox: write %s for %s: %w", name, id, err) } + // On a failed write/close, remove the file we just created: O_EXCL proved it + // did not exist before this call, so the cleanup can only delete our own + // empty/partial file — never a legitimate earlier message — and keeps the + // agent's next inbox scan from picking up a truncated ghost. if _, err := f.WriteString(message); err != nil { _ = f.Close() + _ = os.Remove(path) return fmt.Errorf("inbox: write body %s for %s: %w", name, id, err) } if err := f.Close(); err != nil { + _ = os.Remove(path) return fmt.Errorf("inbox: close %s for %s: %w", name, id, err) } return nil From f6278153e91e8b2f30fa8fcad20078ec4e04ab49 Mon Sep 17 00:00:00 2001 From: harshitsinghbhandari <24b4506@iitb.ac.in> Date: Tue, 2 Jun 2026 19:37:10 +0530 Subject: [PATCH 4/6] fix(messenger): reduce ao send to live pane delivery --- .../adapters/messenger/composite/composite.go | 69 ------ .../messenger/composite/composite_test.go | 223 ------------------ .../adapters/messenger/inbox/inbox.go | 153 ------------ .../adapters/messenger/inbox/inbox_test.go | 210 ----------------- .../adapters/messenger/panep/panep.go | 82 ------- .../adapters/messenger/panep/panep_test.go | 134 ----------- .../adapters/runtime/zellij/commands.go | 4 - .../adapters/runtime/zellij/zellij.go | 19 -- .../adapters/runtime/zellij/zellij_test.go | 29 --- backend/internal/daemon/daemon.go | 5 +- backend/internal/daemon/lifecycle_wiring.go | 63 ++--- backend/internal/daemon/wiring_test.go | 61 ++--- 12 files changed, 48 insertions(+), 1004 deletions(-) delete mode 100644 backend/internal/adapters/messenger/composite/composite.go delete mode 100644 backend/internal/adapters/messenger/composite/composite_test.go delete mode 100644 backend/internal/adapters/messenger/inbox/inbox.go delete mode 100644 backend/internal/adapters/messenger/inbox/inbox_test.go delete mode 100644 backend/internal/adapters/messenger/panep/panep.go delete mode 100644 backend/internal/adapters/messenger/panep/panep_test.go diff --git a/backend/internal/adapters/messenger/composite/composite.go b/backend/internal/adapters/messenger/composite/composite.go deleted file mode 100644 index 9d6cefaf..00000000 --- a/backend/internal/adapters/messenger/composite/composite.go +++ /dev/null @@ -1,69 +0,0 @@ -// Package composite fans Send out to a primary AgentMessenger followed by -// best-effort secondaries. The primary's failure aborts the whole call (the -// message was never delivered); a secondary's failure is logged and swallowed -// (the message IS delivered — the secondary was a live nudge, not a record). -// -// This is how the daemon wires the inbox messenger (primary, durable file -// write) with the pane-ping messenger (secondary, live zellij nudge): if we -// could not write the file we must not tell the agent about a file that does -// not exist, but if we wrote the file and could not ping, the agent will see -// it on the next inbox check. -package composite - -import ( - "context" - "io" - "log/slog" - "time" - - "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/inbox" - "github.com/aoagents/agent-orchestrator/backend/internal/domain" - "github.com/aoagents/agent-orchestrator/backend/internal/ports" -) - -// Messenger fans Send out across Inner messengers with primary-then-secondary -// semantics. Inner[0] is the primary (must succeed); Inner[1:] are best-effort -// secondaries whose failures are logged at WARN and swallowed. Inner is public -// so wiring tests can assert the daemon assembles the composite with the -// expected ordering (inbox first, panep second). -// -// Each Send pins one timestamp via inbox.WithTime so inner messengers that -// derive a filename from (t, message) agree on the same name — without this, -// the inbox file write and a downstream panep ping would land on different -// nanoseconds and the ping would point at a file that does not exist. -type Messenger struct { - Inner []ports.AgentMessenger - Logger *slog.Logger - Clock func() time.Time -} - -// New builds a Messenger. A nil logger is replaced with a discard logger so a -// secondary failure never panics a misconfigured caller. Clock defaults to -// time.Now. -func New(inner []ports.AgentMessenger, logger *slog.Logger) *Messenger { - if logger == nil { - logger = slog.New(slog.NewTextHandler(io.Discard, nil)) - } - return &Messenger{Inner: inner, Logger: logger, Clock: time.Now} -} - -var _ ports.AgentMessenger = (*Messenger)(nil) - -// Send invokes Inner[0] (primary); on failure, no secondaries run and the -// error is returned. On primary success, every Inner[1:] is invoked in order -// and their errors are logged at WARN. -func (c *Messenger) Send(ctx context.Context, id domain.SessionID, message string) error { - if len(c.Inner) == 0 { - return nil - } - ctx = inbox.WithTime(ctx, c.Clock()) - if err := c.Inner[0].Send(ctx, id, message); err != nil { - return err - } - for _, m := range c.Inner[1:] { - if err := m.Send(ctx, id, message); err != nil { - c.Logger.Warn("composite: secondary messenger failed", "sessionId", id, "err", err) - } - } - return nil -} diff --git a/backend/internal/adapters/messenger/composite/composite_test.go b/backend/internal/adapters/messenger/composite/composite_test.go deleted file mode 100644 index 9d60a060..00000000 --- a/backend/internal/adapters/messenger/composite/composite_test.go +++ /dev/null @@ -1,223 +0,0 @@ -package composite_test - -import ( - "context" - "errors" - "io" - "log/slog" - "os" - "path/filepath" - "strings" - "testing" - - "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/composite" - "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/inbox" - "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/panep" - "github.com/aoagents/agent-orchestrator/backend/internal/domain" - "github.com/aoagents/agent-orchestrator/backend/internal/ports" -) - -func TestSatisfiesAgentMessenger(t *testing.T) { - var _ ports.AgentMessenger = (*composite.Messenger)(nil) -} - -type recordingMessenger struct { - name string - err error - calls *[]string -} - -func (r *recordingMessenger) Send(_ context.Context, _ domain.SessionID, _ string) error { - *r.calls = append(*r.calls, r.name) - return r.err -} - -func nopLogger() *slog.Logger { - return slog.New(slog.NewTextHandler(io.Discard, nil)) -} - -func TestSend_FansOutInOrder(t *testing.T) { - var calls []string - primary := &recordingMessenger{name: "primary", calls: &calls} - secondary := &recordingMessenger{name: "secondary", calls: &calls} - - c := composite.New([]ports.AgentMessenger{primary, secondary}, nopLogger()) - if err := c.Send(context.Background(), "s-1", "hi"); err != nil { - t.Fatalf("Send: %v", err) - } - if len(calls) != 2 { - t.Fatalf("calls = %v, want 2", calls) - } - if calls[0] != "primary" || calls[1] != "secondary" { - t.Fatalf("call order = %v, want [primary secondary]", calls) - } -} - -func TestSend_PrimaryFailureSkipsSecondaries(t *testing.T) { - var calls []string - primary := &recordingMessenger{name: "primary", err: errors.New("disk full"), calls: &calls} - secondary := &recordingMessenger{name: "secondary", calls: &calls} - - c := composite.New([]ports.AgentMessenger{primary, secondary}, nopLogger()) - err := c.Send(context.Background(), "s-1", "hi") - if err == nil { - t.Fatal("expected error when primary fails") - } - if !strings.Contains(err.Error(), "disk full") { - t.Errorf("error should surface primary failure, got %v", err) - } - if len(calls) != 1 || calls[0] != "primary" { - t.Fatalf("calls = %v, want only [primary]", calls) - } -} - -func TestSend_SecondaryFailureIsLoggedAtWarnAndSwallowed(t *testing.T) { - var calls []string - primary := &recordingMessenger{name: "primary", calls: &calls} - secondary := &recordingMessenger{name: "secondary", err: errors.New("pipe broken"), calls: &calls} - - rec := &levelRecorder{} - logger := slog.New(rec) - - c := composite.New([]ports.AgentMessenger{primary, secondary}, logger) - if err := c.Send(context.Background(), "s-1", "hi"); err != nil { - t.Fatalf("Send must succeed when only the secondary fails, got %v", err) - } - if len(calls) != 2 { - t.Fatalf("calls = %v, want both invoked", calls) - } - if len(rec.records) != 1 { - t.Fatalf("want 1 log record for secondary failure, got %d", len(rec.records)) - } - if rec.records[0].Level != slog.LevelWarn { - t.Errorf("secondary failure logged at %v, want WARN", rec.records[0].Level) - } - if !strings.Contains(rec.records[0].Message+" "+rec.attrText(0), "pipe broken") { - t.Errorf("expected secondary failure surfaced in log, got message=%q attrs=%q", - rec.records[0].Message, rec.attrText(0)) - } -} - -// levelRecorder is a slog.Handler that captures full Records so tests can -// assert on level + message + attrs, not just a serialized substring. -type levelRecorder struct { - records []slog.Record - attrs [][]slog.Attr -} - -func (r *levelRecorder) Enabled(context.Context, slog.Level) bool { return true } - -func (r *levelRecorder) Handle(_ context.Context, rec slog.Record) error { - var collected []slog.Attr - rec.Attrs(func(a slog.Attr) bool { - collected = append(collected, a) - return true - }) - r.records = append(r.records, rec) - r.attrs = append(r.attrs, collected) - return nil -} - -func (r *levelRecorder) WithAttrs([]slog.Attr) slog.Handler { return r } -func (r *levelRecorder) WithGroup(string) slog.Handler { return r } - -func (r *levelRecorder) attrText(i int) string { - var b strings.Builder - for _, a := range r.attrs[i] { - b.WriteString(a.Key) - b.WriteString("=") - b.WriteString(a.Value.String()) - b.WriteString(" ") - } - return b.String() -} - -func TestSend_AllSecondariesAttemptedEvenIfOneFails(t *testing.T) { - var calls []string - primary := &recordingMessenger{name: "primary", calls: &calls} - sec1 := &recordingMessenger{name: "sec1", err: errors.New("transient"), calls: &calls} - sec2 := &recordingMessenger{name: "sec2", calls: &calls} - - c := composite.New([]ports.AgentMessenger{primary, sec1, sec2}, nopLogger()) - if err := c.Send(context.Background(), "s-1", "hi"); err != nil { - t.Fatalf("Send: %v", err) - } - if len(calls) != 3 || calls[0] != "primary" || calls[1] != "sec1" || calls[2] != "sec2" { - t.Fatalf("call order = %v, want [primary sec1 sec2]", calls) - } -} - -func TestSend_EmptyInnerListIsNoOp(t *testing.T) { - c := composite.New(nil, nopLogger()) - if err := c.Send(context.Background(), "s-1", "hi"); err != nil { - t.Fatalf("empty composite Send should be no-op, got %v", err) - } -} - -// inboxLookup adapts a tempdir into inbox.SessionWorkspace. -type fixedWorkspace struct{ path string } - -func (f fixedWorkspace) WorkspacePath(context.Context, domain.SessionID) (string, error) { - return f.path, nil -} - -// fixedSession adapts to panep.SessionLookup. -type fixedSession struct{ handle, workspace string } - -func (f fixedSession) SessionHandle(context.Context, domain.SessionID) (string, string, error) { - return f.handle, f.workspace, nil -} - -type recordingRuntime struct { - calls []string -} - -func (r *recordingRuntime) WriteChars(_ context.Context, _ ports.RuntimeHandle, s string) error { - r.calls = append(r.calls, s) - return nil -} - -// TestSend_PingFilenameMatchesOnDiskFilename is the end-to-end consistency -// proof the brief implicitly requires: a single composite.Send invocation must -// produce one inbox file on disk AND a ping body that names exactly that file. -// Without shared time, the inbox clock and panep clock fire at different -// nanoseconds and the filenames diverge. -func TestSend_PingFilenameMatchesOnDiskFilename(t *testing.T) { - workspace := t.TempDir() - inboxMsg := inbox.New(fixedWorkspace{path: workspace}) - rt := &recordingRuntime{} - panepMsg := panep.New(rt, fixedSession{handle: "sess-id/terminal_0", workspace: workspace}) - - c := composite.New([]ports.AgentMessenger{inboxMsg, panepMsg}, nopLogger()) - if err := c.Send(context.Background(), "s-1", "hello agent"); err != nil { - t.Fatalf("Send: %v", err) - } - - entries, err := os.ReadDir(filepath.Join(workspace, ".ao", "inbox")) - if err != nil { - t.Fatalf("read inbox dir: %v", err) - } - if len(entries) != 1 { - t.Fatalf("want 1 inbox file, got %d", len(entries)) - } - onDisk := entries[0].Name() - if len(rt.calls) == 0 { - t.Fatal("panep did not ping the pane") - } - if !strings.Contains(rt.calls[0], onDisk) { - t.Fatalf("ping body must reference the on-disk file %q, got %q", onDisk, rt.calls[0]) - } -} - -func TestNew_NilLoggerStillWorks(t *testing.T) { - // A nil logger should not panic — composite must default to a discard - // logger so misconfigured callers don't crash on the first secondary error. - var calls []string - primary := &recordingMessenger{name: "primary", calls: &calls} - secondary := &recordingMessenger{name: "secondary", err: errors.New("x"), calls: &calls} - - c := composite.New([]ports.AgentMessenger{primary, secondary}, nil) - if err := c.Send(context.Background(), "s-1", "hi"); err != nil { - t.Fatalf("Send with nil logger and secondary error must not return error, got %v", err) - } -} diff --git a/backend/internal/adapters/messenger/inbox/inbox.go b/backend/internal/adapters/messenger/inbox/inbox.go deleted file mode 100644 index e8b235b6..00000000 --- a/backend/internal/adapters/messenger/inbox/inbox.go +++ /dev/null @@ -1,153 +0,0 @@ -// Package inbox implements ports.AgentMessenger by writing each message as a -// file in /.ao/inbox/. The agent reads its inbox on demand; -// pinging the runtime pane to consume new files is a separate concern that -// lives in the runtime adapter, not here. -package inbox - -import ( - "context" - "crypto/sha256" - "encoding/hex" - "errors" - "fmt" - "os" - "path/filepath" - "strconv" - "time" - - "github.com/aoagents/agent-orchestrator/backend/internal/domain" - "github.com/aoagents/agent-orchestrator/backend/internal/ports" -) - -// SessionWorkspace resolves a session id to the absolute path of its workspace. -// The sqlite store satisfies this via GetSession; the adapter is in -// daemon/lifecycle_wiring.go. -type SessionWorkspace interface { - WorkspacePath(ctx context.Context, id domain.SessionID) (string, error) -} - -// Messenger writes inbox files into per-session workspaces. -type Messenger struct { - lookup SessionWorkspace - clock func() time.Time -} - -// New builds a Messenger over the given workspace lookup. lookup is required. -func New(lookup SessionWorkspace) *Messenger { - return &Messenger{lookup: lookup, clock: time.Now} -} - -var _ ports.AgentMessenger = (*Messenger)(nil) - -// Send writes message into /.ao/inbox/_.md. -// -// Filename collisions are practically impossible: nanosecond timestamp plus an -// 8-char hash of the body. The write uses O_EXCL so a collision (two identical -// messages on the same pinned nanosecond) surfaces as an error rather than -// silently clobbering the earlier message; we do not retry on EEXIST. -// -// Symlink safety: if .ao or .ao/inbox already exists as a symlink, refuse. -// Otherwise os.MkdirAll creates real directories and the O_CREATE|O_EXCL open -// writes the message body. O_EXCL also refuses to follow a symlink at the final -// path component. The inbox is owned by ao; a symlink there is either user -// misconfig or attack. -func (m *Messenger) Send(ctx context.Context, id domain.SessionID, message string) error { - ws, err := m.lookup.WorkspacePath(ctx, id) - if err != nil { - return fmt.Errorf("inbox: lookup workspace for %s: %w", id, err) - } - if ws == "" { - return fmt.Errorf("inbox: empty workspace path for %s", id) - } - - aoDir := filepath.Join(ws, ".ao") - if err := ensureRealDir(aoDir); err != nil { - return fmt.Errorf("inbox: prepare .ao for %s: %w", id, err) - } - inboxDir := filepath.Join(aoDir, "inbox") - if err := ensureRealDir(inboxDir); err != nil { - return fmt.Errorf("inbox: prepare inbox for %s: %w", id, err) - } - - name := FilenameFor(TimeFromContext(ctx, m.clock), message) - path := filepath.Join(inboxDir, name) - f, err := os.OpenFile(path, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o600) - if err != nil { - return fmt.Errorf("inbox: write %s for %s: %w", name, id, err) - } - // On a failed write/close, remove the file we just created: O_EXCL proved it - // did not exist before this call, so the cleanup can only delete our own - // empty/partial file — never a legitimate earlier message — and keeps the - // agent's next inbox scan from picking up a truncated ghost. - if _, err := f.WriteString(message); err != nil { - _ = f.Close() - _ = os.Remove(path) - return fmt.Errorf("inbox: write body %s for %s: %w", name, id, err) - } - if err := f.Close(); err != nil { - _ = os.Remove(path) - return fmt.Errorf("inbox: close %s for %s: %w", name, id, err) - } - return nil -} - -// ensureRealDir creates path if missing (0755), refuses if path is a symlink. -// Lstat (not Stat) is used so a symlink isn't followed into a different tree. -// -// The workspace root itself is not Lstat-checked because gitworktree.Workspace -// resolves ManagedRoot to an absolute, symlink-free path at construction -// (gitworktree.physicalAbs); per-session workspaces under it are created by ao. -// A symlinked .ao or .ao/inbox inside an ao-owned workspace would be user -// misconfig or attack, and is the only segment that can be tampered with -// between Spawn and Send. -func ensureRealDir(path string) error { - info, err := os.Lstat(path) - switch { - case err == nil: - if info.Mode()&os.ModeSymlink != 0 { - return fmt.Errorf("%q is a symlink; refusing to follow", path) - } - if !info.IsDir() { - return fmt.Errorf("%q exists and is not a directory", path) - } - return nil - case errors.Is(err, os.ErrNotExist): - return os.MkdirAll(path, 0o750) - default: - return err - } -} - -// FilenameFor builds a sortable, collision-resistant name from the timestamp -// and message body. Underscore separator keeps the timestamp's own dashes -// distinguishable from the hash prefix. Exported so adapters that point at the -// file (e.g. the pane-ping messenger) can derive the same name the inbox -// messenger would write for the same (t, message). -func FilenameFor(t time.Time, message string) string { - sum := sha256.Sum256([]byte(message)) - hash := hex.EncodeToString(sum[:])[:8] - return strconv.FormatInt(t.UnixNano(), 10) + "_" + hash + ".md" -} - -// sendTimeKey scopes the shared "Send timestamp" composite injects so inbox -// and panep derive the same filename for one Send call. The key is unexported -// — only the inbox.WithTime / TimeFromContext helpers can read or write it. -type sendTimeKey struct{} - -// WithTime attaches t to ctx as the shared timestamp the inbox messenger and -// any peers (panep) should use when deriving a filename via FilenameFor. The -// composite messenger calls this so a single Send produces one filename across -// every inner messenger, regardless of how long inbox's file I/O takes. -func WithTime(ctx context.Context, t time.Time) context.Context { - return context.WithValue(ctx, sendTimeKey{}, t) -} - -// TimeFromContext returns the timestamp WithTime stashed on ctx, falling back -// to fallback() if none is present. Callers running outside the composite -// (e.g. tests, direct use of inbox.Messenger) just get the fallback. -func TimeFromContext(ctx context.Context, fallback func() time.Time) time.Time { - if t, ok := ctx.Value(sendTimeKey{}).(time.Time); ok { - return t - } - return fallback() -} diff --git a/backend/internal/adapters/messenger/inbox/inbox_test.go b/backend/internal/adapters/messenger/inbox/inbox_test.go deleted file mode 100644 index 3c0e6f47..00000000 --- a/backend/internal/adapters/messenger/inbox/inbox_test.go +++ /dev/null @@ -1,210 +0,0 @@ -package inbox_test - -import ( - "context" - "crypto/sha256" - "encoding/hex" - "errors" - "os" - "path/filepath" - "strconv" - "strings" - "testing" - "time" - - "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/inbox" - "github.com/aoagents/agent-orchestrator/backend/internal/domain" - "github.com/aoagents/agent-orchestrator/backend/internal/ports" -) - -// FilenameFor is the shared helper used by both the inbox messenger (writes the -// file) and the panep messenger (points at the file). Both must agree on the -// exact name for the same (timestamp, message) input. -func TestFilenameFor_IsDeterministicForSameInput(t *testing.T) { - ts := time.Unix(1717171717, 42).UTC() - body := "ε ao test" - if got, want := inbox.FilenameFor(ts, body), inbox.FilenameFor(ts, body); got != want { - t.Fatalf("FilenameFor not deterministic: %q vs %q", got, want) - } -} - -func TestFilenameFor_MatchesTimestampNanoAndHashPrefix(t *testing.T) { - ts := time.Unix(1717171717, 42).UTC() - body := "hello agent" - - got := inbox.FilenameFor(ts, body) - - sum := sha256.Sum256([]byte(body)) - wantPrefix := strconv.FormatInt(ts.UnixNano(), 10) + "_" + hex.EncodeToString(sum[:])[:8] + ".md" - if got != wantPrefix { - t.Fatalf("FilenameFor(%v, %q) = %q, want %q", ts, body, got, wantPrefix) - } -} - -func TestFilenameFor_DiffersByMessage(t *testing.T) { - ts := time.Unix(1717171717, 42).UTC() - a := inbox.FilenameFor(ts, "alpha") - b := inbox.FilenameFor(ts, "beta") - if a == b { - t.Fatalf("different messages produced same filename: %q", a) - } -} - -func TestSatisfiesAgentMessenger(t *testing.T) { - var _ ports.AgentMessenger = (*inbox.Messenger)(nil) -} - -type fakeLookup struct { - path string - err error -} - -func (f fakeLookup) WorkspacePath(context.Context, domain.SessionID) (string, error) { - return f.path, f.err -} - -func TestSend_WritesMessageFile(t *testing.T) { - dir := t.TempDir() - m := inbox.New(fakeLookup{path: dir}) - if err := m.Send(context.Background(), "s-1", "hello agent"); err != nil { - t.Fatal(err) - } - inboxDir := filepath.Join(dir, ".ao", "inbox") - entries, err := os.ReadDir(inboxDir) - if err != nil { - t.Fatalf("inbox dir: %v", err) - } - if len(entries) != 1 { - t.Fatalf("want 1 file, got %d", len(entries)) - } - name := entries[0].Name() - if !strings.HasSuffix(name, ".md") { - t.Errorf("want .md suffix, got %q", name) - } - body, err := os.ReadFile(filepath.Join(inboxDir, name)) - if err != nil { - t.Fatal(err) - } - if string(body) != "hello agent" { - t.Errorf("body %q want %q", body, "hello agent") - } -} - -func TestSend_CreatesInboxDirIfMissing(t *testing.T) { - dir := t.TempDir() - // dir contains no .ao yet. - m := inbox.New(fakeLookup{path: dir}) - if err := m.Send(context.Background(), "s-1", "x"); err != nil { - t.Fatal(err) - } - if _, err := os.Stat(filepath.Join(dir, ".ao", "inbox")); err != nil { - t.Fatalf("inbox dir not created: %v", err) - } -} - -func TestSend_TwoSendsProduceTwoFiles(t *testing.T) { - dir := t.TempDir() - m := inbox.New(fakeLookup{path: dir}) - ctx := context.Background() - if err := m.Send(ctx, "s-1", "first"); err != nil { - t.Fatal(err) - } - if err := m.Send(ctx, "s-1", "second"); err != nil { - t.Fatal(err) - } - entries, _ := os.ReadDir(filepath.Join(dir, ".ao", "inbox")) - if len(entries) != 2 { - t.Fatalf("want 2 files, got %d", len(entries)) - } -} - -// TestSend_CollisionOnPinnedTimeSurfacesError guards the O_EXCL write: when the -// composite pins one timestamp for the whole fan-out (inbox.WithTime) and the -// same message is sent twice on that timestamp, the second write must error -// rather than silently clobber the first message's file. -func TestSend_CollisionOnPinnedTimeSurfacesError(t *testing.T) { - dir := t.TempDir() - m := inbox.New(fakeLookup{path: dir}) - ctx := inbox.WithTime(context.Background(), time.Unix(0, 1_700_000_000_123_456_789)) - - if err := m.Send(ctx, "s-1", "dup"); err != nil { - t.Fatalf("first send: %v", err) - } - if err := m.Send(ctx, "s-1", "dup"); err == nil { - t.Fatal("second send with the same pinned time + message should error, not clobber") - } - entries, _ := os.ReadDir(filepath.Join(dir, ".ao", "inbox")) - if len(entries) != 1 { - t.Fatalf("want exactly 1 file preserved, got %d", len(entries)) - } -} - -func TestSend_UnknownSessionReturnsError(t *testing.T) { - m := inbox.New(fakeLookup{err: errors.New("not found")}) - err := m.Send(context.Background(), "s-1", "x") - if err == nil { - t.Fatal("expected error when workspace lookup fails") - } - if !strings.Contains(err.Error(), "not found") { - t.Errorf("error should wrap lookup error, got %v", err) - } -} - -func TestSend_EmptyWorkspacePathReturnsError(t *testing.T) { - // A spawned-but-not-yet-mark-spawned row has WorkspacePath == "". The - // messenger must refuse rather than write into "/.ao/inbox/...". - m := inbox.New(fakeLookup{path: ""}) - if err := m.Send(context.Background(), "s-1", "x"); err == nil { - t.Fatal("expected error for empty workspace path") - } -} - -func TestSend_SymlinkedInboxIsRefused(t *testing.T) { - dir := t.TempDir() - // Create .ao/inbox as a symlink to a sibling directory. - target := t.TempDir() - if err := os.MkdirAll(filepath.Join(dir, ".ao"), 0o755); err != nil { - t.Fatal(err) - } - if err := os.Symlink(target, filepath.Join(dir, ".ao", "inbox")); err != nil { - t.Skipf("symlink not supported: %v", err) - } - m := inbox.New(fakeLookup{path: dir}) - err := m.Send(context.Background(), "s-1", "x") - if err == nil { - t.Fatal("expected refusal when inbox is a symlink") - } - if entries, _ := os.ReadDir(target); len(entries) != 0 { - t.Errorf("symlink target should not have received writes, got %d entries", len(entries)) - } -} - -func TestSend_EmptyMessageStillWritesAFile(t *testing.T) { - dir := t.TempDir() - m := inbox.New(fakeLookup{path: dir}) - if err := m.Send(context.Background(), "s-1", ""); err != nil { - t.Fatal(err) - } - entries, _ := os.ReadDir(filepath.Join(dir, ".ao", "inbox")) - if len(entries) != 1 { - t.Fatalf("want 1 file even for empty message, got %d", len(entries)) - } -} - -func TestSend_FilenameContainsTimestampAndHashPrefix(t *testing.T) { - dir := t.TempDir() - m := inbox.New(fakeLookup{path: dir}) - if err := m.Send(context.Background(), "s-1", "payload"); err != nil { - t.Fatal(err) - } - entries, _ := os.ReadDir(filepath.Join(dir, ".ao", "inbox")) - name := strings.TrimSuffix(entries[0].Name(), ".md") - // Format: _; underscore separator avoids the timestamp's own dashes. - parts := strings.SplitN(name, "_", 2) - if len(parts) != 2 { - t.Fatalf("filename should be _.md, got %q", entries[0].Name()) - } - if len(parts[1]) < 4 { - t.Errorf("hash prefix too short: %q", parts[1]) - } -} diff --git a/backend/internal/adapters/messenger/panep/panep.go b/backend/internal/adapters/messenger/panep/panep.go deleted file mode 100644 index eb4586ad..00000000 --- a/backend/internal/adapters/messenger/panep/panep.go +++ /dev/null @@ -1,82 +0,0 @@ -// Package panep ("pane ping") implements ports.AgentMessenger by typing a -// short pointer into the agent's runtime pane: "📥 ao: new message at -// .ao/inbox/ — please read it". The inbox messenger writes the file; -// panep merely tells the agent to read it. Multi-line message bodies typed -// verbatim fight with the agent's input handler, so a pointer is robust where -// pasting the full body is not. -// -// panep is best-effort: composite.Messenger logs and swallows panep errors so a -// missed nudge never loses the message — the inbox file is still on disk. -package panep - -import ( - "context" - "fmt" - "time" - - "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/inbox" - "github.com/aoagents/agent-orchestrator/backend/internal/domain" - "github.com/aoagents/agent-orchestrator/backend/internal/ports" -) - -// SessionLookup resolves a session id to its runtime handle id and workspace -// path. The sqlite store satisfies this via a small adapter in -// daemon/lifecycle_wiring.go. The workspace path is required so panep can prove -// the inbox messenger had somewhere to write before pointing at the file. -type SessionLookup interface { - SessionHandle(ctx context.Context, id domain.SessionID) (handleID, workspacePath string, err error) -} - -// RuntimePaneWriter is the narrow runtime contract panep depends on: type -// characters into the pane identified by handle. Kept separate from -// ports.Runtime so adding a pane-ping does not widen the runtime port. -type RuntimePaneWriter interface { - WriteChars(ctx context.Context, handle ports.RuntimeHandle, s string) error -} - -// Messenger pings the agent's pane with a pointer at a freshly-written inbox -// file. It does not write the file itself — that is the inbox messenger's job. -type Messenger struct { - runtime RuntimePaneWriter - lookup SessionLookup - clock func() time.Time -} - -// New constructs a Messenger. Both deps are required. -func New(runtime RuntimePaneWriter, lookup SessionLookup) *Messenger { - return &Messenger{runtime: runtime, lookup: lookup, clock: time.Now} -} - -var _ ports.AgentMessenger = (*Messenger)(nil) - -// Send pings the agent pane with a pointer to .ao/inbox/. The filename -// is derived from the same (clock(), message) inputs the inbox messenger uses, -// so the two adapters agree on a single name when invoked together by the -// composite messenger. -func (m *Messenger) Send(ctx context.Context, id domain.SessionID, message string) error { - handleID, ws, err := m.lookup.SessionHandle(ctx, id) - if err != nil { - return fmt.Errorf("panep: lookup handle for %s: %w", id, err) - } - if handleID == "" { - return fmt.Errorf("panep: empty runtime handle for %s", id) - } - if ws == "" { - return fmt.Errorf("panep: empty workspace path for %s", id) - } - - // Reads the timestamp the composite messenger stashed via inbox.WithTime - // so the filename here matches what the inbox messenger just wrote. Outside - // a composite, falls back to m.clock — useful for tests and any future - // caller that uses panep stand-alone. - filename := inbox.FilenameFor(inbox.TimeFromContext(ctx, m.clock), message) - body := fmt.Sprintf("📥 ao: new message at .ao/inbox/%s — please read it", filename) - handle := ports.RuntimeHandle{ID: handleID} - if err := m.runtime.WriteChars(ctx, handle, body); err != nil { - return fmt.Errorf("panep: write ping for %s: %w", id, err) - } - if err := m.runtime.WriteChars(ctx, handle, "\n"); err != nil { - return fmt.Errorf("panep: submit ping for %s: %w", id, err) - } - return nil -} diff --git a/backend/internal/adapters/messenger/panep/panep_test.go b/backend/internal/adapters/messenger/panep/panep_test.go deleted file mode 100644 index 578e7d40..00000000 --- a/backend/internal/adapters/messenger/panep/panep_test.go +++ /dev/null @@ -1,134 +0,0 @@ -package panep_test - -import ( - "context" - "errors" - "strings" - "testing" - "time" - - "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/inbox" - "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/panep" - "github.com/aoagents/agent-orchestrator/backend/internal/domain" - "github.com/aoagents/agent-orchestrator/backend/internal/ports" -) - -func TestSatisfiesAgentMessenger(t *testing.T) { - var _ ports.AgentMessenger = (*panep.Messenger)(nil) -} - -type fakeLookup struct { - handle string - workspace string - err error -} - -func (f fakeLookup) SessionHandle(context.Context, domain.SessionID) (string, string, error) { - return f.handle, f.workspace, f.err -} - -type writeCall struct { - handle ports.RuntimeHandle - s string -} - -type fakeRuntime struct { - err error - calls []writeCall -} - -func (f *fakeRuntime) WriteChars(_ context.Context, handle ports.RuntimeHandle, s string) error { - f.calls = append(f.calls, writeCall{handle: handle, s: s}) - return f.err -} - -func TestSend_PingsPaneWithFilenameMatchingInbox(t *testing.T) { - lookup := fakeLookup{handle: "sess-id/terminal_0", workspace: "/ws"} - rt := &fakeRuntime{} - fixed := time.Unix(1717171717, 42).UTC() - m := panep.New(rt, lookup) - - // In production the composite injects the timestamp via inbox.WithTime so - // the panep ping filename matches what inbox just wrote. Pin it here. - ctx := inbox.WithTime(context.Background(), fixed) - if err := m.Send(ctx, "s-1", "ε hello"); err != nil { - t.Fatalf("Send: %v", err) - } - if len(rt.calls) != 2 { - t.Fatalf("want 2 WriteChars calls (body + newline), got %d", len(rt.calls)) - } - // The exact filename the inbox messenger would write for the same - // (clock, message); panep must point at it. - wantFilename := inbox.FilenameFor(fixed, "ε hello") - if !strings.Contains(rt.calls[0].s, wantFilename) { - t.Errorf("ping body should reference inbox filename %q, got %q", wantFilename, rt.calls[0].s) - } - if !strings.Contains(rt.calls[0].s, ".ao/inbox/") { - t.Errorf("ping body should mention .ao/inbox/, got %q", rt.calls[0].s) - } - if rt.calls[0].handle.ID != "sess-id/terminal_0" { - t.Errorf("handle ID = %q, want sess-id/terminal_0", rt.calls[0].handle.ID) - } - if rt.calls[1].s != "\n" { - t.Errorf("second WriteChars should be newline to submit, got %q", rt.calls[1].s) - } -} - -func TestSend_LookupErrorIsWrapped(t *testing.T) { - lookup := fakeLookup{err: errors.New("db dead")} - rt := &fakeRuntime{} - m := panep.New(rt, lookup) - - err := m.Send(context.Background(), "s-1", "x") - if err == nil { - t.Fatal("expected error when SessionHandle fails") - } - if !strings.Contains(err.Error(), "db dead") { - t.Errorf("error should wrap lookup error, got %v", err) - } - if len(rt.calls) != 0 { - t.Errorf("runtime must not be called when lookup fails, got %d calls", len(rt.calls)) - } -} - -func TestSend_EmptyHandleIsError(t *testing.T) { - lookup := fakeLookup{handle: "", workspace: "/ws"} - rt := &fakeRuntime{} - m := panep.New(rt, lookup) - - if err := m.Send(context.Background(), "s-1", "x"); err == nil { - t.Fatal("expected error when runtime handle is empty") - } - if len(rt.calls) != 0 { - t.Errorf("runtime must not be called for empty handle, got %d calls", len(rt.calls)) - } -} - -func TestSend_EmptyWorkspacePathIsError(t *testing.T) { - // The ping body references .ao/inbox/; with no workspace path we - // cannot trust the inbox messenger wrote a real file there. - lookup := fakeLookup{handle: "sess-id/terminal_0", workspace: ""} - rt := &fakeRuntime{} - m := panep.New(rt, lookup) - - if err := m.Send(context.Background(), "s-1", "x"); err == nil { - t.Fatal("expected error when workspace path is empty") - } - if len(rt.calls) != 0 { - t.Errorf("runtime must not be called for empty workspace, got %d calls", len(rt.calls)) - } -} - -func TestSend_RuntimeErrorIsWrapped(t *testing.T) { - lookup := fakeLookup{handle: "sess-id/terminal_0", workspace: "/ws"} - rt := &fakeRuntime{err: errors.New("zellij: pipe broken")} - m := panep.New(rt, lookup) - - err := m.Send(context.Background(), "s-1", "x") - if err == nil { - t.Fatal("expected error when WriteChars fails") - } - if !strings.Contains(err.Error(), "zellij: pipe broken") { - t.Errorf("error should wrap runtime error, got %v", err) - } -} diff --git a/backend/internal/adapters/runtime/zellij/commands.go b/backend/internal/adapters/runtime/zellij/commands.go index 96dd250e..20446dde 100644 --- a/backend/internal/adapters/runtime/zellij/commands.go +++ b/backend/internal/adapters/runtime/zellij/commands.go @@ -42,10 +42,6 @@ func sendEnterArgs(id, paneID string) []string { return []string{"--session", id, "action", "send-keys", "--pane-id", paneID, "Enter"} } -func writeCharsArgs(id, paneID, s string) []string { - return []string{"--session", id, "action", "write-chars", "--pane-id", paneID, s} -} - func dumpScreenArgs(id, paneID string) []string { return []string{"--session", id, "action", "dump-screen", "--pane-id", paneID, "--full"} } diff --git a/backend/internal/adapters/runtime/zellij/zellij.go b/backend/internal/adapters/runtime/zellij/zellij.go index 09cff2c7..536ad28a 100644 --- a/backend/internal/adapters/runtime/zellij/zellij.go +++ b/backend/internal/adapters/runtime/zellij/zellij.go @@ -169,25 +169,6 @@ func (r *Runtime) Destroy(ctx context.Context, handle ports.RuntimeHandle) error return nil } -// WriteChars types s into the session's pane via `zellij action write-chars`. -// Single shot, no chunking, no follow-up Enter. The pane-ping messenger -// (panep) sequences body + newline as two separate calls so the agent's input -// handler sees one well-formed line followed by a submit. -// -// Not part of ports.Runtime — runtime adapters that don't model a typing -// surface (none of the message-injection adapters), and panep depends on the -// narrow RuntimePaneWriter contract instead of the full Runtime interface. -func (r *Runtime) WriteChars(ctx context.Context, handle ports.RuntimeHandle, s string) error { - id, paneID, err := handleID(handle) - if err != nil { - return err - } - if _, err := r.run(ctx, writeCharsArgs(id, paneID, s)...); err != nil { - return fmt.Errorf("zellij runtime: write-chars %s/%s: %w", id, paneID, err) - } - return nil -} - // SendMessage pastes a message into the session's pane (chunked) and presses // Enter to submit it. func (r *Runtime) SendMessage(ctx context.Context, handle ports.RuntimeHandle, message string) error { diff --git a/backend/internal/adapters/runtime/zellij/zellij_test.go b/backend/internal/adapters/runtime/zellij/zellij_test.go index 327f5b34..22e2e509 100644 --- a/backend/internal/adapters/runtime/zellij/zellij_test.go +++ b/backend/internal/adapters/runtime/zellij/zellij_test.go @@ -316,35 +316,6 @@ func TestParseVersion(t *testing.T) { } } -// WriteChars is the narrow seam panep's RuntimePaneWriter calls — a single -// `zellij action write-chars` against the handle's pane, no chunking, no -// follow-up Enter. The composite messenger sequences body + newline as two -// separate calls. -func TestWriteCharsInvokesZellijAction(t *testing.T) { - fr := &fakeRunner{} - r := New(Options{Timeout: time.Second}) - r.runner = fr - - if err := r.WriteChars(context.Background(), ports.RuntimeHandle{ID: "sess-1/terminal_0"}, "hi"); err != nil { - t.Fatalf("WriteChars: %v", err) - } - if len(fr.calls) != 1 { - t.Fatalf("calls = %d, want 1", len(fr.calls)) - } - want := []string{"--session", "sess-1", "action", "write-chars", "--pane-id", "terminal_0", "hi"} - if got := fr.calls[0].args; !reflect.DeepEqual(got, want) { - t.Fatalf("WriteChars args = %#v, want %#v", got, want) - } -} - -func TestWriteCharsRejectsBadHandle(t *testing.T) { - r := New(Options{Timeout: time.Second}) - r.runner = &fakeRunner{} - if err := r.WriteChars(context.Background(), ports.RuntimeHandle{ID: ""}, "hi"); err == nil { - t.Fatal("WriteChars with empty handle: got nil, want error") - } -} - func TestSendMessageChunksAndSendsEnter(t *testing.T) { fr := &fakeRunner{} r := New(Options{Timeout: time.Second, ChunkSize: 5}) diff --git a/backend/internal/daemon/daemon.go b/backend/internal/daemon/daemon.go index 6d7c6e73..97a2ab20 100644 --- a/backend/internal/daemon/daemon.go +++ b/backend/internal/daemon/daemon.go @@ -81,9 +81,8 @@ func Run() error { // change_log -> poller -> broadcaster) and gives startSession the shared LCM. lcStack := startLifecycle(ctx, store, runtimeAdapter, log) - // The agent messenger: inbox file write (durable, primary) composed with a - // live zellij pane ping (best-effort secondary). runtimeAdapter is the - // concrete zellij runtime, so it satisfies panep's RuntimePaneWriter seam. + // The agent messenger sends validated user input to the session's live + // zellij pane. Keep this path small until durable inbox semantics are needed. messenger := newSessionMessenger(store, runtimeAdapter, log) // Wire the controller-facing session service over the same store + LCM, the diff --git a/backend/internal/daemon/lifecycle_wiring.go b/backend/internal/daemon/lifecycle_wiring.go index 57fd5409..1ee101fa 100644 --- a/backend/internal/daemon/lifecycle_wiring.go +++ b/backend/internal/daemon/lifecycle_wiring.go @@ -9,9 +9,6 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/adapters" "github.com/aoagents/agent-orchestrator/backend/internal/adapters/agent/claudecode" "github.com/aoagents/agent-orchestrator/backend/internal/adapters/agent/codex" - "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/composite" - "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/inbox" - "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/panep" "github.com/aoagents/agent-orchestrator/backend/internal/adapters/workspace/gitworktree" "github.com/aoagents/agent-orchestrator/backend/internal/config" "github.com/aoagents/agent-orchestrator/backend/internal/domain" @@ -48,8 +45,7 @@ func (l *lifecycleStack) Stop() { <-l.reaperDone } // startSession builds the controller-facing session service: a session manager // over the real zellij runtime, a per-session gitworktree workspace, the shared // store + LCM, the per-session agent resolver (AO_AGENT default), and the -// composite agent messenger (inbox file write + live zellij pane ping). The -// returned service is mounted at httpd APIDeps.Sessions. +// agent messenger. The returned service is mounted at httpd APIDeps.Sessions. func startSession(cfg config.Config, runtime ports.Runtime, store *sqlite.Store, lcm *lifecycle.Manager, messenger ports.AgentMessenger, log *slog.Logger) (*sessionsvc.Service, error) { agents, err := buildAgentResolver(cfg.Agent, log) if err != nil { @@ -79,48 +75,39 @@ func startSession(cfg config.Config, runtime ports.Runtime, store *sqlite.Store, return sessionsvc.New(mgr, store), nil } -// storeWorkspaceLookup adapts the sqlite store to the SessionWorkspace lookup -// the inbox messenger needs. WorkspacePath becomes meaningful only after the -// LCM records spawn metadata, so a session that exists but has no path is an -// error — Send must not invent a destination. -type storeWorkspaceLookup struct{ store *sqlite.Store } - -func (s storeWorkspaceLookup) WorkspacePath(ctx context.Context, id domain.SessionID) (string, error) { - rec, ok, err := s.store.GetSession(ctx, id) - if err != nil { - return "", err - } - if !ok { - return "", fmt.Errorf("session %s not found", id) - } - return rec.Metadata.WorkspacePath, nil +// runtimeMessageSender is the narrow part of the concrete runtime needed by +// ao send. zellij.Runtime already implements this via SendMessage. +type runtimeMessageSender interface { + SendMessage(ctx context.Context, handle ports.RuntimeHandle, message string) error } -// storeSessionHandleLookup adapts the sqlite store to panep.SessionLookup. -// panep needs the runtime handle id (to address the right zellij pane) and the -// workspace path (proof the inbox messenger had a real directory to write to). -type storeSessionHandleLookup struct{ store *sqlite.Store } +// runtimeMessenger sends the user's message directly to the session's live +// runtime pane. The HTTP controller has already validated and sanitized the +// message body; this adapter only resolves the stored runtime handle. +type runtimeMessenger struct { + store *sqlite.Store + runtime runtimeMessageSender +} -func (s storeSessionHandleLookup) SessionHandle(ctx context.Context, id domain.SessionID) (string, string, error) { - rec, ok, err := s.store.GetSession(ctx, id) +func (m runtimeMessenger) Send(ctx context.Context, id domain.SessionID, message string) error { + rec, ok, err := m.store.GetSession(ctx, id) if err != nil { - return "", "", err + return err } if !ok { - return "", "", fmt.Errorf("session %s not found", id) + return fmt.Errorf("session %s not found", id) + } + handleID := rec.Metadata.RuntimeHandleID + if handleID == "" { + return fmt.Errorf("session %s has no runtime handle", id) } - return rec.Metadata.RuntimeHandleID, rec.Metadata.WorkspacePath, nil + return m.runtime.SendMessage(ctx, ports.RuntimeHandle{ID: handleID}, message) } -// newSessionMessenger assembles the per-daemon agent messenger: inbox (durable -// file write, primary) wrapped in a composite with panep (live pane ping, -// best-effort secondary). Ordering matters — see composite.Messenger for the -// "primary must succeed, secondaries are nudges" contract. Replaces the old -// noopMessenger stub that silently dropped every agent nudge. -func newSessionMessenger(store *sqlite.Store, runtime panep.RuntimePaneWriter, logger *slog.Logger) ports.AgentMessenger { - inboxMsg := inbox.New(storeWorkspaceLookup{store: store}) - panepMsg := panep.New(runtime, storeSessionHandleLookup{store: store}) - return composite.New([]ports.AgentMessenger{inboxMsg, panepMsg}, logger) +// newSessionMessenger assembles the per-daemon agent messenger. For now, ao +// send is intentionally minimal: submit the message to the live runtime pane. +func newSessionMessenger(store *sqlite.Store, runtime runtimeMessageSender, _ *slog.Logger) ports.AgentMessenger { + return runtimeMessenger{store: store, runtime: runtime} } // buildAgentRegistry returns a registry populated with the agent adapters the diff --git a/backend/internal/daemon/wiring_test.go b/backend/internal/daemon/wiring_test.go index 38ff2fce..1388194f 100644 --- a/backend/internal/daemon/wiring_test.go +++ b/backend/internal/daemon/wiring_test.go @@ -5,16 +5,11 @@ import ( "errors" "io" "log/slog" - "os" - "path/filepath" "sync" "testing" "time" "github.com/aoagents/agent-orchestrator/backend/internal/adapters" - "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/composite" - "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/inbox" - "github.com/aoagents/agent-orchestrator/backend/internal/adapters/messenger/panep" "github.com/aoagents/agent-orchestrator/backend/internal/adapters/runtime/zellij" "github.com/aoagents/agent-orchestrator/backend/internal/cdc" "github.com/aoagents/agent-orchestrator/backend/internal/config" @@ -142,63 +137,49 @@ func TestWiring_StartSessionBuildsSessionService(t *testing.T) { } } -// TestWiring_SessionMessengerIsInboxThenPanepComposite asserts the daemon wires -// the agent messenger as a composite of inbox (primary, durable file write) -// then panep (secondary, live pane ping) — the ordering the "primary must -// succeed, secondaries are nudges" contract depends on. It also proves the -// messenger reaches the same store the SM reads: a Send through a row the store -// owns lands an inbox file under that row's workspace. This is the switch that -// replaced the old noopMessenger, which silently dropped every nudge. -func TestWiring_SessionMessengerIsInboxThenPanepComposite(t *testing.T) { +type captureRuntimeSender struct { + handle ports.RuntimeHandle + message string +} + +func (c *captureRuntimeSender) SendMessage(_ context.Context, handle ports.RuntimeHandle, message string) error { + c.handle = handle + c.message = message + return nil +} + +// TestWiring_SessionMessengerSendsToRuntimePane asserts the daemon wires ao +// send to the live runtime pane and resolves the handle from the shared store. +func TestWiring_SessionMessengerSendsToRuntimePane(t *testing.T) { store, err := sqlite.Open(t.TempDir()) if err != nil { t.Fatal(err) } t.Cleanup(func() { _ = store.Close() }) - runtime := zellij.New(zellij.Options{}) + runtime := &captureRuntimeSender{} messenger := newSessionMessenger(store, runtime, nil) - comp, ok := messenger.(*composite.Messenger) - if !ok { - t.Fatalf("session messenger should be *composite.Messenger, got %T", messenger) - } - if len(comp.Inner) != 2 { - t.Fatalf("composite should wrap exactly 2 inner messengers (inbox + panep), got %d", len(comp.Inner)) - } - if _, ok := comp.Inner[0].(*inbox.Messenger); !ok { - t.Errorf("composite Inner[0] should be *inbox.Messenger (primary), got %T", comp.Inner[0]) - } - if _, ok := comp.Inner[1].(*panep.Messenger); !ok { - t.Errorf("composite Inner[1] should be *panep.Messenger (secondary), got %T", comp.Inner[1]) - } - - // End-to-end: a session row in the shared store is reachable through the - // messenger. A second store would surface as "session not found" here. ctx := context.Background() if err := store.UpsertProject(ctx, domain.ProjectRecord{ID: "p", Path: "/repo/p", RegisteredAt: time.Now()}); err != nil { t.Fatal(err) } - workspaceDir := t.TempDir() rec, err := store.CreateSession(ctx, domain.SessionRecord{ ProjectID: "p", Kind: domain.KindWorker, Activity: domain.Activity{State: domain.ActivityIdle, LastActivityAt: time.Now()}, - Metadata: domain.SessionMetadata{WorkspacePath: workspaceDir}, + Metadata: domain.SessionMetadata{RuntimeHandleID: "ao-1/terminal_0"}, }) if err != nil { t.Fatal(err) } - // panep will fail (no live zellij pane), but it is best-effort: Send must - // still succeed because the inbox file write (primary) succeeded. if err := messenger.Send(ctx, rec.ID, "hello agent"); err != nil { - t.Fatalf("messenger.Send through shared store lookup: %v", err) + t.Fatalf("messenger.Send: %v", err) } - entries, err := os.ReadDir(filepath.Join(workspaceDir, ".ao", "inbox")) - if err != nil { - t.Fatalf("inbox dir: %v", err) + if runtime.handle.ID != "ao-1/terminal_0" { + t.Fatalf("handle = %q, want ao-1/terminal_0", runtime.handle.ID) } - if len(entries) != 1 { - t.Fatalf("want 1 inbox file, got %d", len(entries)) + if runtime.message != "hello agent" { + t.Fatalf("message = %q, want hello agent", runtime.message) } } From 13450ae64209a325c30d9049b8fde9dbd3e7a6d8 Mon Sep 17 00:00:00 2001 From: harshitsinghbhandari <24b4506@iitb.ac.in> Date: Tue, 2 Jun 2026 19:52:34 +0530 Subject: [PATCH 5/6] fix(send): preserve messages and map lookup errors --- backend/internal/cli/send.go | 4 +-- backend/internal/cli/send_test.go | 6 ++-- backend/internal/daemon/lifecycle_wiring.go | 4 +-- backend/internal/daemon/wiring_test.go | 39 +++++++++++++++++++++ 4 files changed, 46 insertions(+), 7 deletions(-) diff --git a/backend/internal/cli/send.go b/backend/internal/cli/send.go index 1effb2d1..e1ecbcd5 100644 --- a/backend/internal/cli/send.go +++ b/backend/internal/cli/send.go @@ -37,10 +37,10 @@ func newSendCommand(ctx *commandContext) *cobra.Command { } func (c *commandContext) sendMessage(ctx context.Context, opts sendOptions) error { - message := strings.TrimSpace(opts.message) - if message == "" { + if strings.TrimSpace(opts.message) == "" { return usageError{errors.New("usage: --message is required")} } + message := opts.message session := strings.TrimSpace(opts.session) if session == "" { return usageError{errors.New("usage: --session is required")} diff --git a/backend/internal/cli/send_test.go b/backend/internal/cli/send_test.go index cd61a0fb..a68293ba 100644 --- a/backend/internal/cli/send_test.go +++ b/backend/internal/cli/send_test.go @@ -83,7 +83,7 @@ func TestSend_Success(t *testing.T) { } } -func TestSend_TrimsLeadingAndTrailingWhitespace(t *testing.T) { +func TestSend_PreservesMessageWhitespace(t *testing.T) { cfg := setConfigEnv(t) srv, capture := sendServer(t, http.StatusOK, `{"ok":true,"sessionId":"demo-1","message":"hi"}`) writeRunFileFor(t, cfg, srv) @@ -100,8 +100,8 @@ func TestSend_TrimsLeadingAndTrailingWhitespace(t *testing.T) { if err := json.Unmarshal([]byte(capture.body), &req); err != nil { t.Fatalf("decode body: %v\nbody=%s", err, capture.body) } - if req.Message != "hi" { - t.Errorf("server received %q, want trimmed %q", req.Message, "hi") + if req.Message != " hi " { + t.Errorf("server received %q, want preserved whitespace", req.Message) } } diff --git a/backend/internal/daemon/lifecycle_wiring.go b/backend/internal/daemon/lifecycle_wiring.go index 1ee101fa..28b32fa7 100644 --- a/backend/internal/daemon/lifecycle_wiring.go +++ b/backend/internal/daemon/lifecycle_wiring.go @@ -95,11 +95,11 @@ func (m runtimeMessenger) Send(ctx context.Context, id domain.SessionID, message return err } if !ok { - return fmt.Errorf("session %s not found", id) + return fmt.Errorf("session %s: %w", id, sessionmanager.ErrNotFound) } handleID := rec.Metadata.RuntimeHandleID if handleID == "" { - return fmt.Errorf("session %s has no runtime handle", id) + return fmt.Errorf("session %s: %w", id, sessionmanager.ErrIncompleteHandle) } return m.runtime.SendMessage(ctx, ports.RuntimeHandle{ID: handleID}, message) } diff --git a/backend/internal/daemon/wiring_test.go b/backend/internal/daemon/wiring_test.go index 1388194f..1925a0c1 100644 --- a/backend/internal/daemon/wiring_test.go +++ b/backend/internal/daemon/wiring_test.go @@ -183,6 +183,45 @@ func TestWiring_SessionMessengerSendsToRuntimePane(t *testing.T) { } } +func TestWiring_SessionMessengerWrapsLookupErrors(t *testing.T) { + store, err := sqlite.Open(t.TempDir()) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = store.Close() }) + + messenger := newSessionMessenger(store, &captureRuntimeSender{}, nil) + err = messenger.Send(context.Background(), "missing", "hello") + if !errors.Is(err, sessionmanager.ErrNotFound) { + t.Fatalf("missing session should wrap ErrNotFound, got %v", err) + } +} + +func TestWiring_SessionMessengerRequiresRuntimeHandle(t *testing.T) { + store, err := sqlite.Open(t.TempDir()) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = store.Close() }) + + ctx := context.Background() + if err := store.UpsertProject(ctx, domain.ProjectRecord{ID: "p", Path: "/repo/p", RegisteredAt: time.Now()}); err != nil { + t.Fatal(err) + } + rec, err := store.CreateSession(ctx, domain.SessionRecord{ + ProjectID: "p", Kind: domain.KindWorker, + Activity: domain.Activity{State: domain.ActivityIdle, LastActivityAt: time.Now()}, + }) + if err != nil { + t.Fatal(err) + } + messenger := newSessionMessenger(store, &captureRuntimeSender{}, nil) + err = messenger.Send(ctx, rec.ID, "hello") + if !errors.Is(err, sessionmanager.ErrIncompleteHandle) { + t.Fatalf("missing runtime handle should wrap ErrIncompleteHandle, got %v", err) + } +} + // TestProjectRepoResolver_ResolvesRegisteredProject asserts the DB-backed repo // resolver turns a registered project into its on-disk repo path (so spawns // materialise a worktree), and fails loudly for an unregistered project. From 4bcd58420a872e21e1cee196546ca250a18d44b0 Mon Sep 17 00:00:00 2001 From: harshitsinghbhandari <24b4506@iitb.ac.in> Date: Tue, 2 Jun 2026 19:58:33 +0530 Subject: [PATCH 6/6] fix(send): reject terminated sessions --- backend/internal/daemon/lifecycle_wiring.go | 3 ++ backend/internal/daemon/wiring_test.go | 31 +++++++++++++++++++ .../internal/httpd/controllers/sessions.go | 2 ++ backend/internal/session_manager/manager.go | 1 + 4 files changed, 37 insertions(+) diff --git a/backend/internal/daemon/lifecycle_wiring.go b/backend/internal/daemon/lifecycle_wiring.go index 28b32fa7..69aae574 100644 --- a/backend/internal/daemon/lifecycle_wiring.go +++ b/backend/internal/daemon/lifecycle_wiring.go @@ -97,6 +97,9 @@ func (m runtimeMessenger) Send(ctx context.Context, id domain.SessionID, message if !ok { return fmt.Errorf("session %s: %w", id, sessionmanager.ErrNotFound) } + if rec.IsTerminated { + return fmt.Errorf("session %s: %w", id, sessionmanager.ErrTerminated) + } handleID := rec.Metadata.RuntimeHandleID if handleID == "" { return fmt.Errorf("session %s: %w", id, sessionmanager.ErrIncompleteHandle) diff --git a/backend/internal/daemon/wiring_test.go b/backend/internal/daemon/wiring_test.go index 1925a0c1..0350c373 100644 --- a/backend/internal/daemon/wiring_test.go +++ b/backend/internal/daemon/wiring_test.go @@ -222,6 +222,37 @@ func TestWiring_SessionMessengerRequiresRuntimeHandle(t *testing.T) { } } +func TestWiring_SessionMessengerRejectsTerminatedSession(t *testing.T) { + store, err := sqlite.Open(t.TempDir()) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = store.Close() }) + + ctx := context.Background() + if err := store.UpsertProject(ctx, domain.ProjectRecord{ID: "p", Path: "/repo/p", RegisteredAt: time.Now()}); err != nil { + t.Fatal(err) + } + rec, err := store.CreateSession(ctx, domain.SessionRecord{ + ProjectID: "p", Kind: domain.KindWorker, + IsTerminated: true, + Activity: domain.Activity{State: domain.ActivityIdle, LastActivityAt: time.Now()}, + Metadata: domain.SessionMetadata{RuntimeHandleID: "ao-1/terminal_0"}, + }) + if err != nil { + t.Fatal(err) + } + runtime := &captureRuntimeSender{} + messenger := newSessionMessenger(store, runtime, nil) + err = messenger.Send(ctx, rec.ID, "hello") + if !errors.Is(err, sessionmanager.ErrTerminated) { + t.Fatalf("terminated session should wrap ErrTerminated, got %v", err) + } + if runtime.handle.ID != "" || runtime.message != "" { + t.Fatalf("runtime should not be called for terminated sessions, got handle=%q message=%q", runtime.handle.ID, runtime.message) + } +} + // TestProjectRepoResolver_ResolvesRegisteredProject asserts the DB-backed repo // resolver turns a registered project into its on-disk repo path (so spawns // materialise a worktree), and fails loudly for an unregistered project. diff --git a/backend/internal/httpd/controllers/sessions.go b/backend/internal/httpd/controllers/sessions.go index bc88fd72..632d31f4 100644 --- a/backend/internal/httpd/controllers/sessions.go +++ b/backend/internal/httpd/controllers/sessions.go @@ -251,6 +251,8 @@ func writeSessionError(w http.ResponseWriter, r *http.Request, err error) { envelope.WriteAPIError(w, r, http.StatusNotFound, "not_found", "SESSION_NOT_FOUND", "Unknown session", nil) case errors.Is(err, sessionmanager.ErrNotRestorable): envelope.WriteAPIError(w, r, http.StatusConflict, "conflict", "SESSION_NOT_RESTORABLE", "Session is not restorable", nil) + case errors.Is(err, sessionmanager.ErrTerminated): + envelope.WriteAPIError(w, r, http.StatusConflict, "conflict", "SESSION_TERMINATED", "Session is terminated", nil) case errors.Is(err, sessionmanager.ErrIncompleteHandle): envelope.WriteAPIError(w, r, http.StatusConflict, "conflict", "SESSION_INCOMPLETE_HANDLE", "Session is missing runtime or workspace handles", nil) case errors.Is(err, sessionmanager.ErrProjectNotResolvable): diff --git a/backend/internal/session_manager/manager.go b/backend/internal/session_manager/manager.go index fcf4277f..7c8d8233 100644 --- a/backend/internal/session_manager/manager.go +++ b/backend/internal/session_manager/manager.go @@ -16,6 +16,7 @@ import ( var ( ErrNotFound = errors.New("session: not found") ErrNotRestorable = errors.New("session: not restorable (not terminal)") + ErrTerminated = errors.New("session: terminated") ErrIncompleteHandle = errors.New("session: incomplete teardown handle") // ErrProjectNotResolvable means the spawn's project has no usable repo // (unregistered, archived, or missing a path). The API maps it to a 400.