Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/internal/cli/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func NewRootCommand(deps Deps) *cobra.Command {
root.AddCommand(newStatusCommand(ctx))
root.AddCommand(newDoctorCommand(ctx))
root.AddCommand(newSpawnCommand(ctx))
root.AddCommand(newSendCommand(ctx))
root.AddCommand(newProjectCommand(ctx))
root.AddCommand(newCompletionCommand())
root.AddCommand(newVersionCommand())
Expand Down
53 changes: 53 additions & 0 deletions backend/internal/cli/send.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package cli

import (
"context"
"errors"
"net/url"
"strings"

"github.com/spf13/cobra"
)

type sendOptions struct {
session string
message string
}

// sendAPIRequest mirrors the daemon's SendSessionMessageRequest body for
// POST /api/v1/sessions/{id}/send. The CLI keeps its own copy so it need not
// import httpd.
type sendAPIRequest struct {
Message string `json:"message"`
}

func newSendCommand(ctx *commandContext) *cobra.Command {
var opts sendOptions
cmd := &cobra.Command{
Use: "send",
Short: "Send a message to a running agent session",
Args: noArgs,
RunE: func(cmd *cobra.Command, _ []string) error {
return ctx.sendMessage(cmd.Context(), opts)
},
}
cmd.Flags().StringVar(&opts.session, "session", "", "Session id (required)")
cmd.Flags().StringVar(&opts.message, "message", "", "Message body (required)")
return cmd
}

func (c *commandContext) sendMessage(ctx context.Context, opts sendOptions) error {
if strings.TrimSpace(opts.message) == "" {
return usageError{errors.New("usage: --message is required")}
}
message := opts.message
session := strings.TrimSpace(opts.session)
if session == "" {
return usageError{errors.New("usage: --session is required")}
}

// PathEscape: session ids are already "-"/digit safe, but may later come
// from sanitized issue refs; keep the URL well-formed regardless.
path := "sessions/" + url.PathEscape(session) + "/send"
return c.postJSON(ctx, path, sendAPIRequest{Message: message}, nil)
}
220 changes: 220 additions & 0 deletions backend/internal/cli/send_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
package cli

import (
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"os"
"strings"
"testing"
"time"

"github.com/aoagents/agent-orchestrator/backend/internal/runfile"
)

// sendServer wires an httptest server expecting POST /api/v1/sessions/{id}/send
// and captures the request body and path the CLI hit.
type sendCapture struct {
body string
path string
}

// writeRunFileFor points the CLI's run-file at srv so postJSON dials the test
// server. It mirrors the run-file convention the other CLI tests use.
func writeRunFileFor(t *testing.T, cfg testConfig, srv *httptest.Server) {
t.Helper()
if err := runfile.Write(cfg.runFile, runfile.Info{
PID: os.Getpid(), Port: serverPort(t, srv.URL), StartedAt: time.Unix(100, 0).UTC(),
}); err != nil {
t.Fatalf("write run-file: %v", err)
}
}

func sendServer(t *testing.T, status int, respBody string) (*httptest.Server, *sendCapture) {
t.Helper()
capture := &sendCapture{}
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.NotFound(w, r)
return
}
if !strings.HasPrefix(r.URL.Path, "/api/v1/sessions/") || !strings.HasSuffix(r.URL.Path, "/send") {
http.NotFound(w, r)
return
}
body, err := io.ReadAll(r.Body)
if err != nil {
t.Fatalf("read body: %v", err)
}
capture.body = string(body)
capture.path = r.URL.Path
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
_, _ = io.WriteString(w, respBody)
}))
t.Cleanup(srv.Close)
return srv, capture
}

func TestSend_Success(t *testing.T) {
cfg := setConfigEnv(t)
srv, capture := sendServer(t, http.StatusOK,
`{"ok":true,"sessionId":"demo-1","message":"hello agent"}`)
writeRunFileFor(t, cfg, srv)

_, errOut, err := executeCLI(t, Deps{
ProcessAlive: func(int) bool { return true },
}, "send", "--session", "demo-1", "--message", "hello agent")
if err != nil {
t.Fatalf("unexpected error: %v\nstderr=%s", err, errOut)
}
if capture.path != "/api/v1/sessions/demo-1/send" {
t.Errorf("path = %q, want /api/v1/sessions/demo-1/send", capture.path)
}
var req struct {
Message string `json:"message"`
}
if err := json.Unmarshal([]byte(capture.body), &req); err != nil {
t.Fatalf("decode body: %v\nbody=%s", err, capture.body)
}
if req.Message != "hello agent" {
t.Errorf("captured message = %q, want %q", req.Message, "hello agent")
}
}

func TestSend_PreservesMessageWhitespace(t *testing.T) {
cfg := setConfigEnv(t)
srv, capture := sendServer(t, http.StatusOK, `{"ok":true,"sessionId":"demo-1","message":"hi"}`)
writeRunFileFor(t, cfg, srv)

_, _, err := executeCLI(t, Deps{
ProcessAlive: func(int) bool { return true },
}, "send", "--session", "demo-1", "--message", " hi ")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var req struct {
Message string `json:"message"`
}
if err := json.Unmarshal([]byte(capture.body), &req); err != nil {
t.Fatalf("decode body: %v\nbody=%s", err, capture.body)
}
if req.Message != " hi " {
t.Errorf("server received %q, want preserved whitespace", req.Message)
}
}

func TestSend_EmptyMessageIsUsageError(t *testing.T) {
setConfigEnv(t)
_, _, err := executeCLI(t, Deps{}, "send", "--session", "demo-1", "--message", " ")
if err == nil {
t.Fatal("expected usage error for empty message")
}
if got := ExitCode(err); got != 2 {
t.Fatalf("exit code = %d, want 2", got)
}
if !strings.Contains(err.Error(), "--message is required") {
t.Fatalf("error missing usage message: %v", err)
}
}

func TestSend_MissingSessionIsUsageError(t *testing.T) {
setConfigEnv(t)
_, _, err := executeCLI(t, Deps{}, "send", "--message", "hi")
if err == nil {
t.Fatal("expected usage error for missing --session")
}
if got := ExitCode(err); got != 2 {
t.Fatalf("exit code = %d, want 2", got)
}
}

func TestSend_ServerBadRequestExits1(t *testing.T) {
cfg := setConfigEnv(t)
srv, _ := sendServer(t, http.StatusBadRequest,
`{"error":"bad_request","code":"MESSAGE_REQUIRED","message":"Message is required"}`)
writeRunFileFor(t, cfg, srv)

_, errOut, err := executeCLI(t, Deps{
ProcessAlive: func(int) bool { return true },
}, "send", "--session", "demo-1", "--message", "hi")
if err == nil {
t.Fatal("expected runtime error from 400")
}
if got := ExitCode(err); got != 1 {
t.Fatalf("exit code = %d, want 1", got)
}
if !strings.Contains(err.Error(), "MESSAGE_REQUIRED") && !strings.Contains(errOut, "MESSAGE_REQUIRED") {
t.Fatalf("error did not surface the server error envelope: %v\nstderr=%s", err, errOut)
}
}

func TestSend_ServerNotFoundExits1(t *testing.T) {
cfg := setConfigEnv(t)
srv, _ := sendServer(t, http.StatusNotFound,
`{"error":"not_found","code":"SESSION_NOT_FOUND","message":"Unknown session"}`)
writeRunFileFor(t, cfg, srv)

_, _, err := executeCLI(t, Deps{
ProcessAlive: func(int) bool { return true },
}, "send", "--session", "missing", "--message", "hi")
if err == nil {
t.Fatal("expected runtime error from 404")
}
if got := ExitCode(err); got != 1 {
t.Fatalf("exit code = %d, want 1", got)
}
}

func TestSend_ServerInternalErrorExits1(t *testing.T) {
cfg := setConfigEnv(t)
srv, _ := sendServer(t, http.StatusInternalServerError,
`{"error":"internal","code":"SESSION_OPERATION_FAILED","message":"Session operation failed"}`)
writeRunFileFor(t, cfg, srv)

_, errOut, err := executeCLI(t, Deps{
ProcessAlive: func(int) bool { return true },
}, "send", "--session", "demo-1", "--message", "hi")
if err == nil {
t.Fatal("expected runtime error from 500")
}
if got := ExitCode(err); got != 1 {
t.Fatalf("exit code = %d, want 1", got)
}
// Regression guard: a future change that swallows the API envelope and
// prints only "daemon returned HTTP 500" would silently hide what the
// daemon was trying to tell the operator.
if !strings.Contains(err.Error(), "SESSION_OPERATION_FAILED") && !strings.Contains(errOut, "SESSION_OPERATION_FAILED") {
t.Fatalf("error did not surface the server error envelope: %v\nstderr=%s", err, errOut)
}
}

func TestSend_DaemonNotRunningExits1(t *testing.T) {
setConfigEnv(t)
_, _, err := executeCLI(t, Deps{}, "send", "--session", "demo-1", "--message", "hi")
if err == nil {
t.Fatal("expected error when daemon is not running")
}
if got := ExitCode(err); got != 1 {
t.Fatalf("exit code = %d, want 1", got)
}
}

func TestSend_NetworkErrorExits1(t *testing.T) {
cfg := setConfigEnv(t)
// Start and immediately close a server so the run-file points at a closed port.
srv, _ := sendServer(t, http.StatusOK, "{}")
writeRunFileFor(t, cfg, srv)
srv.Close()

_, _, err := executeCLI(t, Deps{
ProcessAlive: func(int) bool { return true },
}, "send", "--session", "demo-1", "--message", "hi")
if err == nil {
t.Fatal("expected runtime error from network failure")
}
if got := ExitCode(err); got != 1 {
t.Fatalf("exit code = %d, want 1", got)
}
}
11 changes: 8 additions & 3 deletions backend/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,15 @@ func Run() error {
// change_log -> poller -> broadcaster) and gives startSession the shared LCM.
lcStack := startLifecycle(ctx, store, runtimeAdapter, log)

// The agent messenger sends validated user input to the session's live
// zellij pane. Keep this path small until durable inbox semantics are needed.
messenger := newSessionMessenger(store, runtimeAdapter, log)

// Wire the controller-facing session service over the same store + LCM, the
// zellij runtime, a gitworktree workspace, and the per-session agent resolver
// (AO_AGENT default, validated here), then mount it on the API.
sessionSvc, err := startSession(cfg, runtimeAdapter, store, lcStack.LCM, log)
// zellij runtime, a gitworktree workspace, the per-session agent resolver
// (AO_AGENT default, validated here), and the agent messenger, then mount it
// on the API.
sessionSvc, err := startSession(cfg, runtimeAdapter, store, lcStack.LCM, messenger, log)
if err != nil {
stop()
lcStack.Stop()
Expand Down
54 changes: 42 additions & 12 deletions backend/internal/daemon/lifecycle_wiring.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,11 @@ func startLifecycle(ctx context.Context, store *sqlite.Store, runtime ports.Runt
// passed to startLifecycle before calling Stop.
func (l *lifecycleStack) Stop() { <-l.reaperDone }

// noopMessenger is a stub ports.AgentMessenger: durable writes and notifications
// work without it; only live agent nudges are absent until the runtime/agent
// nudge path is wired.
type noopMessenger struct{}

func (noopMessenger) Send(context.Context, domain.SessionID, string) error { return nil }

// startSession builds the controller-facing session service: a session manager
// over the real zellij runtime, a per-session gitworktree workspace, the shared
// store + LCM, and the per-session agent resolver (AO_AGENT default). The
// Messenger is a stub until the live agent-nudge path lands. The returned
// service is mounted at httpd APIDeps.Sessions.
func startSession(cfg config.Config, runtime ports.Runtime, store *sqlite.Store, lcm *lifecycle.Manager, log *slog.Logger) (*sessionsvc.Service, error) {
// store + LCM, the per-session agent resolver (AO_AGENT default), and the
// agent messenger. The returned service is mounted at httpd APIDeps.Sessions.
func startSession(cfg config.Config, runtime ports.Runtime, store *sqlite.Store, lcm *lifecycle.Manager, messenger ports.AgentMessenger, log *slog.Logger) (*sessionsvc.Service, error) {
agents, err := buildAgentResolver(cfg.Agent, log)
if err != nil {
return nil, err
Expand All @@ -76,13 +68,51 @@ func startSession(cfg config.Config, runtime ports.Runtime, store *sqlite.Store,
Agents: agents,
Workspace: ws,
Store: store,
Messenger: noopMessenger{},
Messenger: messenger,
Lifecycle: lcm,
DataDir: cfg.DataDir,
})
return sessionsvc.New(mgr, store), nil
}

// runtimeMessageSender is the narrow part of the concrete runtime needed by
// ao send. zellij.Runtime already implements this via SendMessage.
type runtimeMessageSender interface {
SendMessage(ctx context.Context, handle ports.RuntimeHandle, message string) error
}

// runtimeMessenger sends the user's message directly to the session's live
// runtime pane. The HTTP controller has already validated and sanitized the
// message body; this adapter only resolves the stored runtime handle.
type runtimeMessenger struct {
store *sqlite.Store
runtime runtimeMessageSender
}

func (m runtimeMessenger) Send(ctx context.Context, id domain.SessionID, message string) error {
rec, ok, err := m.store.GetSession(ctx, id)
if err != nil {
return err
}
if !ok {
return fmt.Errorf("session %s: %w", id, sessionmanager.ErrNotFound)
}
if rec.IsTerminated {
return fmt.Errorf("session %s: %w", id, sessionmanager.ErrTerminated)
}
handleID := rec.Metadata.RuntimeHandleID
if handleID == "" {
return fmt.Errorf("session %s: %w", id, sessionmanager.ErrIncompleteHandle)
}
return m.runtime.SendMessage(ctx, ports.RuntimeHandle{ID: handleID}, message)
}

// newSessionMessenger assembles the per-daemon agent messenger. For now, ao
// send is intentionally minimal: submit the message to the live runtime pane.
func newSessionMessenger(store *sqlite.Store, runtime runtimeMessageSender, _ *slog.Logger) ports.AgentMessenger {
return runtimeMessenger{store: store, runtime: runtime}
}

// buildAgentRegistry returns a registry populated with the agent adapters the
// daemon ships, keyed by manifest id. Registration only fails on an
// empty/duplicate id — a programmer error, not a runtime condition.
Expand Down
Loading
Loading