Skip to content

Commit

Permalink
propagate user-set Spark configurations to the driver pod (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikerlandson authored and foxish committed Dec 2, 2016
1 parent 7566d27 commit 60700c0
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 24 deletions.
Expand Up @@ -35,6 +35,11 @@ import org.apache.spark.util.Utils

import scala.util.Random

private[spark] object KubernetesClusterScheduler {
def defaultNameSpace = "default"
def defaultServiceAccountName = "default"
}

/**
* This is a simple extension to ClusterScheduler
* */
Expand All @@ -45,14 +50,26 @@ private[spark] class KubernetesClusterScheduler(conf: SparkConf)
private val DEFAULT_CORES = 1.0

logInfo("Created KubernetesClusterScheduler instance")

var client = setupKubernetesClient()
val driverName = s"spark-driver-${Random.alphanumeric take 5 mkString("")}".toLowerCase()
val svcName = s"spark-svc-${Random.alphanumeric take 5 mkString("")}".toLowerCase()
val instances = conf.get(EXECUTOR_INSTANCES).getOrElse(1)
val serviceAccountName = conf.get("spark.kubernetes.serviceAccountName", "default")
val nameSpace = conf.get("spark.kubernetes.namespace", "default")

logWarning("instances: " + instances)
val nameSpace = conf.get(
"spark.kubernetes.namespace",
KubernetesClusterScheduler.defaultNameSpace)
val serviceAccountName = conf.get(
"spark.kubernetes.serviceAccountName",
KubernetesClusterScheduler.defaultServiceAccountName)

// Anything that should either not be passed to driver config in the cluster, or
// that is going to be explicitly managed as command argument to the driver pod
val confBlackList = scala.collection.Set(
"spark.master",
"spark.app.name",
"spark.submit.deployMode",
"spark.executor.jar",
"spark.dynamicAllocation.enabled",
"spark.shuffle.service.enabled")

def start(args: ClientArguments): Unit = {
startDriver(client, args)
Expand All @@ -73,7 +90,7 @@ private[spark] class KubernetesClusterScheduler(conf: SparkConf)
val driverDescription = buildDriverDescription(args)

// image needs to support shim scripts "/opt/driver.sh" and "/opt/executor.sh"
val sparkDriverImage = conf.getOption("spark.kubernetes.sparkImage").getOrElse {
val sparkImage = conf.getOption("spark.kubernetes.sparkImage").getOrElse {
// TODO: this needs to default to some standard Apache Spark image
throw new SparkException("Spark image not set. Please configure spark.kubernetes.sparkImage")
}
Expand All @@ -91,10 +108,10 @@ private[spark] class KubernetesClusterScheduler(conf: SparkConf)
s"--class=${args.userClass}",
s"--master=$kubernetesHost",
s"--executor-memory=${driverDescription.mem}",
s"--conf=spark.executor.jar=$clientJarUri",
s"--conf=spark.executor.instances=$instances",
s"--conf=spark.kubernetes.namespace=$nameSpace",
s"--conf=spark.kubernetes.driver.image=$sparkDriverImage")
s"--conf spark.executor.jar=$clientJarUri")

submitArgs ++= conf.getAll.filter { case (name, _) => !confBlackList.contains(name) }
.map { case (name, value) => s"--conf ${name}=${value}" }

if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
submitArgs ++= Vector(
Expand All @@ -117,7 +134,7 @@ private[spark] class KubernetesClusterScheduler(conf: SparkConf)
.withServiceAccount(serviceAccountName)
.addNewContainer()
.withName("spark-driver")
.withImage(sparkDriverImage)
.withImage(sparkImage)
.withImagePullPolicy("Always")
.withCommand(s"/opt/driver.sh")
.withArgs(submitArgs :_*)
Expand Down
Expand Up @@ -50,15 +50,17 @@ private[spark] class KubernetesClusterSchedulerBackend(
var shutdownToPod = mutable.Map.empty[String, String] // pending shutdown
var executorID = 0

val sparkDriverImage = sc.getConf.get("spark.kubernetes.driver.image")
val clientJarUri = sc.getConf.get("spark.executor.jar")
val ns = sc.getConf.get("spark.kubernetes.namespace")
val dynamicExecutors = Utils.isDynamicAllocationEnabled(sc.getConf)
val sparkImage = conf.get("spark.kubernetes.sparkImage")
val clientJarUri = conf.get("spark.executor.jar")
val ns = conf.get(
"spark.kubernetes.namespace",
KubernetesClusterScheduler.defaultNameSpace)
val dynamicExecutors = Utils.isDynamicAllocationEnabled(conf)

// executor back-ends take their configuration this way
if (dynamicExecutors) {
sc.getConf.setExecutorEnv("spark.dynamicAllocation.enabled", "true")
sc.getConf.setExecutorEnv("spark.shuffle.service.enabled", "true")
conf.setExecutorEnv("spark.dynamicAllocation.enabled", "true")
conf.setExecutorEnv("spark.shuffle.service.enabled", "true")
}

override def start(): Unit = {
Expand Down Expand Up @@ -159,12 +161,7 @@ private[spark] class KubernetesClusterSchedulerBackend(

initialNumExecutors
} else {
val targetNumExecutors =
sys.env
.get("SPARK_EXECUTOR_INSTANCES")
.map(_.toInt)
.getOrElse(numExecutors)
conf.get(EXECUTOR_INSTANCES).getOrElse(targetNumExecutors)
conf.get(EXECUTOR_INSTANCES).getOrElse(numExecutors)
}
}

Expand Down Expand Up @@ -195,7 +192,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
.withNewSpec()
.withRestartPolicy("OnFailure")

.addNewContainer().withName("spark-executor").withImage(sparkDriverImage)
.addNewContainer().withName("spark-executor").withImage(sparkImage)
.withImagePullPolicy("IfNotPresent")
.withCommand("/opt/executor.sh")
.withArgs(submitArgs :_*)
Expand Down

0 comments on commit 60700c0

Please sign in to comment.