Skip to content

Commit

Permalink
[SPARK-22962][K8S] Fail fast if submission client local files are used
Browse files Browse the repository at this point in the history
  • Loading branch information
liyinan926 committed Jan 18, 2018
1 parent bf34d66 commit df9cdfb
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 3 deletions.
5 changes: 4 additions & 1 deletion docs/running-on-kubernetes.md
Expand Up @@ -117,7 +117,10 @@ This URI is the location of the example jar that is already in the Docker image.
If your application's dependencies are all hosted in remote locations like HDFS or HTTP servers, they may be referred to
by their appropriate remote URIs. Also, application dependencies can be pre-mounted into custom-built Docker images.
Those dependencies can be added to the classpath by referencing them with `local://` URIs and/or setting the
`SPARK_EXTRA_CLASSPATH` environment variable in your Dockerfiles.
`SPARK_EXTRA_CLASSPATH` environment variable in your Dockerfiles. The `local://` scheme is also required when referring to
dependencies in custom-built Docker images in `spark-submit`. Note that using application dependencies local to the submission
client is currently not yet supported.


### Using Remote Dependencies
When there are application dependencies hosted in remote locations like HDFS or HTTP servers, the driver and executor pods
Expand Down
Expand Up @@ -20,7 +20,7 @@ import java.util.UUID

import com.google.common.primitives.Longs

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
Expand Down Expand Up @@ -117,6 +117,12 @@ private[spark] class DriverConfigOrchestrator(
.map(_.split(","))
.getOrElse(Array.empty[String])

if (existSubmissionLocalFiles(sparkJars) || existSubmissionLocalFiles(sparkFiles)) {
throw new SparkException("The Kubernetes mode does not yet support application " +
"dependencies local to the submission client. It currently only allows application" +
"dependencies locally in the container or that can be downloaded remotely.")
}

val dependencyResolutionStep = if (sparkJars.nonEmpty || sparkFiles.nonEmpty) {
Seq(new DependencyResolutionStep(
sparkJars,
Expand Down Expand Up @@ -162,6 +168,12 @@ private[spark] class DriverConfigOrchestrator(
initContainerBootstrapStep
}

private def existSubmissionLocalFiles(files: Seq[String]): Boolean = {
files.exists { uri =>
Utils.resolveURI(uri).getScheme == "file"
}
}

private def existNonContainerLocalFiles(files: Seq[String]): Boolean = {
files.exists { uri =>
Utils.resolveURI(uri).getScheme != "local"
Expand Down
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.deploy.k8s.submit

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.submit.steps._

Expand Down Expand Up @@ -117,6 +117,35 @@ class DriverConfigOrchestratorSuite extends SparkFunSuite {
classOf[DriverMountSecretsStep])
}

test("Submission using client local dependencies") {
val sparkConf = new SparkConf(false)
.set(CONTAINER_IMAGE, DRIVER_IMAGE)
var orchestrator = new DriverConfigOrchestrator(
APP_ID,
LAUNCH_TIME,
Some(JavaMainAppResource("file:///var/apps/jars/main.jar")),
APP_NAME,
MAIN_CLASS,
APP_ARGS,
sparkConf)
assertThrows[SparkException] {
orchestrator.getAllConfigurationSteps
}

sparkConf.set("spark.files", "/path/to/file1,/path/to/file2")
orchestrator = new DriverConfigOrchestrator(
APP_ID,
LAUNCH_TIME,
Some(JavaMainAppResource("local:///var/apps/jars/main.jar")),
APP_NAME,
MAIN_CLASS,
APP_ARGS,
sparkConf)
assertThrows[SparkException] {
orchestrator.getAllConfigurationSteps
}
}

private def validateStepTypes(
orchestrator: DriverConfigOrchestrator,
types: Class[_ <: DriverConfigurationStep]*): Unit = {
Expand Down

0 comments on commit df9cdfb

Please sign in to comment.