diff --git a/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala index 489c9016d57..d3047db5802 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala @@ -191,12 +191,27 @@ class SyncExecutionResource extends LazyLogging { val currentConsoleState = executionService.executionStateStore.consoleStore.getState val currentStatsState = executionService.executionStateStore.statsStore.getState + // Multi-region operators (e.g., HashJoin: build region then probe region) report their + // aggregated logical state as COMPLETED for a brief window after the first region + // terminates and before the second region's workers are added to regionExecutions. + // Guard against firing during that window by also requiring every declared external + // input port to be present in the operator's input metrics — port-1 stats only appear + // once probe actually starts consuming, which closes the race. + val targetExpectedExternalInputs: Map[String, Int] = effectiveLogicalPlan.operators + .filter(op => request.targetOperatorIds.contains(op.operatorIdentifier.id)) + .map(op => op.operatorIdentifier.id -> op.operatorInfo.inputPorts.count(!_.id.internal)) + .toMap + // Require COMPLETED, not just "has output", so upstream operators finish flushing // their data downstream before we tear the execution down. def allTargetsCompleted(stats: ExecutionStatsStore): Boolean = { request.targetOperatorIds.nonEmpty && request.targetOperatorIds.forall { opId => stats.operatorInfo.get(opId).exists { metrics => - metrics.operatorState == COMPLETED + val externalInputPortsReporting = + metrics.operatorStatistics.inputMetrics.count(!_.portId.internal) + val expectedExternalInputs = targetExpectedExternalInputs.getOrElse(opId, 0) + metrics.operatorState == COMPLETED && + externalInputPortsReporting >= expectedExternalInputs } } }