Skip to content
This repository has been archived by the owner on Apr 24, 2020. It is now read-only.

Commit

Permalink
Add spark.nomad.cluster.passive mode
Browse files Browse the repository at this point in the history
  • Loading branch information
barnardb committed Jun 18, 2018
1 parent 70d3ccb commit ee3f1ef
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 4 deletions.
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.util.Utils
*/
private[spark] case class NomadClusterModeConf(
backend: NomadClusterManagerConf,
passive: Boolean,
expectImmediateScheduling: Boolean,
monitorUntil: Option[Milestone]
)
Expand All @@ -55,6 +56,13 @@ private[spark] object NomadClusterModeConf {
.stringConf
.createOptional

val PASSIVE_MODE =
ConfigBuilder("spark.nomad.cluster.passive")
.doc("When true, spark-submit will register a Nomad job with a driver count of 0. " +
"It is up to the user to change the count to 1 to start the Spark application")
.booleanConf
.createWithDefault(false)

val SYSTEM_EXIT_ON_MAIN_COMPLETION =
ConfigBuilder("spark.nomad.cluster.systemExitOnMainCompletion")
.doc("When true, the application will be terminated if the main thread completes. " +
Expand All @@ -77,6 +85,7 @@ private[spark] object NomadClusterModeConf {

NomadClusterModeConf(
backend = backendConf,
passive = conf.get(PASSIVE_MODE),
expectImmediateScheduling = expectImmediateScheduling,
monitorUntil = conf.get(MONITOR_UNTIL).map(_.toLowerCase() match {
case "submitted" if expectImmediateScheduling =>
Expand Down
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster.nomad
import com.hashicorp.nomad.apimodel.TaskGroup

import org.apache.spark.SparkConf
import org.apache.spark.deploy.nomad.NomadClusterModeConf

private[spark] object DriverTaskGroup extends SparkNomadTaskGroupType("driver", DriverTask) {

Expand All @@ -30,6 +31,13 @@ private[spark] object DriverTaskGroup extends SparkNomadTaskGroupType("driver",
parameters: DriverTask.Parameters
): Unit = {
configureCommonSettings(jobConf, conf, group)

if (conf.get(NomadClusterModeConf.PASSIVE_MODE)) {
group.setCount(0)
} else {
SparkNomadJob.applyDefault(group.getCount)(group.setCount(1))
}

DriverTask.configure(jobConf, conf, findOrAdd(group, DriverTask), parameters)
}

Expand Down
Expand Up @@ -303,7 +303,7 @@ private[spark] object SparkNomadJob extends Logging {
}
}

private def applyDefault[A](getter: => AnyRef)(set: => Unit): Unit =
def applyDefault[A](getter: => AnyRef)(set: => Unit): Unit =
if (getter == null) {
set
}
Expand Down
Expand Up @@ -64,8 +64,6 @@ private[spark] abstract class SparkNomadTaskGroupType(
group.setName(role)
}

group.setCount(1)

val policy = Option(group.getRestartPolicy).getOrElse {
val p = new RestartPolicy
group.setRestartPolicy(p)
Expand Down
Expand Up @@ -186,7 +186,8 @@ abstract class BaseNomadClusterSuite extends SparkFunSuite with BeforeAndAfterEa
extraJars: Seq[File] = Nil,
extraFiles: Seq[File] = Nil,
extraConf: Map[String, String] = Map(),
extraEnv: Map[String, String] = Map()): State = {
extraEnv: Map[String, String] = Map(),
actionBeforeWaiting: SparkAppHandle => Unit = _ => ()): State = {

val propsFile = createConfFile(extraClassPath = extraClassPath, extraConf = extraConf)

Expand Down Expand Up @@ -226,6 +227,9 @@ abstract class BaseNomadClusterSuite extends SparkFunSuite with BeforeAndAfterEa
launcher.setVerbose(true)

val handle = launcher.startApplication()

actionBeforeWaiting(handle)

try {
eventually(timeout(3 minutes), interval(1 second)) {
val state = handle.getState()
Expand Down
Expand Up @@ -23,6 +23,8 @@ import java.nio.charset.StandardCharsets.UTF_8
import scala.concurrent.duration._
import scala.language.postfixOps

import com.hashicorp.nomad.javasdk.NomadApiConfiguration
import com.hashicorp.nomad.scalasdk.NomadScalaApi
import org.apache.commons.io.IOUtils
import org.scalatest.concurrent.Eventually._

Expand Down Expand Up @@ -388,6 +390,45 @@ class NomadClusterSuite extends BaseNomadClusterSuite {
executorBar.get should not be "NOT SET"
}

test("run in passive cluster mode") {
val configBuilder = new NomadApiConfiguration.Builder()
nomadTestAddress.foreach(configBuilder.setAddress)
val nomadApi = NomadScalaApi(configBuilder.build())
val finalState = try {
runSpark(
ClusterMode,
nomadTestApp[EnvironmentPostingTestDriver.type],
appArgs = Seq(httpServer.url("/driver/"), httpServer.url("/executor/"),
"NOMAD_ADDR_some_driver_sidecar_foo",
"NOMAD_ADDR_some_executor_sidecar_bar"
),
extraConf = Map(
"spark.nomad.cluster.passive" -> "true"
),
actionBeforeWaiting = handle => {
eventually(timeout(1 minute), interval(1 second)) {
assert(handle.getState == SparkAppHandle.State.SUBMITTED)
}
val job = nomadApi.jobs.info(handle.getAppId).getValue
val driverGroup = SparkNomadJob.find(job, DriverTaskGroup).get
assert(driverGroup.getCount == 0)
driverGroup.setCount(1)
nomadApi.jobs.register(job)
}
)
} finally {
nomadApi.close()
}
checkResult(finalState,
"/driver/NOMAD_ADDR_some_executor_sidecar_bar" -> "NOT SET",
"/executor/NOMAD_ADDR_some_driver_sidecar_foo" -> "NOT SET")

val driverFoo = httpServer.valuePutToPath("/driver/NOMAD_ADDR_some_driver_sidecar_foo")
val executorBar = httpServer.valuePutToPath("/executor/NOMAD_ADDR_some_executor_sidecar_bar")

driverFoo.get should not be "NOT SET"
executorBar.get should not be "NOT SET"
}

}

Expand Down

0 comments on commit ee3f1ef

Please sign in to comment.