Skip to content

Commit

Permalink
Move the wait for driver readiness
Browse files Browse the repository at this point in the history
Move the wait from onNewSnapshots() to start() in ExecutorPodsAllocator,
so that this check will be only done once on start.

Mock waitUntilReady() to not break existing tests
  • Loading branch information
cchriswu committed Jun 2, 2021
1 parent 1e4ad67 commit 66981fe
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ private[spark] class ExecutorPodsAllocator(
@volatile private var deletedExecutorIds = Set.empty[Long]

def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
// wait until the driver pod is ready to ensure executors can connect to driver svc
try {
kubernetesClient.pods()
.withName(kubernetesDriverPodName.get)
.waitUntilReady(driverPodReadinessTimeout, TimeUnit.MINUTES)
} catch {
case e: InterruptedException =>
logWarning(s"Timeout waiting for driver pod ${kubernetesDriverPodName.get} get ready in " +
s"namespace $namespace")
}
snapshotsStore.addSubscriber(podAllocationDelay) {
onNewSnapshots(applicationId, schedulerBackend, _)
}
Expand Down Expand Up @@ -348,16 +358,6 @@ private[spark] class ExecutorPodsAllocator(
s"ResourceProfile Id: $resourceProfileId, target: $expected running: $running.")
// Check reusable PVCs for this executor allocation batch
val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse)
// wait until the driver pod is ready to ensure executors can connect to driver svc
if (numExecutorsToAllocate > 0) {
try {
kubernetesClient.pods().inNamespace(namespace).withName(kubernetesDriverPodName.get).
waitUntilReady(driverPodReadinessTimeout, TimeUnit.MINUTES)
} catch {
case e: InterruptedException =>
logWarning(s"Timeout waiting for driver pod ${kubernetesDriverPodName.get} get ready")
}
}
for ( _ <- 0 until numExecutorsToAllocate) {
val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
val executorConf = KubernetesConf.createExecutorConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
package org.apache.spark.scheduler.cluster.k8s

import java.time.Instant
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.PodResource
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.ArgumentMatchers.{any, eq => meq}
import org.mockito.ArgumentMatchers.{any, anyInt, eq => meq}
import org.mockito.Mockito.{never, times, verify, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
Expand Down Expand Up @@ -104,6 +105,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
when(driverPodOperations.get).thenReturn(driverPod)
when(driverPodOperations.waitUntilReady(anyInt(), any(classOf[TimeUnit])))
.thenReturn(driverPod)
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr),
meq(kubernetesClient), any(classOf[ResourceProfile]))).thenAnswer(executorPodAnswer())
snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
Expand Down

0 comments on commit 66981fe

Please sign in to comment.