Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@ class ExecutorPodsAllocator(
newlyCreatedExecutors.filterKeys(schedulerKnownExecs.contains(_)).mapValues(_._1)
newlyCreatedExecutors --= schedulerKnownNewlyCreatedExecs.keySet

// If executor was created and removed in a short period, then it is possible that the creation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate a little more about this situation?

If executor was created and removed in a short period

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun this is happening when underlying node is decommissioned in Kubernetes. If that happens shortly after executor is scheduled on that node, we hit the bug in question.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @dongjoon-hyun, sorry for the late reply. Here's an example of a failure scenario:

  • Driver requests an executor
  • exec-1 gets created and registers with driver
  • exec-1 is moved from newlyCreatedExecutors to schedulerKnownNewlyCreatedExecs
  • exec-1 got deleted very quickly (~1-30 sec) after registration
  • ExecutorPodsWatchSnapshotSource fails to catch the creation of the pod (e.g. websocket connection was reset, k8s-apiserver was down, etc.)
  • ExecutorPodsPollingSnapshotSource fails to catch the creation because it runs every 30 secs, but executor was removed much quicker after creation
  • exec-1 is never removed from schedulerKnownNewlyCreatedExecs
  • ExecutorPodsAllocator will never request new executor because it’s slot is occupied by exec-1, due to schedulerKnownNewlyCreatedExecs never being cleared.

// was not captured by any snapshots. In this case, we should remove the executor from
// schedulerKnownNewlyCreatedExecs list, otherwise it will get stuck in the list and new
// executor will not be requested.
schedulerKnownNewlyCreatedExecs --=
schedulerKnownNewlyCreatedExecs.filterKeys(!schedulerKnownExecs.contains(_)).keySet

// For all executors we've created against the API but have not seen in a snapshot
// yet - check the current time. If the current time has exceeded some threshold,
// assume that the pod was either never created (the API server never properly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,38 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
}

test("SPARK-44609: Do not track an executor if it was removed from scheduler backend.") {
when(podOperations
.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
.thenReturn(podOperations)
when(podOperations
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(podOperations)
when(podOperations
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1"))
.thenReturn(labeledPods)

podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)

// Verify initial pod creation.
verify(podOperations).create(podWithAttachedContainerForId(1))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)

// Verify numOutstandingPods is 0 when pod is registered with scheduler backend.
when(schedulerBackend.getExecutorIds).thenReturn(Seq("1"))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)

// Verify that allocator requests a new executor when scheduler backend does not track the
// existing executor anymore.
when(schedulerBackend.getExecutorIds).thenReturn(Seq.empty)
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
verify(podOperations).create(podWithAttachedContainerForId(2))
}

private def executorPodAnswer(): Answer[KubernetesExecutorSpec] =
(invocation: InvocationOnMock) => {
val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)
Expand Down