Skip to content

feat(pipeline-runner): execute pipelineplan.Plan with Iceberg providers (ADR-0045 Phase C.5)#75

Merged
DioCrafts merged 1 commit into
migration/spark-removal-phase-c-2-runtimefrom
migration/spark-removal-phase-c-5-runner
May 18, 2026
Merged

feat(pipeline-runner): execute pipelineplan.Plan with Iceberg providers (ADR-0045 Phase C.5)#75
DioCrafts merged 1 commit into
migration/spark-removal-phase-c-2-runtimefrom
migration/spark-removal-phase-c-5-runner

Conversation

@DioCrafts
Copy link
Copy Markdown
Owner

Summary

Phase C.5 of ADR-0045. The runner side of the new contract: this PR replaces the Scala/Spark pipeline-runner and the `spark-submit` shell-out with a distroless Go binary that decodes a `pipelineplan.Plan` from the `PIPELINE_PLAN_B64` env var (the dispatcher from #73 populates it) and executes it via `libs/pipeline-runtime` (#69) against new Iceberg providers introduced here.

Branch base: stacked on #69 (C.2) because the runner imports both `libs/pipeline-plan` and `libs/pipeline-runtime`. Merge order: C.1 → C.2 → C.5.

New: `internal/providers/`

File Implements Source pattern
`iceberg_reader.go` (`IcebergReader`) `pipelineruntime.Reader` Phase A indexer (`services/iceberg-object-indexer/internal/source/iceberg.go`)
`http_writer.go` (`HTTPWriter`) `pipelineruntime.Writer` Phase B sink (`services/audit-sink/internal/writer/iceberg.go`)

`IcebergReader` carries every Phase A discovery: blank import `_ "github.com/apache/iceberg-go/io/gocloud"` (S3/GCS/Azure scheme handlers), `s3.remote-signing-enabled=false` override (Lakekeeper advertises `s3.signer-uri` but iceberg-go v0.5.0's gocloud S3 adapter does not implement remote signing), and the same REST options (`WithCredential` / `WithAuthURI` / `WithScope` / `WithWarehouseLocation`).

`HTTPWriter` posts to `iceberg-catalog-service`'s `/openfoundry/iceberg/v1/append` (the Phase B pattern). Typed errors `ErrTableNotFound` (404) / `ErrSchemaMismatch` (409, 422) / `ErrCommitFailed` (5xx, transport). `X-Internal-Token` forwarded when set.

Rewritten: `internal/runner/`

File Change
`args.go` Drops `--pipeline-build-url` and the SparkApplication-shaped flags (`--inline-sql`, `--inline-format`). Adds the Iceberg/OAuth knobs the providers need (`--catalog-warehouse`, `--catalog-credential`, `--oauth-token-uri`, `--oauth-scope`, `--table-writer-url`, `--internal-token`, `--log-format`) plus env-var fallbacks. `--smoke` skips field validation so integration CI can hit it without any catalog config.
`run.go` Replaces `ResolveSpec + submitToSpark` with: `loadPlanFromEnv` → `pipelineplan.Plan.Validate` → `pipelineruntime.Executor{Reader, Writer}.Run(ctx, plan)`. Switches to `slog` (text or json via `--log-format`) so the dispatcher's structured logs flow through.
`spec.go` deleted — no more pipeline-build-service HTTP spec fetch. Plan is the wire contract.

Other touches

  • `cmd/pipeline-runner/main.go` documents the new CLI surface; one less import (`runner.Usage` removed).
  • `Dockerfile` collapses from a Spark-image multi-stage (Spark base + Iceberg JAR downloads + sbt-built Scala JAR + Go orchestrator) to a distroless static build mirroring `services/iceberg-object-indexer/Dockerfile` (Phase A). Image size drops from ~700 MB → ~25 MB.
  • `go.mod` adds `apache/iceberg-go@v0.5.0` and pins `substrait-protobuf/go@v0.81.0` — the same Phase A workaround (v0.85.0 removed `extensions.SimpleExtensionURI` which `substrait-go/v7` still references). `arrow-go/v18` trails along at v18.5.2.

Test plan

  • `go vet ./...` clean across the full repo
  • `go build ./...` clean across the full repo
  • `go test -count=1 ./services/pipeline-runner/...` — every package green:
    • `runner`: 10 sub-tests (happy-path arg parsing, env-var fallbacks, smoke-validation gate, unknown-flag rejection, mandatory-field matrix, dangling-flag, bad log-format, Plan env decode happy path, empty env, bad base64, bad JSON, `firstCatalogNameInPlan`)
    • `providers`: 7 sub-tests (HTTPWriter POST shape, token forwarding, 404 → `ErrTableNotFound`, 409/422 → `ErrSchemaMismatch`, 5xx → `ErrCommitFailed`, URL validation matrix)
    • `server`: unchanged

`IcebergReader` has no unit tests in this PR — its surface is too tightly coupled to `apache/iceberg-go` to fake cleanly without a real REST catalog. Smoke against a live Lakekeeper catalog is the validation gate per Phase A discovery (and per ADR-0045 § Phase C exit criterion).

Follow-ups

  • C.6 migrates `infra/dev/poc-pipeline-nodes.yaml` from the SQL-shaped node configs to the structured `planner.NodeConfig` shape (#74 defined it) so the four PoC pipelines run on the new runner end-to-end. That PR also exercises the smoke gate in the dev k3s cluster.

🤖 Generated with Claude Code

…rs (ADR-0045 Phase C.5)

Phase C.5 of ADR-0045 — the runner that consumes the Plan + Job
contract the dispatcher (Phase C.4.a) ships and runs it end-to-end
against Iceberg. The Scala/Spark runner and the spark-submit
shell-out are gone; the binary is now a distroless Go process that
decodes a base64-JSON pipelineplan.Plan from the PIPELINE_PLAN_B64
env var and executes it via libs/pipeline-runtime (Phase C.2)
against the Iceberg providers introduced in this PR.

New: internal/providers/

- iceberg_reader.go (IcebergReader) wraps apache/iceberg-go's REST
  catalog client. Lakekeeper-flavoured: WithCredential / WithAuthURI
  / WithScope / WithWarehouseLocation. Crucially registers
  S3RemoteSigningEnabled=false because iceberg-go v0.5.0's gocloud
  S3 adapter does not implement remote signing and Lakekeeper
  advertises s3.signer-uri on every LoadTable response (Phase A
  discovery). Implements pipelineruntime.Reader; rowStreamFromBatches
  adapts Arrow record batches into the iter.Seq2[Row, error] shape
  with GetOneForMarshal for cell encoding.
- http_writer.go (HTTPWriter) implements pipelineruntime.Writer by
  posting to iceberg-catalog-service's /openfoundry/iceberg/v1/append.
  Same pattern services/audit-sink and services/ai-sink (Phase B)
  use; iceberg-go's write-side is not stable enough for direct use
  (documented in audit-sink/README.md). Typed errors: ErrTableNotFound,
  ErrSchemaMismatch, ErrCommitFailed.

Rewritten: internal/runner/

- args.go drops --pipeline-build-url and the SparkApplication-shaped
  flags (--inline-sql, --inline-format), adds the Iceberg / OAuth
  knobs the providers need (--catalog-warehouse, --catalog-credential,
  --oauth-token-uri, --oauth-scope, --table-writer-url,
  --internal-token, --log-format) plus env-var fallbacks. Smoke
  mode skips field validation so integration CI can hit --smoke
  without any catalog config.
- run.go replaces ResolveSpec + submitToSpark with: load Plan from
  PIPELINE_PLAN_B64, build IcebergReader + HTTPWriter, hand to
  pipelineruntime.Executor.Run. Side-effect import
  `_ "github.com/apache/iceberg-go/io/gocloud"` registers the S3 /
  GCS / Azure scheme handlers (Phase A discovery: missing import
  yields ErrIOSchemeNotFound). Switches to slog (text or json via
  --log-format) so the dispatcher's structured logs flow through.
- spec.go deleted entirely — no more pipeline-build-service HTTP
  spec fetch; the Plan is the wire contract now.

Updates: cmd/pipeline-runner/main.go now documents the new flag
surface; Dockerfile collapses from a Spark-image multi-stage (Spark
base + Iceberg JAR downloads + sbt-built Scala JAR) to a distroless
static build mirroring services/iceberg-object-indexer/Dockerfile
(Phase A); image size drops from ~700 MB to ~25 MB.

go.mod: adds apache/iceberg-go v0.5.0 and pins
substrait-protobuf/go to v0.81.0 — the same Phase A workaround
(v0.85.0 removed extensions.SimpleExtensionURI which substrait-go/v7
still references). arrow-go/v18 trails along at v18.5.2.

Test plan:
- go vet ./... clean across the full repo.
- go build ./... clean across the full repo.
- go test -count=1 ./services/pipeline-runner/... green across all
  three packages:
  - runner: 10 sub-tests (happy-path arg parsing, env-var fallbacks,
    smoke validation gate, unknown-flag rejection, mandatory-field
    matrix, dangling-flag, bad log-format, Plan env decode happy
    path, empty env, bad base64, bad JSON, firstCatalogNameInPlan).
  - providers: 7 sub-tests (HTTPWriter POST shape, token forwarding,
    404 → ErrTableNotFound, 409/422 → ErrSchemaMismatch, 5xx →
    ErrCommitFailed, URL validation matrix). IcebergReader has no
    unit tests in this PR — its surface is too tightly coupled to
    apache/iceberg-go to fake cleanly; smoke against a live
    Lakekeeper catalog is the validation gate per Phase A discovery.
  - server: unchanged (/healthz + /metrics).

Follow-ups:
- C.6 migrates infra/dev/poc-pipeline-nodes.yaml to the structured
  planner.NodeConfig shape so the four PoC pipelines run on the new
  runner end-to-end. That PR also exercises the smoke gate in the
  dev k3s cluster.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@DioCrafts DioCrafts merged commit 3f5bbe3 into migration/spark-removal-phase-c-2-runtime May 18, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant