Skip to content

fix(execution-api): fix multi-input operator's execution termination condition#4615

Merged
bobbai00 merged 4 commits into
apache:mainfrom
bobbai00:fix/hashjoin-sync-execution-race
May 1, 2026
Merged

fix(execution-api): fix multi-input operator's execution termination condition#4615
bobbai00 merged 4 commits into
apache:mainfrom
bobbai00:fix/hashjoin-sync-execution-race

Conversation

@bobbai00
Copy link
Copy Markdown
Contributor

@bobbai00 bobbai00 commented May 1, 2026

What changes were proposed in this PR?

This PR fixes a race in SyncExecutionResource.allTargetsCompleted that causes the sync execution API (POST /api/execution/{wid}/{cuid}/run) to terminate before a HashJoin's probe phase produces output, returning an empty result.

Root cause. HashJoinOpDesc.getPhysicalPlan produces two PhysicalOps (build, probe) sharing one logical id, separated by a blocking edge. The scheduler places them in two regions and runs them sequentially. WorkflowExecution.getAllRegionExecutionsStats aggregates per-logical-op state by groupBy(_._1.logicalOpId.id) over only the registered RegionExecutions. Between "build region completed" and "probe region instantiated," only the build PhysicalOp is registered, so aggregateStates(Iterable(COMPLETED)) returns COMPLETED. The sync resource then takes the TargetResultsReady branch, calls killExecution, and reads the probe's still-empty Iceberg output. The same shape applies to any logical operator whose physical plan contains multiple PhysicalOps separated by a blocking edge (e.g., Aggregate). It does not surface in the regular WebSocket-driven frontend execution because the frontend waits for full workflow termination.

Fix. Strengthen allTargetsCompleted to require, in addition to operatorState == COMPLETED, that every declared external input port of the target is already present in OperatorMetrics.operatorStatistics.inputMetrics. Port-1 metrics only appear after the probe actually consumes data, which closes the race window. Internal ports (e.g., HashJoin's build→probe internal edge) are filtered out on both sides of the comparison so the predicate matches what aggregateMetrics already exposes. Source operators (zero declared inputs) and single-input operators are unaffected; for empty-input edge cases, terminalStateObservable continues to provide the fallback signal.

val targetExpectedExternalInputs: Map[String, Int] = effectiveLogicalPlan.operators
  .filter(op => request.targetOperatorIds.contains(op.operatorIdentifier.id))
  .map(op =>
    op.operatorIdentifier.id -> op.operatorInfo.inputPorts.count(!_.id.internal)
  )
  .toMap

def allTargetsCompleted(stats: ExecutionStatsStore): Boolean = {
  request.targetOperatorIds.nonEmpty && request.targetOperatorIds.forall { opId =>
    stats.operatorInfo.get(opId).exists { metrics =>
      val externalInputPortsReporting =
        metrics.operatorStatistics.inputMetrics.count(!_.portId.internal)
      val expectedExternalInputs = targetExpectedExternalInputs.getOrElse(opId, 0)
      metrics.operatorState == COMPLETED &&
      externalInputPortsReporting >= expectedExternalInputs
    }
  }
}

Any related issues, documentation, discussions?

Closes #4576

How was this PR tested?

Manually reproduced and verified end-to-end against ComputingUnitMaster on port 8085 with a 3-operator DAG (CSVFileScan movies + CSVFileScan ratings → HashJoin on movieId) executed via POST /api/execution/{wid}/{cuid}/run with targetOperatorIds = [HashJoinId]. Inputs: movies.csv (1000 rows) and ratings.csv (10 311 rows).

Steps to reproduce / verify:

# 1. Start the master
sbt "project WorkflowExecutionService" compile
java ... org.apache.texera.web.ComputingUnitMaster   # listens on :8085

# 2. Get a JWT
curl -s -X POST http://localhost:8080/api/auth/login \
  -H "Content-Type: application/json" \
  -d '{"username":"<user>","password":"<pw>"}'

# 3. POST the request (CSV → CSV → HashJoin, target = HashJoin)
curl -s -X POST http://localhost:8085/api/execution/<wid>/<cuid>/run \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer <token>" \
  --data @sync-exec-request.json

Existing tests pass (sbt "project WorkflowExecutionService" compile succeeds). No new unit test was added because the failure is a timing race in the controller's region-registration sequence relative to the sync resource's observable; reproducing it deterministically in a unit test would require either mocking ExecutionStatsStore to emit a build-only snapshot followed by a build+probe snapshot, or driving the full controller actor system, both of which are out of scope for this targeted fix. Manual reproduction is reliable on every run because the race window is several hundred milliseconds wide and Observable.amb consistently selects the (incorrect) target-completion signal first prior to this fix.

Was this PR authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Claude Opus 4.7)

`SyncExecutionResource.allTargetsCompleted` previously fired as soon as
the target's aggregated `operatorState` reached `COMPLETED`. For logical
operators that compile to multiple PhysicalOps separated by a blocking
edge (HashJoin: build → probe; same shape applies to Aggregate), the
build region's terminal state propagates briefly before the probe
region is added to `regionExecutions`. During that window the per-
logical-op aggregation only sees the build PhysicalOp and reports
COMPLETED, the resource then takes the `TargetResultsReady` branch,
kills the execution, and reads the probe's still-empty output storage.

Also require every declared external input port to appear in the
target's `inputMetrics` before treating it as completed. Port-1 stats
only appear once the probe actually consumes data, which closes the
race; source operators (no input) and single-input operators are
unaffected.
@bobbai00 bobbai00 requested a review from aglinxinyuan May 1, 2026 20:54
@aglinxinyuan aglinxinyuan requested a review from Xiao-zhen-Liu May 1, 2026 20:55
@bobbai00 bobbai00 added the release/v1.1.0-incubating back porting to release/v1.1.0-incubating label May 1, 2026
@aglinxinyuan aglinxinyuan removed their request for review May 1, 2026 20:55
@aglinxinyuan
Copy link
Copy Markdown
Contributor

aglinxinyuan commented May 1, 2026

@Xiao-zhen-Liu, please review this PR. It's about the region lifecycle and hashjoin.

@bobbai00 bobbai00 requested review from Xiao-zhen-Liu and aglinxinyuan and removed request for Xiao-zhen-Liu May 1, 2026 20:56
@bobbai00 bobbai00 changed the title fix(execution): close HashJoin sync-exec premature-termination race fix(execution-api): fix multi-input operator's execution termination condition May 1, 2026
Copy link
Copy Markdown
Contributor

@Xiao-zhen-Liu Xiao-zhen-Liu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@bobbai00 bobbai00 enabled auto-merge (squash) May 1, 2026 22:47
@Xiao-zhen-Liu
Copy link
Copy Markdown
Contributor

@bobbai00 But I think the issue is not about multi-input operator? It is multi-physical operators for one logical operator.

@bobbai00 bobbai00 merged commit 8383e19 into apache:main May 1, 2026
26 checks passed
github-actions Bot pushed a commit that referenced this pull request May 1, 2026
…condition (#4615)

### What changes were proposed in this PR?

This PR fixes a race in `SyncExecutionResource.allTargetsCompleted` that
causes the sync execution API (`POST /api/execution/{wid}/{cuid}/run`)
to terminate before a HashJoin's probe phase produces output, returning
an empty result.

**Root cause.** `HashJoinOpDesc.getPhysicalPlan` produces two
PhysicalOps (`build`, `probe`) sharing one logical id, separated by a
blocking edge. The scheduler places them in two regions and runs them
sequentially. `WorkflowExecution.getAllRegionExecutionsStats` aggregates
per-logical-op state by `groupBy(_._1.logicalOpId.id)` over only the
*registered* `RegionExecution`s. Between "build region completed" and
"probe region instantiated," only the build PhysicalOp is registered, so
`aggregateStates(Iterable(COMPLETED))` returns `COMPLETED`. The sync
resource then takes the `TargetResultsReady` branch, calls
`killExecution`, and reads the probe's still-empty Iceberg output. The
same shape applies to any logical operator whose physical plan contains
multiple PhysicalOps separated by a blocking edge (e.g., `Aggregate`).
It does not surface in the regular WebSocket-driven frontend execution
because the frontend waits for full workflow termination.

**Fix.** Strengthen `allTargetsCompleted` to require, in addition to
`operatorState == COMPLETED`, that every declared external input port of
the target is already present in
`OperatorMetrics.operatorStatistics.inputMetrics`. Port-1 metrics only
appear after the probe actually consumes data, which closes the race
window. Internal ports (e.g., HashJoin's build→probe internal edge) are
filtered out on both sides of the comparison so the predicate matches
what `aggregateMetrics` already exposes. Source operators (zero declared
inputs) and single-input operators are unaffected; for empty-input edge
cases, `terminalStateObservable` continues to provide the fallback
signal.

```scala
val targetExpectedExternalInputs: Map[String, Int] = effectiveLogicalPlan.operators
  .filter(op => request.targetOperatorIds.contains(op.operatorIdentifier.id))
  .map(op =>
    op.operatorIdentifier.id -> op.operatorInfo.inputPorts.count(!_.id.internal)
  )
  .toMap

def allTargetsCompleted(stats: ExecutionStatsStore): Boolean = {
  request.targetOperatorIds.nonEmpty && request.targetOperatorIds.forall { opId =>
    stats.operatorInfo.get(opId).exists { metrics =>
      val externalInputPortsReporting =
        metrics.operatorStatistics.inputMetrics.count(!_.portId.internal)
      val expectedExternalInputs = targetExpectedExternalInputs.getOrElse(opId, 0)
      metrics.operatorState == COMPLETED &&
      externalInputPortsReporting >= expectedExternalInputs
    }
  }
}
```

### Any related issues, documentation, discussions?

Closes #4576

### How was this PR tested?

Manually reproduced and verified end-to-end against
`ComputingUnitMaster` on port 8085 with a 3-operator DAG (CSVFileScan
movies + CSVFileScan ratings → HashJoin on `movieId`) executed via `POST
/api/execution/{wid}/{cuid}/run` with `targetOperatorIds =
[HashJoinId]`. Inputs: `movies.csv` (1000 rows) and `ratings.csv` (10
311 rows).

Steps to reproduce / verify:

```
# 1. Start the master
sbt "project WorkflowExecutionService" compile
java ... org.apache.texera.web.ComputingUnitMaster   # listens on :8085

# 2. Get a JWT
curl -s -X POST http://localhost:8080/api/auth/login \
  -H "Content-Type: application/json" \
  -d '{"username":"<user>","password":"<pw>"}'

# 3. POST the request (CSV → CSV → HashJoin, target = HashJoin)
curl -s -X POST http://localhost:8085/api/execution/<wid>/<cuid>/run \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer <token>" \
  --data @sync-exec-request.json
```

Existing tests pass (`sbt "project WorkflowExecutionService" compile`
succeeds). No new unit test was added because the failure is a timing
race in the controller's region-registration sequence relative to the
sync resource's observable; reproducing it deterministically in a unit
test would require either mocking `ExecutionStatsStore` to emit a
build-only snapshot followed by a build+probe snapshot, or driving the
full controller actor system, both of which are out of scope for this
targeted fix. Manual reproduction is reliable on every run because the
race window is several hundred milliseconds wide and `Observable.amb`
consistently selects the (incorrect) target-completion signal first
prior to this fix.

### Was this PR authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Claude Opus 4.7)

---------

Co-authored-by: Xinyuan Lin <xinyual3@uci.edu>

(backported from commit 8383e19)
SarahAsad23 pushed a commit to SarahAsad23/texera that referenced this pull request May 4, 2026
…condition (apache#4615)

### What changes were proposed in this PR?

This PR fixes a race in `SyncExecutionResource.allTargetsCompleted` that
causes the sync execution API (`POST /api/execution/{wid}/{cuid}/run`)
to terminate before a HashJoin's probe phase produces output, returning
an empty result.

**Root cause.** `HashJoinOpDesc.getPhysicalPlan` produces two
PhysicalOps (`build`, `probe`) sharing one logical id, separated by a
blocking edge. The scheduler places them in two regions and runs them
sequentially. `WorkflowExecution.getAllRegionExecutionsStats` aggregates
per-logical-op state by `groupBy(_._1.logicalOpId.id)` over only the
*registered* `RegionExecution`s. Between "build region completed" and
"probe region instantiated," only the build PhysicalOp is registered, so
`aggregateStates(Iterable(COMPLETED))` returns `COMPLETED`. The sync
resource then takes the `TargetResultsReady` branch, calls
`killExecution`, and reads the probe's still-empty Iceberg output. The
same shape applies to any logical operator whose physical plan contains
multiple PhysicalOps separated by a blocking edge (e.g., `Aggregate`).
It does not surface in the regular WebSocket-driven frontend execution
because the frontend waits for full workflow termination.

**Fix.** Strengthen `allTargetsCompleted` to require, in addition to
`operatorState == COMPLETED`, that every declared external input port of
the target is already present in
`OperatorMetrics.operatorStatistics.inputMetrics`. Port-1 metrics only
appear after the probe actually consumes data, which closes the race
window. Internal ports (e.g., HashJoin's build→probe internal edge) are
filtered out on both sides of the comparison so the predicate matches
what `aggregateMetrics` already exposes. Source operators (zero declared
inputs) and single-input operators are unaffected; for empty-input edge
cases, `terminalStateObservable` continues to provide the fallback
signal.

```scala
val targetExpectedExternalInputs: Map[String, Int] = effectiveLogicalPlan.operators
  .filter(op => request.targetOperatorIds.contains(op.operatorIdentifier.id))
  .map(op =>
    op.operatorIdentifier.id -> op.operatorInfo.inputPorts.count(!_.id.internal)
  )
  .toMap

def allTargetsCompleted(stats: ExecutionStatsStore): Boolean = {
  request.targetOperatorIds.nonEmpty && request.targetOperatorIds.forall { opId =>
    stats.operatorInfo.get(opId).exists { metrics =>
      val externalInputPortsReporting =
        metrics.operatorStatistics.inputMetrics.count(!_.portId.internal)
      val expectedExternalInputs = targetExpectedExternalInputs.getOrElse(opId, 0)
      metrics.operatorState == COMPLETED &&
      externalInputPortsReporting >= expectedExternalInputs
    }
  }
}
```

### Any related issues, documentation, discussions?

Closes apache#4576

### How was this PR tested?

Manually reproduced and verified end-to-end against
`ComputingUnitMaster` on port 8085 with a 3-operator DAG (CSVFileScan
movies + CSVFileScan ratings → HashJoin on `movieId`) executed via `POST
/api/execution/{wid}/{cuid}/run` with `targetOperatorIds =
[HashJoinId]`. Inputs: `movies.csv` (1000 rows) and `ratings.csv` (10
311 rows).

Steps to reproduce / verify:

```
# 1. Start the master
sbt "project WorkflowExecutionService" compile
java ... org.apache.texera.web.ComputingUnitMaster   # listens on :8085

# 2. Get a JWT
curl -s -X POST http://localhost:8080/api/auth/login \
  -H "Content-Type: application/json" \
  -d '{"username":"<user>","password":"<pw>"}'

# 3. POST the request (CSV → CSV → HashJoin, target = HashJoin)
curl -s -X POST http://localhost:8085/api/execution/<wid>/<cuid>/run \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer <token>" \
  --data @sync-exec-request.json
```

Existing tests pass (`sbt "project WorkflowExecutionService" compile`
succeeds). No new unit test was added because the failure is a timing
race in the controller's region-registration sequence relative to the
sync resource's observable; reproducing it deterministically in a unit
test would require either mocking `ExecutionStatsStore` to emit a
build-only snapshot followed by a build+probe snapshot, or driving the
full controller actor system, both of which are out of scope for this
targeted fix. Manual reproduction is reliable on every run because the
race window is several hundred milliseconds wide and `Observable.amb`
consistently selects the (incorrect) target-completion signal first
prior to this fix.

### Was this PR authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Claude Opus 4.7)

---------

Co-authored-by: Xinyuan Lin <xinyual3@uci.edu>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

engine fix release/v1.1.0-incubating back porting to release/v1.1.0-incubating

Projects

None yet

Development

Successfully merging this pull request may close these issues.

HashJoin returns empty result via sync execution API

3 participants