Skip to content

Event driven architecture#115

Open
tuanknguyen wants to merge 17 commits intomainfrom
feat/event-driven-messaging
Open

Event driven architecture#115
tuanknguyen wants to merge 17 commits intomainfrom
feat/event-driven-messaging

Conversation

@tuanknguyen
Copy link
Copy Markdown
Contributor

Notes to Reviewers

This PR replaces the watchdog-based polling architecture with an event-driven pub/sub system for terminal output processing, status detection, and inbox message delivery. Terminal output now streams through named FIFOs into an in-process event bus, eliminating filesystem polling and expensive tmux subprocess calls.

Category Files Review Priority
Event bus core event_bus.py, fifo_reader.py, status_monitor.py, log_writer.py Critical — new infrastructure, threading/async boundary
Async conversion base.py, all 5 providers, terminal.py (utils) Hightime.sleepawait asyncio.sleep throughout init chain
Service rewrites inbox_service.py, terminal_service.py, main.py (lifespan) High — wiring changes, startup/shutdown lifecycle
Cleanup paths session_service.py, cleanup_service.py, terminal_service.delete_terminal Medium — FIFO + state cleanup on delete
Docs & config event-driven-architecture.md, CODEBASE.md, constants.py, pyproject.toml Low
Tests test_codex_provider_unit.py, test_gemini_cli_unit.py, test_inbox_service.py Low — adapted to new signatures

Start here: Read docs/event-driven-architecture.md for the full architecture overview, ASCII data-flow diagram, and component roles before diving into code.

Key changes:

  • Introduced an in-process EventBus with wildcard topic matching and thread-safe publishing
  • Replaced watchdog PollingObserver with FIFO readers (named pipes + os.read()) for real-time terminal output streaming
  • Added StatusMonitor (rolling 8KB buffer + provider pattern matching) as the single source of truth for terminal status
  • Added LogWriter consumer for debug log persistence
  • Refactored InboxService from file-watching handler to event-driven consumer
  • Converted the entire initialization chain to async (initialize(), wait_for_shell(), wait_until_status())
  • Removed watchdog and aiofiles dependencies

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@tuanknguyen tuanknguyen requested review from a team and haofeif March 13, 2026 02:10
@haofeif
Copy link
Copy Markdown
Contributor

haofeif commented Mar 13, 2026

@patricka3125 feel free to have a look at this PR too.

@haofeif haofeif added the enhancement New feature or request label Mar 13, 2026
@haofeif
Copy link
Copy Markdown
Contributor

haofeif commented Mar 13, 2026

@tuanknguyen is this document still correct ? the change wont impact the user experience right ? https://github.com/awslabs/cli-agent-orchestrator/tree/main/examples/assign ?

also would you mind fixing the unit testing errors ?

@tuanknguyen
Copy link
Copy Markdown
Contributor Author

@haofeif that's correct, this does not affect the assign or handoff or send message pattern at all. All unit tests also passed.

Separately, we need to consolidate the provider implementation. there's quite a bit of code duplication across the different providers

terminal_id = terminal_id_from_topic(event["topic"])
log_path = TERMINAL_LOG_DIR / f"{terminal_id}.log"
with open(log_path, "a") as f:
f.write(event["data"]["data"])
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could consider using asyncio.to_thread here to minimize blocking in event loop

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a sync file write inside an async loop, every output will block the event loop briefly

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! I'll update it.


Returns:
bool: True if a message was sent, False otherwise
def deliver_pending(self, terminal_id: str) -> None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would we ever consider delivering all messages at once here? Or perhaps add some way for user to specify message size as a flag?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure thing. I'll add number of messages as an optional param default to 1

@haofeif
Copy link
Copy Markdown
Contributor

haofeif commented Mar 16, 2026

@tuanknguyen thanks for it. also it looks like the current change breaks the handoff function. In my test using code_supervisor, developer and reviewer agent. i found some issues (in kiro-cli)

  • code_supervisor is able to handoff to developer, but when it completes the job, the code_supervisor agent is not aware of its completion (i have to ctrl+C the handoff from supervisor agent and then tells it that it is completed)
  • code_supervisor then will handoff to reviewer, but no extra tmux agent is spinned up. It was hung as below messages and no further tmux session is created.
Let me retry the handoff to the Code Reviewer Agent.
Running tool handoff with the param (from mcp server: cao-mcp-server)
 ⋮  {
 ⋮    "agent_profile": "reviewer",
 ⋮    "message": "You are the Code Reviewer Agent. Please read the review task description at the following absolute path and perform a code review:\n\n/Users/haofeif/Amazon-WorkDocs/Code/AIMLAoD/AIAgents/OpenshiftToEKS/cli-agent-orchestrator/cli-agent-orchestrator/tasks/hello-world-review.md\n\nRead the code file referenced in the task and provide your review. End with a clear verdict: APPROVED or NEEDS CHANGES (with specific feedback)."
 ⋮  }
 

When testing the Assign examples, similar issues seem to be hanging the handoff session.

@haofeif
Copy link
Copy Markdown
Contributor

haofeif commented Mar 16, 2026

@tuanknguyen also i did the e2e test results for 3 providers, i guess let's test the kiro and claude code in your end and see how it goes

E2E Test Results — feat/event-driven-messaging

Summary

Provider main (baseline) feat/event-driven-messaging Regression
Kiro CLI 8/8 ✅ 0/8 ❌ Yes — COMPLETED never detected
Claude Code 8/8 ✅ 0/8 ❌ Yes — IDLE never detected during init
Kimi CLI 8/8 ✅ 6-7/8 ⚠️ Partial — 1 consistent fail, 1 flaky

Test Location

All E2E tests are in test/e2e/:

Test File Test Classes What It Tests
test_assign.py TestKiroCliAssign, TestClaudeCodeAssign, TestKimiCliAssign Create worker terminal, send task, verify COMPLETED, extract output
test_handoff.py TestKiroCliHandoff, TestClaudeCodeHandoff, TestKimiCliHandoff Create terminal, send task, poll for COMPLETED, extract response
test_send_message.py TestKiroCliSendMessage, TestClaudeCodeSendMessage, TestKimiCliSendMessage Create 2 terminals, send inbox message, verify delivery
test_supervisor_orchestration.py TestKiroCliSupervisorOrchestration, TestClaudeCodeSupervisorOrchestration, TestKimiCliSupervisorOrchestration Supervisor delegates via
handoff/assign MCP tools

Failure Details

Kiro CLI — 0/8 (all failed)

All 8 tests fail at wait_for_status(terminal_id, "completed"). Terminal creation and init succeed (IDLE detected), message is sent successfully, but COMPLETED is never detected after the agent finishes.

Test Error
test_assign.py::TestKiroCliAssign::test_assign_data_analyst Worker did not reach COMPLETED within 180s (provider=kiro_cli)
test_assign.py::TestKiroCliAssign::test_assign_report_generator Worker did not reach COMPLETED within 180s (provider=kiro_cli)
test_assign.py::TestKiroCliAssign::test_assign_with_callback Worker did not reach COMPLETED within 180s (provider=kiro_cli)
test_handoff.py::TestKiroCliHandoff::test_handoff_simple_function Terminal did not reach COMPLETED within 180s (provider=kiro_cli)
test_handoff.py::TestKiroCliHandoff::test_handoff_second_task` `Terminal did not reach COMPLETED within 180s (provider=kiro_cli)
test_send_message.py::TestKiroCliSendMessage::test_send_message_to_inbox Receiver should have transitioned from IDLE after inbox delivery within 60s, got: idle
test_supervisor_orchestration.py::TestKiroCliSupervisorOrchestration::test_supervisor_handoff Supervisor did not reach COMPLETED within 300s. Last status: idle
test_supervisor_orchestration.py::TestKiroCliSupervisorOrchestration::test_supervisor_assign_and_handoff Supervisor did not reach COMPLETED within 300s. Last status: idle

Possible Root cause: StatusMonitor returns idle instead of completed. Kiro CLI's get_status() needs both a green arrow pattern (response) AND an idle prompt after it to return COMPLETED. The 8KB rolling buffer loses the green arrow as the response grows, leaving only the idle prompt → returns IDLE.

Claude Code — 0/8 (all failed)

All 8 tests fail at create_terminal() — the terminal never initializes.

Test Error
test_assign.py::TestClaudeCodeAssign::test_assign_data_analyst Claude Code initialization timed out after 30 seconds
test_assign.py::TestClaudeCodeAssign::test_assign_report_generator Claude Code initialization timed out after 30 seconds
test_assign.py::TestClaudeCodeAssign::test_assign_with_callback Claude Code initialization timed out after 30 seconds
test_handoff.py::TestClaudeCodeHandoff::test_handoff_simple_function Claude Code initialization timed out after 30 seconds
test_handoff.py::TestClaudeCodeHandoff::test_handoff_second_task Claude Code initialization timed out after 30 seconds
test_send_message.py::TestClaudeCodeSendMessage::test_send_message_to_inbox Claude Code initialization timed out after 30 seconds
test_supervisor_orchestration.py::TestClaudeCodeSupervisorOrchestration::test_supervisor_handoff Claude Code initialization timed out after 30 seconds
test_supervisor_orchestration.py::TestClaudeCodeSupervisorOrchestration::test_supervisor_assign_and_handoff Claude Code initialization timed out after 30 seconds

Root cause: StatusMonitor never detects IDLE during provider.initialize(). The FIFO → EventBus → StatusMonitor pipeline doesn't deliver output fast enough (or at all) for Claude Code's get_status() to match the IDLE prompt pattern within the 30s timeout.

Architectural Possible Root Cause

On main, get_terminal() calls provider.get_status() which reads fresh tmux scrollback (tmux capture-pane) on every poll — full history available.

On feat/event-driven-messaging, get_terminal() calls status_monitor.get_status() which returns a cached status derived from an 8KB rolling buffer fed by the FIFO pipeline.

This causes two problems:

  1. Buffer truncation: Long agent responses push early patterns (e.g., Kiro CLI's green arrow) out of the 8KB window, breaking COMPLETED detection
  2. Pipeline timing: The FIFO → EventBus → StatusMonitor pipeline may not deliver output fast enough during provider initialization, breaking IDLE detection for Claude Code

@tuanknguyen
Copy link
Copy Markdown
Contributor Author

@haofeif thanks for flagging the issue. I noticed that the issue was with the regex of the COMPLETE status detection. We now read from the buffer directly which means that there are raw control characters or ANSI escape. It's not due to sequencing or event bus not delivering fast enough. I'll update and push the changes.

@patricka3125
Copy link
Copy Markdown
Collaborator

Any updates on the progress of this PR? I have also been ecountering issues mentioned in #131 on a main branch build and this seems a promising fix 👀

@haofeif
Copy link
Copy Markdown
Contributor

haofeif commented Apr 7, 2026

Any updates on the progress of this PR? I have also been ecountering issues mentioned in #131 on a main branch build and this seems a promising fix 👀

yes this will be continued worked on

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants