Skip to content

feat(libs/pipeline-plan): typed operator-plan IR (ADR-0045 Phase C.1)#67

Merged
DioCrafts merged 1 commit into
mainfrom
migration/spark-removal-phase-c-1-plan-schema
May 17, 2026
Merged

feat(libs/pipeline-plan): typed operator-plan IR (ADR-0045 Phase C.1)#67
DioCrafts merged 1 commit into
mainfrom
migration/spark-removal-phase-c-1-plan-schema

Conversation

@DioCrafts
Copy link
Copy Markdown
Owner

Summary

Phase C.1 of ADR-0045 — defines the typed operator-plan IR that replaces the free-form --inline-sql contract the Spark-backed runner consumed. This is the foundational schema every other Phase C sub-PR depends on; merging it first unblocks parallel work on the interpreter (C.2), the build-service emitter (C.4), and the runner consumer (C.5).

A Plan is a directed acyclic graph of typed Op nodes. Each Op has one of nine v1 Kinds and exactly one populated per-kind config field. Inputs reference upstream Op.IDs.

Scope

  • Schema only: types, JSON serde, structural validation.
  • No interpreter — that lives in libs/pipeline-runtime/ (Phase C.2).
  • No emitter or consumerpipeline-build-service (C.4) and pipeline-runner (C.5) follow.

Keeping the schema independent means the contract package does not drag the Iceberg client and its dep graph (apache/iceberg-go, the substrait-protobuf pin from Phase A, gocloud, …) into every consumer's build.

Operator set (v1)

Kind Inputs Purpose
read_table 0 Source — pulls every row from lakekeeper.<ns>.<table> at the current snapshot
filter 1 Keeps rows where a pipeline-expression DSL string evaluates to BOOLEAN
project 1 Rewrites the row to exactly the listed columns; empty Expr = passthrough
rename 1 Relabels columns by from → to mapping
cast 1 Converts listed columns to a pipeline-expression.PipelineType
aggregate 1 GROUP BY + the seven canonical aggregations (sum, count, count_distinct, avg, stddev, min, max)
union ≥2 Concatenates streams; schemas must align by name
limit 1 Caps row count (N > 0)
write_table 1 Terminal — publishes the result as a new Iceberg snapshot. Modes: create_or_replace (matches Spark's df.writeTo(target).createOrReplace()) or append

join is deferred to v2. Phase 0's inventory (docs/migration/pipeline-runner-spark-to-go-inventory.md) found every concrete pipeline in the repo re-expressed with these nine; if a follow-up audit against production pipeline_authoring.published_dag turns up a pipeline that needs join, adding OpKindJoin is wire-compatible.

Validation

Plan.Validate() returns a ValidationErrors slice so the authoring UI can highlight every broken node in one pass rather than one error at a time. Sentinel ErrEmptyPlan is reachable via errors.Is. Eleven structural checks:

  1. Plan has ≥1 op.
  2. Op.IDs non-empty and unique.
  3. Kind is a known constant.
  4. Exactly the per-kind config field is populated; siblings nil.
  5. Inputs reference existing IDs (no dangling, no self-reference).
  6. Source ops (read_table) have zero inputs.
  7. Non-source non-union ops have exactly one input.
  8. Union ops have ≥2 inputs.
  9. ≥1 source and ≥1 terminal in the graph.
  10. No cycles (3-colour DFS).
  11. Per-kind invariants (limit n > 0, aggregation function token known, write mode valid, non-empty column names, no duplicate target columns, etc.).

Test plan

  • go build ./... (full repo)
  • go vet ./... (full repo)
  • go test -race ./libs/pipeline-plan/... — 4 sub-tests in plan_test.go + 11 sub-test groups in validate_test.go covering every error branch:
    • 4 known-good plans (the two PoC pipelines transactions_clean and customer_metrics from the Phase 0 inventory, plus a rename + cast + limit chain and a 2-source union so every v1 op is exercised by at least one valid case)
    • JSON round-trip stability across all 4
    • Empty plan → ErrEmptyPlan via errors.Is
    • Missing source / missing terminal
    • Duplicate / empty op IDs
    • Kind-vs-config mismatch (declared kind missing config, sibling kind populated)
    • Input arity per kind (source-with-inputs, single-input-without-one, union-with-one, unknown-id, self-reference)
    • Cycle detection
    • Per-op invariants (filter empty expr, project no columns / duplicate column, aggregate unknown function / missing source_column / duplicate target, limit ≤0, write_table unknown mode, read_table missing catalog, cast empty column, rename empty pair)

Follow-ups

  • C.2 libs/pipeline-runtime/ — interpreter that executes a Plan against Iceberg via the apache/iceberg-go reads from Phase A and the HTTP append adapter from Phase B.
  • C.3 Promote aggregate from planned to available in services/pipeline-build-service/internal/handler/transform_catalog.go:528 (catalog entry exists, runtime path is missing).
  • C.4 Rewrite services/pipeline-build-service/internal/spark/spark.go → emit Plan instead of SparkApplication CR; rename spark package to dispatch.
  • C.5 Rewrite services/pipeline-runner/internal/runner/run.go → fetch + execute Plan. Delete --inline-sql flags.
  • C.6 Migrate infra/dev/poc-pipeline-nodes.yaml (6 pipelines with SQL → Plan JSON) and update READMEs.

🤖 Generated with Claude Code

Phase C.1 of ADR-0045 — defines the typed operator-plan IR that
replaces the free-form `--inline-sql` flag the Spark-backed runner
consumed. A Plan is a directed acyclic graph of typed Op nodes;
each Op has one of nine v1 kinds (read_table, filter, project,
rename, cast, aggregate, union, limit, write_table) and exactly
one populated per-kind config field. Inputs reference upstream
Op.IDs.

Scope is schema only — types, JSON serde, structural validation.
The interpreter that consumes a Plan and executes it against
Iceberg lives in a separate package (Phase C.2). Keeping the
schema independent means pipeline-build-service, pipeline-runner,
and any test harness can depend on the contract without dragging
the Iceberg client and its dep graph (apache/iceberg-go,
substrait-protobuf pin from Phase A, gocloud, …) into their build.

The operator set is the one Phase 0's inventory
(docs/migration/pipeline-runner-spark-to-go-inventory.md) found
sufficient for every concrete pipeline in the repo. `join` is
deferred to v2; adding OpKindJoin later is wire-compatible.

Validate() runs 11 structural checks (uniqueness, kind/config
consistency, input arity by kind, source/terminal presence, cycle
detection via 3-colour DFS, per-kind invariants) and returns every
finding in one pass so the authoring UI can highlight all broken
nodes simultaneously. The ValidationError shape is JSON-serialised
and ready to flow back through the existing per-node validation
endpoint pipeline-authoring-service exposes.

No new dependencies; reuses libs/pipeline-expression for column
types only.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@DioCrafts DioCrafts merged commit 0a3d32c into main May 17, 2026
3 of 10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant