diff --git a/backend/internal/cli/root.go b/backend/internal/cli/root.go index 293d431f..280cd7b1 100644 --- a/backend/internal/cli/root.go +++ b/backend/internal/cli/root.go @@ -148,6 +148,7 @@ func NewRootCommand(deps Deps) *cobra.Command { root.AddCommand(newStatusCommand(ctx)) root.AddCommand(newDoctorCommand(ctx)) root.AddCommand(newSpawnCommand(ctx)) + root.AddCommand(newSendCommand(ctx)) root.AddCommand(newProjectCommand(ctx)) root.AddCommand(newCompletionCommand()) root.AddCommand(newVersionCommand()) diff --git a/backend/internal/cli/send.go b/backend/internal/cli/send.go new file mode 100644 index 00000000..e1ecbcd5 --- /dev/null +++ b/backend/internal/cli/send.go @@ -0,0 +1,53 @@ +package cli + +import ( + "context" + "errors" + "net/url" + "strings" + + "github.com/spf13/cobra" +) + +type sendOptions struct { + session string + message string +} + +// sendAPIRequest mirrors the daemon's SendSessionMessageRequest body for +// POST /api/v1/sessions/{id}/send. The CLI keeps its own copy so it need not +// import httpd. +type sendAPIRequest struct { + Message string `json:"message"` +} + +func newSendCommand(ctx *commandContext) *cobra.Command { + var opts sendOptions + cmd := &cobra.Command{ + Use: "send", + Short: "Send a message to a running agent session", + Args: noArgs, + RunE: func(cmd *cobra.Command, _ []string) error { + return ctx.sendMessage(cmd.Context(), opts) + }, + } + cmd.Flags().StringVar(&opts.session, "session", "", "Session id (required)") + cmd.Flags().StringVar(&opts.message, "message", "", "Message body (required)") + return cmd +} + +func (c *commandContext) sendMessage(ctx context.Context, opts sendOptions) error { + if strings.TrimSpace(opts.message) == "" { + return usageError{errors.New("usage: --message is required")} + } + message := opts.message + session := strings.TrimSpace(opts.session) + if session == "" { + return usageError{errors.New("usage: --session is required")} + } + + // PathEscape: session ids are already "-"/digit safe, but may later come + // from sanitized issue refs; keep the URL well-formed regardless. + path := "sessions/" + url.PathEscape(session) + "/send" + return c.postJSON(ctx, path, sendAPIRequest{Message: message}, nil) +} diff --git a/backend/internal/cli/send_test.go b/backend/internal/cli/send_test.go new file mode 100644 index 00000000..a68293ba --- /dev/null +++ b/backend/internal/cli/send_test.go @@ -0,0 +1,220 @@ +package cli + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/runfile" +) + +// sendServer wires an httptest server expecting POST /api/v1/sessions/{id}/send +// and captures the request body and path the CLI hit. +type sendCapture struct { + body string + path string +} + +// writeRunFileFor points the CLI's run-file at srv so postJSON dials the test +// server. It mirrors the run-file convention the other CLI tests use. +func writeRunFileFor(t *testing.T, cfg testConfig, srv *httptest.Server) { + t.Helper() + if err := runfile.Write(cfg.runFile, runfile.Info{ + PID: os.Getpid(), Port: serverPort(t, srv.URL), StartedAt: time.Unix(100, 0).UTC(), + }); err != nil { + t.Fatalf("write run-file: %v", err) + } +} + +func sendServer(t *testing.T, status int, respBody string) (*httptest.Server, *sendCapture) { + t.Helper() + capture := &sendCapture{} + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.NotFound(w, r) + return + } + if !strings.HasPrefix(r.URL.Path, "/api/v1/sessions/") || !strings.HasSuffix(r.URL.Path, "/send") { + http.NotFound(w, r) + return + } + body, err := io.ReadAll(r.Body) + if err != nil { + t.Fatalf("read body: %v", err) + } + capture.body = string(body) + capture.path = r.URL.Path + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _, _ = io.WriteString(w, respBody) + })) + t.Cleanup(srv.Close) + return srv, capture +} + +func TestSend_Success(t *testing.T) { + cfg := setConfigEnv(t) + srv, capture := sendServer(t, http.StatusOK, + `{"ok":true,"sessionId":"demo-1","message":"hello agent"}`) + writeRunFileFor(t, cfg, srv) + + _, errOut, err := executeCLI(t, Deps{ + ProcessAlive: func(int) bool { return true }, + }, "send", "--session", "demo-1", "--message", "hello agent") + if err != nil { + t.Fatalf("unexpected error: %v\nstderr=%s", err, errOut) + } + if capture.path != "/api/v1/sessions/demo-1/send" { + t.Errorf("path = %q, want /api/v1/sessions/demo-1/send", capture.path) + } + var req struct { + Message string `json:"message"` + } + if err := json.Unmarshal([]byte(capture.body), &req); err != nil { + t.Fatalf("decode body: %v\nbody=%s", err, capture.body) + } + if req.Message != "hello agent" { + t.Errorf("captured message = %q, want %q", req.Message, "hello agent") + } +} + +func TestSend_PreservesMessageWhitespace(t *testing.T) { + cfg := setConfigEnv(t) + srv, capture := sendServer(t, http.StatusOK, `{"ok":true,"sessionId":"demo-1","message":"hi"}`) + writeRunFileFor(t, cfg, srv) + + _, _, err := executeCLI(t, Deps{ + ProcessAlive: func(int) bool { return true }, + }, "send", "--session", "demo-1", "--message", " hi ") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + var req struct { + Message string `json:"message"` + } + if err := json.Unmarshal([]byte(capture.body), &req); err != nil { + t.Fatalf("decode body: %v\nbody=%s", err, capture.body) + } + if req.Message != " hi " { + t.Errorf("server received %q, want preserved whitespace", req.Message) + } +} + +func TestSend_EmptyMessageIsUsageError(t *testing.T) { + setConfigEnv(t) + _, _, err := executeCLI(t, Deps{}, "send", "--session", "demo-1", "--message", " ") + if err == nil { + t.Fatal("expected usage error for empty message") + } + if got := ExitCode(err); got != 2 { + t.Fatalf("exit code = %d, want 2", got) + } + if !strings.Contains(err.Error(), "--message is required") { + t.Fatalf("error missing usage message: %v", err) + } +} + +func TestSend_MissingSessionIsUsageError(t *testing.T) { + setConfigEnv(t) + _, _, err := executeCLI(t, Deps{}, "send", "--message", "hi") + if err == nil { + t.Fatal("expected usage error for missing --session") + } + if got := ExitCode(err); got != 2 { + t.Fatalf("exit code = %d, want 2", got) + } +} + +func TestSend_ServerBadRequestExits1(t *testing.T) { + cfg := setConfigEnv(t) + srv, _ := sendServer(t, http.StatusBadRequest, + `{"error":"bad_request","code":"MESSAGE_REQUIRED","message":"Message is required"}`) + writeRunFileFor(t, cfg, srv) + + _, errOut, err := executeCLI(t, Deps{ + ProcessAlive: func(int) bool { return true }, + }, "send", "--session", "demo-1", "--message", "hi") + if err == nil { + t.Fatal("expected runtime error from 400") + } + if got := ExitCode(err); got != 1 { + t.Fatalf("exit code = %d, want 1", got) + } + if !strings.Contains(err.Error(), "MESSAGE_REQUIRED") && !strings.Contains(errOut, "MESSAGE_REQUIRED") { + t.Fatalf("error did not surface the server error envelope: %v\nstderr=%s", err, errOut) + } +} + +func TestSend_ServerNotFoundExits1(t *testing.T) { + cfg := setConfigEnv(t) + srv, _ := sendServer(t, http.StatusNotFound, + `{"error":"not_found","code":"SESSION_NOT_FOUND","message":"Unknown session"}`) + writeRunFileFor(t, cfg, srv) + + _, _, err := executeCLI(t, Deps{ + ProcessAlive: func(int) bool { return true }, + }, "send", "--session", "missing", "--message", "hi") + if err == nil { + t.Fatal("expected runtime error from 404") + } + if got := ExitCode(err); got != 1 { + t.Fatalf("exit code = %d, want 1", got) + } +} + +func TestSend_ServerInternalErrorExits1(t *testing.T) { + cfg := setConfigEnv(t) + srv, _ := sendServer(t, http.StatusInternalServerError, + `{"error":"internal","code":"SESSION_OPERATION_FAILED","message":"Session operation failed"}`) + writeRunFileFor(t, cfg, srv) + + _, errOut, err := executeCLI(t, Deps{ + ProcessAlive: func(int) bool { return true }, + }, "send", "--session", "demo-1", "--message", "hi") + if err == nil { + t.Fatal("expected runtime error from 500") + } + if got := ExitCode(err); got != 1 { + t.Fatalf("exit code = %d, want 1", got) + } + // Regression guard: a future change that swallows the API envelope and + // prints only "daemon returned HTTP 500" would silently hide what the + // daemon was trying to tell the operator. + if !strings.Contains(err.Error(), "SESSION_OPERATION_FAILED") && !strings.Contains(errOut, "SESSION_OPERATION_FAILED") { + t.Fatalf("error did not surface the server error envelope: %v\nstderr=%s", err, errOut) + } +} + +func TestSend_DaemonNotRunningExits1(t *testing.T) { + setConfigEnv(t) + _, _, err := executeCLI(t, Deps{}, "send", "--session", "demo-1", "--message", "hi") + if err == nil { + t.Fatal("expected error when daemon is not running") + } + if got := ExitCode(err); got != 1 { + t.Fatalf("exit code = %d, want 1", got) + } +} + +func TestSend_NetworkErrorExits1(t *testing.T) { + cfg := setConfigEnv(t) + // Start and immediately close a server so the run-file points at a closed port. + srv, _ := sendServer(t, http.StatusOK, "{}") + writeRunFileFor(t, cfg, srv) + srv.Close() + + _, _, err := executeCLI(t, Deps{ + ProcessAlive: func(int) bool { return true }, + }, "send", "--session", "demo-1", "--message", "hi") + if err == nil { + t.Fatal("expected runtime error from network failure") + } + if got := ExitCode(err); got != 1 { + t.Fatalf("exit code = %d, want 1", got) + } +} diff --git a/backend/internal/daemon/daemon.go b/backend/internal/daemon/daemon.go index d38fa4ea..97a2ab20 100644 --- a/backend/internal/daemon/daemon.go +++ b/backend/internal/daemon/daemon.go @@ -81,10 +81,15 @@ func Run() error { // change_log -> poller -> broadcaster) and gives startSession the shared LCM. lcStack := startLifecycle(ctx, store, runtimeAdapter, log) + // The agent messenger sends validated user input to the session's live + // zellij pane. Keep this path small until durable inbox semantics are needed. + messenger := newSessionMessenger(store, runtimeAdapter, log) + // Wire the controller-facing session service over the same store + LCM, the - // zellij runtime, a gitworktree workspace, and the per-session agent resolver - // (AO_AGENT default, validated here), then mount it on the API. - sessionSvc, err := startSession(cfg, runtimeAdapter, store, lcStack.LCM, log) + // zellij runtime, a gitworktree workspace, the per-session agent resolver + // (AO_AGENT default, validated here), and the agent messenger, then mount it + // on the API. + sessionSvc, err := startSession(cfg, runtimeAdapter, store, lcStack.LCM, messenger, log) if err != nil { stop() lcStack.Stop() diff --git a/backend/internal/daemon/lifecycle_wiring.go b/backend/internal/daemon/lifecycle_wiring.go index 51232299..69aae574 100644 --- a/backend/internal/daemon/lifecycle_wiring.go +++ b/backend/internal/daemon/lifecycle_wiring.go @@ -42,19 +42,11 @@ func startLifecycle(ctx context.Context, store *sqlite.Store, runtime ports.Runt // passed to startLifecycle before calling Stop. func (l *lifecycleStack) Stop() { <-l.reaperDone } -// noopMessenger is a stub ports.AgentMessenger: durable writes and notifications -// work without it; only live agent nudges are absent until the runtime/agent -// nudge path is wired. -type noopMessenger struct{} - -func (noopMessenger) Send(context.Context, domain.SessionID, string) error { return nil } - // startSession builds the controller-facing session service: a session manager // over the real zellij runtime, a per-session gitworktree workspace, the shared -// store + LCM, and the per-session agent resolver (AO_AGENT default). The -// Messenger is a stub until the live agent-nudge path lands. The returned -// service is mounted at httpd APIDeps.Sessions. -func startSession(cfg config.Config, runtime ports.Runtime, store *sqlite.Store, lcm *lifecycle.Manager, log *slog.Logger) (*sessionsvc.Service, error) { +// store + LCM, the per-session agent resolver (AO_AGENT default), and the +// agent messenger. The returned service is mounted at httpd APIDeps.Sessions. +func startSession(cfg config.Config, runtime ports.Runtime, store *sqlite.Store, lcm *lifecycle.Manager, messenger ports.AgentMessenger, log *slog.Logger) (*sessionsvc.Service, error) { agents, err := buildAgentResolver(cfg.Agent, log) if err != nil { return nil, err @@ -76,13 +68,51 @@ func startSession(cfg config.Config, runtime ports.Runtime, store *sqlite.Store, Agents: agents, Workspace: ws, Store: store, - Messenger: noopMessenger{}, + Messenger: messenger, Lifecycle: lcm, DataDir: cfg.DataDir, }) return sessionsvc.New(mgr, store), nil } +// runtimeMessageSender is the narrow part of the concrete runtime needed by +// ao send. zellij.Runtime already implements this via SendMessage. +type runtimeMessageSender interface { + SendMessage(ctx context.Context, handle ports.RuntimeHandle, message string) error +} + +// runtimeMessenger sends the user's message directly to the session's live +// runtime pane. The HTTP controller has already validated and sanitized the +// message body; this adapter only resolves the stored runtime handle. +type runtimeMessenger struct { + store *sqlite.Store + runtime runtimeMessageSender +} + +func (m runtimeMessenger) Send(ctx context.Context, id domain.SessionID, message string) error { + rec, ok, err := m.store.GetSession(ctx, id) + if err != nil { + return err + } + if !ok { + return fmt.Errorf("session %s: %w", id, sessionmanager.ErrNotFound) + } + if rec.IsTerminated { + return fmt.Errorf("session %s: %w", id, sessionmanager.ErrTerminated) + } + handleID := rec.Metadata.RuntimeHandleID + if handleID == "" { + return fmt.Errorf("session %s: %w", id, sessionmanager.ErrIncompleteHandle) + } + return m.runtime.SendMessage(ctx, ports.RuntimeHandle{ID: handleID}, message) +} + +// newSessionMessenger assembles the per-daemon agent messenger. For now, ao +// send is intentionally minimal: submit the message to the live runtime pane. +func newSessionMessenger(store *sqlite.Store, runtime runtimeMessageSender, _ *slog.Logger) ports.AgentMessenger { + return runtimeMessenger{store: store, runtime: runtime} +} + // buildAgentRegistry returns a registry populated with the agent adapters the // daemon ships, keyed by manifest id. Registration only fails on an // empty/duplicate id — a programmer error, not a runtime condition. diff --git a/backend/internal/daemon/wiring_test.go b/backend/internal/daemon/wiring_test.go index d15f9dee..0350c373 100644 --- a/backend/internal/daemon/wiring_test.go +++ b/backend/internal/daemon/wiring_test.go @@ -126,7 +126,9 @@ func TestWiring_StartSessionBuildsSessionService(t *testing.T) { lcm := lifecycle.New(store, nil) cfg := config.Config{DataDir: t.TempDir()} - svc, err := startSession(cfg, zellij.New(zellij.Options{}), store, lcm, log) + runtime := zellij.New(zellij.Options{}) + messenger := newSessionMessenger(store, runtime, log) + svc, err := startSession(cfg, runtime, store, lcm, messenger, log) if err != nil { t.Fatalf("startSession: %v", err) } @@ -135,6 +137,122 @@ func TestWiring_StartSessionBuildsSessionService(t *testing.T) { } } +type captureRuntimeSender struct { + handle ports.RuntimeHandle + message string +} + +func (c *captureRuntimeSender) SendMessage(_ context.Context, handle ports.RuntimeHandle, message string) error { + c.handle = handle + c.message = message + return nil +} + +// TestWiring_SessionMessengerSendsToRuntimePane asserts the daemon wires ao +// send to the live runtime pane and resolves the handle from the shared store. +func TestWiring_SessionMessengerSendsToRuntimePane(t *testing.T) { + store, err := sqlite.Open(t.TempDir()) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = store.Close() }) + + runtime := &captureRuntimeSender{} + messenger := newSessionMessenger(store, runtime, nil) + + ctx := context.Background() + if err := store.UpsertProject(ctx, domain.ProjectRecord{ID: "p", Path: "/repo/p", RegisteredAt: time.Now()}); err != nil { + t.Fatal(err) + } + rec, err := store.CreateSession(ctx, domain.SessionRecord{ + ProjectID: "p", Kind: domain.KindWorker, + Activity: domain.Activity{State: domain.ActivityIdle, LastActivityAt: time.Now()}, + Metadata: domain.SessionMetadata{RuntimeHandleID: "ao-1/terminal_0"}, + }) + if err != nil { + t.Fatal(err) + } + if err := messenger.Send(ctx, rec.ID, "hello agent"); err != nil { + t.Fatalf("messenger.Send: %v", err) + } + if runtime.handle.ID != "ao-1/terminal_0" { + t.Fatalf("handle = %q, want ao-1/terminal_0", runtime.handle.ID) + } + if runtime.message != "hello agent" { + t.Fatalf("message = %q, want hello agent", runtime.message) + } +} + +func TestWiring_SessionMessengerWrapsLookupErrors(t *testing.T) { + store, err := sqlite.Open(t.TempDir()) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = store.Close() }) + + messenger := newSessionMessenger(store, &captureRuntimeSender{}, nil) + err = messenger.Send(context.Background(), "missing", "hello") + if !errors.Is(err, sessionmanager.ErrNotFound) { + t.Fatalf("missing session should wrap ErrNotFound, got %v", err) + } +} + +func TestWiring_SessionMessengerRequiresRuntimeHandle(t *testing.T) { + store, err := sqlite.Open(t.TempDir()) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = store.Close() }) + + ctx := context.Background() + if err := store.UpsertProject(ctx, domain.ProjectRecord{ID: "p", Path: "/repo/p", RegisteredAt: time.Now()}); err != nil { + t.Fatal(err) + } + rec, err := store.CreateSession(ctx, domain.SessionRecord{ + ProjectID: "p", Kind: domain.KindWorker, + Activity: domain.Activity{State: domain.ActivityIdle, LastActivityAt: time.Now()}, + }) + if err != nil { + t.Fatal(err) + } + messenger := newSessionMessenger(store, &captureRuntimeSender{}, nil) + err = messenger.Send(ctx, rec.ID, "hello") + if !errors.Is(err, sessionmanager.ErrIncompleteHandle) { + t.Fatalf("missing runtime handle should wrap ErrIncompleteHandle, got %v", err) + } +} + +func TestWiring_SessionMessengerRejectsTerminatedSession(t *testing.T) { + store, err := sqlite.Open(t.TempDir()) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = store.Close() }) + + ctx := context.Background() + if err := store.UpsertProject(ctx, domain.ProjectRecord{ID: "p", Path: "/repo/p", RegisteredAt: time.Now()}); err != nil { + t.Fatal(err) + } + rec, err := store.CreateSession(ctx, domain.SessionRecord{ + ProjectID: "p", Kind: domain.KindWorker, + IsTerminated: true, + Activity: domain.Activity{State: domain.ActivityIdle, LastActivityAt: time.Now()}, + Metadata: domain.SessionMetadata{RuntimeHandleID: "ao-1/terminal_0"}, + }) + if err != nil { + t.Fatal(err) + } + runtime := &captureRuntimeSender{} + messenger := newSessionMessenger(store, runtime, nil) + err = messenger.Send(ctx, rec.ID, "hello") + if !errors.Is(err, sessionmanager.ErrTerminated) { + t.Fatalf("terminated session should wrap ErrTerminated, got %v", err) + } + if runtime.handle.ID != "" || runtime.message != "" { + t.Fatalf("runtime should not be called for terminated sessions, got handle=%q message=%q", runtime.handle.ID, runtime.message) + } +} + // TestProjectRepoResolver_ResolvesRegisteredProject asserts the DB-backed repo // resolver turns a registered project into its on-disk repo path (so spawns // materialise a worktree), and fails loudly for an unregistered project. diff --git a/backend/internal/httpd/controllers/sessions.go b/backend/internal/httpd/controllers/sessions.go index bc88fd72..632d31f4 100644 --- a/backend/internal/httpd/controllers/sessions.go +++ b/backend/internal/httpd/controllers/sessions.go @@ -251,6 +251,8 @@ func writeSessionError(w http.ResponseWriter, r *http.Request, err error) { envelope.WriteAPIError(w, r, http.StatusNotFound, "not_found", "SESSION_NOT_FOUND", "Unknown session", nil) case errors.Is(err, sessionmanager.ErrNotRestorable): envelope.WriteAPIError(w, r, http.StatusConflict, "conflict", "SESSION_NOT_RESTORABLE", "Session is not restorable", nil) + case errors.Is(err, sessionmanager.ErrTerminated): + envelope.WriteAPIError(w, r, http.StatusConflict, "conflict", "SESSION_TERMINATED", "Session is terminated", nil) case errors.Is(err, sessionmanager.ErrIncompleteHandle): envelope.WriteAPIError(w, r, http.StatusConflict, "conflict", "SESSION_INCOMPLETE_HANDLE", "Session is missing runtime or workspace handles", nil) case errors.Is(err, sessionmanager.ErrProjectNotResolvable): diff --git a/backend/internal/session_manager/manager.go b/backend/internal/session_manager/manager.go index fcf4277f..7c8d8233 100644 --- a/backend/internal/session_manager/manager.go +++ b/backend/internal/session_manager/manager.go @@ -16,6 +16,7 @@ import ( var ( ErrNotFound = errors.New("session: not found") ErrNotRestorable = errors.New("session: not restorable (not terminal)") + ErrTerminated = errors.New("session: terminated") ErrIncompleteHandle = errors.New("session: incomplete teardown handle") // ErrProjectNotResolvable means the spawn's project has no usable repo // (unregistered, archived, or missing a path). The API maps it to a 400.