Skip to content

Commit

Permalink
[SPARK-22574][MESOS][SUBMIT] Check submission request parameters
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

PR closed with all the comments -> apache#19793

It solves the problem when submitting a wrong CreateSubmissionRequest to Spark Dispatcher was causing a bad state of Dispatcher and making it inactive as a Mesos framework.

https://issues.apache.org/jira/browse/SPARK-22574

## How was this patch tested?

All spark test passed successfully.

It was tested sending a wrong request (without appArgs) before and after the change. The point is easy, check if the value is null before being accessed.

This was before the change, leaving the dispatcher inactive:

```
Exception in thread "Thread-22" java.lang.NullPointerException
	at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.getDriverCommandValue(MesosClusterScheduler.scala:444)
	at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.buildDriverCommand(MesosClusterScheduler.scala:451)
	at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.org$apache$spark$scheduler$cluster$mesos$MesosClusterScheduler$$createTaskInfo(MesosClusterScheduler.scala:538)
	at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler$$anonfun$scheduleTasks$1.apply(MesosClusterScheduler.scala:570)
	at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler$$anonfun$scheduleTasks$1.apply(MesosClusterScheduler.scala:555)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.scheduleTasks(MesosClusterScheduler.scala:555)
	at org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler.resourceOffers(MesosClusterScheduler.scala:621)
```

And after:

```
  "message" : "Malformed request: org.apache.spark.deploy.rest.SubmitRestProtocolException: Validation of message CreateSubmissionRequest failed!\n\torg.apache.spark.deploy.rest.SubmitRestProtocolMessage.validate(SubmitRestProtocolMessage.scala:70)\n\torg.apache.spark.deploy.rest.SubmitRequestServlet.doPost(RestSubmissionServer.scala:272)\n\tjavax.servlet.http.HttpServlet.service(HttpServlet.java:707)\n\tjavax.servlet.http.HttpServlet.service(HttpServlet.java:790)\n\torg.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:845)\n\torg.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583)\n\torg.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)\n\torg.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)\n\torg.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)\n\torg.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)\n\torg.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)\n\torg.spark_project.jetty.server.Server.handle(Server.java:524)\n\torg.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:319)\n\torg.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:253)\n\torg.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273)\n\torg.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:95)\n\torg.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)\n\torg.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)\n\torg.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)\n\torg.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)\n\torg.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)\n\torg.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)\n\tjava.lang.Thread.run(Thread.java:745)"
```

Author: German Schiavon <germanschiavon@gmail.com>

Closes apache#19966 from Gschiavon/fix-submission-request.

(cherry picked from commit 0bdb4e5)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
Gschiavon authored and MatthewRBruce committed Jul 31, 2018
1 parent f175fbb commit e96da06
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ private[rest] class CreateSubmissionRequest extends SubmitRestProtocolRequest {
super.doValidate()
assert(sparkProperties != null, "No Spark properties set!")
assertFieldIsSet(appResource, "appResource")
assertFieldIsSet(appArgs, "appArgs")
assertFieldIsSet(environmentVariables, "environmentVariables")
assertPropertyIsSet("spark.app.name")
assertPropertyIsBoolean("spark.driver.supervise")
assertPropertyIsNumeric("spark.driver.cores")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class SubmitRestProtocolSuite extends SparkFunSuite {
message.clientSparkVersion = "1.2.3"
message.appResource = "honey-walnut-cherry.jar"
message.mainClass = "org.apache.spark.examples.SparkPie"
message.appArgs = Array("two slices")
message.environmentVariables = Map("PATH" -> "/dev/null")
val conf = new SparkConf(false)
conf.set("spark.app.name", "SparkPie")
message.sparkProperties = conf.getAll.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,17 @@ private[mesos] class MesosSubmitRequestServlet(
private def buildDriverDescription(request: CreateSubmissionRequest): MesosDriverDescription = {
// Required fields, including the main class because python is not yet supported
val appResource = Option(request.appResource).getOrElse {
throw new SubmitRestMissingFieldException("Application jar is missing.")
throw new SubmitRestMissingFieldException("Application jar 'appResource' is missing.")
}
val mainClass = Option(request.mainClass).getOrElse {
throw new SubmitRestMissingFieldException("Main class is missing.")
throw new SubmitRestMissingFieldException("Main class 'mainClass' is missing.")
}
val appArgs = Option(request.appArgs).getOrElse {
throw new SubmitRestMissingFieldException("Application arguments 'appArgs' are missing.")
}
val environmentVariables = Option(request.environmentVariables).getOrElse {
throw new SubmitRestMissingFieldException("Environment variables 'environmentVariables' " +
"are missing.")
}

// Optional fields
Expand All @@ -91,8 +98,6 @@ private[mesos] class MesosSubmitRequestServlet(
val superviseDriver = sparkProperties.get("spark.driver.supervise")
val driverMemory = sparkProperties.get("spark.driver.memory")
val driverCores = sparkProperties.get("spark.driver.cores")
val appArgs = request.appArgs
val environmentVariables = request.environmentVariables
val name = request.sparkProperties.getOrElse("spark.app.name", mainClass)

// Construct driver description
Expand Down

0 comments on commit e96da06

Please sign in to comment.