Skip to content

feat: operation events system#807

Merged
alexluong merged 13 commits intomainfrom
feat/opevents-core
Apr 8, 2026
Merged

feat: operation events system#807
alexluong merged 13 commits intomainfrom
feat/opevents-core

Conversation

@alexluong
Copy link
Copy Markdown
Collaborator

@alexluong alexluong commented Apr 6, 2026

Summary

  • Add internal/opevents/ package: Emitter interface with topic filtering + 3-retry, Sink interface (HTTP with HMAC-SHA256 signing, MQ adapter wrapping internal/mqs, Noop), OperationEvent envelope, config with nested sink sub-structs
  • Move alert evaluation from deliverymq fire-and-forget goroutine into logmq batch processor with per-entry ack/nack for at-least-once delivery
  • alert.destination.consecutive_failure — emitted at percentage thresholds (hard-coded 50/70/90/100%)
  • alert.destination.disabled — emitted when destination auto-disabled at 100% threshold
  • alert.event.exhausted_retries — emitted when delivery exhausts all retry attempts, with per-destination suppression window via idempotence package
  • tenant.subscription.updated — emitted from API handlers on destination create/update/delete/disable/enable when tenant-level topics or destinations_count changes (best-effort)
  • Replace legacy alert HTTP callback e2e tests with opevents e2e suite (3 flows: consecutive failure + disable, exhausted retries, subscription updated)

HTTP Sink Signature

The HTTP sink signs each request body with HMAC-SHA256 and sends the signature in the X-Outpost-Signature header. The format matches the default webhook destination signature scheme:

  • Signed content: raw JSON body
  • Algorithm: HMAC-SHA256
  • Encoding: hex
  • Header format: X-Outpost-Signature: v0=<hex>

Verification example:

import hmac, hashlib
expected = hmac.new(signing_secret.encode(), body, hashlib.sha256).hexdigest()
assert signature_header == f"v0={expected}"

Commits

  1. 4941a835 — opevents core (envelope, emitter, sink interface, noop sink, config)
  2. 362e7496 — HTTP sink (HMAC-SHA256) + MQ sink (adapter wrapping mqs.Queue)
  3. dec44cc0 — Move consecutive_failure alert from deliverymq → logmq
  4. c8ab9f67 — Destination auto-disable + disabled alert at 100% threshold
  5. 7d7c8e5d — exhausted_retries with idempotency window suppression
  6. 5bff3d18 — tenant.subscription.updated from API handlers
  7. f31cb810 — Replace alert e2e suite with opevents e2e suite
  8. 50652d61 — Align HTTP sink signature format with webhook default (v0=)

Test plan

  • Unit tests: go test ./internal/alert/... ./internal/opevents/... ./internal/logmq/... ./internal/deliverymq/... ./internal/apirouter/... -count=1
  • E2e tests: make test/e2e (requires full infra)
  • QA: all 4 opevent topics verified end-to-end against local HTTP sink (14/14 pass)

🤖 Generated with Claude Code

@vercel
Copy link
Copy Markdown

vercel bot commented Apr 6, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
outpost-docs Ready Ready Preview, Comment Apr 8, 2026 9:14am
outpost-website Ready Ready Preview, Comment Apr 8, 2026 9:14am

Request Review

alexluong added a commit that referenced this pull request Apr 7, 2026
- Redact secrets in opevents sink configs with json:"-" tags
- Fix router_test emitter harness to not silently ignore injected emitters
- Add comment clarifying Nack safety (InsertMany is idempotent by attempt ID)
- Fix %% escaping in e2e test assertion message
- Drain HTTP response body for connection reuse; include body in 4xx errors
- Panic early in NewAlertMonitor if emitter is nil
- Skip exhausted_retries check when retryMaxLimit=0 (retries disabled)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@alexluong alexluong marked this pull request as ready for review April 7, 2026 12:31
@alexluong alexluong requested a review from alexbouchardd April 7, 2026 12:31
@alexluong
Copy link
Copy Markdown
Collaborator Author

alexluong commented Apr 7, 2026

QA Results: Operation Events

PASS (14/14) — all 4 opevent topics verified end-to-end against a local HTTP sink.

# Test Result
1 Create destination emits subscription update
2 Verify tenant.subscription.updated event received
3 Update destination topics
4 Verify subscription update reflects topic change
5 Delete destination
6 Verify subscription update after delete (count 1→0)
7 Create failing destination (→ 500)
8 Publish 10 events to trigger consecutive failures
9 Verify 4 alert.destination.consecutive_failure events at 50/70/90/100%
10 Verify alert.destination.disabled event (reason=consecutive_failure)
11 Confirm destination disabled via API
12 Create destination for exhausted retries
13 Publish retry-eligible event
14 Verify alert.event.exhausted_retries event (attempt_number=3 > limit=2)

Config overrides used: consecutive_failure_count: 10, retry_max_limit: 2, retry_interval_seconds: 1.


Test Cases

tenant.subscription.updated

Trigger: Destination create, update (topics change), delete, disable, enable — when tenant-level `topics` or `destinations_count` changes.

  1. Create destination — `POST /tenants/{id}/destinations` with `topics: ["user.updated"]`

    • Assert: event emitted with `destinations_count: 1`, `previous_destinations_count: 0`, `topics: ["user.updated"]`, `previous_topics: []`
  2. Update destination topics — `PATCH /tenants/{id}/destinations/{id}` with `topics: ["user.deleted"]`

    • Assert: event emitted with `topics: ["user.deleted"]`, `previous_topics: ["user.updated"]`, count unchanged
  3. Update config only (no topic change) — should NOT emit (no-op guard on unchanged topics/count)

  4. Delete destination — `DELETE /tenants/{id}/destinations/{id}`

    • Assert: event emitted with `destinations_count: 0`, `previous_destinations_count: 1`, `topics: []`

alert.destination.consecutive_failure

Trigger: Delivery failures hitting threshold percentages of `consecutive_failure_count`.

  • With `consecutive_failure_count: 10` and default thresholds `[50, 70, 90, 100]`:

    • Failure 5 → 50% threshold
    • Failure 7 → 70% threshold
    • Failure 9 → 90% threshold
    • Failure 10 → 100% threshold
  • Test: Publish 10 events to a destination returning 500. Wait for batch processor (10s). Assert 4 events with `consecutive_failures.threshold` values 50, 70, 90, 100.

  • Success resets counter: A successful delivery between failure batches resets the counter (verified by unit tests).

alert.destination.disabled

Trigger: Destination auto-disabled at 100% consecutive failure threshold (requires `auto_disable_destination: true`).

  • Test: Same flow as above — after 10 consecutive failures, 1 disabled event is emitted with `reason: "consecutive_failure"`.
  • Without disabler: If `auto_disable_destination: false`, consecutive_failure alerts still fire but no disabled event is emitted.

alert.event.exhausted_retries

Trigger: Delivery exhausts all retry attempts (`attempt_number > retry_max_limit`) for a retry-eligible event.

  • With `retry_max_limit: 2`: initial attempt (1) + 2 retries (2, 3). Attempt 3 triggers the event.
  • Idempotency: Same event+destination pair is suppressed within `exhausted_retries_window_seconds`. Different events on the same destination each emit their own alert.
  • Non-eligible events: Events with `eligible_for_retry: false` never emit this alert.
  • Test: Publish 1 retry-eligible event to a failing destination. Wait for retries to exhaust (~5s + batch processing). Assert 1 event with `attempt.attempt_number: 3`.
Payload Schemas

OperationEvent envelope

All events share this envelope:

```json
{
"id": "eR19VlFLH41rQB0uezAG7GIaHD",
"topic": "alert.destination.consecutive_failure",
"time": "2026-04-07T12:25:39.580Z",
"deployment_id": "deployment_1",
"tenant_id": "qa_tenant_opevents",
"data": { ... }
}
```

  • `id` — unique event ID (nanoid)
  • `deployment_id` — omitted if empty (`omitempty`)
  • `tenant_id` — omitted if empty
  • HTTP sink signs payloads with HMAC-SHA256: `X-Outpost-Signature: v0=` (matches default webhook destination signature format)

tenant.subscription.updated

```json
{
"id": "7qL3rbbHleqD0g2Ej7K21AJFrb",
"topic": "tenant.subscription.updated",
"time": "2026-04-07T12:24:54.046Z",
"deployment_id": "deployment_1",
"tenant_id": "qa_tenant_opevents",
"data": {
"tenant_id": "qa_tenant_opevents",
"topics": ["user.updated"],
"previous_topics": [],
"destinations_count": 1,
"previous_destinations_count": 0
}
}
```


alert.destination.consecutive_failure

```json
{
"topic": "alert.destination.consecutive_failure",
"data": {
"tenant_id": "qa_tenant_opevents",
"event": {
"id": "evt_...",
"tenant_id": "qa_tenant_opevents",
"destination_id": "des_...",
"topic": "user.created",
"eligible_for_retry": false,
"time": "2026-04-07T12:25:31.899Z",
"data": { "index": 5 }
},
"attempt": {
"id": "atm_...",
"attempt_number": 1,
"status": "failed",
"code": "500",
"response_data": { "body": "error", "status": 500 }
},
"destination": {
"id": "des_...",
"tenant_id": "qa_tenant_opevents",
"type": "webhook",
"topics": ["user.created"],
"disabled_at": null
},
"consecutive_failures": {
"current": 5,
"max": 10,
"threshold": 50
}
}
}
```


alert.destination.disabled

```json
{
"topic": "alert.destination.disabled",
"data": {
"tenant_id": "qa_tenant_opevents",
"destination": {
"id": "des_...",
"type": "webhook",
"disabled_at": "2026-04-07T12:25:39.580Z"
},
"disabled_at": "2026-04-07T12:25:39.580Z",
"reason": "consecutive_failure",
"event": { "id": "evt_...", "topic": "user.created" },
"attempt": { "id": "atm_...", "attempt_number": 1, "status": "failed", "code": "500" }
}
}
```


alert.event.exhausted_retries

```json
{
"topic": "alert.event.exhausted_retries",
"data": {
"tenant_id": "qa_tenant_opevents",
"event": {
"id": "evt_lPHfLyXbKZazmNLWumz05tiplT",
"topic": "user.deleted",
"eligible_for_retry": true,
"time": "2026-04-07T12:26:04.760Z",
"data": { "test": "exhausted_retries" }
},
"attempt": {
"id": "atm_4vVkCafJkHUIkzgGabdjHhfX1Z",
"attempt_number": 3,
"status": "failed",
"code": "500",
"response_data": { "body": "error", "status": 500 }
},
"destination": {
"id": "des_oGh8ouGw3Aci2DyDOmrbLA8ipg",
"type": "webhook",
"topics": ["user.deleted"],
"disabled_at": null
}
}
}
```

🤖 Generated with Claude Code

Comment on lines +31 to +32
Topics []string `json:"topics"`
PreviousTopics []string `json:"previous_topics"`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is all topic represented as ['*'] or do we sent a list of topics regardless?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It uses the same logic as when we retrieve Tenant. So * -> when any destination subscribes to *. Otherwise it's a list.

@alexbouchardd
Copy link
Copy Markdown
Contributor

2 things I'm doubtful about

  1. alert.event.exhausted_retries not sure that's the right nomenclature, is it the event that exhausted, the destination that exhausted it's retry or that attempt that reached "exhaustion" of retries.

alert.destination.exhausted_retries alert.atttempt.exhasuted_retries

  1. Having second thoughts on including tenant_id at the top level of the event since we end up repeating it everywhere in the data

@alexluong
Copy link
Copy Markdown
Collaborator Author

1: maybe alert.delivery.exhausted_retries? I don't fully get the destination part since retries must be tied to a particular event/delivery.
2: I think it's okay, yes a bit duplicate but could be helpful if people want to apply logic for it. Easier to keep it at top level? Doesn't hurt, I guess.

@alexbouchardd
Copy link
Copy Markdown
Contributor

We don't use the delivery terminology from an operator POV anymore, so alert.delivery.exhausted_retries doesn't work IMO. The most logical to me is alert.atttempt.exhasuted_retries

@alexbouchardd
Copy link
Copy Markdown
Contributor

2: I think it's okay, yes a bit duplicate but could be helpful if people want to apply logic for it. Easier to keep it at top level? Doesn't hurt, I guess.

Doesn't hurt until we need a new event type that's not associated with a tenant 😅

@alexluong
Copy link
Copy Markdown
Collaborator Author

tenant_id can be nullable/omitted. It's already implemented that way

alexluong and others added 8 commits April 8, 2026 15:42
Introduces internal/opevents/ — the foundation for the operation events
system. Includes OperationEvent envelope, Emitter with topic filtering
and retry, Sink interface, NoopSink, and config integration.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
HTTP sink with HMAC-SHA256 payload signing (X-Outpost-Signature header).
MQ sink as thin adapter wrapping mqs.Queue for SQS, Pub/Sub, RabbitMQ.
Config with nested sub-structs per sink type and NewSink() factory.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…events emitter

Replaces AlertNotifier with AlertEmitter (satisfied by opevents.Emitter).
Consecutive failure alert evaluation now happens in logmq batch processor
after log persistence, with per-entry ack/nack for at-least-once delivery.
LogEntry carries Destination for alert context.

Disable logic (DestinationDisabler, disabled alert) removed — deferred to Phase 4.

- alert/: replace AlertNotifier with AlertEmitter, update data types to RFC format
- logmq/: add AlertMonitor to BatchProcessor, per-entry alert eval post-InsertMany
- deliverymq/: remove fire-and-forget alert goroutine, populate LogEntry.Destination
- services/: move alert wiring from BuildDeliveryWorker to BuildLogWorker

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
When consecutive failures reach 100%, disables the destination via
DestinationDisabler and emits alert.destination.disabled event.

Both disable action and alert emission return errors on failure,
causing the logmq entry to be nacked and redelivered. Replay is safe:
SADD is idempotent, DisableDestination is idempotent, consumers
deduplicate events by ID.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Emit alert.event.exhausted_retries when a delivery exhausts all retry
attempts. Uses the idempotence package to suppress duplicate alerts per
destination within a configurable TTL window (default 1h). On emit
failure, the idempotency key is cleared so MQ replay retries correctly.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Snapshot tenant topics and destinations_count before each destination
mutation (create, update, delete, disable, enable), re-fetch after, and
emit tenant.subscription.updated if either value changed. Best-effort:
emission errors are logged but do not block the API response.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove legacy alert HTTP callback tests and mock server. Add opevents
mock server that captures OperationEvent envelopes with HMAC signature
verification. Three e2e test flows: consecutive failure + auto-disable,
exhausted retries, and subscription updated (create, update topics,
delete).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Redact secrets in opevents sink configs with json:"-" tags
- Fix router_test emitter harness to not silently ignore injected emitters
- Add comment clarifying Nack safety (InsertMany is idempotent by attempt ID)
- Fix %% escaping in e2e test assertion message
- Drain HTTP response body for connection reuse; include body in 4xx errors
- Panic early in NewAlertMonitor if emitter is nil
- Skip exhausted_retries check when retryMaxLimit=0 (retries disabled)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
alexluong and others added 5 commits April 8, 2026 15:42
Previously the key was destination-only, so if two distinct events
exhausted retries on the same destination within the TTL window, only
the first emitted an alert. The key now includes event ID so each
event's exhaustion is tracked independently.

Also fixes the WindowSuppression test which was accidentally asserting
the buggy behavior (two different events, expecting 1 alert). It now
tests same-event replay suppression. A new PerEvent test covers the
distinct-event case.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… sink

- Remove duplicate private topic consts from alert/monitor.go; use the
  exported opevents.Topic* constants from internal/opevents/event.go
- NewSink now returns an error when topics are configured but no sink is
  set, preventing silent event loss

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Change X-Outpost-Signature header from `sha256=<hex>` to `v0=<hex>` to
match the default webhook destination signature format configured in
config.go (SignatureHeaderTemplate: "v0={{.Signatures | join ","}}").

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
ConsecutiveFailureCount 20→100, AutoDisableDestination true→false.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…sted_retries

Per review feedback — "attempt" better describes the entity that
exhausted its retries than "event".

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@alexluong alexluong force-pushed the feat/opevents-core branch from 4c8472d to 590e16e Compare April 8, 2026 09:13
@alexluong alexluong merged commit 9c287ac into main Apr 8, 2026
4 checks passed
@alexluong alexluong deleted the feat/opevents-core branch April 8, 2026 12:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants