From 5995e363ed21c5dd0a36c11a7e34b2e8cf255b1b Mon Sep 17 00:00:00 2001 From: Jiadong Bai Date: Fri, 1 May 2026 13:29:26 -0700 Subject: [PATCH 1/2] fix(execution): close HashJoin sync-exec premature-termination race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `SyncExecutionResource.allTargetsCompleted` previously fired as soon as the target's aggregated `operatorState` reached `COMPLETED`. For logical operators that compile to multiple PhysicalOps separated by a blocking edge (HashJoin: build → probe; same shape applies to Aggregate), the build region's terminal state propagates briefly before the probe region is added to `regionExecutions`. During that window the per- logical-op aggregation only sees the build PhysicalOp and reports COMPLETED, the resource then takes the `TargetResultsReady` branch, kills the execution, and reads the probe's still-empty output storage. Also require every declared external input port to appear in the target's `inputMetrics` before treating it as completed. Port-1 stats only appear once the probe actually consumes data, which closes the race; source operators (no input) and single-input operators are unaffected. --- .../web/resource/SyncExecutionResource.scala | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala index 489c9016d57..413352c98e8 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala @@ -191,12 +191,29 @@ class SyncExecutionResource extends LazyLogging { val currentConsoleState = executionService.executionStateStore.consoleStore.getState val currentStatsState = executionService.executionStateStore.statsStore.getState + // Multi-region operators (e.g., HashJoin: build region then probe region) report their + // aggregated logical state as COMPLETED for a brief window after the first region + // terminates and before the second region's workers are added to regionExecutions. + // Guard against firing during that window by also requiring every declared external + // input port to be present in the operator's input metrics — port-1 stats only appear + // once probe actually starts consuming, which closes the race. + val targetExpectedExternalInputs: Map[String, Int] = effectiveLogicalPlan.operators + .filter(op => request.targetOperatorIds.contains(op.operatorIdentifier.id)) + .map(op => + op.operatorIdentifier.id -> op.operatorInfo.inputPorts.count(!_.id.internal) + ) + .toMap + // Require COMPLETED, not just "has output", so upstream operators finish flushing // their data downstream before we tear the execution down. def allTargetsCompleted(stats: ExecutionStatsStore): Boolean = { request.targetOperatorIds.nonEmpty && request.targetOperatorIds.forall { opId => stats.operatorInfo.get(opId).exists { metrics => - metrics.operatorState == COMPLETED + val externalInputPortsReporting = + metrics.operatorStatistics.inputMetrics.count(!_.portId.internal) + val expectedExternalInputs = targetExpectedExternalInputs.getOrElse(opId, 0) + metrics.operatorState == COMPLETED && + externalInputPortsReporting >= expectedExternalInputs } } } From 2638b6a72158f100a7059cdf57c24ad2f32cfaeb Mon Sep 17 00:00:00 2001 From: Jiadong Bai Date: Fri, 1 May 2026 13:56:25 -0700 Subject: [PATCH 2/2] fmt --- .../apache/texera/web/resource/SyncExecutionResource.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala index 413352c98e8..d3047db5802 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala @@ -199,9 +199,7 @@ class SyncExecutionResource extends LazyLogging { // once probe actually starts consuming, which closes the race. val targetExpectedExternalInputs: Map[String, Int] = effectiveLogicalPlan.operators .filter(op => request.targetOperatorIds.contains(op.operatorIdentifier.id)) - .map(op => - op.operatorIdentifier.id -> op.operatorInfo.inputPorts.count(!_.id.internal) - ) + .map(op => op.operatorIdentifier.id -> op.operatorInfo.inputPorts.count(!_.id.internal)) .toMap // Require COMPLETED, not just "has output", so upstream operators finish flushing