Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-42813][K8S] Print application info when waitAppCompletion is false #40444

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -180,8 +180,8 @@ private[spark] class Client(
throw e
}

val sId = Client.submissionId(conf.namespace, driverPodName)
if (conf.get(WAIT_FOR_APP_COMPLETION)) {
val sId = Seq(conf.namespace, driverPodName).mkString(":")
breakable {
while (true) {
val podWithName = kubernetesClient
Expand All @@ -202,10 +202,17 @@ private[spark] class Client(
}
}
}
} else {
logInfo(s"Deployed Spark application ${conf.appName} with application ID ${conf.appName} " +
s"and submission ID $sId into Kubernetes")
}
}
}

private[spark] object Client {
def submissionId(namespace: String, driverPodName: String): String = s"$namespace:$driverPodName"
}

/**
* Main class and entry point of application submission in KUBERNETES mode.
*/
Expand Down
Expand Up @@ -95,8 +95,9 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
this.notifyAll()
}

override def watchOrStop(sId: String): Boolean = if (conf.get(WAIT_FOR_APP_COMPLETION)) {
logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...")
override def watchOrStop(sId: String): Boolean = {
logInfo(s"Waiting for application ${conf.appName} with application ID ${conf.appId} " +
s"and submission ID $sId to finish...")
val interval = conf.get(REPORT_INTERVAL)
synchronized {
while (!podCompleted && !resourceTooOldReceived) {
Expand All @@ -109,12 +110,9 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
logInfo(
pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" }
.getOrElse("No containers were found in the driver pod."))
logInfo(s"Application ${conf.appName} with submission ID $sId finished")
logInfo(s"Application ${conf.appName} with application ID ${conf.appId} " +
s"and submission ID $sId finished")
}
podCompleted
} else {
logInfo(s"Deployed Spark application ${conf.appName} with submission ID $sId into Kubernetes")
// Always act like the application has completed since we don't want to wait for app completion
true
}
}
Expand Up @@ -33,8 +33,10 @@ import org.scalatestplus.mockito.MockitoSugar._

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{Config, _}
import org.apache.spark.deploy.k8s.Config.WAIT_FOR_APP_COMPLETION
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.deploy.k8s.submit.Client.submissionId
import org.apache.spark.util.Utils

class ClientSuite extends SparkFunSuite with BeforeAndAfter {
Expand Down Expand Up @@ -181,7 +183,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
when(podsWithNamespace.resource(fullExpectedPod())).thenReturn(namedPods)
when(namedPods.create()).thenReturn(podWithOwnerReference())
when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch])
when(loggingPodStatusWatcher.watchOrStop(kconf.namespace + ":" + POD_NAME)).thenReturn(true)
val sId = submissionId(kconf.namespace, POD_NAME)
when(loggingPodStatusWatcher.watchOrStop(sId)).thenReturn(true)
doReturn(resourceList)
.when(kubernetesClient)
.resourceList(createdResourcesArgumentCaptor.capture())
Expand Down Expand Up @@ -343,6 +346,31 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
kubernetesClient,
loggingPodStatusWatcher)
submissionClient.run()
verify(loggingPodStatusWatcher).watchOrStop(kconf.namespace + ":driver")
verify(loggingPodStatusWatcher).watchOrStop(submissionId(kconf.namespace, POD_NAME))
}

test("SPARK-42813: Print application info when waitAppCompletion is false") {
val appName = "SPARK-42813"
val logAppender = new LogAppender
withLogAppender(logAppender) {
val sparkConf = new SparkConf(loadDefaults = false)
.set("spark.app.name", appName)
.set(WAIT_FOR_APP_COMPLETION, false)
kconf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf,
resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX))
when(driverBuilder.buildFromFeatures(kconf, kubernetesClient))
.thenReturn(BUILT_KUBERNETES_SPEC)
val submissionClient = new Client(
kconf,
driverBuilder,
kubernetesClient,
loggingPodStatusWatcher)
submissionClient.run()
}
val appId = KubernetesTestConf.APP_ID
val sId = submissionId(kconf.namespace, POD_NAME)
logAppender.loggingEvents.map(_.getMessage.getFormattedMessage).exists { line =>
line === s"Application $appName with application ID $appId and submission ID $sId finished"
}
}
}