Skip to content

Commit

Permalink
executor initial pod test
Browse files Browse the repository at this point in the history
  • Loading branch information
yifeih committed Aug 23, 2018
1 parent 81e5a66 commit 7f4ff5a
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ private[spark] class BasicExecutorFeatureStep(
val executorPod = new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(name)
.withLabels(kubernetesConf.roleLabels.asJava)
.withAnnotations(kubernetesConf.roleAnnotations.asJava)
.addToLabels(kubernetesConf.roleLabels.asJava)
.addToAnnotations(kubernetesConf.roleAnnotations.asJava)
.addToOwnerReferences(ownerReference.toSeq: _*)
.endMetadata()
.editOrNewSpec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ import org.mockito.Mockito._

import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config.CONTAINER_IMAGE
import org.apache.spark.deploy.k8s.features._
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountSecretsFeatureStep}
import org.apache.spark.deploy.k8s.Config.{CONTAINER_IMAGE, KUBERNETES_DRIVER_CONTAINER_NAME, KUBERNETES_DRIVER_PODTEMPLATE_FILE, KUBERNETES_EXECUTOR_PODTEMPLATE_FILE}
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountSecretsFeatureStep, _}
import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep}

class KubernetesDriverBuilderSuite extends SparkFunSuite {
Expand Down Expand Up @@ -260,7 +259,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
test("Apply template volume step if executor template is present.") {
val sparkConf = spy(new SparkConf(false))
doReturn(Option("filename")).when(sparkConf)
.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE)
.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE)
val conf = KubernetesConf(
sparkConf,
KubernetesDriverSpecificConf(
Expand Down Expand Up @@ -305,27 +304,27 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
}

test("Starts with template if specified") {
val spec = getSpecWithPodTemplate(
val spec = constructSpecWithPodTemplate(
new PodBuilder()
.withNewMetadata()
.addToLabels("test-label-key", "test-label-value")
.endMetadata()
.withNewSpec()
.addNewContainer()
.withName(Config.KUBERNETES_DRIVER_CONTAINER_NAME.defaultValueString)
.withName(KUBERNETES_DRIVER_CONTAINER_NAME.defaultValueString)
.endContainer()
.endSpec()
.build())

assert(spec.pod.pod.getMetadata.getLabels.containsKey("test-label-key"))
assert(spec.pod.pod.getMetadata.getLabels.get("test-label-key") === "test-label-value")
assert(spec.pod.container.getName ===
Config.KUBERNETES_DRIVER_CONTAINER_NAME.defaultValueString)
KUBERNETES_DRIVER_CONTAINER_NAME.defaultValueString)
}

test("Throws on misconfigured pod template") {
val exception = intercept[SparkException] {
getSpecWithPodTemplate(
constructSpecWithPodTemplate(
new PodBuilder()
.withNewMetadata()
.addToLabels("test-label-key", "test-label-value")
Expand All @@ -335,7 +334,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
assert(exception.getMessage.contains("Could not load driver pod from template file."))
}

private def getSpecWithPodTemplate(pod: Pod) : KubernetesDriverSpec = {
private def constructSpecWithPodTemplate(pod: Pod) : KubernetesDriverSpec = {
val kubernetesClient = mock(classOf[KubernetesClient])
val pods =
mock(classOf[MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]])
Expand All @@ -346,7 +345,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {

val sparkConf = new SparkConf(false)
.set(CONTAINER_IMAGE, "spark-driver:latest")
.set(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE, "template-file.yaml")
.set(KUBERNETES_DRIVER_PODTEMPLATE_FILE, "template-file.yaml")

val kubernetesConf = new KubernetesConf(
sparkConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
*/
package org.apache.spark.scheduler.cluster.k8s

import io.fabric8.kubernetes.api.model.PodBuilder
import java.io.File

import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder, PodList}
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.{MixedOperation, PodResource}
import org.mockito.Matchers.any
import org.mockito.Mockito.{mock, never, verify, when}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._
Expand Down Expand Up @@ -119,4 +125,64 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
assert(resolvedPod.pod.getMetadata.getLabels.get(stepType) === stepType)
}
}

test("Starts with empty executor pod if template is not specified") {
val kubernetesClient = mock(classOf[KubernetesClient])
val executorBuilder = KubernetesExecutorBuilder.apply(kubernetesClient, new SparkConf())
verify(kubernetesClient, never()).pods()
}

test("Starts with executor template if specified") {
val pod = constructPodWithPodTemplate(
new PodBuilder()
.withNewMetadata()
.addToLabels("test-label-key", "test-label-value")
.endMetadata()
.withNewSpec()
.addNewContainer()
.withName(Config.KUBERNETES_EXECUTOR_CONTAINER_NAME.defaultValueString)
.endContainer()
.endSpec()
.build())

assert(pod.pod.getMetadata.getLabels.containsKey("test-label-key"))
assert(pod.pod.getMetadata.getLabels.get("test-label-key") === "test-label-value")
assert(pod.container.getName ===
Config.KUBERNETES_EXECUTOR_CONTAINER_NAME.defaultValueString)
}

private def constructPodWithPodTemplate(pod: Pod) : SparkPod = {
val kubernetesClient = mock(classOf[KubernetesClient])
val pods =
mock(classOf[MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]])
val podResource = mock(classOf[PodResource[Pod, DoneablePod]])
when(kubernetesClient.pods()).thenReturn(pods)
when(pods.load(any(classOf[File]))).thenReturn(podResource)
when(podResource.get()).thenReturn(pod)

val sparkConf = new SparkConf(false)
.set("spark.driver.host", "https://driver.host.com")
.set(Config.CONTAINER_IMAGE, "spark-executor:latest")
.set(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, "template-file.yaml")

val kubernetesConf = KubernetesConf(
sparkConf,
KubernetesExecutorSpecificConf(
"executor-id", Some(new PodBuilder()
.withNewMetadata()
.withName("driver")
.endMetadata()
.build())),
"prefix",
"appId",
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Nil,
Seq.empty[String])

KubernetesExecutorBuilder.apply(kubernetesClient, sparkConf).buildFromFeatures(kubernetesConf)
}
}

0 comments on commit 7f4ff5a

Please sign in to comment.