Skip to content

Commit

Permalink
[SPARK-24266][K8S] Restart the watcher when we receive a version chan…
Browse files Browse the repository at this point in the history
…ged from k8s

### What changes were proposed in this pull request?

Restart the watcher when it failed with a HTTP_GONE code from the kubernetes api. Which means a resource version has changed.

For more relevant information see here: fabric8io/kubernetes-client#1075

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Running spark-submit to a k8s cluster.

Not sure how to make an automated test for this. If someone can help me out that would be great.

Closes #28423 from stijndehaes/bugfix/k8s-submit-resource-version-change.

Authored-by: Stijn De Haes <stijndehaes@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
  • Loading branch information
stijndehaes authored and Jim Kleckner committed Aug 30, 2020
1 parent 05144a5 commit 6449efa
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 27 deletions.
Expand Up @@ -21,9 +21,11 @@ import java.util.{Collections, UUID}
import java.util.Properties

import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
import io.fabric8.kubernetes.client.Watcher.Action
import scala.collection.mutable
import scala.util.control.NonFatal
import util.control.Breaks._

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkApplication
Expand Down Expand Up @@ -122,25 +124,37 @@ private[spark] class Client(
.endSpec()
.build()
val driverPodName = resolvedDriverPod.getMetadata.getName
Utils.tryWithResource(
kubernetesClient
.pods()
.withName(driverPodName)
.watch(watcher)) { _ =>
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
try {
val otherKubernetesResources =
resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
throw e
}

val sId = Seq(conf.namespace, driverPodName).mkString(":")
watcher.watchOrStop(sId)
var watch: Watch = null
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
try {
val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
throw e
}
val sId = Seq(conf.namespace, driverPodName).mkString(":")
breakable {
while (true) {
val podWithName = kubernetesClient
.pods()
.withName(driverPodName)
// Reset resource to old before we start the watch, this is important for race conditions
watcher.reset()
watch = podWithName.watch(watcher)

// Send the latest pod state we know to the watcher to make sure we didn't miss anything
watcher.eventReceived(Action.MODIFIED, podWithName.get())

// Break the while loop if the pod is completed or we don't want to wait
if(watcher.watchOrStop(sId)) {
watch.close()
break
}
}
}
}

Expand Down
Expand Up @@ -19,14 +19,16 @@ package org.apache.spark.deploy.k8s.submit
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import java.net.HttpURLConnection.HTTP_GONE

import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.KubernetesDriverConf
import org.apache.spark.deploy.k8s.KubernetesUtils._
import org.apache.spark.internal.Logging

private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
def watchOrStop(submissionId: String): Unit
def watchOrStop(submissionId: String): Boolean
def reset(): Unit
}

/**
Expand All @@ -42,10 +44,16 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)

private var podCompleted = false

private var resourceTooOldReceived = false

private var pod = Option.empty[Pod]

private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown")

override def reset(): Unit = {
resourceTooOldReceived = false
}

override def eventReceived(action: Action, pod: Pod): Unit = {
this.pod = Option(pod)
action match {
Expand All @@ -62,7 +70,12 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)

override def onClose(e: KubernetesClientException): Unit = {
logDebug(s"Stopping watching application $appId with last-observed phase $phase")
closeWatch()
if(e != null && e.getCode == HTTP_GONE) {
resourceTooOldReceived = true
logDebug(s"Got HTTP Gone code, resource version changed in k8s api: $e")
} else {
closeWatch()
}
}

private def logLongStatus(): Unit = {
Expand All @@ -78,20 +91,26 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
this.notifyAll()
}

override def watchOrStop(sId: String): Unit = if (conf.get(WAIT_FOR_APP_COMPLETION)) {
override def watchOrStop(sId: String): Boolean = if (conf.get(WAIT_FOR_APP_COMPLETION)) {
logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...")
val interval = conf.get(REPORT_INTERVAL)
synchronized {
while (!podCompleted) {
while (!podCompleted && !resourceTooOldReceived) {
wait(interval)
logInfo(s"Application status for $appId (phase: $phase)")
}
}
logInfo(
pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" }
.getOrElse("No containers were found in the driver pod."))
logInfo(s"Application ${conf.appName} with submission ID $sId finished")

if(podCompleted) {
logInfo(
pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" }
.getOrElse("No containers were found in the driver pod."))
logInfo(s"Application ${conf.appName} with submission ID $sId finished")
}
podCompleted
} else {
logInfo(s"Deployed Spark application ${conf.appName} with submission ID $sId into Kubernetes")
// Always act like the application has completed since we don't want to wait for app completion
true
}
}
Expand Up @@ -136,6 +136,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata])
when(podOperations.create(FULL_EXPECTED_POD)).thenReturn(POD_WITH_OWNER_REFERENCE)
when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch])
when(loggingPodStatusWatcher.watchOrStop(kconf.namespace + ":" + POD_NAME)).thenReturn(true)
doReturn(resourceList)
.when(kubernetesClient)
.resourceList(createdResourcesArgumentCaptor.capture())
Expand Down

0 comments on commit 6449efa

Please sign in to comment.