Skip to content

Commit

Permalink
[SPARK-23153][K8S] Support client dependencies with a Hadoop Compatib…
Browse files Browse the repository at this point in the history
…le File System

## What changes were proposed in this pull request?
- solves the current issue with --packages in cluster mode (there is no ticket for it). Also note of some [issues](https://issues.apache.org/jira/browse/SPARK-22657) of the past here when hadoop libs are used at the spark submit side.
- supports spark.jars, spark.files, app jar.

It works as follows:
Spark submit uploads the deps to the HCFS. Then the driver serves the deps via the Spark file server.
No hcfs uris are propagated.

The related design document is [here](https://docs.google.com/document/d/1peg_qVhLaAl4weo5C51jQicPwLclApBsdR1To2fgc48/edit). the next option to add is the RSS but has to be improved given the discussion in the past about it (Spark 2.3).
## How was this patch tested?

- Run integration test suite.
- Run an example using S3:

```
 ./bin/spark-submit \
...
 --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.6 \
 --deploy-mode cluster \
 --name spark-pi \
 --class org.apache.spark.examples.SparkPi \
 --conf spark.executor.memory=1G \
 --conf spark.kubernetes.namespace=spark \
 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa \
 --conf spark.driver.memory=1G \
 --conf spark.executor.instances=2 \
 --conf spark.sql.streaming.metricsEnabled=true \
 --conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp" \
 --conf spark.kubernetes.container.image.pullPolicy=Always \
 --conf spark.kubernetes.container.image=skonto/spark:k8s-3.0.0 \
 --conf spark.kubernetes.file.upload.path=s3a://fdp-stavros-test \
 --conf spark.hadoop.fs.s3a.access.key=... \
 --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
 --conf spark.hadoop.fs.s3a.fast.upload=true \
 --conf spark.kubernetes.executor.deleteOnTermination=false \
 --conf spark.hadoop.fs.s3a.secret.key=... \
 --conf spark.files=client:///...resolv.conf \
file:///my.jar **
```
Added integration tests based on [Ceph nano](https://github.com/ceph/cn). Looks very [active](http://www.sebastien-han.fr/blog/2019/02/24/Ceph-nano-is-getting-better-and-better/).
Unfortunately minio needs hadoop >= 2.8.

Closes #23546 from skonto/support-client-deps.

Authored-by: Stavros Kontopoulos <stavros.kontopoulos@lightbend.com>
Signed-off-by: Erik Erlandson <eerlands@redhat.com>
  • Loading branch information
Stavros Kontopoulos authored and erikerlandson committed May 22, 2019
1 parent 6c5827c commit 5e74570
Show file tree
Hide file tree
Showing 14 changed files with 545 additions and 75 deletions.
91 changes: 71 additions & 20 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Expand Up @@ -29,6 +29,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.util.{Properties, Try}

import org.apache.commons.io.FilenameUtils
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.{FileSystem, Path}
Expand Down Expand Up @@ -222,7 +223,7 @@ private[spark] class SparkSubmit extends Logging {
// Return values
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sparkConf = new SparkConf()
val sparkConf = args.toSparkConf()
var childMainClass = ""

// Set the cluster manager
Expand Down Expand Up @@ -313,6 +314,9 @@ private[spark] class SparkSubmit extends Logging {
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT
val isKubernetesClusterModeDriver = isKubernetesClient &&
sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)
val isMesosClient = clusterManager == MESOS && deployMode == CLIENT

if (!isMesosCluster && !isStandAloneCluster) {
Expand All @@ -323,9 +327,25 @@ private[spark] class SparkSubmit extends Logging {
args.ivySettingsPath)

if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython || isInternal(args.primaryResource)) {
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
// In K8s client mode, when in the driver, add resolved jars early as we might need
// them at the submit time for artifact downloading.
// For example we might use the dependencies for downloading
// files from a Hadoop Compatible fs eg. S3. In this case the user might pass:
// --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6
if (isKubernetesClusterModeDriver) {
val loader = getSubmitClassLoader(sparkConf)
for (jar <- resolvedMavenCoordinates.split(",")) {
addJarToClasspath(jar, loader)
}
} else if (isKubernetesCluster) {
// We need this in K8s cluster mode so that we can upload local deps
// via the k8s application, like in cluster mode driver
childClasspath ++= resolvedMavenCoordinates.split(",")
} else {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython || isInternal(args.primaryResource)) {
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
}
}
}

Expand Down Expand Up @@ -380,6 +400,17 @@ private[spark] class SparkSubmit extends Logging {
localPyFiles = Option(args.pyFiles).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
}.orNull

if (isKubernetesClusterModeDriver) {
// Replace with the downloaded local jar path to avoid propagating hadoop compatible uris.
// Executors will get the jars from the Spark file server.
// Explicitly download the related files here
args.jars = renameResourcesToLocalFS(args.jars, localJars)
val localFiles = Option(args.files).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
}.orNull
args.files = renameResourcesToLocalFS(args.files, localFiles)
}
}

// When running in YARN, for some remote resources with scheme:
Expand Down Expand Up @@ -535,11 +566,13 @@ private[spark] class SparkSubmit extends Logging {
OptionAssigner(args.pyFiles, ALL_CLUSTER_MGRS, CLUSTER, confKey = SUBMIT_PYTHON_FILES.key),

// Propagate attributes for dependency resolution at the driver side
OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.packages"),
OptionAssigner(args.repositories, STANDALONE | MESOS, CLUSTER,
confKey = "spark.jars.repositories"),
OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.ivy"),
OptionAssigner(args.packagesExclusions, STANDALONE | MESOS,
OptionAssigner(args.packages, STANDALONE | MESOS | KUBERNETES,
CLUSTER, confKey = "spark.jars.packages"),
OptionAssigner(args.repositories, STANDALONE | MESOS | KUBERNETES,
CLUSTER, confKey = "spark.jars.repositories"),
OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS | KUBERNETES,
CLUSTER, confKey = "spark.jars.ivy"),
OptionAssigner(args.packagesExclusions, STANDALONE | MESOS | KUBERNETES,
CLUSTER, confKey = "spark.jars.excludes"),

// Yarn only
Expand Down Expand Up @@ -777,6 +810,21 @@ private[spark] class SparkSubmit extends Logging {
(childArgs, childClasspath, sparkConf, childMainClass)
}

private def renameResourcesToLocalFS(resources: String, localResources: String): String = {
if (resources != null && localResources != null) {
val localResourcesSeq = Utils.stringToSeq(localResources)
Utils.stringToSeq(resources).map { resource =>
val filenameRemote = FilenameUtils.getName(new URI(resource).getPath)
localResourcesSeq.find { localUri =>
val filenameLocal = FilenameUtils.getName(new URI(localUri).getPath)
filenameRemote == filenameLocal
}.getOrElse(resource)
}.mkString(",")
} else {
resources
}
}

// [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with
// renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos or Kubernetes
// mode, we must trick it into thinking we're YARN.
Expand All @@ -787,6 +835,19 @@ private[spark] class SparkSubmit extends Logging {
sparkConf.set(key, shortUserName)
}

private def getSubmitClassLoader(sparkConf: SparkConf): MutableURLClassLoader = {
val loader =
if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
loader
}

/**
* Run the main method of the child class using the submit arguments.
*
Expand Down Expand Up @@ -814,17 +875,7 @@ private[spark] class SparkSubmit extends Logging {
logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
logInfo("\n")
}

val loader =
if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)

val loader = getSubmitClassLoader(sparkConf)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
Expand Down
Expand Up @@ -1325,12 +1325,12 @@ class SparkSubmitSuite
"--class", "Foo",
"app.jar")
val conf = new SparkSubmitArguments(clArgs).toSparkConf()
Seq(
testConf,
masterConf
).foreach { case (k, v) =>
conf.get(k) should be (v)
}
Seq(
testConf,
masterConf
).foreach { case (k, v) =>
conf.get(k) should be (v)
}
}
}

Expand Down
37 changes: 34 additions & 3 deletions docs/running-on-kubernetes.md
Expand Up @@ -208,8 +208,31 @@ If your application's dependencies are all hosted in remote locations like HDFS
by their appropriate remote URIs. Also, application dependencies can be pre-mounted into custom-built Docker images.
Those dependencies can be added to the classpath by referencing them with `local://` URIs and/or setting the
`SPARK_EXTRA_CLASSPATH` environment variable in your Dockerfiles. The `local://` scheme is also required when referring to
dependencies in custom-built Docker images in `spark-submit`. Note that using application dependencies from the submission
client's local file system is currently not yet supported.
dependencies in custom-built Docker images in `spark-submit`. We support dependencies from the submission
client's local file system using the `file://` scheme or without a scheme (using a full path), where the destination should be a Hadoop compatible filesystem.
A typical example of this using S3 is via passing the following options:

```
...
--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.6
--conf spark.kubernetes.file.upload.path=s3a://<s3-bucket>/path
--conf spark.hadoop.fs.s3a.access.key=...
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
--conf spark.hadoop.fs.s3a.fast.upload=true
--conf spark.hadoop.fs.s3a.secret.key=....
--conf spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp
file:///full/path/to/app.jar
```
The app jar file will be uploaded to the S3 and then when the driver is launched it will be downloaded
to the driver pod and will be added to its classpath. Spark will generate a subdir under the upload path with a random name
to avoid conflicts with spark apps running in parallel. User could manage the subdirs created according to his needs.

The client scheme is supported for the application jar, and dependencies specified by properties `spark.jars` and `spark.files`.

Important: all client-side dependencies will be uploaded to the given path with a flat directory structure so
file names must be unique otherwise files will be overwritten. Also make sure in the derived k8s image default ivy dir
has the required access rights or modify the settings as above. The latter is also important if you use `--packages` in
cluster mode.

## Secret Management
Kubernetes [Secrets](https://kubernetes.io/docs/concepts/configuration/secret/) can be used to provide credentials for a
Expand Down Expand Up @@ -455,7 +478,6 @@ There are several Spark on Kubernetes features that are currently being worked o
Some of these include:

* Dynamic Resource Allocation and External Shuffle Service
* Local File Dependency Management
* Job Queues and Resource Management

# Configuration
Expand Down Expand Up @@ -1078,6 +1100,15 @@ See the [configuration page](configuration.html) for information on Spark config
Specify the grace period in seconds when deleting a Spark application using spark-submit.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.file.upload.path</code></td>
<td>(none)</td>
<td>
Path to store files at the spark submit side in cluster mode. For example:
<code>spark.kubernetes.file.upload.path=s3a://<s3-bucket>/path</code>
File should specified as <code>file://path/to/file </code> or absolute path.
</td>
</tr>
</table>

#### Pod template properties
Expand Down
Expand Up @@ -338,6 +338,13 @@ private[spark] object Config extends Logging {
.timeConf(TimeUnit.SECONDS)
.createOptional

val KUBERNETES_FILE_UPLOAD_PATH =
ConfigBuilder("spark.kubernetes.file.upload.path")
.doc("Hadoop compatible file system path where files from the local file system " +
"will be uploded to in cluster mode.")
.stringConf
.createOptional

val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
Expand Down
Expand Up @@ -16,18 +16,25 @@
*/
package org.apache.spark.deploy.k8s

import java.io.File
import java.io.{File, IOException}
import java.net.URI
import java.security.SecureRandom
import java.util.UUID

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.commons.codec.binary.Hex
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.{Clock, SystemClock, Utils}
import org.apache.spark.util.Utils.getHadoopFileSystem

private[spark] object KubernetesUtils extends Logging {

Expand Down Expand Up @@ -209,4 +216,77 @@ private[spark] object KubernetesUtils extends Logging {
Hex.encodeHexString(random) + time
}

/**
* Upload files and modify their uris
*/
def uploadAndTransformFileUris(fileUris: Iterable[String], conf: Option[SparkConf] = None)
: Iterable[String] = {
fileUris.map { uri =>
uploadFileUri(uri, conf)
}
}

private def isLocalDependency(uri: URI): Boolean = {
uri.getScheme match {
case null | "file" => true
case _ => false
}
}

def isLocalAndResolvable(resource: String): Boolean = {
resource != SparkLauncher.NO_RESOURCE &&
isLocalDependency(Utils.resolveURI(resource))
}

def renameMainAppResource(resource: String, conf: SparkConf): String = {
if (isLocalAndResolvable(resource)) {
SparkLauncher.NO_RESOURCE
} else {
resource
}
}

def uploadFileUri(uri: String, conf: Option[SparkConf] = None): String = {
conf match {
case Some(sConf) =>
if (sConf.get(KUBERNETES_FILE_UPLOAD_PATH).isDefined) {
val fileUri = Utils.resolveURI(uri)
try {
val hadoopConf = SparkHadoopUtil.get.newConfiguration(sConf)
val uploadPath = sConf.get(KUBERNETES_FILE_UPLOAD_PATH).get
val fs = getHadoopFileSystem(Utils.resolveURI(uploadPath), hadoopConf)
val randomDirName = s"spark-upload-${UUID.randomUUID()}"
fs.mkdirs(new Path(s"${uploadPath}/${randomDirName}"))
val targetUri = s"${uploadPath}/${randomDirName}/${fileUri.getPath.split("/").last}"
log.info(s"Uploading file: ${fileUri.getPath} to dest: $targetUri...")
uploadFileToHadoopCompatibleFS(new Path(fileUri.getPath), new Path(targetUri), fs)
targetUri
} catch {
case e: Exception =>
throw new SparkException(s"Uploading file ${fileUri.getPath} failed...", e)
}
} else {
throw new SparkException("Please specify " +
"spark.kubernetes.file.upload.path property.")
}
case _ => throw new SparkException("Spark configuration is missing...")
}
}

/**
* Upload a file to a Hadoop-compatible filesystem.
*/
private def uploadFileToHadoopCompatibleFS(
src: Path,
dest: Path,
fs: FileSystem,
delSrc : Boolean = false,
overwrite: Boolean = true): Unit = {
try {
fs.copyFromLocalFile(false, true, src, dest)
} catch {
case e: IOException =>
throw new SparkException(s"Error uploading file ${src.getName}", e)
}
}
}
Expand Up @@ -27,7 +27,6 @@ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -156,6 +155,15 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.resourceNamePrefix,
KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
// try upload local, resolvable files to a hadoop compatible file system
Seq(JARS, FILES).foreach { key =>
val value = conf.get(key).filter(uri => KubernetesUtils.isLocalAndResolvable(uri))
val resolved = KubernetesUtils.uploadAndTransformFileUris(value, Some(conf.sparkConf))
if (resolved.nonEmpty) {
additionalProps.put(key.key, resolved.mkString(","))
}
}
additionalProps.toMap
}
}

4 comments on commit 5e74570

@pacuna
Copy link

@pacuna pacuna commented on 5e74570 Jun 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@skonto any plans on adding support for pyspark using --py-files as well?

Thanks.

@erikerlandson
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pacuna I have filed a JIRA to track --py-files: https://issues.apache.org/jira/browse/SPARK-27936

@pacuna
Copy link

@pacuna pacuna commented on 5e74570 Jun 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@erikerlandson Thanks! Quick question: is it possible to submit remote --py-files? I don't see any mention of that on the k8s docs and I haven't been able to made it work with cluster mode. Works fine in client mode with local py files.

@ejblanco
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be supported in Spark 2.X ? I'm having issues with this right now...

Please sign in to comment.