Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2454] Do not ship spark home to Workers #1734

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ private[spark] class ApplicationDescription(
val maxCores: Option[Int],
val memoryPerSlave: Int,
val command: Command,
val sparkHome: Option[String],
var appUiUrl: String,
val eventLogDir: Option[String] = None)
extends Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ private[spark] object JsonProtocol {
("cores" -> obj.maxCores) ~
("memoryperslave" -> obj.memoryPerSlave) ~
("user" -> obj.user) ~
("sparkhome" -> obj.sparkHome) ~
("command" -> obj.command.toString)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ private[spark] object TestClient {
val conf = new SparkConf
val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
conf = conf, securityManager = new SecurityManager(conf))
val desc = new ApplicationDescription(
"TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(),
Seq(), Seq(), Seq()), Some("dummy-spark-home"), "ignored")
val desc = new ApplicationDescription("TestClient", Some(1), 512,
Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored")
val listener = new TestListener
val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf)
client.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ private[spark] class Worker(
@volatile var registered = false
@volatile var connected = false
val workerId = generateWorkerId()
val sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
val sparkHome =
new File(sys.props.get("spark.test.home").orElse(sys.env.get("SPARK_HOME")).getOrElse("."))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this needed to make the tests work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, otherwise all tests with local-cluster fail

var workDir: File = null
val executors = new HashMap[String, ExecutorRunner]
val finishedExecutors = new HashMap[String, ExecutorRunner]
Expand Down Expand Up @@ -233,9 +234,7 @@ private[spark] class Worker(
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
self, workerId, host,
appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
workDir, akkaUrl, conf, ExecutorState.RUNNING)
self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ private[spark] class SparkDeploySchedulerBackend(
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
val sparkHome = sc.getSparkHome()
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))

client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/DriverSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import scala.language.postfixOps
class DriverSuite extends FunSuite with Timeouts {

test("driver should exit after finishing") {
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
val sparkHome = sys.props("spark.test.home")
// Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
forAll(masters) { (master: String) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class JsonProtocolSuite extends FunSuite {

def createAppDesc(): ApplicationDescription = {
val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq())
new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl")
new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl")
}

def createAppInfo() : ApplicationInfo = {
Expand Down Expand Up @@ -169,8 +169,7 @@ object JsonConstants {
val appDescJsonStr =
"""
|{"name":"name","cores":4,"memoryperslave":1234,
|"user":"%s","sparkhome":"sparkHome",
|"command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),List())"}
|"user":"%s","command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),List())"}
""".format(System.getProperty("user.name", "<unknown>")).stripMargin

val executorRunnerJsonStr =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {

// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
def runSparkSubmit(args: Seq[String]): String = {
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
val sparkHome = sys.props("spark.test.home")
Utils.executeAndGetOutput(
Seq("./bin/spark-submit") ++ args,
new File(sparkHome),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ import org.apache.spark.SparkConf
class ExecutorRunnerTest extends FunSuite {
test("command includes appId") {
def f(s:String) = new File(s)
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home"))
val sparkHome = sys.props("spark.test.home")
val appDesc = new ApplicationDescription("app name", Some(8), 500,
Command("foo", Seq(), Map(), Seq(), Seq(), Seq()),
sparkHome, "appUiUrl")
Command("foo", Seq(), Map(), Seq(), Seq(), Seq()), "appUiUrl")
val appId = "12345-worker321-9876"
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")),
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome),
f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)

assert(er.getCommandSeq.last === appId)
Expand Down
2 changes: 1 addition & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ object TestSettings {
lazy val settings = Seq (
// Fork new JVMs for tests and set Java options for those
fork := true,
javaOptions in Test += "-Dspark.home=" + sparkHome,
javaOptions in Test += "-Dspark.test.home=" + sparkHome,
javaOptions in Test += "-Dspark.testing=1",
javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true",
javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark")
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
u'local'
>>> sc.appName
u'My app'
>>> sc.sparkHome is None
>>> sc.sparkHome is not None
True

>>> conf = SparkConf(loadDefaults=False)
Expand Down
6 changes: 2 additions & 4 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
@param serializer: The serializer for RDDs.
@param conf: A L{SparkConf} object setting Spark properties.
@param gateway: Use an existing gateway and JVM, otherwise a new JVM
will be instatiated.
will be instantiated.


>>> from pyspark.context import SparkContext
Expand Down Expand Up @@ -126,8 +126,6 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
self._conf.setMaster(master)
if appName:
self._conf.setAppName(appName)
if sparkHome:
self._conf.setSparkHome(sparkHome)
if environment:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why we should disable this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of an effort to shift away from calling setSparkHome within Spark completely. After all, all that does is set spark.home, which we established is only for Mesos.

for key, value in environment.iteritems():
self._conf.setExecutorEnv(key, value)
Expand All @@ -144,7 +142,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
# the classpath or an external config file
self.master = self._conf.get("spark.master")
self.appName = self._conf.get("spark.app.name")
self.sparkHome = self._conf.get("spark.home", None)
self.sparkHome = os.environ.get("SPARK_HOME")
for (k, v) in self._conf.getAll():
if k.startswith("spark.executorEnv."):
varName = k[len("spark.executorEnv."):]
Expand Down
3 changes: 0 additions & 3 deletions repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -969,9 +969,6 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
if (execUri != null) {
conf.set("spark.executor.uri", execUri)
}
if (System.getenv("SPARK_HOME") != null) {
conf.setSparkHome(System.getenv("SPARK_HOME"))
}
sparkContext = new SparkContext(conf)
logInfo("Created spark context..")
sparkContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
extends Logging with Serializable {
val master = ssc.sc.master
val framework = ssc.sc.appName
val sparkHome = ssc.sc.getSparkHome.getOrElse(null)
val jars = ssc.sc.jars
val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
Expand Down