Skip to content

feat(action-log-sink): port ActionLogStreamSink to Go (ADR-0045 Phase B)#66

Merged
DioCrafts merged 1 commit into
mainfrom
migration/spark-removal-phase-b
May 17, 2026
Merged

feat(action-log-sink): port ActionLogStreamSink to Go (ADR-0045 Phase B)#66
DioCrafts merged 1 commit into
mainfrom
migration/spark-removal-phase-b

Conversation

@DioCrafts
Copy link
Copy Markdown
Owner

Summary

Phase B of ADR-0045 — new Go service services/action-log-sink/ that replaces the Scala com.openfoundry.audit.ActionLogStreamSink (Spark Structured Streaming) shipped in services/pipeline-runner-spark/. Consumes ontology.actions.applied.v1 Kafka topic and appends batches to the Iceberg lakekeeper.default.action_log table via the OpenFoundry Iceberg HTTP append adapter (POST /openfoundry/iceberg/v1/append, served by services/iceberg-catalog-service).

The dev SparkApplication CR at infra/dev/action-log-sink.yaml is rewritten as an apps/v1 Deployment + ClusterIP Service with terminationGracePeriodSeconds=90 and a rolling strategy of maxSurge=0 to prevent two consumers in the same group racing partitions during a deploy.

Architecture

Mirrors the established services/audit-sink / services/ai-sink pattern:

Package Responsibility
internal/envelope Wire format (15 required + optional fields from libs/ontology-kernel/handlers/actions/side_effects.go::publishActionAuditToKafka). Decode returns typed *DecodeError / *ValidateError; IsPoison classifies them.
internal/writer Writer interface + IcebergWriter (HTTP adapter) + JSONLWriter (dev fallback selected via ACTION_LOG_SINK_JSONL_PATH). Posts the canonical TableSpec shape with 16-column schema, partition day(applied_at_ms), sort applied_at_ms ASC.
internal/runtime Kafka → batch → Writer.AppendCommitMessages loop. At-least-once: offsets advance only after a successful append. Poison records carry their offset commit alongside the next good flush so the consumer does not head-of-line-block.
internal/server /healthz + /metrics on a service-local Prometheus registry.
cmd/action-log-sink/main.go Signal-aware lifecycle, env-driven wiring, dev-fallback switch.

At-least-once + dedup contract

Documented in services/action-log-sink/README.md and in the new ADR section is the deliberate downgrade from Spark Structured Streaming exactly-once to at-least-once with downstream DISTINCT event_id dedup. event_id is the producer-generated UUID, immutable across replays.

Any downstream query on action_log that aggregates without DISTINCT event_id (e.g. row-count metrics, audit-trail counters) must be reviewed before this sink ships to production. The README flags this explicitly.

Test plan

  • go build ./... from the worktree (full repo)
  • go vet ./... (full repo)
  • go test -race ./services/action-log-sink/... (4 packages, ~14 sub-tests)
    • envelope: happy decode, invalid JSON, every required field missing, applied_at_ms=0 rejected.
    • writer: 16-column body shape against httptest.Server, 404 → ErrTableNotFound, 409/422 → ErrSchemaMismatch, 5xx → ErrCommitFailed, empty-batch guard.
    • config: env-var parsing happy path, missing required, JSONL skips catalog requirement, invalid integer.
    • runtime: flush-by-MaxRecords with offset commits, poison records committed but not appended, writer error preserves batch (no commit), ctx-cancel triggers final flush.
  • Docker build + push + Deployment apply on the dev k3s cluster (deferred — same docker push parallelism issue as Phase A, plus broader cluster stability work tracked separately). Validation steps once the cluster is healthy:
    docker build -t localhost:5001/openfoundry/action-log-sink:0.1.0 -f services/action-log-sink/Dockerfile .
    # docker save + limactl cp + ctr import on the 4 nodes (Phase A workaround) OR
    # kubectl port-forward + docker push once the registry forwarder is stable
    kubectl apply -f infra/dev/action-log-sink.yaml
    kubectl -n openfoundry rollout status deploy/action-log-sink
    # produce a few test actions via the ontology API, then verify the rows landed:
    curl -s http://iceberg-catalog-service:8080/iceberg/v1/namespaces/default/tables/action_log | jq '.metadata.current-snapshot-id'

Deviation from ADR-0045 text

ADR § Decision point 5 reads "append via apache/iceberg-go writeBuilder against the Iceberg catalog". The established repo pattern (audit-sink, ai-sink) is the HTTP append adapter because iceberg-go's write-side is not stable end-to-end (documented in both sinks' READMEs). This PR follows the repo pattern, not the ADR text. Phase A already had to pin substrait-protobuf down to ship the read path of iceberg-go; the write path is in worse shape. Re-evaluation tracked as a follow-up when iceberg-go upstream catches up.

Follow-ups (out of scope)

  • Phase C: operator-plan runtime that collapses DISTRIBUTED + FASTER (the hard one).
  • Phase D: delete services/pipeline-runner-spark/, Spark Operator chart, related docs.
  • End-to-end smoke against a live Lakekeeper-backed cluster (this PR's deferred test plan items).

🤖 Generated with Claude Code

Phase B of ADR-0045 replaces the Scala `com.openfoundry.audit.ActionLogStreamSink`
(Spark Structured Streaming) with a new Go service `action-log-sink`
that consumes `ontology.actions.applied.v1` Kafka topic and appends
each batch to `lakekeeper.default.action_log` Iceberg table via the
OpenFoundry Iceberg HTTP append adapter served by
iceberg-catalog-service. Same topic, table, and 16-column schema as
the Scala original.

Architecture mirrors services/audit-sink and services/ai-sink:
- internal/envelope decodes + validates the wire format (15 required
  + optional fields from libs/ontology-kernel side_effects.go).
- internal/writer wraps the HTTP append adapter and adds a JSONL
  dev fallback selected via ACTION_LOG_SINK_JSONL_PATH.
- internal/runtime runs the Kafka -> batch -> Writer.Append ->
  CommitMessages loop. At-least-once semantics: offsets advance only
  after a successful Iceberg append. Poison records are committed
  alongside the next successful flush to avoid head-of-line blocking.
- internal/server exposes /healthz + /metrics on a service-local
  Prometheus registry (action_log_sink_lag_seconds / records_total /
  batch_size_records / commits_total{outcome}).

The dev infra/dev/action-log-sink.yaml SparkApplication CR is
rewritten as an apps/v1 Deployment + ClusterIP Service with
terminationGracePeriodSeconds=90 so the final flush completes before
SIGKILL. Rolling update is configured with maxSurge=0 to prevent two
consumers in the same group racing partitions during a deploy.

Note vs the ADR-0045 text: Phase B description says "append via
iceberg-go writeBuilder" but the established repo pattern (audit-sink,
ai-sink) is the HTTP adapter; iceberg-go's write-side is not stable
enough today. The Iceberg write path will be re-evaluated when
iceberg-go upstream catches up (tracked alongside Phase A's
gocloud/S3-signing pins).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@DioCrafts DioCrafts merged commit 862b940 into main May 17, 2026
4 of 12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant