Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23146][WIP] Support client mode for Kubernetes in Out-Cluster mode #20451

Closed
wants to merge 17 commits into from

Conversation

echarles
Copy link
Member

What changes were proposed in this pull request?

The changes allow to support Kubernetes resource manager in client mode (upon the existing cluster mode)

How was this patch tested?

The initial changes were done on the latest commits in the spark-k8s fork (https://github.com/apache-spark-on-k8s/spark) and have been tested on AWS with real data processing.

In an effort to merge back the latests features to apache master, I open here untested changes subject to feedback and discussion.

Documentation will be updated when code will be discussed, but in the meantime there is a indigest design document that can be read to know more about the changes. In- and Out- K8s Cluster considerations, as deps and hdfs access is discussed there.

Upon the current design and implementation constructs, an open point I have is about the way we wanna configure the path of the k8s config in case of OutCluster mode. Options are:

  1. Force use to specify the path and fail if this property is not given
  2. In case of absence of /var/run/secrets/kubernetes.io/serviceaccount/token (which is there for InCluster), fall back automatically to the given property, or if no property has been given, fallback to the $HOME/.kube/config (in this latter case, there is no separate cacert nor keyfile, those details are all bundled in the single $HOME/.kube/config file).

The tests so far have been done with separated config, cacert and key files (I guess the single config file should not give any issue).

A last important point is how we move forward with this for the merge. To have a client mode better coverage, it would be interesting to have also downstream apache-spark-on-k8s#540 which is not only Kerberos, but also the Hadoop steps much needed to mount Hadoop conf to connect HDFS from Driver/Executors.

Also to avoid mess in future merge, I list here the changes I had to deal with applying the patch on the apache master repo:

  • submitsteps package is steps
  • no OptionRequirements class (used in SparkKubernetesClientFactory)
  • no ExecutorLocalDirVolumeProvider in ExecutorPodFactory
  • no APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX in config.scala

@liyinan926
Copy link
Contributor

/cc

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just getting started with k8s so apologies for a basic question, can you explain how you have in-cluster in client mode? Sorry I didn't see it described in the linked doc (the original SPIP doc says "The driver may
run outside the cluster (client mode) or within (cluster mode)"). But feel free to point me at some other docs I should look at.


private[spark] object OptionRequirements {

def requireBothOrNeitherDefined(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used, and if it were, perhaps would be clearer as

(opt1, opt2) match {
  case (Some(val1), None) =>
    ...
  case (None, Some(val2)) =>
    ...
  case _ => // ok
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of unused code that could be fixed in other PR (e.g. #21462). Will double check after merging master, but normally, client-mode does not impact this.

@SparkQA
Copy link

SparkQA commented Mar 26, 2018

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/1735/

@SparkQA
Copy link

SparkQA commented Mar 26, 2018

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/1735/

@SparkQA
Copy link

SparkQA commented Mar 27, 2018

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/1756/

@SparkQA
Copy link

SparkQA commented Mar 27, 2018

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/1756/

@echarles
Copy link
Member Author

Current state works for out-cluster client mode.

For in-cluster, we miss the DriverServiceBootstrapStep of the fork which "Allows the driver to be reachable by executor pods through a headless service. The service's ports should correspond to the ports that the executor will reach the pod at for RPC."

Running in cluster gives exception:

Caused by: java.io.IOException: Failed to connect to spark-pod:43957
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
	at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198)
Caused by: java.net.UnknownHostException: spark-pod
...

@SparkQA
Copy link

SparkQA commented Mar 27, 2018

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/1761/

@SparkQA
Copy link

SparkQA commented Mar 27, 2018

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/1761/

@inetfuture
Copy link

May I ask, what's the release plan for this?

@echarles
Copy link
Member Author

Now that #20910 has been merged, I will update this PR to take account the refactoring. @inetfuture Once these changes are pushed, there is the review process which needs to occur, so difficult to give you a plan.

@echarles
Copy link
Member Author

echarles commented May 4, 2018

I have updated this branch and tested successfully client mode OutCluster.

Happy to get confirmation of this from anyone here (cc/ @foxish)

For InCluster (was working fine before ##20910), worker starts but fails due to UnknownHostException due to the lack of DriverServiceBootstrapStep (see previous comments).

k logs -f spark-exec-1
++ id -u
+ myuid=0
++ id -g
+ mygid=0
+ set +e
++ getent passwd 0
+ uidentry=root:x:0:0:root:/root:/bin/ash
+ set -e
+ '[' -z root:x:0:0:root:/root:/bin/ash ']'
+ SPARK_K8S_CMD=executor
+ '[' -z executor ']'
+ shift 1
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sort -t_ -k4 -n
+ sed 's/[^=]*=\(.*\)/\1/g'
+ readarray -t SPARK_JAVA_OPTS
+ '[' -n '' ']'
+ '[' -n '' ']'
+ case "$SPARK_K8S_CMD" in
+ CMD=(${JAVA_HOME}/bin/java "${SPARK_JAVA_OPTS[@]}" -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP)
+ exec /sbin/tini -s -- /usr/lib/jvm/java-1.8-openjdk/bin/java -Xms1g -Xmx1g -cp ':/opt/spark/jars/*' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@spark-pod:33913 --executor-id 1 --cores 1 --app-id spark-application-1525426093472 --hostname 172.17.0.9
2018-05-04 09:28:16 INFO  CoarseGrainedExecutorBackend:2502 - Started daemon with process name: 15@spark-exec-1
2018-05-04 09:28:16 INFO  SignalUtils:54 - Registered signal handler for TERM
2018-05-04 09:28:16 INFO  SignalUtils:54 - Registered signal handler for HUP
2018-05-04 09:28:16 INFO  SignalUtils:54 - Registered signal handler for INT
2018-05-04 09:28:17 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2018-05-04 09:28:17 INFO  SecurityManager:54 - Changing view acls to: root
2018-05-04 09:28:17 INFO  SecurityManager:54 - Changing modify acls to: root
2018-05-04 09:28:17 INFO  SecurityManager:54 - Changing view acls groups to: 
2018-05-04 09:28:17 INFO  SecurityManager:54 - Changing modify acls groups to: 
2018-05-04 09:28:17 INFO  SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1904)
	at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:65)
	at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
	at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:281)
	at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101)
	at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201)
	at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:66)
	at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1886)
	... 4 more
Caused by: java.io.IOException: Failed to connect to spark-pod:33913
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
	at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198)
	at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)
	at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.UnknownHostException: spark-pod
	at java.net.InetAddress.getAllByName0(InetAddress.java:1280)
	at java.net.InetAddress.getAllByName(InetAddress.java:1192)
	at java.net.InetAddress.getAllByName(InetAddress.java:1126)
	at java.net.InetAddress.getByName(InetAddress.java:1076)
	at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146)
	at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143)
	at java.security.AccessController.doPrivileged(Native Method)
	at io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:143)
	at io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:43)
	at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:63)
	at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:55)
	at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:57)
	at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:32)
	at io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:108)
	at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:208)
	at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:49)
	at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:188)
	at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:174)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
	at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:82)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.java:978)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:512)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:423)
	at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:482)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
	... 1 more

@SparkQA
Copy link

SparkQA commented May 4, 2018

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/2828/

@SparkQA
Copy link

SparkQA commented May 4, 2018

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/2828/

@liyinan926
Copy link
Contributor

@echarles I'm wondering do you have time to sync this with the master and make it to a shape that's ready for review? Thanks!

@rayburgemeestre
Copy link

Just wanted to say this PR is really great, with this it's possible to for example use Jupyter with Spark on K8s:

image

@echarles
Copy link
Member Author

@rayburgemeestre This is cool!

Apache Toree configuration for K8S is on my todo list but would be happy to copycat your conf...
Any gist?

IMHO both client and cluster mode should be fine with Toree. Did you try cluster mode?

@rayburgemeestre
Copy link

rayburgemeestre commented Jun 27, 2018

Yes, absolutely, but I made those changes in a hardcoded way though, just to try it out. So I'm not sure how helpful it is.

Near the end of this file: /usr/local/share/jupyter/kernels/apache_toree_pyspark/bin/run.sh (which is invoked by the pyspark kernel)

Added these options:

    --master k8s://https://rb-spark:6443 \
    --conf spark.executor.instances=3 \
    --conf spark.kubernetes.container.image=node001:5000/brightcomputing/spark-py:v2.3.0 \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \

So the exec call looks like this:

eval exec \
     "${SPARK_HOME}/bin/spark-submit" \
     --name "'Apache Toree'" \
     "${SPARK_OPTS}" \
     --master k8s://https://rb-spark:6443 \
     --conf spark.executor.instances=3 \
     --conf spark.kubernetes.container.image=node001:5000/brightcomputing/spark-py:v2.3.0 \
     --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
     --class org.apache.toree.Main \
     "${TOREE_ASSEMBLY}" \
     "${TOREE_OPTS}" \
     "$@"

A few notes:

  • node001:5000 is my docker registry
  • I merged some changes from master into your PR before manually building the 2.4.0-SNAPSHOT jar file. Changes that make an additional "spark-py" image available) The PR for that was: [SPARK-23984][K8S] Initial Python Bindings for PySpark on K8s #21092
  • The spark image in my shell script says "spark-py:v2.3.0" but it's actually a newer version v2.4.0-SNAPSHOT. I just called the docker_image_tool.sh script with the wrong version number in this case.

EDIT: To answer your question, I remember I tried --deploy-mode cluster but I forgot whether it worked or not, will try tomorrow.

.map(new File(_))
.orElse(maybeServiceAccountToken)
val oauthTokenValue = sparkConf.getOption(oauthTokenConf)
OptionRequirements.requireNandDefined(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's only used once I'm not sure it warrents a separate file/method for checking Options.
Also the method signature isn't quite clear for me what it does. (especially requireN)
How about just a simple match that @squito suggested here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not introduce OptionRequirements.requireNandDefined but reused what was already in place.

@@ -88,6 +103,56 @@ private[spark] object SparkKubernetesClientFactory {
new DefaultKubernetesClient(httpClientWithCustomDispatcher, config)
}

def createOutClusterKubernetesClient(
master: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix ident

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -140,13 +140,6 @@ private[spark] class Client(
throw e
}

if (waitForAppCompletion) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not needed anymore? If we enable cluster mode we still want the same behavior defined here right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmh didn't change that behavior. Let's wait the next push and see the diff. master is evolving and numerous refactoring does not make this PR easy to merge with.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I have added back that section... Thx @tnachen

SparkKubernetesClientFactory.createKubernetesClient(
KUBERNETES_MASTER_INTERNAL_URL,
Some(sparkConf.get(KUBERNETES_NAMESPACE)),
APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a separate conf prefix as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just took what was existing. will double check to ensure latest merge does not diverge.

@echarles
Copy link
Member Author

@liyinan926 I pushed the merge with master and tested OutCluster (I didn't test InCluster but as mentioned previously, we would need something similar to the DriverServiceBootstrapStep of the fork).

@squito @tnachen Thx for your reviews and comments which make sense. My approach is to change what is needed for the client-mode. I prefer leaving constructs, code removal and enhancement considerations in other PRs.

@SparkQA
Copy link

SparkQA commented Jul 10, 2018

@SparkQA
Copy link

SparkQA commented Jul 10, 2018

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-make-spark-distribution-unified/820/

@SparkQA
Copy link

SparkQA commented Jul 10, 2018

@SparkQA
Copy link

SparkQA commented Jul 10, 2018

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-make-spark-distribution-unified/821/

@@ -0,0 +1,40 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have KubernetesUtils that is supposed to be the place for all K8s-related utility methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -58,6 +58,8 @@ private[spark] object Config extends Logging {
"spark.kubernetes.authenticate.driver"
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
"spark.kubernetes.authenticate.driver.mounted"
private[spark] val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX was replaced by KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX . Please revert this addition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -88,6 +103,60 @@ private[spark] object SparkKubernetesClientFactory {
new DefaultKubernetesClient(httpClientWithCustomDispatcher, config)
}

def createOutClusterKubernetesClient(
master: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong indention.

SparkKubernetesClientFactory.createKubernetesClient(
sparkConf.get("spark.master").replace("k8s://", ""),
Some(sparkConf.get(KUBERNETES_NAMESPACE)),
KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of this prefix spark.kubernetes.authenticate.driver.mounted sounds weird in this case given that the client is running outside the cluster. BTW: can we alternatively use the config at $HOME//.kube/config to build a kubernetes client instead? I think this is a common approach for building clients outside a cluster.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the call to createKubernetesClient is not used in two different ways:

  • KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX is used in KubernetesClusterManager
  • KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX is used in KubernetesClientApplication

I would favor the second and remove the first.

For the config place, I remember that the fabric8 k8s client does also some inspection to see if it is in or out cluster, and loads the config form the default place (depending the case), with possiblity to specify other places for the cert, token... (this is what we give as property to the end-user).

@liyinan926
Copy link
Contributor

@mccheah who mentioned a plan to create a separate change for client mode support.

@echarles
Copy link
Member Author

@liyinan926 Thx for you comments (see my answers). I have merged the last changes, added a tiny doc section and pushed again.

@mccheah
Copy link
Contributor

mccheah commented Jul 11, 2018

@echarles I'm planning to push a different change that takes a different approach from this.

@SparkQA
Copy link

SparkQA commented Jul 11, 2018

@SparkQA
Copy link

SparkQA commented Jul 11, 2018

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-make-spark-distribution-unified/856/

@echarles echarles changed the title [SPARK-23146][WIP] Support client mode for Kubernetes cluster backend [SPARK-23146][WIP] Support client mode for Kubernetes in Out-Cluster mode Jul 11, 2018
@mccheah
Copy link
Contributor

mccheah commented Jul 11, 2018

See #21748

@echarles
Copy link
Member Author

See #21748

@echarles echarles closed this Jul 27, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants