Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,27 @@ class SyncExecutionResource extends LazyLogging {
val currentConsoleState = executionService.executionStateStore.consoleStore.getState
val currentStatsState = executionService.executionStateStore.statsStore.getState

// Multi-region operators (e.g., HashJoin: build region then probe region) report their
// aggregated logical state as COMPLETED for a brief window after the first region
// terminates and before the second region's workers are added to regionExecutions.
// Guard against firing during that window by also requiring every declared external
// input port to be present in the operator's input metrics — port-1 stats only appear
// once probe actually starts consuming, which closes the race.
val targetExpectedExternalInputs: Map[String, Int] = effectiveLogicalPlan.operators
.filter(op => request.targetOperatorIds.contains(op.operatorIdentifier.id))
.map(op => op.operatorIdentifier.id -> op.operatorInfo.inputPorts.count(!_.id.internal))
.toMap

// Require COMPLETED, not just "has output", so upstream operators finish flushing
// their data downstream before we tear the execution down.
def allTargetsCompleted(stats: ExecutionStatsStore): Boolean = {
request.targetOperatorIds.nonEmpty && request.targetOperatorIds.forall { opId =>
stats.operatorInfo.get(opId).exists { metrics =>
metrics.operatorState == COMPLETED
val externalInputPortsReporting =
metrics.operatorStatistics.inputMetrics.count(!_.portId.internal)
val expectedExternalInputs = targetExpectedExternalInputs.getOrElse(opId, 0)
metrics.operatorState == COMPLETED &&
externalInputPortsReporting >= expectedExternalInputs
}
}
}
Expand Down
Loading