Conversation
Implement Agent Relay-style messaging for agent-to-agent communication: - Add relay_messages table for persistent message storage - Create internal/relay package with Relay manager, agent registry, and message routing - Support direct messages, broadcast (*), and case-insensitive agent names - Integrate with executor for agent registration on task start - Add idle detection for message delivery timing - Add CLI commands: ty relay send/read/list - Display relay messages in task detail logs with 📨 icon - Include comprehensive tests for relay functionality Messages are routed by agent name (derived from task title) and can be sent between running agents. Pending messages are delivered when agents are idle. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR implements an agent-to-agent messaging system inspired by Agent Relay, enabling tasks running as "agents" to communicate with each other. The implementation adds database persistence for messages, an in-memory relay manager, executor integration for agent lifecycle management, and CLI commands for sending and reading messages.
Changes:
- Added
relay_messagesdatabase table with indexed fields for efficient message retrieval - Created
internal/relaypackage with core messaging logic, agent registry, and database adapter - Integrated relay manager into executor with agent registration on task start and message delivery during idle periods
- Added three new CLI commands:
ty relay send,ty relay read, andty relay list - Enhanced task detail UI to display relay messages with 📨 icon
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/db/sqlite.go | Adds relay_messages table schema with indexes for efficient queries |
| internal/db/relay.go | Implements database layer for persisting and retrieving relay messages |
| internal/relay/relay.go | Core relay logic for agent registration, message routing, and broadcast support |
| internal/relay/store.go | Database adapter and helper methods for relay persistence |
| internal/relay/relay_test.go | Unit tests covering registration, messaging, broadcast, and command parsing |
| internal/executor/relay.go | Relay manager that integrates messaging with task execution and idle detection |
| internal/executor/executor.go | Hooks relay manager into executor lifecycle and worker loop |
| cmd/task/main.go | CLI commands for sending messages, reading messages, and listing agents |
| internal/ui/detail.go | Adds relay message icon for task detail logs |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
internal/relay/store.go
Outdated
| // LoadPendingMessages loads pending messages from the database into the relay. | ||
| func (r *Relay) LoadPendingMessages(database *db.DB) error { | ||
| // Get all agents and load their pending messages | ||
| r.mu.Lock() | ||
| agents := make([]string, 0, len(r.agents)) | ||
| for _, a := range r.agents { | ||
| agents = append(agents, a.Name) | ||
| } | ||
| r.mu.Unlock() | ||
|
|
||
| for _, name := range agents { | ||
| msgs, err := database.GetPendingRelayMessages(name) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| r.mu.Lock() | ||
| for _, m := range msgs { | ||
| r.messages = append(r.messages, &Message{ | ||
| ID: m.ID, | ||
| From: m.From, | ||
| To: m.To, | ||
| Content: m.Content, | ||
| TaskID: m.TaskID, | ||
| Status: m.Status, | ||
| CreatedAt: m.CreatedAt.Time, | ||
| }) | ||
| } | ||
| r.mu.Unlock() | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
The LoadPendingMessages function is defined as a method on *Relay, but it's placed in the store.go file. This is inconsistent with the package structure where relay.go contains Relay methods and store.go contains DBStore methods. Consider either moving this to relay.go or making it a standalone function if it doesn't need to be a method.
| // Register task as relay agent | ||
| if e.relay != nil { | ||
| e.relay.RegisterAgent(task) | ||
| } |
There was a problem hiding this comment.
Agents registered on line 887 are never unregistered when tasks finish executing. This can lead to a memory leak where agents remain in the relay registry even after tasks complete, fail, or are interrupted. The defer block starting at line 863 cleans up runningTasks but does not unregister the agent. Add agent unregistration to that defer block to ensure proper cleanup.
cmd/task/main.go
Outdated
| name = strings.Map(func(r rune) rune { | ||
| if (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '-' || r == '_' { | ||
| return r | ||
| } | ||
| if r == ' ' { | ||
| return '-' | ||
| } | ||
| return -1 | ||
| }, name) | ||
| if len(name) > 32 { | ||
| name = name[:32] | ||
| } |
There was a problem hiding this comment.
The agent name generation logic is duplicated in two places: here in the CLI and in internal/executor/relay.go agentName() method (lines 62-70). This code duplication violates DRY principles and could lead to inconsistencies if one implementation is updated without the other. Consider extracting this logic to a shared function in the relay package that both the executor and CLI can use.
internal/relay/store.go
Outdated
| // GetAgentByTaskID finds an agent by their task ID. | ||
| func (r *Relay) GetAgentByTaskID(taskID int64) *Agent { | ||
| r.mu.RLock() | ||
| defer r.mu.RUnlock() | ||
|
|
||
| for _, a := range r.agents { | ||
| if a.TaskID == taskID { | ||
| return a | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // Heartbeat updates an agent's last seen time. | ||
| func (r *Relay) Heartbeat(name string) { | ||
| r.mu.Lock() | ||
| defer r.mu.Unlock() | ||
|
|
||
| if agent, ok := r.agents[normalize(name)]; ok { | ||
| agent.LastSeen = time.Now() | ||
| } | ||
| } |
There was a problem hiding this comment.
The function GetAgentByTaskID and Heartbeat are also Relay methods placed in store.go, which is inconsistent with the package organization. These methods don't interact with the database store and should be in relay.go alongside other Relay methods for better code organization and maintainability.
internal/executor/executor.go
Outdated
| // Record activity for relay idle detection (except for relay messages themselves) | ||
| if e.relay != nil && lineType != "relay" { |
There was a problem hiding this comment.
The RecordActivity function is being called for all log types except "relay" messages (line 3041), but this means system messages, errors, and other non-output events will reset the idle timer. This could prevent relay messages from being delivered to agents that are actively processing but haven't produced output in 1.5 seconds. Consider being more selective about which log types reset the idle timer - perhaps only "output" type should count as activity, since that represents actual task progress.
| // Record activity for relay idle detection (except for relay messages themselves) | |
| if e.relay != nil && lineType != "relay" { | |
| // Record activity for relay idle detection only for actual output lines | |
| if e.relay != nil && lineType == "output" { |
internal/relay/relay_test.go
Outdated
| func contains(s, substr string) bool { | ||
| return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsHelper(s, substr)) | ||
| } | ||
|
|
||
| func containsHelper(s, substr string) bool { | ||
| for i := 0; i <= len(s)-len(substr); i++ { | ||
| if s[i:i+len(substr)] == substr { | ||
| return true | ||
| } | ||
| } | ||
| return false | ||
| } |
There was a problem hiding this comment.
The custom contains() and containsHelper() functions reinvent the wheel when strings.Contains() is available in the standard library and is used extensively throughout the codebase (found in 60+ files). Replace these with strings.Contains() for consistency with the rest of the codebase and to reduce unnecessary code.
cmd/task/main.go
Outdated
| blockedTasks, _ := database.ListTasks(db.ListTasksOptions{Status: db.StatusBlocked, Limit: 100}) | ||
| tasks = append(tasks, blockedTasks...) |
There was a problem hiding this comment.
Error from database.ListTasks() is being silently ignored with the blank identifier. If this query fails, blockedTasks will be empty and agents running in blocked state won't be shown to the user, which could be confusing. Consider logging the error or handling it appropriately to ensure users are aware when blocked tasks can't be retrieved.
| blockedTasks, _ := database.ListTasks(db.ListTasksOptions{Status: db.StatusBlocked, Limit: 100}) | |
| tasks = append(tasks, blockedTasks...) | |
| blockedTasks, err := database.ListTasks(db.ListTasksOptions{Status: db.StatusBlocked, Limit: 100}) | |
| if err != nil { | |
| fmt.Fprintln(os.Stderr, errorStyle.Render("Warning: failed to list blocked tasks: "+err.Error())) | |
| } else { | |
| tasks = append(tasks, blockedTasks...) | |
| } |
| if r.store != nil { | ||
| r.store.MarkDelivered(msgID) | ||
| } |
There was a problem hiding this comment.
The MarkDelivered and MarkRead store operations in lines 166 and 62 respectively ignore any errors returned from the database layer. If these operations fail, the message status won't be updated in the database but will be marked as delivered/read in memory, leading to inconsistent state. Consider logging errors at minimum, or handling them to ensure database and memory state stay in sync.
internal/executor/relay.go
Outdated
| // Limit length | ||
| if len(name) > 32 { | ||
| name = name[:32] | ||
| } |
There was a problem hiding this comment.
The agent name normalization in the relay package uses normalize() which does lowercase + trim, but the agentName() function in RelayManager uses strings.Map to clean special characters. This creates an asymmetry where agent names could potentially mismatch. For example, if a task title is "Test Agent!" it becomes "Test-Agent" but is stored/looked up as "test-agent". While this works due to normalize() being applied consistently in relay.go, it's fragile because the CLI must duplicate the exact cleaning logic. Consider having agentName() also call normalize() at the end for consistency.
| } | |
| } | |
| // Final normalization to match relay.normalize behavior (lowercase + trim) | |
| name = strings.ToLower(strings.TrimSpace(name)) |
internal/relay/store.go
Outdated
| func (r *Relay) LoadPendingMessages(database *db.DB) error { | ||
| // Get all agents and load their pending messages | ||
| r.mu.Lock() | ||
| agents := make([]string, 0, len(r.agents)) | ||
| for _, a := range r.agents { | ||
| agents = append(agents, a.Name) | ||
| } | ||
| r.mu.Unlock() | ||
|
|
||
| for _, name := range agents { | ||
| msgs, err := database.GetPendingRelayMessages(name) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| r.mu.Lock() | ||
| for _, m := range msgs { | ||
| r.messages = append(r.messages, &Message{ | ||
| ID: m.ID, | ||
| From: m.From, | ||
| To: m.To, | ||
| Content: m.Content, | ||
| TaskID: m.TaskID, | ||
| Status: m.Status, | ||
| CreatedAt: m.CreatedAt.Time, | ||
| }) | ||
| } | ||
| r.mu.Unlock() | ||
| } | ||
| return nil |
There was a problem hiding this comment.
The LoadPendingMessages function iterates through agents and loads pending messages from the database, but it doesn't check if those messages already exist in r.messages before appending them. This could lead to duplicate messages in memory if LoadPendingMessages is called multiple times. Consider either checking for duplicates before appending or clearing r.messages before loading.
PR Review Fixes AppliedAll Copilot review comments have been addressed:
Local verification
|
- Fix ineffectual assignment lint error (statusIcon variable) - Fix silent error handling for blockedTasks query - Add shared CleanAgentName function to avoid code duplication - Move GetAgentByTaskID and Heartbeat methods to relay.go - Add agent unregistration when tasks finish (fixes memory leak) - Track only "output" lines for idle detection - Replace custom contains() with strings.Contains in tests Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
aaa9955 to
cd5fccc
Compare
Summary
relay_messagestable for persistent message storageinternal/relaypackage with Relay manager, agent registry, and message routingty relay send/read/listImplementation Details
Database Schema:
relay_messagestable with id, from_agent, to_agent, content, task_id, status, timestampsRelay Package (
internal/relay/):Relaystruct manages agents and messages in memory with DB persistence*), and case-insensitive agent namesDBStoreadapter connects relay to database layerExecutor Integration:
CLI Commands:
ty relay send <to> <message>- Send message to agentty relay read [id]- Read message by ID or list for agentty relay list- List connected agentsUI:
Test plan
🤖 Generated with Claude Code