Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Commit

Permalink
Add a unit test for BaseSubmissionStep.
Browse files Browse the repository at this point in the history
  • Loading branch information
mccheah committed Jul 5, 2017
1 parent 9ff8c69 commit c23bb4c
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 35 deletions.
Expand Up @@ -90,12 +90,10 @@ private[spark] class Client(
// and/or PodBuilder#editSpec() safely.
val basePod = new PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build()
var currentDriverSpec = KubernetesDriverSpec(
driverPod = basePod,
driverContainer = new ContainerBuilder().build(),
driverSparkConf = submissionSparkConf.clone(),
otherKubernetesResources = Seq.empty[HasMetadata])
// This orchestrator determines which steps are necessary to take to resolve varying
// client arguments that are passed in
driverPod = basePod,
driverContainer = new ContainerBuilder().build(),
driverSparkConf = submissionSparkConf.clone(),
otherKubernetesResources = Seq.empty[HasMetadata])
for (nextStep <- submissionSteps) {
currentDriverSpec = nextStep.prepareSubmission(currentDriverSpec)
}
Expand All @@ -119,10 +117,10 @@ private[spark] class Client(
.endSpec()
.build()
Utils.tryWithResource(
kubernetesClient
.pods()
.withName(resolvedDriverPod.getMetadata.getName)
.watch(loggingPodStatusWatcher)) { _ =>
kubernetesClient
.pods()
.withName(resolvedDriverPod.getMetadata.getName)
.watch(loggingPodStatusWatcher)) { _ =>
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
try {
if (currentDriverSpec.otherKubernetesResources.nonEmpty) {
Expand Down
Expand Up @@ -28,27 +28,26 @@ import org.apache.spark.deploy.kubernetes.constants._
* Represents the initial setup required for the driver.
*/
private[spark] class BaseSubmissionStep(
kubernetesAppId: String,
kubernetesResourceNamePrefix: String,
driverLabels: Map[String, String],
dockerImagePullPolicy: String,
appName: String,
mainClass: String,
appArgs: Array[String],
submissionSparkConf: SparkConf)
extends KubernetesSubmissionStep {
kubernetesAppId: String,
kubernetesResourceNamePrefix: String,
driverLabels: Map[String, String],
dockerImagePullPolicy: String,
appName: String,
mainClass: String,
appArgs: Array[String],
submissionSparkConf: SparkConf) extends KubernetesSubmissionStep {

private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(s"$kubernetesResourceNamePrefix-driver")
.getOrElse(s"$kubernetesResourceNamePrefix-driver")
private val driverExtraClasspath = submissionSparkConf.get(
org.apache.spark.internal.config.DRIVER_CLASS_PATH)
// CPU settings
private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
private val driverLimitCores = submissionSparkConf.getOption(KUBERNETES_DRIVER_LIMIT_CORES.key)
private val driverLimitCores = submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)

// Memory settings
private val driverMemoryMb = submissionSparkConf.get(
org.apache.spark.internal.config.DRIVER_MEMORY)
org.apache.spark.internal.config.DRIVER_MEMORY)
private val memoryOverheadMb = submissionSparkConf
.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD)
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMb).toInt,
Expand All @@ -64,6 +63,16 @@ private[spark] class BaseSubmissionStep(
.withValue(classPath)
.build()
}
val driverCustomAnnotations = ConfigurationUtils
.combinePrefixedKeyValuePairsWithDeprecatedConf(
submissionSparkConf,
KUBERNETES_DRIVER_ANNOTATION_PREFIX,
KUBERNETES_DRIVER_ANNOTATIONS,
"annotation")
require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" +
s" Spark bookkeeping operations.")
val allDriverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName)
val driverCpuQuantity = new QuantityBuilder(false)
.withAmount(driverCpuCores)
.build()
Expand Down Expand Up @@ -104,7 +113,7 @@ private[spark] class BaseSubmissionStep(
.editOrNewMetadata()
.withName(kubernetesDriverPodName)
.addToLabels(driverLabels.asJava)
.addToAnnotations(getAllDriverAnnotations(submissionSparkConf).asJava)
.addToAnnotations(allDriverAnnotations.asJava)
.endMetadata()
.withNewSpec()
.withRestartPolicy("Never")
Expand All @@ -119,16 +128,4 @@ private[spark] class BaseSubmissionStep(
driverSparkConf = resolvedSparkConf,
driverContainer = driverContainer)
}

private def getAllDriverAnnotations(sparkConf: SparkConf): Map[String, String] = {
val driverCustomAnnotations = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf(
sparkConf,
KUBERNETES_DRIVER_ANNOTATION_PREFIX,
KUBERNETES_DRIVER_ANNOTATIONS,
"annotation")
require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" +
s" Spark bookkeeping operations.")
driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName)
}
}
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.submitsteps

import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder}
import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._

private[spark] class BaseSubmissionStepSuite extends SparkFunSuite {

private val APP_ID = "spark-app-id"
private val RESOURCE_NAME_PREFIX = "spark"
private val DRIVER_LABELS = Map("labelkey" -> "labelvalue")
private val DOCKER_IMAGE_PULL_POLICY = "IfNotPresent"
private val APP_NAME = "spark-test"
private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
private val APP_ARGS = Array("arg1", "arg2")
private val CUSTOM_ANNOTATION_KEY = "customAnnotation"
private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue"
private val DEPRECATED_CUSTOM_ANNOTATION_KEY = "customAnnotationDeprecated"
private val DEPRECATED_CUSTOM_ANNOTATION_VALUE = "customAnnotationDeprecatedValue"

test("Set all possible configurations from the user.") {
val sparkConf = new SparkConf()
.set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
.set(org.apache.spark.internal.config.DRIVER_CLASS_PATH, "/opt/spark/spark-exmaples.jar")
.set("spark.driver.cores", "2")
.set(KUBERNETES_DRIVER_LIMIT_CORES, "4")
.set(org.apache.spark.internal.config.DRIVER_MEMORY, 256L)
.set(KUBERNETES_DRIVER_MEMORY_OVERHEAD, 200L)
.set(DRIVER_DOCKER_IMAGE, "spark-driver:latest")
.set(s"spark.kubernetes.driver.annotation.$CUSTOM_ANNOTATION_KEY", CUSTOM_ANNOTATION_VALUE)
.set("spark.kubernetes.driver.annotations",
s"$DEPRECATED_CUSTOM_ANNOTATION_KEY=$DEPRECATED_CUSTOM_ANNOTATION_VALUE")
val submissionStep = new BaseSubmissionStep(
APP_ID,
RESOURCE_NAME_PREFIX,
DRIVER_LABELS,
DOCKER_IMAGE_PULL_POLICY,
APP_NAME,
MAIN_CLASS,
APP_ARGS,
sparkConf)
val basePod = new PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build()
val baseDriverSpec = KubernetesDriverSpec(
driverPod = basePod,
driverContainer = new ContainerBuilder().build(),
driverSparkConf = new SparkConf(false),
otherKubernetesResources = Seq.empty[HasMetadata])

val preparedDriverSpec = submissionStep.prepareSubmission(baseDriverSpec)
assert(preparedDriverSpec.driverContainer.getName === DRIVER_CONTAINER_NAME)
assert(preparedDriverSpec.driverContainer.getImage === "spark-driver:latest")
assert(preparedDriverSpec.driverContainer.getImagePullPolicy === DOCKER_IMAGE_PULL_POLICY)
val envs = preparedDriverSpec.driverContainer
.getEnv
.asScala
.map(env => (env.getName, env.getValue))
.toMap
assert(envs.size === 4)
assert(envs(ENV_SUBMIT_EXTRA_CLASSPATH) === "/opt/spark/spark-exmaples.jar")
assert(envs(ENV_DRIVER_MEMORY) === "456m")
assert(envs(ENV_DRIVER_MAIN_CLASS) === MAIN_CLASS)
assert(envs(ENV_DRIVER_ARGS) === "arg1 arg2")
val resourceRequirements = preparedDriverSpec.driverContainer.getResources
val requests = resourceRequirements.getRequests.asScala
assert(requests("cpu").getAmount === "2")
assert(requests("memory").getAmount === "256M")
val limits = resourceRequirements.getLimits.asScala
assert(limits("memory").getAmount === "456M")
assert(limits("cpu").getAmount === "4")
val driverPodMetadata = preparedDriverSpec.driverPod.getMetadata
assert(driverPodMetadata.getName === "spark-driver-pod")
assert(driverPodMetadata.getLabels.asScala === DRIVER_LABELS)
val expectedAnnotations = Map(
CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE,
DEPRECATED_CUSTOM_ANNOTATION_KEY -> DEPRECATED_CUSTOM_ANNOTATION_VALUE,
SPARK_APP_NAME_ANNOTATION -> APP_NAME)
assert(driverPodMetadata.getAnnotations.asScala === expectedAnnotations)
assert(preparedDriverSpec.driverPod.getSpec.getRestartPolicy === "Never")
val resolvedSparkConf = preparedDriverSpec.driverSparkConf.getAll.toMap
val expectedSparkConf = Map(
KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
"spark.app.id" -> APP_ID,
KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX)
assert(resolvedSparkConf === expectedSparkConf)

}
}

0 comments on commit c23bb4c

Please sign in to comment.