Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20992][Scheduler] Add support for Nomad as a scheduler backend #18209

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -43,7 +43,7 @@ notifications:
# 5. Run maven install before running lint-java.
install:
- export MAVEN_SKIP_RC=1
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install
- build/mvn -T 4 -q -DskipTests -Pmesos -Pnomad -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install

# 6. Run lint-java.
script:
Expand Down
10 changes: 10 additions & 0 deletions assembly/pom.xml
Expand Up @@ -148,6 +148,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>nomad</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-nomad_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>hive</id>
<dependencies>
Expand Down
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.tags;

import org.scalatest.TagAnnotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@TagAnnotation
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface ExtendedNomadTest { }
21 changes: 15 additions & 6 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Expand Up @@ -426,6 +426,16 @@ private[spark] class SecurityManager(
* we throw an exception.
*/
private def generateSecretKey(): String = {

def generate(): String = {
val rnd = new SecureRandom()
val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE
val secret = new Array[Byte](length)
rnd.nextBytes(secret)

HashCodes.fromBytes(secret).toString
}

if (!isAuthenticationEnabled) {
null
} else if (SparkHadoopUtil.get.isYarnMode) {
Expand All @@ -435,12 +445,7 @@ private[spark] class SecurityManager(
val secretKey = SparkHadoopUtil.get.getSecretKeyFromUserCredentials(SECRET_LOOKUP_KEY)
if (secretKey == null || secretKey.length == 0) {
logDebug("generateSecretKey: yarn mode, secret key from credentials is null")
val rnd = new SecureRandom()
val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE
val secret = new Array[Byte](length)
rnd.nextBytes(secret)

val cookie = HashCodes.fromBytes(secret).toString()
val cookie = generate()
SparkHadoopUtil.get.addSecretKeyToUserCredentials(SECRET_LOOKUP_KEY, cookie)
cookie
} else {
Expand All @@ -452,6 +457,10 @@ private[spark] class SecurityManager(
Option(sparkConf.getenv(SecurityManager.ENV_AUTH_SECRET))
.orElse(sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF)) match {
case Some(value) => value
case None if sparkConf.getOption("spark.master").exists(_.startsWith("nomad")) =>
val secret = generate()
sparkConf.set(SecurityManager.SPARK_AUTH_SECRET_CONF, secret)
secret
case None =>
throw new IllegalArgumentException(
"Error: a secret key must be specified via the " +
Expand Down
83 changes: 66 additions & 17 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.deploy

import java.io.{File, IOException}
import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
import java.net.URL
import java.net.{URI, URL}
import java.nio.file.Files
import java.security.PrivilegedExceptionAction
import java.text.ParseException
Expand Down Expand Up @@ -72,7 +72,8 @@ object SparkSubmit extends CommandLineUtils {
private val STANDALONE = 2
private val MESOS = 4
private val LOCAL = 8
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
private val NOMAD = 16
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | NOMAD

// Deploy modes
private val CLIENT = 1
Expand Down Expand Up @@ -232,9 +233,10 @@ object SparkSubmit extends CommandLineUtils {
YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("nomad") => NOMAD
case m if m.startsWith("local") => LOCAL
case _ =>
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
printErrorAndExit("Master must either be yarn or start with spark, mesos, nomad, local")
-1
}

Expand Down Expand Up @@ -277,6 +279,7 @@ object SparkSubmit extends CommandLineUtils {
}
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isNomadCluster = clusterManager == NOMAD && deployMode == CLUSTER

// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
Expand Down Expand Up @@ -322,7 +325,8 @@ object SparkSubmit extends CommandLineUtils {
// Require all python files to be local, so we can add them to the PYTHONPATH
// In YARN cluster mode, python files are distributed as regular files, which can be non-local.
// In Mesos cluster mode, non-local python files are automatically downloaded by Mesos.
if (args.isPython && !isYarnCluster && !isMesosCluster) {
// In Nomad cluster mode, non-local python files are automatically downloaded by Nomad.
if (args.isPython && !isYarnCluster && !isMesosCluster && !isNomadCluster) {
if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) {
printErrorAndExit(s"Only local python files are supported: ${args.primaryResource}")
}
Expand All @@ -333,7 +337,7 @@ object SparkSubmit extends CommandLineUtils {
}

// Require all R files to be local
if (args.isR && !isYarnCluster && !isMesosCluster) {
if (args.isR && !isYarnCluster && !isMesosCluster && !isNomadCluster) {
if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) {
printErrorAndExit(s"Only local R files are supported: ${args.primaryResource}")
}
Expand Down Expand Up @@ -423,6 +427,11 @@ object SparkSubmit extends CommandLineUtils {
printErrorAndExit("Distributing R packages with mesos cluster is not supported.")
}

// TODO: Support distributing R packages with Nomad cluster
if (args.isR && clusterManager == NOMAD && !RUtils.rPackages.isEmpty) {
printErrorAndExit("Distributing R packages with Nomad cluster is not supported.")
}

// If we're running an R app, set the main class to our specific R runner
if (args.isR && deployMode == CLIENT) {
if (args.primaryResource == SPARKR_SHELL) {
Expand Down Expand Up @@ -466,28 +475,41 @@ object SparkSubmit extends CommandLineUtils {

// Yarn only
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.instances"),
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.jars"),
OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"),
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.principal"),
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.keytab"),

// Nomad only
OptionAssigner(args.distribution, NOMAD, ALL_DEPLOY_MODES,
sysProp = "spark.nomad.sparkDistribution"),
OptionAssigner(args.dockerImage, NOMAD, ALL_DEPLOY_MODES,
sysProp = "spark.nomad.dockerImage"),
OptionAssigner(args.executorCpu, NOMAD, ALL_DEPLOY_MODES, sysProp = "spark.executor.cpu"),
OptionAssigner(args.driverCpu, NOMAD, CLUSTER, sysProp = "spark.driver.cpu"),
OptionAssigner(args.monitorUntil, NOMAD, CLUSTER,
sysProp = "spark.nomad.cluster.monitorUntil"),
OptionAssigner(args.nomadTemplate, NOMAD, ALL_DEPLOY_MODES,
sysProp = "spark.nomad.job.template"),

// Other options
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
OptionAssigner(args.numExecutors, YARN | NOMAD, ALL_DEPLOY_MODES,
sysProp = "spark.executor.instances"),
OptionAssigner(args.executorCores, STANDALONE | YARN | NOMAD, ALL_DEPLOY_MODES,
sysProp = "spark.executor.cores"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | NOMAD, ALL_DEPLOY_MODES,
sysProp = "spark.executor.memory"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | NOMAD, ALL_DEPLOY_MODES,
sysProp = "spark.files"),
OptionAssigner(args.jars, LOCAL, CLIENT, sysProp = "spark.jars"),
OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER,
OptionAssigner(args.jars, STANDALONE | MESOS | NOMAD, ALL_DEPLOY_MODES,
sysProp = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | NOMAD, CLUSTER,
sysProp = "spark.driver.memory"),
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER,
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | NOMAD, CLUSTER,
sysProp = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
sysProp = "spark.driver.supervise"),
Expand Down Expand Up @@ -622,6 +644,33 @@ object SparkSubmit extends CommandLineUtils {
}
}

if (isNomadCluster || (clusterManager == NOMAD && args.isNomadDryRun)) {
childMainClass =
if (args.isNomadDryRun) "org.apache.spark.deploy.nomad.NomadDryRun"
else "org.apache.spark.deploy.nomad.NomadClusterModeLauncher"
if (args.driverExtraClassPath != null) {
childClasspath ++= args.driverExtraClassPath.split(":")
}
if (args.isPython) {
childArgs += ("--primary-py-file", args.primaryResource)
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
}
} else if (args.isR) {
childArgs += ("--primary-r-file", args.primaryResource)
childArgs += ("--class", "org.apache.spark.deploy.RRunner")
} else {
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs += ("--jar", args.primaryResource)
}
childArgs += ("--class", args.mainClass)
}
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
}

// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sysProps.getOrElseUpdate(k, v)
Expand Down Expand Up @@ -651,10 +700,10 @@ object SparkSubmit extends CommandLineUtils {
// explicitly sets `spark.submit.pyFiles` in his/her default properties file.
sysProps.get("spark.submit.pyFiles").foreach { pyFiles =>
val resolvedPyFiles = Utils.resolveURIs(pyFiles)
val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) {
val formattedPyFiles = if (!isYarnCluster && !isMesosCluster && !isNomadCluster) {
PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
} else {
// Ignoring formatting python path in yarn and mesos cluster mode, these two modes
// Ignoring formatting python path in yarn, mesos and nomad cluster mode, these modes
// support dealing with remote python files, they could distribute and add python files
// locally.
resolvedPyFiles
Expand Down Expand Up @@ -812,14 +861,14 @@ object SparkSubmit extends CommandLineUtils {
* Return whether the given primary resource requires running python.
*/
private[deploy] def isPython(res: String): Boolean = {
res != null && res.endsWith(".py") || res == PYSPARK_SHELL
res != null && new URI(res).getPath.endsWith(".py") || res == PYSPARK_SHELL
}

/**
* Return whether the given primary resource requires running R.
*/
private[deploy] def isR(res: String): Boolean = {
res != null && res.endsWith(".R") || res == SPARKR_SHELL
res != null && new URI(res).getPath.endsWith(".R") || res == SPARKR_SHELL
}

private[deploy] def isInternal(res: String): Boolean = {
Expand Down