Skip to content

aygp-dr/abq

Repository files navigation

Agent Bus Queue (abq)

https://img.shields.io/badge/status-experimental-orange.svg https://img.shields.io/github/actions/workflow/status/aygp-dr/abq/ci.yml.svg?branch=main&label=CI https://img.shields.io/codecov/c/github/aygp-dr/abq.svg https://img.shields.io/pypi/v/abq.svg https://img.shields.io/badge/python-3.10%20%7C%203.11%20%7C%203.12-blue.svg https://img.shields.io/github/license/aygp-dr/abq.svg

A minimal file-based message bus for coordinating AI agents across repositories, worktrees, and processes. Inspired by Efrit’s queue system but generalized for any agent-to-agent communication.

Problem Statement

When running AI agents (Claude Code, Cursor, Efrit) in separate contexts - different repos, worktrees, or processes - they need a simple way to coordinate:

  • Test agent in repo A needs to signal app agent in repo B when tests pass
  • Main branch agent needs to dispatch work to feature branch agents
  • Supervisor agent needs to monitor/coordinate worker agents

The solution should be:

  • Zero daemon / zero broker (no NATS, Redis, RabbitMQ to run)
  • Work with any language (shell, Python, Ruby, Node, etc.)
  • Git-aware (agents identified by org/repo/branch)
  • Trivially debuggable (just JSON files in directories)

Architecture

~/.abq/
├── registry.json          # Agent/channel metadata
├── channels/
│   ├── test-runner/       # Channel: test-runner
│   │   ├── requests/      # Incoming messages
│   │   ├── processing/    # In-flight (prevents double-pickup)
│   │   ├── responses/     # Replies
│   │   └── archive/       # Completed
│   ├── app-agent/
│   └── broadcast/         # Global pubsub
├── agents/                # Per-agent state (optional)
└── logs/

Protocol

Request

{
  "id": "req_018d5a3b2c4e",
  "version": "1.0.0",
  "type": "signal",
  "from": {
    "agent": "github.com/myorg/tests/main",
    "pid": 12345,
    "pwd": "/path/to/repo"
  },
  "to": "app-agent",
  "content": "{\"signal\": \"tests_passed\", \"count\": 42}",
  "timestamp": "2025-01-28T12:00:00Z",
  "ttl": 300
}

Response

{
  "id": "req_018d5a3b2c4e",
  "version": "1.0.0",
  "status": "success",
  "from": {
    "agent": "github.com/myorg/app/main",
    "pid": 12346
  },
  "result": "acknowledged",
  "timestamp": "2025-01-28T12:00:01Z"
}

Message Types

TypePurposeContent
signalNotifications (tests passed, build done){signal: string, data?: any}
commandNatural language instructionsString
evalExecute code/shellString
statusHealth check / heartbeat{}

Quick Start

# Initialize
abq init

# Create channels
abq channel create test-runner
abq channel create app-agent

# Terminal 1: Watch for signals
abq watch app-agent --handler ./my-handler.sh

# Terminal 2: Send a signal
abq send app-agent signal '{"signal": "tests_passed"}'

Implementation

See abq-spec.org for the full tangleable implementation in Org mode.

Research: Communication Patterns for Agent Coordination

Related Systems

1. Efrit (Emacs Agent)

Efrit uses a file-based queue for AI agent communication:

.efrit/queues/
├── requests/
├── responses/
├── processing/
└── archive/

Efrit uses JSON messages with types: eval, command, chat, status. Other agents (Claude Code, Cursor) can interact via the Model Context Protocol or directly via the filesystem queue.

abq generalizes Efrit’s single-queue design to multiple named channels with per-channel state directories:

# Efrit: single queue at .efrit/queues/requests/
# abq: named channels, each with full state machine
abq channel create test-runner
abq channel create app-agent

ls ~/.abq/channels/
# broadcast/  test-runner/  app-agent/

# Each channel has the same structure as Efrit's queue
ls ~/.abq/channels/test-runner/
# requests/  processing/  responses/  archive/

# abq shares Efrit's message types (plus "status")
abq send test-runner eval 'print("hello from abq")'
abq send test-runner command "run lint"
abq send test-runner signal '{"signal": "tests_passed"}'

2. Unix Spool Pattern

The spool directory pattern is a venerable Unix approach:

  • Print spoolers (/var/spool/lpd)
  • Mail queues (/var/spool/mail)
  • Cron (/var/spool/cron)
  • Batch systems (HTCondor, PBS)

Key characteristics:

  • FIFO processing
  • State directories: new/cur/done/
  • Lock files prevent double-pickup
  • No daemon required for submitters

abq follows the spool pattern with directory-based state transitions instead of lock files:

# Spool pattern: new/ → cur/ → done/
# abq mapping:   requests/ → processing/ → archive/

# Submit a job (like lpr submitting to the spool)
abq send test-runner command "run full test suite"

# Worker picks up the job (atomic rename, no lock file needed)
abq recv test-runner
# Message moves: requests/req_018d5a3b.json → processing/req_018d5a3b.json

# Worker completes and responds (like moving to done/)
abq respond test-runner req_018d5a3b --status success --result '{"passed": 64}'
# Original moves: processing/ → archive/
# Response written: responses/req_018d5a3b.json

# FIFO ordering: messages sorted by timestamp in filename
ls ~/.abq/channels/test-runner/requests/
# req_018d5a3b2c4e.json  (oldest first)
# req_018d5a3b4f71.json

3. Maildir Format

The Maildir format (used by qmail, Dovecot) solves similar problems:

  • Atomic file creation (write to tmp/, rename to new/)
  • No locking required
  • Multiple readers safe

abq uses the same atomic-rename trick that makes Maildir reliable:

# Maildir: write to tmp/, rename to new/ (atomic delivery)
# abq:     write to requests/ (atomic via rename), recv atomically renames to processing/

# send() writes atomically — partial messages never visible to recv()
abq send app-agent signal '{"signal": "deploy_ready"}'

# recv() claims via atomic rename — two concurrent receivers never get the same message
# Receiver A: rename requests/req_abc.json → processing/req_abc.json  ✓ (wins)
# Receiver B: rename requests/req_abc.json → processing/req_abc.json  ✗ (FileNotFoundError, retries next)

# Like Maildir, no advisory locks (flock/fcntl) needed — rename is the lock

Theoretical Foundations

Actor Model

The Actor Model (Hewitt, 1973) defines agents with:

  • Mailbox: Queue of incoming messages
  • Behavior: How to process each message
  • State: Private, mutable only by the actor
  • Address: How other actors find this one

Each maps directly to abq:

Mailbox → requests/ directory

Each channel’s requests/ directory is its mailbox. Messages queue as JSON files, ordered by timestamp:

# Send two messages — they queue in the mailbox
abq send test-runner signal '{"signal": "tests_passed", "count": 42}'
abq send test-runner command "rerun flaky suite"

# Inspect the mailbox directly
ls ~/.abq/channels/test-runner/requests/
# req_018d5a3b2c4e.json  req_018d5a3b4f71.json

Behavior → message handler

The handler defines how the actor processes each message. This is a shell script, Python function, or any executable:

# Handler script: my-handler.sh
#!/bin/sh
# $1 is the JSON message file
TYPE=$(jq -r .type "$1")
case "$TYPE" in
  signal)  echo "Got signal: $(jq -r .content "$1")" ;;
  command) eval "$(jq -r .content "$1")" ;;
  eval)    python3 -c "$(jq -r .content "$1")" ;;
  status)  echo '{"status": "alive"}' ;;
esac

# Attach behavior to an actor
abq watch test-runner --handler ./my-handler.sh

State → channel directories + processing/

Actor state is the contents of its channel directories. The processing/ directory represents in-flight work — messages the actor has claimed but not yet completed:

# Actor claims a message (atomic rename: requests/ → processing/)
abq recv test-runner

# State is visible via filesystem
ls ~/.abq/channels/test-runner/
# requests/    — pending mailbox
# processing/  — in-flight (actor's private working state)
# responses/   — completed replies
# archive/     — finished messages

# Or via the status command
abq status
# Channel test-runner: 0 requests, 1 processing, 3 responses, 12 archived

Address → github.com/org/repo/branch

Actors are addressed by their git context. The from field in every message identifies the sender; the to field names the destination channel:

# Send from repo A to an agent watching channel "app-agent"
abq send app-agent signal '{"signal": "deploy_ready"}'

# The message's "from" is auto-populated:
# {
#   "from": {
#     "agent": "github.com/myorg/tests/main",
#     "pid": 12345,
#     "pwd": "/home/user/repos/tests"
#   },
#   "to": "app-agent"
# }

# Any process on the machine can address any channel by name
abq send broadcast signal '{"signal": "all_clear"}'

From Erlang OTP:

  • Lightweight processes (~300 bytes each)
  • Asynchronous message passing (no shared memory)
  • Supervision trees for fault tolerance
  • “Let it crash” philosophy

abq agents follow OTP principles:

# Lightweight processes — each agent is just a shell process watching a channel
abq watch worker-1 --handler ./process-task.sh &   # PID 1001
abq watch worker-2 --handler ./process-task.sh &   # PID 1002
abq watch worker-3 --handler ./process-task.sh &   # PID 1003
# Three agents, no shared memory, ~0 overhead when idle

# Asynchronous message passing — send is fire-and-forget
abq send worker-1 command "analyze dataset-a.csv"
# Returns immediately; worker-1 processes asynchronously

# Supervision — a supervisor script monitors workers and restarts on failure
#!/bin/sh
# supervisor.sh — "let it crash" with restart
while true; do
    abq send supervisor status '{}'
    for worker in worker-1 worker-2 worker-3; do
        # Check if worker process is alive
        if ! abq check --channel "$worker" --alive; then
            echo "Restarting $worker"
            abq watch "$worker" --handler ./process-task.sh &
        fi
    done
    sleep 10
done

# "Let it crash" — if a handler fails, the message stays in processing/
# The supervisor (or gc) can requeue it for another worker:
abq requeue worker-1
# Moves stale processing/ messages back to requests/

IPC Patterns

From IPC literature:

PatternTransportUse Case
Pipesstdin/stdoutLinear pipelines
Message QueuesKernel/filesystemAsync decoupled
Shared MemorymmapHigh bandwidth
SocketsTCP/UnixNetwork/local
SignalsKernelSimple notifications

Filesystem-based queues trade throughput for simplicity and debuggability.

abq implements the Message Queue pattern using the filesystem as transport:

# abq is a filesystem message queue — compare to the IPC patterns above:

# Pipes (stdin/stdout) — linear, ephemeral, one producer → one consumer
echo '{"test": true}' | my-agent

# abq — async, persistent, many-to-many, decoupled
abq send my-agent signal '{"test": true}'
# Message persists on disk until consumed; sender doesn't block

# The key advantage over pipes/sockets: debuggability
# When something goes wrong, inspect the queue:
cat ~/.abq/channels/my-agent/requests/req_018d5a3b2c4e.json | jq .
# {
#   "id": "req_018d5a3b2c4e",
#   "type": "signal",
#   "content": "{\"test\": true}",
#   "timestamp": "2026-01-28T12:00:00Z"
# }

Contract Net Protocol

FIPA Contract Net (1980) formalized multi-agent task allocation:

  1. Manager broadcasts call-for-proposals (CFP)
  2. Contractors submit bids (propose)
  3. Manager selects winner(s) (accept/reject)
  4. Contractor performs task, reports result

The abq protocol supports this:

# Manager: CFP
abq send broadcast signal '{"signal":"cfp","task":"analyze_logs"}'

# Contractors respond to reply_to channel
abq send manager signal '{"signal":"propose","bid":{"time":5}}'

# Manager accepts
abq send contractor-1 command "analyze /var/log/app.log"

Publish-Subscribe

Pub/Sub decouples producers from consumers:

  • Topic-based: Subscribe to named channels
  • Content-based: Subscribe to message patterns

The abq broadcast channel provides topic-based pub/sub. Content filtering happens in handlers:

# Topic-based: the broadcast channel is a built-in pub/sub topic
# Any agent can publish to it
abq send broadcast signal '{"signal": "build_complete", "version": "0.4.0"}'

# Multiple subscribers watch the same broadcast channel
# Each gets a copy (fan-out via polling/inotify)
abq watch broadcast --handler ./deploy-on-build.sh &    # subscriber 1
abq watch broadcast --handler ./notify-slack.sh &        # subscriber 2
abq watch broadcast --handler ./update-dashboard.sh &    # subscriber 3

# Content-based filtering: handlers inspect message content
# deploy-on-build.sh only acts on build_complete signals:
#!/bin/sh
SIGNAL=$(jq -r '.content | fromjson | .signal' "$1")
if [ "$SIGNAL" = "build_complete" ]; then
    VERSION=$(jq -r '.content | fromjson | .version' "$1")
    echo "Deploying $VERSION..."
fi
# Other signals are silently ignored

# Named channels as topics: create per-concern channels
abq channel create ci-events
abq channel create deploy-events
abq channel create alert-events

# Producers publish to the relevant topic
abq send ci-events signal '{"signal": "lint_passed"}'
abq send deploy-events signal '{"signal": "rollback", "reason": "health check failed"}'

Why Filesystem Queues?

For local agent coordination (two agents, same machine, “tests done, your turn”), here’s how options compare:

OptionVerdict
Named FIFOTempting, zero deps, but unidirectional and no persistence. If receiver isn’t listening, gone.
Shared file + flockRace conditions, no structure, have to invent locking. Pain.
SQLiteActually underrated. Single file, ACID, survives crashes. But feels heavy for “hey, tests passed”.
Unix socketsFast (~0.1ms), but ephemeral. Requires both processes running.
D-BusDesigned for this on Linux. Heavyweight, weird API, overkill for agents.
RedisGood pub/sub, but now you’re running a daemon. For two processes?
NATSBest “real” option if you need <10ms latency or complex routing. Still a daemon.
ZeroMQ/nanomsgNo broker, fast. But now you’re linking a library everywhere.
MQTT (mosquitto)IoT-focused, lightweight. Still a daemon.
Kafkalol no
IRC (ngircd/miniircd)Unironically fun but massive overkill
KeybaseIf you need encryption + multi-device, sure. You don’t.
Efrit/fs-queueDebuggable with cat and jq. Survives restarts. Works from any language. No daemon.

The filesystem approach wins because the failure mode is “I can literally =ls= the queue and see what’s stuck.” That’s worth a lot when debugging agent coordination at 2am.

When to Use Something Else

ScenarioRecommendation
Same machine, <10 agents, “tests passed” signalsFilesystem queue
Same machine, need <10ms latencyUnix sockets or ZeroMQ
Multiple machinesNATS (single binary)
Complex routing/filteringNATS or Redis pub/sub
Already running RedisJust use Redis
Need persistence + queriesSQLite
Enterprise/”we need Kafka”NATS JetStream

Alternative Transports

For higher throughput or more features:

SystemOverheadFeaturesUse When
Filesystem~1msDebuggable, zero-depsLocal, low-volume
Unix sockets~0.1msFast, stream-basedSame machine, high-vol
NATS~0.5msPub/sub, req/replyMulti-machine
ZeroMQ~0.1msNo broker, patternsLibrary integration
Redis~0.5msPub/sub, persistenceAlready running Redis

The abq design prioritizes simplicity over performance. For most AI agent coordination (messages/second, not messages/millisecond), filesystem is sufficient.

File Watching

Three approaches for detecting new messages:

1. Polling (Portable)

while True:
    files = list(requests_dir.glob("*.json"))
    if files:
        process(files[0])
    time.sleep(0.1)

2. inotify (Linux, FreeBSD 15+)

import inotify.adapters
i = inotify.adapters.Inotify()
i.add_watch(str(requests_dir))
for event in i.event_gen(yield_nones=False):
    if 'IN_CREATE' in event[1]:
        process(event[3])

Note: FreeBSD 15 added inotify compatibility via libinotify, so the same code works on both Linux and modern FreeBSD.

3. FSEvents/kqueue (macOS/BSD)

# Via fswatch or watchdog library
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

fswatch provides cross-platform file watching:

fswatch -0 ~/.abq/channels/my-agent/requests | xargs -0 -n1 ./handler.sh

The abqwatch command uses polling (0.5s default) for portability. For lower latency, use inotify/FSEvents directly.

Security Considerations

Filesystem-based queues inherit Unix permissions:

  • Channel directories can be mode 700 (private) or 770 (group)
  • Messages are readable by channel owner
  • No authentication beyond filesystem permissions

For multi-user scenarios, consider:

  • Separate channels per user/group
  • Signed messages (JSON Web Signatures)
  • Encrypted content (age, GPG)

References

Papers

Specifications

Implementations

  • Efrit - Emacs AI agent with file-based queue
  • fswatch - Cross-platform file change monitor
  • Maildir - djb’s maildir specification

Patterns

License

MIT

About

Minimal filesystem-based message bus for agent coordination

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors