diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 25970e918ec42..dd4806e363ee9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -190,6 +190,13 @@ class ExecutorPodsAllocator( newlyCreatedExecutors.filterKeys(schedulerKnownExecs.contains(_)).mapValues(_._1) newlyCreatedExecutors --= schedulerKnownNewlyCreatedExecs.keySet + // If executor was created and removed in a short period, then it is possible that the creation + // was not captured by any snapshots. In this case, we should remove the executor from + // schedulerKnownNewlyCreatedExecs list, otherwise it will get stuck in the list and new + // executor will not be requested. + schedulerKnownNewlyCreatedExecs --= + schedulerKnownNewlyCreatedExecs.filterKeys(!schedulerKnownExecs.contains(_)).keySet + // For all executors we've created against the API but have not seen in a snapshot // yet - check the current time. If the current time has exceeded some threshold, // assume that the pod was either never created (the API server never properly diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 350a09f0218ba..2db335627b997 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -963,6 +963,38 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) } + test("SPARK-44609: Do not track an executor if it was removed from scheduler backend.") { + when(podOperations + .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) + .thenReturn(podOperations) + when(podOperations + .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) + .thenReturn(podOperations) + when(podOperations + .withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1")) + .thenReturn(labeledPods) + + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) + + // Verify initial pod creation. + verify(podOperations).create(podWithAttachedContainerForId(1)) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) + + // Verify numOutstandingPods is 0 when pod is registered with scheduler backend. + when(schedulerBackend.getExecutorIds).thenReturn(Seq("1")) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + + // Verify that allocator requests a new executor when scheduler backend does not track the + // existing executor anymore. + when(schedulerBackend.getExecutorIds).thenReturn(Seq.empty) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) + verify(podOperations).create(podWithAttachedContainerForId(2)) + } + private def executorPodAnswer(): Answer[KubernetesExecutorSpec] = (invocation: InvocationOnMock) => { val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)