Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions alembic/versions/023_messaging_primitive_multi_shell.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"""Messaging primitive — multi-shell same-agent claims (PR-5a of Dock-readiness).

Adds the ``agent_shells`` table. Each row is one running process that
has registered to receive messages addressed to a given Agent.slug.
Same agent can have N concurrent shells (e.g., ``argus@govind`` running
in Claude Code AND Cursor on the same machine).

Push delivery fans out to all live shells. Poll-fetch returns the same
inbox to all shells; the SDK dedupes via Idempotency-Key on the agent
side (each shell sees the same message but only ONE acks it).

Schema overview:

* ``agent_shells`` — per-process registration. PK ``ash_<12 alphanum>``.
Each shell has its own ``webhook_url`` + ``webhook_secret`` (so
different shells can deliver to different local endpoints — Cursor's
port, Claude Code's port, etc.).

* ``agent_shells.last_heartbeat_at`` — used to prune dead shells.
A shell that hasn't heartbeat'd in N minutes is treated as offline
(push delivery skips it; subsequent registrations may displace it).

The existing ``agents.webhook_url`` + ``agents.webhook_secret`` columns
are kept for backward compat. Treat them as the "legacy single-shell"
shape — when multi-shell is in use, those columns can be NULL and
shells own delivery. Service layer (in a follow-up PR or this same
deploy) reads from agent_shells when present.

Backward-compat contract:

* Additive only. No column on ``agents`` removed.
* ``agent_shells`` is empty initially → existing single-shell semantics
preserved exactly. Existing tests pass without modification.
* Integrators who want multi-shell behavior register shells via
``POST /v1/agents/{ref}/shells`` and the service layer fans out.

Revision ID: 023
Revises: 022
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID

revision = "023"
down_revision = "022"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.create_table(
"agent_shells",
sa.Column("id", sa.String(length=20), primary_key=True),
sa.Column(
"agent_id",
sa.String(length=20),
sa.ForeignKey("agents.id", ondelete="CASCADE"),
nullable=False,
index=True,
),
# User scope is denormalized here so push-delivery + the
# per-user concurrent-delivery cap can scope without a join
# back to agents → users. Same source-of-truth FK chain.
sa.Column(
"user_id",
UUID(as_uuid=True),
sa.ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
index=True,
),
# Per-shell delivery target. Mirrors agents.webhook_url shape.
# NULL = poll-only shell (still gets inbox poll-fetch hits, no
# push). webhook_secret is set/unset together with webhook_url
# via the same paired-constraint pattern.
sa.Column("webhook_url", sa.Text(), nullable=True),
sa.Column("webhook_secret", sa.String(length=80), nullable=True),
# Optional human label so admins can tell shells apart in
# /agents/{ref}/shells listings ("claude-code on laptop",
# "cursor on desktop", etc.).
sa.Column("label", sa.String(length=128), nullable=True),
# Presence + heartbeat. last_heartbeat_at is bumped on every
# successful poll-fetch + on POST /v1/agents/{ref}/shells/{id}/heartbeat.
# Stale shells (heartbeat > N minutes ago) are skipped by push
# delivery and may be pruned by a periodic cleanup task.
sa.Column(
"status",
sa.String(length=16),
nullable=False,
server_default=sa.text("'online'"),
),
sa.Column(
"last_heartbeat_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
sa.Column(
"registered_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
sa.CheckConstraint(
"status IN ('online', 'offline', 'away')",
name="valid_shell_status",
),
sa.CheckConstraint(
"(webhook_url IS NULL) = (webhook_secret IS NULL)",
name="shell_webhook_url_secret_paired",
),
)

# Most-common push-delivery query: live shells of a given agent
# ordered by heartbeat freshness so push retries the freshest first.
op.create_index(
"ix_agent_shells_active",
"agent_shells",
["agent_id", "status", "last_heartbeat_at"],
)


def downgrade() -> None:
op.drop_index("ix_agent_shells_active", table_name="agent_shells")
op.drop_table("agent_shells")
3 changes: 2 additions & 1 deletion app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from app.middleware.rate_limit import RateLimitMiddleware
from app.middleware.request_id import RequestIdMiddleware
from app.redis import close_redis
from app.routers import agents, alerts, auth_routes, cues, device_code, echo, executions, health, internal_users, messages, usage, webhook_secret, workers
from app.routers import agent_shells, agents, alerts, auth_routes, cues, device_code, echo, executions, health, internal_users, messages, usage, webhook_secret, workers
from app.utils.logging import setup_logging


Expand Down Expand Up @@ -162,6 +162,7 @@ async def generic_error_handler(request: Request, exc: Exception):
app.include_router(webhook_secret.router)
app.include_router(alerts.router)
app.include_router(agents.router)
app.include_router(agent_shells.router)
app.include_router(messages.router)

# ─── PR-5c: external auth backend internal endpoints ──────────────
Expand Down
3 changes: 2 additions & 1 deletion app/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
from app.models.worker import Worker
from app.models.alert import Alert
from app.models.agent import Agent
from app.models.agent_shell import AgentShell
from app.models.message import Message
from app.models.usage_messages_monthly import UsageMessagesMonthly

__all__ = [
"User", "Cue", "Execution", "DispatchOutbox", "UsageMonthly", "DeviceCode",
"Worker", "Alert", "Agent", "Message", "UsageMessagesMonthly",
"Worker", "Alert", "Agent", "AgentShell", "Message", "UsageMessagesMonthly",
]
93 changes: 93 additions & 0 deletions app/models/agent_shell.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""AgentShell — per-process registration for multi-shell agents (PR-5a).

Each row is one running process that has registered to receive
messages addressed to a given ``Agent.slug``. The same agent can have
N concurrent shells — e.g., ``argus@govind`` running in Claude Code
AND Cursor AND OpenClaw simultaneously, each delivering to its own
local webhook port.

Why this exists
---------------

The original v1 messaging spec §2.3 has slugs lock-after-set with one
``webhook_url`` per agent. That's correct for the single-process
mental model but breaks when the SAME agent identity runs in multiple
shells on the same machine — a common pattern for Dock Connect users
running multiple AI tools side-by-side.

This table lets that work: one canonical Agent identity, N live shells
each holding its own webhook target. Push delivery fans out; the
SDK dedupes message handling at the agent layer via the existing
Idempotency-Key path.

Status field (online / offline / away) is the same vocabulary as
``agents.status`` — shells inherit the presence concept from agents.
A shell that hasn't heartbeat'd within
``MESSAGE_DELIVERY_STALE_AFTER_SECONDS`` is treated as offline by
push delivery (skipped). A periodic cleanup task can hard-delete
shells that have been offline > N hours.
"""
from __future__ import annotations

from sqlalchemy import (
CheckConstraint,
Column,
DateTime,
ForeignKey,
Index,
String,
Text,
func,
)
from sqlalchemy.dialects.postgresql import UUID

from app.database import Base


class AgentShell(Base):
__tablename__ = "agent_shells"

id = Column(String(20), primary_key=True)
agent_id = Column(
String(20),
ForeignKey("agents.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
user_id = Column(
UUID(as_uuid=True),
ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
webhook_url = Column(Text, nullable=True)
webhook_secret = Column(String(80), nullable=True)
label = Column(String(128), nullable=True)
status = Column(String(16), nullable=False, default="online", server_default="online")
last_heartbeat_at = Column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
)
registered_at = Column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
)

__table_args__ = (
CheckConstraint(
"status IN ('online', 'offline', 'away')",
name="valid_shell_status",
),
CheckConstraint(
"(webhook_url IS NULL) = (webhook_secret IS NULL)",
name="shell_webhook_url_secret_paired",
),
Index(
"ix_agent_shells_active",
"agent_id",
"status",
"last_heartbeat_at",
),
)
Loading
Loading