Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Spark on Kubernetes - basic submission client #545

Closed
wants to merge 37 commits into from

Conversation

liyinan926
Copy link
Member

@liyinan926 liyinan926 commented Nov 7, 2017

Second draft upstreaming PR that contains the basic submission client implementation and unit tests. Branch spark-kubernetes-3-updated is a clone of spark-kubernetes-3 with latest changes from upstream/master merged in. spark-kubernetes-4 includes all our changes in spark-kubernetes-3.

cc @foxish @mccheah @apache-spark-on-k8s/contributors

@foxish
Copy link
Member

foxish commented Nov 8, 2017

This will be the follow-up to apache#19468

@mccheah
Copy link

mccheah commented Nov 8, 2017

This seems like a large diff, but a quick scan shows everything included as necessary. We need the driver service bootstrap because of changes to master. I think we can reduce the fanciness of the credentials step but that doesn't reduce the complexity by a significant amount.

@foxish
Copy link
Member

foxish commented Nov 8, 2017

One TODO: Add the unit test in #542 to this PR

@liyinan926
Copy link
Member Author

Changed from #542 merged in.

@kimoonkim kimoonkim self-requested a review November 8, 2017 22:24
private[spark] object ClientArguments {
def fromCommandLineArgs(args: Array[String]): ClientArguments = {
var mainAppResource: Option[MainAppResource] = None
val otherPyFiles = Seq.empty[String]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think we're using this here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
}
} catch {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think we want to catch Throwable here - look into NonFatal.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to NonFatal(e).

.addAllToEnv(driverCustomEnvs.asJava)
.addToEnv(driverExtraClasspathEnv.toSeq: _*)
.addNewEnv()
.withName(ENV_DRIVER_MEMORY)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These environment variable keys don't make much sense without the DockerFile which describes the contract the submission client must fulfill.

It might make sense to include the Dockerfile for the driver and the executor in this PR. We shouldn't add the poms that build them - that would make this diff unnecessarily large.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we still put them under docker-minimum-bundle?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Though, for awhile now I've been thinking that there's probably a better name for this submodule =)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just simply call it docker?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think that should be fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@mccheah
Copy link

mccheah commented Nov 8, 2017

A few comments but otherwise this captures the spirit of what we want to have upstream.

COPY bin /opt/spark/bin
COPY sbin /opt/spark/sbin
COPY conf /opt/spark/conf
COPY dockerfiles/spark-base/entrypoint.sh /opt/
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mccheah @foxish dockerfiles/spark-base doesn't make sense if the distribution does not include spark-k8s stuffs. Does this matter?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think we still want to include it to show the projected contents of the image.

@@ -0,0 +1,43 @@
#
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think we want src/main/dockerfiles as the top level directory.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

masterWithoutK8sPrefix
} else {
val resolvedURL = s"https://$masterWithoutK8sPrefix"
logDebug(s"No scheme specified for kubernetes master URL, so defaulting to https. Resolved" +

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not s" plain text...

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps logInfo? this sounds useful

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

submissionSparkConf,
KUBERNETES_DRIVER_LABEL_PREFIX,
"label")
require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), s"Label with key " +

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not s" plain text...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

"label")
require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), s"Label with key " +
s" $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping" +
s" operations.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not s" plain text...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

"annotation")
require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" +
s" Spark bookkeeping operations.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not s" plain text...
and align this with the previous line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
val caCertDataBase64 = safeFileConfToBase64(
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
s"Driver CA cert file provided at %s does not exist or is not a file.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not s" plain text...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

import scala.collection.mutable
import scala.util.control.NonFatal

import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVar, EnvVarBuilder, HasMetadata, OwnerReferenceBuilder, Pod, PodBuilder}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more than 6, use import ._

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

val resolvedDriverPod = new PodBuilder(currentDriverSpec.driverPod)
.editSpec()
.addToContainers(resolvedDriverContainer)
.endSpec()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

align these?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indention probably makes sense as editSpec returns an object different than PodBuilder.

.build())
.endEnv()
.withNewResources()
.addToRequests("cpu", driverCpuQuantity)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens when we require for more than what the agent has? eg. more cores than number of cores in the agent node?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will cause the driver to not be scheduled by the k8s scheduler onto a node until a node with that many cores becomes available in the cluster.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general YARN doesn't handle this case nicely either, but I wonder in k8s we could do better if it is hooked up to an autoscaler or something?

.addToAnnotations(allDriverAnnotations.asJava)
.endMetadata()
.withNewSpec()
.withRestartPolicy("Never")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might be configurable in the future? some sort of driver HA in cluster mode?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think we can make it configurable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have an issue to track this. HA driver would be useful in a streaming context in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1000x

import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.steps.{DriverConfigurationStep, KubernetesDriverSpec}

private[spark] class ClientSuite extends SparkFunSuite with BeforeAndAfter {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't do private[spark] with the Suite classes - it won't run by jenkins
see c052212

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@liyinan926
Copy link
Member Author

liyinan926 commented Nov 9, 2017

If there's no objection, I will squash the commits and push to upstream for review by EOD today. @apache-spark-on-k8s/contributors

@foxish
Copy link
Member

foxish commented Nov 9, 2017

SGTM! We should see how we can make it less confusing for reviewers - because this PR encompasses changes in spark-kubernetes-3.

@liyinan926
Copy link
Member Author

When pushing upstream, I'm gonna remove code for the first PR so this is less confusing.

Copy link
Member

@kimoonkim kimoonkim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial batch of comments, mostly about readability. I still need to look at the various submission/configuration steps.

case (KUBERNETES, CLIENT) =>
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
case (KUBERNETES, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this message could mislead users to think python client mode is supported. Users will try python client mode and will get the error in line 302, which is not a good experience. Is it possible to pattern match case (KUBERNEETES, _) if args.isPython instead, before line 301? Then, the error message can say "python is not supported for Kubernetes".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

case (KUBERNETES, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on Kubernetes clusters.")
case (KUBERNETES, CLUSTER) if args.isR =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. Wonder if we can match case (KUBERNETES, _) if args.isR before line 301 and say "R is not supported in Kubernetes".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

childMainClass = "org.apache.spark.deploy.k8s.submit.Client"
childArgs ++= Array("--primary-java-resource", args.primaryResource)
childArgs ++= Array("--main-class", args.mainClass)
args.childArgs.foreach { arg =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a null check on args.childArgs is done at line 695 and a few others:

 if (args.childArgs != null) {
     args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
 }    

Maybe we should do the same?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -466,6 +473,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case USAGE_ERROR =>
printUsageAndExit(1)

case KUBERNETES_NAMESPACE =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. I would expect a significant parameter like this to come before line 464. Line 464 - 473 are mostly about help and usage errors, it seems.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

mainClass: String,
driverArgs: Array[String])

private[spark] object ClientArguments {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally prefer one empty line between class/object header and the body, but I don't know if this is standard.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


val resolvedDriverJavaOpts = currentDriverSpec
.driverSparkConf
// We don't need this anymore since we just set the JVM options on the environment
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the code that this comment is referring to?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. This comment and the line below it should be removed. Cc @foxish @mccheah to confirm.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it seems it was added in #365. I think we should keep the comment but rephrase it to make it clearer.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment still applies because the lines below take the driver JVM options and append them to the JVM options of the SparkConf. For example, we don't want to have SPARK_DRIVER_JAVA_OPTS have a value of -Dspark.driver.extraJavaOptions=-XX:HeapDumpOnOutOfMemoryError - we just want the -XX:HeapDumpOn... to be set directly on the driver process.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment should be rephrased to clarify the intent of removing extraJavaOptions, along the lines of what Matt said.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

private[spark] object Client {
def run(sparkConf: SparkConf, clientArguments: ClientArguments): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about one empty line before the class body.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
val master = getK8sMasterUrl(sparkConf.get("spark.master"))
val loggingInterval = Option(sparkConf.get(REPORT_INTERVAL)).filter( _ => waitForAppCompletion)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. Don't need a space before _?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

clientArguments.driverArgs,
sparkConf)

Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular line does not read well because the word "KubernetesClient" appears here twice meaning two different things. The reader may fail to distinguish "Spark client" (SparkKubernetesClientFactory), vs "K8s API client" (createKubernetesClient).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed SparkKubernetesClientFactory to KubernetesClientFactory and renamed the method to create.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I myself misread. The two clients meant the same thing :-)

@kimoonkim
Copy link
Member

The latest commit seems to address my comments so far. Thanks!

@liyinan926 liyinan926 force-pushed the spark-kubernetes-4 branch 3 times, most recently from 3ad6d7b to 37c7ad6 Compare November 10, 2017 00:31
@liyinan926
Copy link
Member Author

Squashed the commit and removed scheduler backend code and relevant changes in Yarn-related code.

@liyinan926
Copy link
Member Author

@kimoonkim any more comments on the submission steps?

@liyinan926
Copy link
Member Author

This is under review at apache#19717.

Copy link
Member

@kimoonkim kimoonkim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Submission steps look good to me. Left a few minor comments. PTAL.


import org.apache.spark.SparkConf

private[spark] object ConfigurationUtils {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put an empty line before the body?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

kubernetesClient,
waitForAppCompletion,
appName,
loggingPodStatusWatcher).run()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I almost missed .run() at the end. Maybe we can use aval here:

kubernetesClient =>
        val sparkClient = new Client(
           configurationStepsOrchestrator.getAllConfigurationSteps(),
           sparkConf,
           kubernetesClient,
           waitForAppCompletion,
           appName,
           loggingPodStatusWatcher)
        sparkClient.run()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

mgaido91 and others added 2 commits December 4, 2017 11:07
## What changes were proposed in this pull request?

apache#19696 replaced the deprecated usages for `Date` and `Waiter`, but a few methods were missed. The PR fixes the forgotten deprecated usages.

## How was this patch tested?

existing UTs

Author: Marco Gaido <mgaido@hortonworks.com>

Closes apache#19875 from mgaido91/SPARK-22473_FOLLOWUP.
…n the RDD commit protocol

I have modified SparkHadoopWriter so that executors and the driver always use consistent JobIds during the hadoop commit. Before SPARK-18191, spark always used the rddId, it just incorrectly named the variable stageId. After SPARK-18191, it used the rddId as the jobId on the driver's side, and the stageId as the jobId on the executors' side. With this change executors and the driver will consistently uses rddId as the jobId. Also with this change, during the hadoop commit protocol spark uses  actual stageId to check whether a stage can be committed unlike before that  it was using executors' jobId to do this check.
In addition to the existing unit tests, a test has been added to check whether executors and the driver are using the same JobId. The test failed before this change and passed after applying this fix.

Author: Reza Safi <rezasafi@cloudera.com>

Closes apache#19848 from rezasafi/stagerddsimple.
Marcelo Vanzin and others added 9 commits December 4, 2017 11:05
The main goal of this change is to allow multiple cluster-mode
submissions from the same JVM, without having them end up with
mixed configuration. That is done by extending the SparkApplication
trait, and doing so was reasonably trivial for standalone and
mesos modes.

For YARN mode, there was a complication. YARN used a "SPARK_YARN_MODE"
system property to control behavior indirectly in a whole bunch of
places, mainly in the SparkHadoopUtil / YarnSparkHadoopUtil classes.
Most of the changes here are removing that.

Since we removed support for Hadoop 1.x, some methods that lived in
YarnSparkHadoopUtil can now live in SparkHadoopUtil. The remaining
methods don't need to be part of the class, and can be called directly
from the YarnSparkHadoopUtil object, so now there's a single
implementation of SparkHadoopUtil.

There were two places in the code that relied on  SPARK_YARN_MODE to
make decisions about YARN-specific functionality, and now explicitly check
the master from the configuration for that instead:

* fetching the external shuffle service port, which can come from the YARN
  configuration.

* propagation of the authentication secret using Hadoop credentials. This also
  was cleaned up a little to not need so many methods in `SparkHadoopUtil`.

With those out of the way, actually changing the YARN client
to extend SparkApplication was easy.

Tested with existing unit tests, and also by running YARN apps
with auth and kerberos both on and off in a real cluster.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes apache#19631 from vanzin/SPARK-22372.
@liyinan926 liyinan926 force-pushed the spark-kubernetes-4 branch 3 times, most recently from 51844cc to 0936fbe Compare December 5, 2017 23:19
@liyinan926
Copy link
Member Author

The PR has been merged upstream. Closing this.

@liyinan926 liyinan926 closed this Dec 11, 2017
@foxish foxish deleted the spark-kubernetes-4 branch December 13, 2017 16:48
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet