Replies: 5 comments 9 replies
-
|
I like design B. What's the disadvantage of design B compared to design A? |
Beta Was this translation helpful? Give feedback.
-
|
Agree Design B feels more natural — CU as a pure executor feel clean. Two questions: 1. Effort & transition. How big is the change of design B? Could we land Design A first (DB-credential removal, small delta) as a stepping stone, then evolve to B without a big-bang switchover? 2. Who orchestrates? In Design B's current sketch the frontend coordinates compile → eid allocation → CU dispatch. That makes the frontend a critical control-plane component, which feels off to me. Would it be worth introducing a thin Execution Service that owns this orchestration? Any client would then just call |
Beta Was this translation helpful? Give feedback.
-
|
First, I want to ask, what is the biggest issue of the current architecture? exposing DB credentials could be one, but that's not that fundamental. I need some more justification for the redesign. That aside, I always wanted to go with option B, and that was the reason we initially introduced the workflow compilation service. but there are some blockers:
|
Beta Was this translation helpful? Give feedback.
-
|
A related note: I believe currently a workflow is compiled twice in its lifecycle, and we want to remove one of them in the near future. |
Beta Was this translation helpful? Give feedback.
-
|
Here is the layout of Physical Plan Physical Plan SpecLayout: {
"operators": [...list of physical operators...],
"links": [...list of physical links...]
}Physical Operator Spec{
"id": {
"logicalOpId": { "id": "CSVScanSource-operator-id" },
"layerName": "main" // distinguishes physical stages from same logical op: main | partial | final
},
"workflowId": { "id": 0 },
"executionId": { "id": 1 },
"opExecInitInfo": { // tells Amber how to construct the runtime executor
// JVM operators use kind "className":
"kind": "className",
"className": "org.apache.texera.amber.operator.source.scan.csv.CSVScanSourceOpExec",
"descString": "{...a JSON STRING that describes the property of the physical operator...}"
// For scan sources (CSV/JSONL/Arrow/file), source path lives here as `fileName`.
It looks like this: `dataset:///dataset-15/versionHash/raw/data.csv` (if the file is resolved on local file system, it will start with `file:///...`)
// For UDF operators, the descStringuse kind "code" instead:
// { "kind": "code", "code": "class ProcessTupleOperator(...): ...", "language": "python" }
},
"parallelizable": true,
"locationPreference": { "type": "roundRobin" },
"partitionRequirement": [], // what each INPUT expects (array: one entry per input port)
// null -> no requirement for that input
// { "type": "single" } -> gather into one partition
// { "type": "hash", "hashAttributeNames": ["id"] } -> hash-partitioned by attributes
// { "type": "broadcast" } -> broadcast to workers
// { "type": "oneToOne" } -> partitioning maps one-to-one
// { "type": "none" } -> no partitioning
"partitionDeriveSpec": { "type": "passthrough" }, // what partitioning this operator PRODUCES
// passthrough -> preserve upstream partitioning
// toSingle -> produce a single partition
// toHash + hashAttributeNames -> produce hash partitioning
// toUnknown -> partitioning unknown
// projection -> derive through projection
"inputPortsSerialized": {}, // map keyed "<portId>_<internalFlag>", e.g. "0_false"
"outputPortsSerialized": {}, // value = 2-item array: [portMetadata, schema|null]
// portMetadata: { id:{id,internal}, displayName, blocking, mode }
// output `mode`: 0 = set snapshot | 1 = set delta | 2 = single snapshot
// schema: { attributes: [ { attributeName, attributeType }, ... ] } or null
// attributeType: string | integer (32-bit) | long (64-bit) | double |
// boolean | timestamp | binary | large_binary (pointer-like)
"isOneToManyOp": false,
"suggestedWorkerNum": 1,
"pveName": ""
}Physical Link SpecEach item in {
"fromOpId": {
"logicalOpId": { "id": "source-op-id" },
"layerName": "main"
},
"fromPortId": { "id": 0, "internal": false },
"toOpId": {
"logicalOpId": { "id": "target-op-id" },
"layerName": "main"
},
"toPortId": { "id": 0, "internal": false }
}@Yicong-Huang @aglinxinyuan @Xiao-zhen-Liu Is this interpretation accurate ? If so I don't think physical plan contains any sensitive information and we can safely expose it to the client. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Motivation of this refactoring
Currently the amber engine directly queries the Postgres for execution status update, workflow compilation and cost-based optimization. Therefore, DB credentials have to be stored in Computing Unit as environment variables. Removing the DB credentials from Computing Unit makes each CU a better, safer sandbox environment for executing workflows, especially for workflows with UDF operators.
Refactor Overview
The fix is to move privileged database work out of the executor, not to harden the sandbox.
The Iceberg REST catalog already removes Iceberg metadata operations from direct Postgres. The remaining dependencies are what I want to decouple and discuss in this post
Current System Architecture and Flow
Here are two diagrams describing the high-level traffic between services and storages when a workflow is being executed:
Phase 1: Workflow (logical plan) is submitted to Computing Unit

Phase 2: Physical plan is being executed

Execution Flow
Every red arrow below is a direct database hit from computing unit
sequenceDiagram autonumber actor U as Frontend participant M as CU Master participant PG as Postgres participant CAT as Catalog Service participant S3I as S3 (Iceberg Tables) U->>M: Run workflow (**logical plan** + JWT) rect rgb(255, 235, 235) M->>PG: Look up latest workflow version M->>PG: Create execution record (get eid) M->>PG: Clear previous run's output locations opt fault tolerance M->>PG: Save replay-log location end Note over M: compile **logical plan** → **physical plan** (in-process) M->>PG: Resolve dataset paths to storage locations Note over M: cost-based optimization (in-process) M->>PG: Read last run's stats (cost-based optimization) M->>PG: Record where results/console/stats go end M->>CAT: Commit result/console/stats tables (REST — no DB login) M->>S3I: Write result/console/stats data M-->>U: Live status, stats & errors (WebSocket)Proposed Design
Design A — Proxy to the Backend
Keep the CU's logic in place; replace each
SqlServercall with an HTTP call that forwards the user's JWT. The Dashboard Service authorizes and runs the SQL.Yellow highlights compilation and execution — in Design A both happen inside the CU Master.
The main ideas of this design:
SqlServercall becomes an HTTP call that forwards the user's JWT: execution-record create/update goes to the Dashboard Service, dataset resolution goes to file-service, and each service authorizes the caller before touching Postgres.Design B — Pure Execution Backend (My Preference)
The CU becomes a stateless executor. It receives one self-contained
ExecutionSpec(the already-compiled physical plan +eid+ last execution stats), runs it, writes only to object storage through the REST catalog, and reports a completion manifest. The backend (Dashboard Service + workflow-compiling-service + file-service) does all compilation, dataset resolution,eidallocation, and persistence.Yellow highlights compilation (in workflow-compiling-service) and execution (in CU Master) — split across services, unlike Design A.
The main ideas of this design:
eid+ last execution stats. It never sees the physical plan and never resolves datasets.eid+ last execution stats (from the Dashboard Service) — and sends it to the CU over the WebSocket.Beta Was this translation helpful? Give feedback.
All reactions