Skip to content

feat: add outbound WebSocket sources#134

Draft
tofarr wants to merge 5 commits into
mainfrom
feat/outbound-websocket-sources
Draft

feat: add outbound WebSocket sources#134
tofarr wants to merge 5 commits into
mainfrom
feat/outbound-websocket-sources

Conversation

@tofarr
Copy link
Copy Markdown
Contributor

@tofarr tofarr commented May 22, 2026

Summary

Adds outbound WebSocket sources — a new kind of event source that complements the existing inbound webhook system.

Where custom webhooks require an external service to POST to us (and a URL-verification handshake), WebSocket sources flip the direction: the automation service initiates and maintains a persistent connection to the external service and receives pushed events over it. This is the architecture required for Slack Socket Mode, which was blocked from using inbound webhooks due to Slack's URL-verification challenge.

New components

OutboundWebSocketSource model + migration 007

Single-table with a kind discriminator ("generic" | "slack"):

  • Common: id, org_id, name, source slug, enabled
  • JMESPath: event_key_expr, payload_expr, filter_expr
  • generic: url (static wss://), headers (upgrade headers)
  • slack: app_token (xapp-… for apps.connections.open)
  • Runtime state: status, status_detail, connected_at, last_event_at

Pydantic schemas (discriminated union on kind)

  • GenericWebSocketSourceCreate — validates wss:// URL + JMESPath expressions
  • SlackWebSocketSourceCreate — validates xapp-… token; sets Slack-appropriate defaults (event_key_expr="payload.event.type", payload_expr="payload.event")
  • WebSocketSourceUpdate — partial updates; kind and source slug are immutable
  • WebSocketSourceResponse — credentials (headers, app_token) never returned

CRUD router (/v1/websocket-sources)

POST   /v1/websocket-sources                 create + notify SocketManager
GET    /v1/websocket-sources                 list (org-scoped)
GET    /v1/websocket-sources/{id}            get with live status
PATCH  /v1/websocket-sources/{id}            update + reconnect if credentials change
DELETE /v1/websocket-sources/{id}            delete + close connection
POST   /v1/websocket-sources/{id}/reconnect  force immediate reconnect

SocketManager background service

One asyncio.Task per enabled source with exponential back-off reconnect (1 s → 5 min cap, pauses after 10 consecutive failures):

  • generic: connects to static wss:// URL with optional HTTP headers
  • slack: calls apps.connections.open to get a fresh URL on each connect; sends per-message envelope ACKs before any dispatch work (meets Slack's ~3 s deadline); handles server-initiated disconnect events with a clean reconnect

Dispatch pipeline (shared with inbound webhooks after the socket receive step):

ACK (if required) → connection-level filter_expr → event_key_expr
→ payload_expr (unwrap envelope) → trigger matching → create PENDING runs

Reuses get_event_automations, matches_trigger, and create_automation_run unchanged.

App wiring

SocketManager sSocketManager sSocketManager sSocketManager s`SocketMae main automation router to avoid UUID route conflicts.

What stays the same

The automation trigger model is unchanged: event-triggered automations reference a WebSocket source by its source slug in trigger.source, use trigger.on for event-key pattern matching, and trigger.filter for JMESPath payload filtering �The automation trigger model is unchanged: event-triggered automations reference a WebSocket source by its source slug in trigger.source, use trigger.on for event-key pattern matching, and trigger.filter for JMESPath paytype is a shell builtin
type is a shell builtin
type is a shell builtin``
ype" \ve "type" \ve "type" \ve "type" \ve "typeter": \ng.ype" \ve R was created by an AI agent (OpenHands) on behalf of the user._

Adds client-initiated outbound WebSocket connections as a new event
source kind, complementing the existing inbound webhook system.

Where custom webhooks require an external service to POST to the
automation service, WebSocket sources flip the direction: the
automation service initiates and maintains a persistent connection
to the external service and receives pushed events over it. This
eliminates the URL-verification handshake problem that blocks
services like Slack Socket Mode from using inbound webhooks.

## New components

### OutboundWebSocketSource model (models.py, migration 007)
Single-table with a 'kind' discriminator ('generic' | 'slack'):
- Common fields: id, org_id, name, source slug, enabled
- JMESPath fields: event_key_expr, payload_expr, filter_expr
- generic fields: url (static wss://), headers (upgrade headers)
- slack fields: app_token (xapp-… for apps.connections.open)
- Runtime state: status, status_detail, connected_at, last_event_at

### Pydantic schemas (schemas.py)
Discriminated union on 'kind' for create requests:
- GenericWebSocketSourceCreate – validates wss:// URL + JMESPath exprs
- SlackWebSocketSourceCreate – validates xapp-… token + Slack defaults
- WebSocketSour- WebSocketSour- WebSocketSour- WebSocketSour- WebSocketSourebSocketSourceResponse – credentials (headers, app_token) never returned

### CRUD router (websocket_source_router.py)
/v1/websocket/v1/websocket/v1/websocket/v1/websocket/v1/websocket/v1/websreate + notify SocketManager
  GET    /v1/websocket-sources       GET    /v1/weboped)
  GET    /v1/websocket-sources/{id}    get with live statu  GET    /v1/websocket-sources/{id}    get with live statu  GE needed
  DELETE /v1/websocket-sources/{id}    delete + close connection
  POST   /v1/websocket-sources/{id}/reconnect  force reconnect

### SocketManager background service (socket_ma### SocketManager background service (socket_ma### SocketManal back-off reconnect:
- generic: connects to static wss:// URL with opt- generic: connects to static wss apps.connections.open to get a fresh URL on each connect,
  sends per-message envelope ACKs before dispatch (meets Slack's ~3s
  deadline), handles server-initiated disconnect events
Dispatch pipeline (ACK → pre-filter → event_key_expr → payload_expr →
trigger matching → create PENDING runs) reuses get_event_automations,
matches_trigger, and create_automation_run from the webhook pipeline.

### App wiring (app.py)
SocketManager started/stopped in the FastAPI lifespan alongside the
scheduler, dispatcher, and watchdog. Router registered before the main
automation router to avoid UUID route conflicts.

### Dependency (pyproject.toml)
websockets>=12 added for client-swebsockets>=12 added for client-swebsockets>=12 added for con trigger model is unchanged: event-triggered automations
reference a WebSocket source by its 'source' slug in trigger.source,
use trigger.on for event-key pattern matching, and trigger.filter for
JMESPath payload filtering — identical to inbound webhooks.

Co-authored-by: openhands <openhands@all-hands.dev>
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 22, 2026

🚀 Deploy Preview PR Created/Updated

A deploy preview has been created/updated for this PR.

Deploy PR: https://github.com/OpenHands/deploy/pull/4388
Automation SHA: 1e442d345adb34dbaca231af50732efd9896664a
Last updated: May 22, 2026, 08:06:24 AM ET

Once the deploy PR's CI passes, the automation service will be deployed to the feature environment.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 22, 2026

Coverage

Copy link
Copy Markdown

@all-hands-bot all-hands-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taste Rating: 🟡 Acceptable — Core architecture is sound, but has security and validation gaps that need addressing.

This is a well-structured implementation of outbound WebSocket sources. The single-table discriminator pattern works well, the task management is clean, and tests are comprehensive. However, there are some critical security and validation issues that must be fixed before merging.


[RISK ASSESSMENT]

  • [Overall PR] ⚠️ Risk Assessment: 🟡 MEDIUM

This feature handles sensitive credentials (Slack app tokens, auth headers) and manages persistent background tasks. The main risks are: (1) credentials stored in plaintext in the database with no encryption layer, (2) potential for misconfigured sources to enter failure loops due to missing validation, and (3) broad exception handling that could mask database connection issues. The WebSocket connection management itself is solid with proper backoff and cleanup.

Recommendation: Address the credential storage and validation issues before merging. Consider adding encryption-at-rest for sensitive fields and strengthening validation in the update endpoint.


VERDICT:
Worth merging after fixes: The architecture is clean and the implementation is thorough, but the security and validation issues listed below must be addressed.

KEY INSIGHT:
The discriminated union pattern for multi-kind sources works well here, but required-field validation needs to happen at both the API layer (for immediate feedback) and the runtime layer (for defense in depth) — currently only the runtime layer has it.


Was this automated review useful? React with 👍 or 👎 to this review to help us measure review quality.
Workflow run: https://github.com/OpenHands/automation/actions/runs/26267703709

Comment thread openhands/automation/models.py Outdated
Comment thread openhands/automation/models.py Outdated
Comment thread openhands/automation/websocket_source_router.py
Comment thread openhands/automation/socket_manager.py
Comment thread openhands/automation/socket_manager.py
Comment thread migrations/versions/007_outbound_websocket_sources.py
Comment thread openhands/automation/socket_manager.py
Comment thread openhands/automation/socket_manager.py
Resolves all flagged review comments on PR #134.

## Security: encrypt app_token and headers at rest (🟠 x2)

Implements the same Fernet-based cipher pattern used by StaticSecret and
LookupSecret in the OpenHands SDK (openhands.sdk.utils.cipher):

utils/cipher.py
  Cipher class with SHA-256 key derivation from AUTOMATION_SECRET_KEY /
  OH_SECRET_KEY env var (checked in that order).  is_ciphertext() uses the
  'gAAAAA' Fernet token prefix to distinguish encrypted values from legacy
  plaintext rows — no data migration required when the key is introduced.
  Falls back to plaintext (with a one-time WARNING) when neither key is set.

utils/encrypted_fields.py
  EncryptedString — SQLAlchemy TypeDecorator for single-string columns.
  Mirrors StaticSecret.value: encrypts on process_bind_param (write),
  decrypts on process_result_value (read).

  EncryptedJSONHeaders — TypeDecorator for dict[str, str] header columns.
  Mirrors LookupSecret._serialize_secrets/_validate_secrets: only encrypts
  header values whose key matches SECRET_KEY_PATTERNS (Authorization, Cookie,
  X-Api-Key, Token, etc.); non-sensitive headers are stored unencrypted.

models.py
  app_token: String(255) -> EncryptedString(255)
  headers:   JSON        -> EncryptedJSONHeaders

cryptography>=42 added as an explicit pyproject.toml dependency (was
transitively available via openhands-sdk; now declared directly).

## Validation: fix update endpoint kind-specific guard (🔴)

websocket_source_router.py
  After applying PATCH updates, validates that kind-required fields are still
  set: url for generic sources, app_token for slack sources.  Returns 422 if
  either is cleared, preventing silent runtime failures in SocketManager.

## Error handling: expose CancelledError in DB helpers (🟠 x2)

socket_manager.py
  _set_status and _record_event now re-raise asyncio.CancelledError before
  the broad except-Exception handler so that task cancellation (graceful
  shutdown) is never masked.  logger.exception (ERROR + traceback) is
  preserved for all other DB write failures.

Co-authored-by: openhands <openhands@all-hands.dev>
Fixes all pre-commit failures caught by ci/backend:

- E501: shorten error-message strings in websocket_source_router.py
- F841: remove unused captured_payload variable in test
- ARG002: remove unused ack_ws/ack_id params from _dispatch (and all
  call sites); prefix dialect params with _ in TypeDecorators
- ruff format: reformat app.py, models.py, encrypted_fields.py,
  test_cipher_and_encryption.py, test_websocket_source_router.py

Co-authored-by: openhands <openhands@all-hands.dev>
Three distinct issues caught by running pre-commit locally:

ruff lint (import order)
  migrations/007: Boolean before JSON - ruff isort wanted JSON first.
  Applied auto-fix.

pyright reportIncompatibleMethodOverride
  encrypted_fields.py: renaming the TypeDecorator dialect parameter to
  _dialect (to silence ruff ARG002) changed the parameter name relative
  to the SQLAlchemy base class, which pyright flags as an incompatible
  override in standard mode.
  Fix: restore the name to 'dialect' and annotate it as 'Any' instead.
  'Any' is bidirectionally compatible with 'Dialect' in pyright, so the
  override check passes; noqa: ARG002 silences ruff.

pyright reportOptionalSubscript (x10 in tests)
  process_bind_param/process_result_value return dict | None; tests were
  subscripting the result without a None guard, which pyright flags.
  Fix: added 'assert stored/restored/result is not None' before each
  subscript in the affected test cases.

Co-authored-by: openhands <openhands@all-hands.dev>
Two changes requested in code review:

## 1. Use openhands.sdk.utils.cipher.Cipher (not a local copy)

  - Delete utils/cipher.py — the automation service was duplicating the
    SDK's Cipher class verbatim. The SDK is already a declared dependency
    so there is no reason to maintain a second copy.

  - utils/encrypted_fields.py: import Cipher and FERNET_TOKEN_PREFIX from
    openhands.sdk.utils.cipher. Adapt callers to the SDK interface:
      · cipher.encrypt(SecretStr(value)) instead of cipher.encrypt(value)
      · cipher.try_decrypt_str(v) instead of cipher.decrypt(v)
      · v.startswith(FERNET_TOKEN_PREFIX) instead of cipher.is_ciphertext(v)

  - Remove cryptography>=42 from pyproject.toml — the SDK already brings
    this in transitively.

## 2. Use DiscriminatedUnionMixin instead of a hand-rolled discriminated union

  Following the SecretSource / StaticSecret / LookupSecret pattern:

  schemas.py
    - _WebSocketSourceBase(BaseModel) → WebSocketSourceCreate(DiscriminatedUnionMixin, ABC)
                       s the computed 'kind' field (= class name) and the
      model_validator that dispatches to the correct subclass — no need for
      explicit 'kind: Literal[...]' fields or an Annotated union type alias.
    - GenericWebSocketSourceCreate → GenericWebSocketSource (kind = class name)
    - SlackWebSocketSourceCreate   → SlackWebSocketSource  (kind = class name)
    - Remove WebSocketSourceCreate = Annotated[...] union alias (not needed)
    - Move event_key_expr / payload_expr to the abstract base class (common
      to both kinds); SlackWebSocketSource overrides their defaults.

  Kind values change from lowercase slugs to class names:
    generic → GenericWebSocketSource
    slack   → SlackWebSocketSource
  Updated everywhere: router, socket_manager, models comments, tests.

Co-authored-by: openhands <openhands@all-hands.dev>
@tofarr tofarr marked this pull request as draft May 22, 2026 19:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants