diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 051b7e048b08..9a385dca0f42 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -105,8 +105,7 @@ class ExecutorPodsAllocator( protected val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(conf) - // visible for tests - val numOutstandingPods = new AtomicInteger() + protected val numOutstandingPods = new AtomicInteger() protected var lastSnapshot = ExecutorPodsSnapshot() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index d4f7b9f67fd6..a95c93724fe4 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -116,6 +116,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { private var podsAllocatorUnderTest: ExecutorPodsAllocator = _ + private val numOutstandingPods = PrivateMethod[AtomicInteger](Symbol("numOutstandingPods"))() + val appId = "testapp" before { @@ -203,7 +205,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2, rp -> 3)) - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 3) verify(podsWithNamespace).resource(podWithAttachedContainerForId(1, defaultProfile.id)) verify(podsWithNamespace).resource(podWithAttachedContainerForId(2, defaultProfile.id)) verify(podsWithNamespace).resource(podWithAttachedContainerForId(3, rp.id)) @@ -214,7 +216,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(pendingExecutor(2, defaultProfile.id)) snapshotsStore.updatePod(pendingExecutor(3, rp.id)) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 3) verify(podResource, times(3)).create() verify(labeledPods, never()).delete() @@ -223,7 +225,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { waitForExecutorPodsClock.advance(executorIdleTimeout * 2) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 3)) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 3) verify(labeledPods, times(1)).delete() verify(podsWithNamespace).resource(podWithAttachedContainerForId(4, rp.id)) @@ -231,7 +233,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(runningExecutor(3, rp.id)) snapshotsStore.updatePod(pendingExecutor(4, rp.id)) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 3) verify(podsWithNamespace).resource(podWithAttachedContainerForId(5, rp.id)) verify(labeledPods, times(1)).delete() } @@ -239,7 +241,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { test("Initially request executors in batches. Do not request another batch if the" + " first has not finished.") { podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> (podAllocationSize + 1))) - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 5) for (nextId <- 1 to podAllocationSize) { verify(podsWithNamespace).resource(podWithAttachedContainerForId(nextId)) } @@ -254,23 +256,23 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest.setTotalExpectedExecutors( Map(defaultProfile -> (podAllocationSize + 1))) - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 5) for (execId <- 1 until podAllocationSize) { snapshotsStore.updatePod(runningExecutor(execId)) } assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 5) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 1) verify(podsWithNamespace, never()) .resource(podWithAttachedContainerForId(podAllocationSize + 1)) verify(podResource, times(podAllocationSize)).create() snapshotsStore.updatePod(runningExecutor(podAllocationSize)) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 1) verify(podsWithNamespace).resource(podWithAttachedContainerForId(podAllocationSize + 1)) snapshotsStore.updatePod(runningExecutor(podAllocationSize)) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 1) verify(podResource, times(podAllocationSize + 1)).create() } @@ -278,14 +280,14 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { " them on the next batch.") { podsAllocatorUnderTest.setTotalExpectedExecutors( Map(defaultProfile -> podAllocationSize)) - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 5) for (execId <- 1 until podAllocationSize) { snapshotsStore.updatePod(runningExecutor(execId)) } val failedPod = failedExecutorWithoutDeletion(podAllocationSize) snapshotsStore.updatePod(failedPod) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 1) verify(podsWithNamespace).resource(podWithAttachedContainerForId(podAllocationSize + 1)) } @@ -313,11 +315,11 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { .thenReturn(labeledPods) podsAllocatorUnderTest.setTotalExpectedExecutors( Map(defaultProfile -> 1)) - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 1) verify(podsWithNamespace).resource(podWithAttachedContainerForId(1)) waitForExecutorPodsClock.setTime(podCreationTimeout + 1) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 1) verify(labeledPods).delete() verify(podsWithNamespace).resource(podWithAttachedContainerForId(2)) } @@ -342,13 +344,13 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // Target 1 executor, make sure it's requested, even with an empty initial snapshot. podsAllocatorUnderTest.setTotalExpectedExecutors( Map(defaultProfile -> 1)) - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 1) verify(podsWithNamespace).resource(podWithAttachedContainerForId(1)) // Mark executor as running, verify that subsequent allocation cycle is a no-op. snapshotsStore.updatePod(runningExecutor(1)) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 0) verify(podResource, times(1)).create() verify(labeledPods, never()).delete() @@ -356,7 +358,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest.setTotalExpectedExecutors( Map(defaultProfile -> 4)) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 3) verify(podsWithNamespace).resource(podWithAttachedContainerForId(2)) verify(podsWithNamespace).resource(podWithAttachedContainerForId(3)) verify(podsWithNamespace).resource(podWithAttachedContainerForId(4)) @@ -365,7 +367,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(runningExecutor(2)) snapshotsStore.updatePod(pendingExecutor(3)) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 2) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 2) verify(podResource, times(4)).create() verify(labeledPods, never()).delete() @@ -374,7 +376,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest.setTotalExpectedExecutors( Map(defaultProfile -> 1)) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 0) verify(podResource, times(4)).create() verify(labeledPods).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3", "4") verify(labeledPods).delete() @@ -387,7 +389,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(deletedExecutor(4)) snapshotsStore.removeDeletedExecutors() snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 0) assert(!podsAllocatorUnderTest.isDeleted("3")) assert(!podsAllocatorUnderTest.isDeleted("4")) } @@ -455,7 +457,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { waitForExecutorPodsClock.setTime(startTime) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 5)) - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 5) verify(podsWithNamespace).resource(podWithAttachedContainerForId(1)) verify(podsWithNamespace).resource(podWithAttachedContainerForId(2)) verify(podsWithNamespace).resource(podWithAttachedContainerForId(3)) @@ -469,7 +471,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // Newly created executors (both acknowledged and not) are protected by executorIdleTimeout podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 0)) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 5) verify(podOperations, never()).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2", "3", "4", "5") verify(podResource, never()).delete() @@ -481,7 +483,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // this is why they are not counted into the outstanding PODs and /they are not removed even // though executor 1 is still in pending state and executor 3 and 4 are new request without // any state reported by kubernetes and all the three are already timed out - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 0) verify(labeledPods).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "2", "5") verify(labeledPods).delete() } @@ -547,7 +549,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // 0) request 3 PODs for the default and 4 PODs for the other resource profile podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3, rp -> 4)) - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 7) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 7) verify(podsWithNamespace).resource(podWithAttachedContainerForId(1, defaultProfile.id)) verify(podsWithNamespace).resource(podWithAttachedContainerForId(2, defaultProfile.id)) verify(podsWithNamespace).resource(podWithAttachedContainerForId(3, defaultProfile.id)) @@ -559,7 +561,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // 1) make 1 POD known by the scheduler backend for each resource profile when(schedulerBackend.getExecutorIds()).thenReturn(Seq("1", "4")) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5, + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 5, "scheduler backend known PODs are not outstanding") verify(podResource, times(7)).create() @@ -571,14 +573,14 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(pendingExecutor(5, rp.id)) snapshotsStore.updatePod(pendingExecutor(6, rp.id)) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 3) verify(podResource, times(7)).create() // 3) downscale to 1 POD for default and 1 POD for the other resource profile waitForExecutorPodsClock.advance(executorIdleTimeout * 2) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 1)) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 0) verify(podResource, times(7)).create() verify(labeledPods, times(2)).delete() assert(podsAllocatorUnderTest.isDeleted("3")) @@ -590,13 +592,13 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2, rp -> 2)) snapshotsStore.notifySubscribers() verify(podResource, times(7)).create() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 0) verify(podResource, times(7)).create() // 5) requesting 1 more executor for each resource podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3, rp -> 3)) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 2) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 2) verify(podResource, times(9)).create() verify(podsWithNamespace).resource(podWithAttachedContainerForId(8, defaultProfile.id)) verify(podsWithNamespace).resource(podWithAttachedContainerForId(9, rp.id)) @@ -630,7 +632,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // Target 1 executor for default profile, 2 for other profile, // make sure it's requested, even with an empty initial snapshot. podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 2)) - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 3) verify(podsWithNamespace).resource(podWithAttachedContainerForId(1, defaultProfile.id)) verify(podsWithNamespace).resource(podWithAttachedContainerForId(2, rp.id)) verify(podsWithNamespace).resource(podWithAttachedContainerForId(3, rp.id)) @@ -640,7 +642,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(runningExecutor(2, rp.id)) snapshotsStore.updatePod(runningExecutor(3, rp.id)) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 0) verify(podResource, times(3)).create() verify(podResource, never()).delete() @@ -648,7 +650,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // make sure all are requested. podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 4, rp -> 3)) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 4) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 4) verify(podsWithNamespace).resource(podWithAttachedContainerForId(4, defaultProfile.id)) verify(podsWithNamespace).resource(podWithAttachedContainerForId(5, defaultProfile.id)) verify(podsWithNamespace).resource(podWithAttachedContainerForId(6, defaultProfile.id)) @@ -659,7 +661,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(pendingExecutor(5, defaultProfile.id)) snapshotsStore.updatePod(pendingExecutor(7, rp.id)) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 3) verify(podResource, times(7)).create() verify(podResource, never()).delete() @@ -668,7 +670,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { waitForExecutorPodsClock.advance(executorIdleTimeout * 2) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 1)) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 0) verify(podResource, times(7)).create() verify(labeledPods).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "5", "6") verify(labeledPods).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "7") @@ -684,7 +686,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(deletedExecutor(7)) snapshotsStore.removeDeletedExecutors() snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 0) assert(!podsAllocatorUnderTest.isDeleted("5")) assert(!podsAllocatorUnderTest.isDeleted("6")) assert(!podsAllocatorUnderTest.isDeleted("7")) @@ -705,7 +707,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { .thenReturn(labeledPods) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 6)) - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 5) // Initial request of pods verify(podsWithNamespace).resource(podWithAttachedContainerForId(1)) verify(podsWithNamespace).resource(podWithAttachedContainerForId(2)) @@ -721,7 +723,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // We move forward one allocation cycle waitForExecutorPodsClock.setTime(podAllocationDelay + 1) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 2) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 2) // We request pod 6 verify(podsWithNamespace).resource(podWithAttachedContainerForId(6)) } @@ -773,20 +775,20 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // Target 1 executor, make sure it's requested, even with an empty initial snapshot. podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 1) verify(podsWithNamespace).resource(podWithAttachedContainerForIdAndVolume(1)) // Mark executor as running, verify that subsequent allocation cycle is a no-op. snapshotsStore.updatePod(runningExecutor(1)) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 0) verify(podResource, times(1)).create() verify(podResource, never()).delete() // Request a new executor, make sure it's using reused PVC podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2)) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 1) verify(podsWithNamespace).resource(podWithAttachedContainerForIdAndVolume(2)) verify(pvcWithNamespace, never()).resource(any()) } @@ -874,14 +876,14 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // Target 1 executor, make sure it's requested, even with an empty initial snapshot. podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 1) verify(podsWithNamespace).resource(podWithAttachedContainerForIdAndVolume(1)) assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 1) // Mark executor as running, verify that subsequent allocation cycle is a no-op. snapshotsStore.updatePod(runningExecutor(1)) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 0) verify(podResource, times(1)).create() verify(podResource, never()).delete() verify(pvcWithNamespace, times(1)).resource(any()) @@ -890,7 +892,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // Request a new executor, make sure that no new pod and pvc are created podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2)) snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 0) verify(podResource, times(1)).create() assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 1) } @@ -933,7 +935,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) } assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0) - assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 0) } private def executorPodAnswer(): Answer[KubernetesExecutorSpec] =