Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22962][K8S] Fail fast if submission client local files are used #20320

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ This URI is the location of the example jar that is already in the Docker image.
If your application's dependencies are all hosted in remote locations like HDFS or HTTP servers, they may be referred to
by their appropriate remote URIs. Also, application dependencies can be pre-mounted into custom-built Docker images.
Those dependencies can be added to the classpath by referencing them with `local://` URIs and/or setting the
`SPARK_EXTRA_CLASSPATH` environment variable in your Dockerfiles.
`SPARK_EXTRA_CLASSPATH` environment variable in your Dockerfiles. The `local://` scheme is also required when referring to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point above already covers that local:// is needed with custom-built images.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but that's about adding to the classpath. I wanted to make it more specific and clearer.

dependencies in custom-built Docker images in `spark-submit`. Note that using application dependencies local to the submission
client is currently not yet supported.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"application dependencies from the local file system"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.



### Using Remote Dependencies
When there are application dependencies hosted in remote locations like HDFS or HTTP servers, the driver and executor pods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.util.UUID

import com.google.common.primitives.Longs

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
Expand Down Expand Up @@ -117,6 +117,12 @@ private[spark] class DriverConfigOrchestrator(
.map(_.split(","))
.getOrElse(Array.empty[String])

if (existSubmissionLocalFiles(sparkJars) || existSubmissionLocalFiles(sparkFiles)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a TODO here if this is planned to be supported in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw new SparkException("The Kubernetes mode does not yet support application " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd shorten this to just "Kubernetes mode does not support referencing application dependencies in the local file system".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

"dependencies local to the submission client. It currently only allows application" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra space in the end of line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

"dependencies locally in the container or that can be downloaded remotely.")
}

val dependencyResolutionStep = if (sparkJars.nonEmpty || sparkFiles.nonEmpty) {
Seq(new DependencyResolutionStep(
sparkJars,
Expand Down Expand Up @@ -162,6 +168,12 @@ private[spark] class DriverConfigOrchestrator(
initContainerBootstrapStep
}

private def existSubmissionLocalFiles(files: Seq[String]): Boolean = {
files.exists { uri =>
Utils.resolveURI(uri).getScheme == "file"
}
}

private def existNonContainerLocalFiles(files: Seq[String]): Boolean = {
files.exists { uri =>
Utils.resolveURI(uri).getScheme != "local"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.deploy.k8s.submit

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.submit.steps._

Expand Down Expand Up @@ -117,6 +117,35 @@ class DriverConfigOrchestratorSuite extends SparkFunSuite {
classOf[DriverMountSecretsStep])
}

test("Submission using client local dependencies") {
val sparkConf = new SparkConf(false)
.set(CONTAINER_IMAGE, DRIVER_IMAGE)
var orchestrator = new DriverConfigOrchestrator(
APP_ID,
LAUNCH_TIME,
Some(JavaMainAppResource("file:///var/apps/jars/main.jar")),
APP_NAME,
MAIN_CLASS,
APP_ARGS,
sparkConf)
assertThrows[SparkException] {
orchestrator.getAllConfigurationSteps
}

sparkConf.set("spark.files", "/path/to/file1,/path/to/file2")
orchestrator = new DriverConfigOrchestrator(
APP_ID,
LAUNCH_TIME,
Some(JavaMainAppResource("local:///var/apps/jars/main.jar")),
APP_NAME,
MAIN_CLASS,
APP_ARGS,
sparkConf)
assertThrows[SparkException] {
orchestrator.getAllConfigurationSteps
}
}

private def validateStepTypes(
orchestrator: DriverConfigOrchestrator,
types: Class[_ <: DriverConfigurationStep]*): Unit = {
Expand Down