Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions backend/internal/cli/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"net/url"
"os"
"strings"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -41,6 +42,9 @@ func (c *commandContext) sendMessage(ctx context.Context, opts sendOptions) erro
return usageError{errors.New("usage: --message is required")}
}
message := opts.message
if sender := strings.TrimSpace(os.Getenv("AO_SESSION_ID")); sender != "" {
message = "[from " + sender + "] " + message
}
session := strings.TrimSpace(opts.session)
if session == "" {
return usageError{errors.New("usage: --session is required")}
Expand Down
51 changes: 51 additions & 0 deletions backend/internal/cli/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func sendServer(t *testing.T, status int, respBody string) (*httptest.Server, *s
}

func TestSend_Success(t *testing.T) {
t.Setenv("AO_SESSION_ID", "")
cfg := setConfigEnv(t)
srv, capture := sendServer(t, http.StatusOK,
`{"ok":true,"sessionId":"demo-1","message":"hello agent"}`)
Expand All @@ -83,7 +84,57 @@ func TestSend_Success(t *testing.T) {
}
}

func TestSend_PrefixesMessageWithSenderSessionID(t *testing.T) {
t.Setenv("AO_SESSION_ID", "aa-47")
cfg := setConfigEnv(t)
srv, capture := sendServer(t, http.StatusOK,
`{"ok":true,"sessionId":"demo-1","message":"hi"}`)
writeRunFileFor(t, cfg, srv)

_, errOut, err := executeCLI(t, Deps{
ProcessAlive: func(int) bool { return true },
}, "send", "--session", "demo-1", "--message", " hi ")
if err != nil {
t.Fatalf("unexpected error: %v\nstderr=%s", err, errOut)
}
var req struct {
Message string `json:"message"`
}
if err := json.Unmarshal([]byte(capture.body), &req); err != nil {
t.Fatalf("decode body: %v\nbody=%s", err, capture.body)
}
want := "[from aa-47] hi "
if req.Message != want {
t.Errorf("captured message = %q, want %q", req.Message, want)
}
}

func TestSend_BlankSenderSessionIDDoesNotPrefixMessage(t *testing.T) {
t.Setenv("AO_SESSION_ID", " \t ")
cfg := setConfigEnv(t)
srv, capture := sendServer(t, http.StatusOK,
`{"ok":true,"sessionId":"demo-1","message":"hello agent"}`)
writeRunFileFor(t, cfg, srv)

_, errOut, err := executeCLI(t, Deps{
ProcessAlive: func(int) bool { return true },
}, "send", "--session", "demo-1", "--message", "hello agent")
if err != nil {
t.Fatalf("unexpected error: %v\nstderr=%s", err, errOut)
}
var req struct {
Message string `json:"message"`
}
if err := json.Unmarshal([]byte(capture.body), &req); err != nil {
t.Fatalf("decode body: %v\nbody=%s", err, capture.body)
}
if req.Message != "hello agent" {
t.Errorf("captured message = %q, want %q", req.Message, "hello agent")
}
}

func TestSend_PreservesMessageWhitespace(t *testing.T) {
t.Setenv("AO_SESSION_ID", "")
cfg := setConfigEnv(t)
srv, capture := sendServer(t, http.StatusOK, `{"ok":true,"sessionId":"demo-1","message":"hi"}`)
writeRunFileFor(t, cfg, srv)
Expand Down
70 changes: 69 additions & 1 deletion backend/internal/session_manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ func New(d Deps) *Manager {
// workspace and runtime, then reports completion to the LCM. A failure after the
// row exists parks it as terminated and rolls back what was built.
func (m *Manager) Spawn(ctx context.Context, cfg ports.SpawnConfig) (domain.SessionRecord, error) {
prompt, err := m.buildSpawnPrompt(ctx, cfg)
if err != nil {
return domain.SessionRecord{}, fmt.Errorf("spawn: prompt: %w", err)
}

rec, err := m.store.CreateSession(ctx, seedRecord(cfg, m.clock()))
if err != nil {
return domain.SessionRecord{}, fmt.Errorf("spawn: create: %w", err)
Expand All @@ -118,7 +123,6 @@ func (m *Manager) Spawn(ctx context.Context, cfg ports.SpawnConfig) (domain.Sess
return domain.SessionRecord{}, fmt.Errorf("spawn %s: workspace: %w", id, err)
}

prompt := buildPrompt(cfg)
agent, ok := m.agents.Agent(cfg.Harness)
if !ok {
_ = m.workspace.Destroy(ctx, ws)
Expand Down Expand Up @@ -321,6 +325,70 @@ func buildPrompt(cfg ports.SpawnConfig) string {
}
}

func (m *Manager) buildSpawnPrompt(ctx context.Context, cfg ports.SpawnConfig) (string, error) {
prompt := buildPrompt(cfg)
switch cfg.Kind {
case domain.KindOrchestrator:
return appendPromptSection(orchestratorPrompt(cfg.ProjectID), prompt), nil
case domain.KindWorker:
orchestratorID, ok, err := m.activeOrchestratorSessionID(ctx, cfg.ProjectID)
if err != nil {
return "", err
}
if ok {
prompt = appendPromptSection(prompt, workerOrchestratorPrompt(orchestratorID))
}
}
return prompt, nil
}

func (m *Manager) activeOrchestratorSessionID(ctx context.Context, project domain.ProjectID) (domain.SessionID, bool, error) {
recs, err := m.store.ListSessions(ctx, project)
if err != nil {
return "", false, fmt.Errorf("list sessions for %s: %w", project, err)
}
for _, rec := range recs {
if rec.Kind == domain.KindOrchestrator && !rec.IsTerminated {
return rec.ID, true, nil
}
}
Comment thread
harshitsinghbhandari marked this conversation as resolved.
return "", false, nil
}

func orchestratorPrompt(project domain.ProjectID) string {
return fmt.Sprintf(`## Orchestrator role

You are the human-facing coordinator for project %s. Coordinate work for the human, keep the project moving, and avoid doing implementation yourself unless it is necessary.

Spawn worker sessions for implementation with:
`+"`ao spawn --project %s --prompt \"<clear worker task>\"`"+`

Message workers with `+"`ao send`"+`, for example:
`+"`ao send --session <worker-session-id> --message \"<your message>\"`"+`

Use workers for focused implementation tasks, track their progress, synthesize their results, and only step into implementation directly for true emergencies or small coordination fixes.`, project, project)
}

func workerOrchestratorPrompt(orchestratorID domain.SessionID) string {
return fmt.Sprintf(`## Orchestrator coordination

An active orchestrator session exists for this project. If you hit a true blocker or need cross-session coordination, message it with:
`+"`ao send --session %s --message \"<your message>\"`"+`

Only ping the orchestrator for true blockers, cross-session coordination, or decisions that cannot be resolved within your own task.`, orchestratorID)
}

func appendPromptSection(prompt, section string) string {
switch {
case prompt == "":
return section
case section == "":
return prompt
default:
return prompt + "\n\n" + section
}
}

func spawnEnv(id domain.SessionID, project domain.ProjectID, issue domain.IssueID, dataDir string) map[string]string {
return map[string]string{
EnvSessionID: string(id),
Expand Down
57 changes: 57 additions & 0 deletions backend/internal/session_manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -258,6 +259,62 @@ func TestSpawn_DefaultsBranchFromSessionID(t *testing.T) {
}
}

func TestSpawnWorker_AppendsActiveOrchestratorContact(t *testing.T) {
m, st, _, _ := newManager()
st.num = 1
st.sessions["mer-1"] = domain.SessionRecord{ID: "mer-1", ProjectID: "mer", Kind: domain.KindOrchestrator}

s, err := m.Spawn(ctx, ports.SpawnConfig{ProjectID: "mer", Kind: domain.KindWorker, Prompt: "do it"})
if err != nil {
t.Fatal(err)
}
prompt := st.sessions[s.ID].Metadata.Prompt
for _, want := range []string{
"do it",
"## Orchestrator coordination",
`ao send --session mer-1 --message "<your message>"`,
"Only ping the orchestrator for true blockers, cross-session coordination",
} {
if !strings.Contains(prompt, want) {
t.Fatalf("prompt missing %q:\n%s", want, prompt)
}
}
}

func TestSpawnWorker_SkipsTerminatedOrchestratorContact(t *testing.T) {
m, st, _, _ := newManager()
st.num = 1
st.sessions["mer-1"] = domain.SessionRecord{ID: "mer-1", ProjectID: "mer", Kind: domain.KindOrchestrator, IsTerminated: true}

s, err := m.Spawn(ctx, ports.SpawnConfig{ProjectID: "mer", Kind: domain.KindWorker, Prompt: "do it"})
if err != nil {
t.Fatal(err)
}
prompt := st.sessions[s.ID].Metadata.Prompt
if strings.Contains(prompt, "## Orchestrator coordination") || strings.Contains(prompt, "ao send --session mer-1") {
t.Fatalf("terminated orchestrator should not be added to prompt:\n%s", prompt)
}
}

func TestSpawnOrchestrator_UsesCoordinatorPrompt(t *testing.T) {
m, st, _, _ := newManager()
s, err := m.Spawn(ctx, ports.SpawnConfig{ProjectID: "mer", Kind: domain.KindOrchestrator})
if err != nil {
t.Fatal(err)
}
prompt := st.sessions[s.ID].Metadata.Prompt
for _, want := range []string{
"You are the human-facing coordinator for project mer",
`ao spawn --project mer --prompt "<clear worker task>"`,
"`ao send`",
"avoid doing implementation yourself unless it is necessary",
} {
if !strings.Contains(prompt, want) {
t.Fatalf("prompt missing %q:\n%s", want, prompt)
}
}
}

func TestSpawn_KeepsExplicitBranch(t *testing.T) {
m, st, _, _ := newManager()
s, err := m.Spawn(ctx, ports.SpawnConfig{ProjectID: "mer", Kind: domain.KindWorker, Branch: "feature/x"})
Expand Down
Loading