Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23980][K8S] Resilient Spark driver on Kubernetes #21067

Closed
wants to merge 2 commits into from

Conversation

Projects
None yet
9 participants
@baluchicken
Copy link

commented Apr 13, 2018

What changes were proposed in this pull request?

The current implementation of Spark driver on Kubernetes is not resilient to node failures as it’s implemented as a Pod. In case of a node failure Kubernetes terminates the pods that were running on that node. Kubernetes doesn't reschedule these pods to any of the other nodes of the cluster.
If the driver is implemented as Kubernetes Job than it will be rescheduled to other node.
When the driver is terminated its executors (that may run on other nodes) are terminated by Kubernetes with some delay by Kubernetes Garbage collection.
This can lead to concurrency issues where the re-spawned driver was trying to create new executors with same name as the executors being in the middle of being cleaned up by Kubernetes garbage collection.
To solve this issue the executor name must be made unique for each driver instance.
For example:
networkwordcount-1519301591265-usmj-exec-1

How was this patch tested?

This patch was tested manually.
Submitted a Spark application to a cluster with three node:

kubectl get jobs
NAME                                    DESIRED   SUCCESSFUL   AGE
networkwordcount-1519301591265-driver   1         0            3m
kubectl get pods
NAME                                          READY     STATUS    RESTARTS   AGE
networkwordcount-1519301591265-driver-mszl2   1/1       Running   0          3m
networkwordcount-1519301591265-usmj-exec-1    1/1       Running   0          1m
networkwordcount-1519301591265-usmj-exec-2    1/1       Running   0          1m

Spark driver networkwordcount-1519301591265-driver is a Kubernetes Job, that manages the networkwordcount-1519301591265-driver-mszl2 pod.

Shutted down the node where the driver pod is running

kubectl get pods
NAME                                          READY     STATUS     RESTARTS   AGE
networkwordcount-1519301591265-driver-dwvkf   1/1       Running   0          3m
networkwordcount-1519301591265-rmes-exec-1    1/1       Running   0          1m
networkwordcount-1519301591265-rmes-exec-2    1/1       Running   0          1m

The spark driver kubernetes job rescheduled the driver pod as networkwordcount-1519301591265-driver-dwvkf.
Please review http://spark.apache.org/contributing.html before opening a pull request.

@mccheah

This comment has been minimized.

Copy link
Contributor

commented Apr 13, 2018

Looks like there's a lot of conflicts from the refactor that was just merged.

In general though I don't think this buys us too much. The problem is that when the driver fails, you'll lose any and all state of progress done so far. We don't have a solid story for checkpointing streaming computation right now, and even if we did, you'll certainly lose all progress from batch jobs.

Also, restarting the driver might not be the right thing to do in all cases. This assumes that it's always ok to have the driver re-launch itself automatically. But whether or not the driver should be relaunchable should be determined by the application submitter, and not necessarily done all the time. Can we make this behavior configurable?

...rc/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala Outdated
@@ -59,15 +59,17 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)

private val kubernetesDriverPodName = conf
.get(KUBERNETES_DRIVER_POD_NAME)
.get(KUBERNETES_DRIVER_JOB_NAME)

This comment has been minimized.

Copy link
@mccheah

mccheah Apr 13, 2018

Contributor

You set the job name here but for the driver pod you really want the pod name. It also seems very difficult to pass through the pod name in the driver config since you only know the pod's name derived from the job after the job has started. But we can probably use a unique label to look up the driver pod. Is the label mapped to job-name guaranteed to be unique?

...bernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala Outdated
// create a new executors with the same name, but it will fail
// and hangs indefinitely because a terminating executors blocks
// the creation of the new ones, so to avoid that apply salt
private val executorNameSalt = Random.alphanumeric.take(4).mkString("").toLowerCase

This comment has been minimized.

Copy link
@mccheah

mccheah Apr 13, 2018

Contributor

Should guarantee uniqueness by using UUID. Use labels to make it easy to group all executors tied to this specific job.

@mccheah

This comment has been minimized.

Copy link
Contributor

commented Apr 13, 2018

We don't have a solid story for checkpointing streaming computation right now, and even if we did, you'll certainly lose all progress from batch jobs.

Should probably clarify re: streaming - we don't do any Kubernetes-specific actions (e.g. Persistent Volumes) to do Streaming checkpointing. But anything built-in to Spark should work, such as DFS checkpointing - barring anything that requires using the pod's local disk.

@stoader

This comment has been minimized.

Copy link

commented Apr 14, 2018

@mccheah

But whether or not the driver should be relaunchable should be determined by the application submitter, and not necessarily done all the time. Can we make this behavior configurable?

This should be easy by configuring Pod Backoff failure policy of the job such that it executes the pod only once.

We don't have a solid story for checkpointing streaming computation right now

We've done work for this to store checkpointing on persistence volume but thought that should be a separate PR as it's not strictly linked to this change.

you'll certainly lose all progress from batch jobs

Agree that the batch job would be rerun from scratch. Still I think there is value for one being able to run the batch job unattended and not intervene in case of machine failure as the batch job will be rescheduled to another node.

@baluchicken baluchicken force-pushed the banzaicloud:SPARK-23980 branch May 7, 2018

@baluchicken

This comment has been minimized.

Copy link
Author

commented May 7, 2018

@mccheah Rebased to master, and added support for configurable backofflimit.

@baluchicken baluchicken force-pushed the banzaicloud:SPARK-23980 branch May 13, 2018

@baluchicken

This comment has been minimized.

Copy link
Author

commented May 13, 2018

Rebased again to master.

@baluchicken baluchicken force-pushed the banzaicloud:SPARK-23980 branch May 14, 2018

@felixcheung

This comment has been minimized.

Copy link
Member

commented May 20, 2018

Jenkins, ok to test

@SparkQA

This comment has been minimized.

Copy link

commented May 20, 2018

Test build #90868 has finished for PR 21067 at commit f19bf1a.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@baluchicken

This comment has been minimized.

Copy link
Author

commented May 21, 2018

@felixcheung fixed the Scala style validations, sorry.

@SparkQA

This comment has been minimized.

Copy link

commented May 21, 2018

Test build #90895 has finished for PR 21067 at commit 95f6886.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@baluchicken baluchicken force-pushed the banzaicloud:SPARK-23980 branch May 21, 2018

@SparkQA

This comment has been minimized.

Copy link

commented May 21, 2018

Test build #90898 has finished for PR 21067 at commit 2b1de38.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@liyinan926

This comment has been minimized.

Copy link
Contributor

commented May 21, 2018

@foxish on concerns of the lack of exactly-one semantics.

...s/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala Outdated
* }
* </pre>
*/
def configureExecutorPod(pod: SparkExecutorPod): SparkExecutorPod = SparkExecutorPod.initialPod()

This comment has been minimized.

Copy link
@mccheah

mccheah May 25, 2018

Contributor

This is misnamed since it can configure the pod that's used on the driver as well, if the step is to be shared between the driver and the executor. Aside from that, I think this isn't the right abstraction. The feature steps should only be used to configure the driver/executor pod spec. Then, we can have KubernetesClientApplication or KubernetesDriverBuilder wrap the driver pod spec in a Job object. Therefore I don't think configuring the Job object should be done by these steps, but by some external wrapping step after all the features have been applied.

This comment has been minimized.

Copy link
@mccheah

mccheah Jun 11, 2018

Contributor

Ping on this?

This comment has been minimized.

Copy link
@baluchicken

baluchicken Jun 12, 2018

Author

Driver Pod spec cannot be wrapped into Job spec because it excepts only Container Spec.

This comment has been minimized.

Copy link
@mccheah

mccheah Jun 12, 2018

Contributor

The job has a pod template though right? We should just inject the pod into the job as the pod template.

This comment has been minimized.

Copy link
@mccheah

mccheah Jun 12, 2018

Contributor

Ok looking a bit more closely at the API - I think what we want is for all feature steps to be configuring a SparkPod, where:

case class SparkPod(podTemplate: PodTemplateSpec, container: Container)

Then the KubernetesDriverBuilder adapts the pod template into a Job, but the KubernetesExecutorBuilder adapts the pod template into just a Pod. For the job, the adapting part is trivial (JobBuilder.withNewJobSpec().withSpec(...)). We can translate a pod template spec into a pod as well:

  • inject PodTemplateSpec.getMetadata() into PodBuilder.withMetadata(),
  • inject PodTemplateSpec.getSpec() into PodBuilder.withSpec(), and
  • PodBuilder.editMetadata().withName(<name>).
...etes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala Outdated
// create new executors with the same name, but it will fail
// and hangs indefinitely because a terminating executors blocks
// the creation of the new ones, so to avoid that apply salt
private val executorNameSalt = Random.alphanumeric.take(4).mkString("").toLowerCase

This comment has been minimized.

Copy link
@mccheah

mccheah May 25, 2018

Contributor

Can't you just use KubernetesClusterSchedulerBackend#applicationId? Believe that's actually newly generated every time, albeit confusingly since that app ID isn't tied to any fields on the driver Kubernetes objects itself.

This comment has been minimized.

Copy link
@baluchicken

baluchicken Jun 11, 2018

Author

I think it is only generated when a new Spark Application is submitted. We need a random number which is regenerated for every new driver Pod not for new Application. Also it is too long because Kubernetes only allows pods with name length not longer than 64 character.

This comment has been minimized.

Copy link
@mccheah

mccheah Jun 11, 2018

Contributor

I think it is only generated when a new Spark Application is submitted. We need a random number which is regenerated for every new driver Pod not for new Application. Also it is too long because Kubernetes only allows pods with name length not longer than 64 character.

The application id is generated when then JVM launches - see SchedulerBackend.scala. Note this application ID isn't populated by spark submit itself.

This comment has been minimized.

Copy link
@baluchicken

baluchicken Jun 12, 2018

Author

If we use applicationID as a salt the executor pod name will exceed the 64 length limit in case the application name is longer.
override def configureExecutorPod(pod: SparkExecutorPod): SparkExecutorPod = { val name = s"$executorPodNamePrefix-$applicationID" + s"-exec-${kubernetesConf.roleSpecificConf.executorId}"
For example if the application name is networkwordcount then this result an executor pod name like: networkwordcount-1519234651100-spark-application-1528815922371-exec-10

This comment has been minimized.

Copy link
@mccheah

mccheah Jun 12, 2018

Contributor

We should just have the executor pod name be s"$applicationId-exec-$executorId then. Don't think the pod name prefix has to strictly be tied to the application name. The application name should be applied to a label so the executor pod can be located using that when using the dashboard and kubectl, etc.

This comment has been minimized.

Copy link
@mccheah

mccheah Jun 12, 2018

Contributor

4 digits does not have enough entropy to be a reliable salt, which is why I suggest this to avoid collisions.

...rc/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala Outdated
.get()
private val driverPod: Pod = {
val pods = kubernetesClient.pods()
.inNamespace(kubernetesNamespace).withLabel("job-name", kubernetesDriverJobName).list()

This comment has been minimized.

Copy link
@mccheah

mccheah May 25, 2018

Contributor

In between restarts, you could potentially have two pods with the same job name but you need to pick the right one.

This comment has been minimized.

Copy link
@mccheah

mccheah May 25, 2018

Contributor

Actually would like @foxish or @liyinan926 to confirm if that is the case?

This comment has been minimized.

Copy link
@baluchicken

baluchicken Jun 11, 2018

Author

I don't think this can happen, I can think two scenarios:

  • Job fails: No one restarts the Job the user need to use the spark-submit again (all job related pods will be deleted because of the Ownerreference)
  • Pod fails: Job will recreate a new Driver Pod to replace the failed one. There will be only one Driver pod because the failed one will be removed by the Kubernetes garbage collector.

Can you please elaborate on what do you mean by restart here?

This comment has been minimized.

Copy link
@mccheah

mccheah Jun 11, 2018

Contributor

Pod fails: Job will recreate a new Driver Pod to replace the failed one. There will be only one Driver pod because the failed one will be removed by the Kubernetes garbage collector.

For this one I'm not sure if the GC is done immediately. If that's part of the Kubernetes contract, then we're fine, if not, then we can't make any assumptions.

This comment has been minimized.

Copy link
@baluchicken

baluchicken Jun 12, 2018

Author

Okey I will update the filter with an additional check if the found pod is in running state.

...ernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingJobStatusWatcher.scala Outdated
* @param kubernetesClient kubernetes client.
*/
private[k8s] class LoggingJobStatusWatcherImpl(
appId: String,

This comment has been minimized.

Copy link
@mccheah

mccheah May 25, 2018

Contributor

Indentation

appName: String,
watcher: LoggingPodStatusWatcher,
kubernetesResourceNamePrefix: String) extends Logging {
builder: KubernetesDriverBuilder,

This comment has been minimized.

Copy link
@mccheah

mccheah May 25, 2018

Contributor

Indentation.

@felixcheung

This comment has been minimized.

Copy link
Member

commented Jun 11, 2018

any update?

@baluchicken baluchicken force-pushed the banzaicloud:SPARK-23980 branch Jun 11, 2018

@SparkQA

This comment has been minimized.

Copy link

commented Jun 11, 2018

Test build #91660 has finished for PR 21067 at commit 00a149a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@baluchicken

This comment has been minimized.

Copy link
Author

commented Jun 11, 2018

@felixcheung rebased to master and fixed failing unit tests

@skonto

This comment has been minimized.

Copy link
Contributor

commented Jul 3, 2018

@baluchicken @foxish any update on this? HA story is pretty critical for production in many cases.
@baluchicken is your team planning to upstream the local PVs related stuff?

@baluchicken

This comment has been minimized.

Copy link
Author

commented Jul 4, 2018

@skonto sorry I have couple of other things to do but I am trying to update this as my time allows it.
Yes we are planning to create a PR about the PVs related stuff as soon as this one went in.

@baluchicken baluchicken force-pushed the banzaicloud:SPARK-23980 branch Jul 5, 2018

@baluchicken

This comment has been minimized.

Copy link
Author

commented Jul 5, 2018

@mccheah rebased to master and updated the PR, now the KubernetesDriverBuilder will create the driver job instead of the configuration steps.

@SparkQA

This comment has been minimized.

Copy link

commented Jul 5, 2018

Test build #92650 has finished for PR 21067 at commit 4e0b3b0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@skonto

This comment has been minimized.

Copy link
Contributor

commented Jul 6, 2018

@baluchicken probably this is covered here: #21260. I kind of missed that, as I thought it was only for hostpaths but it also covers PVs.

@baluchicken baluchicken force-pushed the banzaicloud:SPARK-23980 branch Jul 6, 2018

@baluchicken

This comment has been minimized.

Copy link
Author

commented Jul 6, 2018

@skonto thanks, I am going to check it.

@SparkQA

This comment has been minimized.

Copy link

commented Jul 6, 2018

Test build #92685 has finished for PR 21067 at commit 0f280f4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@foxish

This comment has been minimized.

Copy link
Contributor

commented Jul 6, 2018

I don't think this current approach will suffice. Correctness is important here, especially for folks using spark streaming. I understand that you're proposing the use of backoff limits but there is no guarantee that a job controller won't spin up 2 driver pods when we ask for 1. That by definition is how the job controller works, by being greedy and working towards desired completions. For example, in the case of a network partition, the job controller logic in the Kubernetes master will not differentiate between:

  1. Losing contact with the driver pod temporarily
  2. Finding no driver pod and starting a new one

This has been the reason why in the past I've proposed using a StatefulSet. However, getting termination semantics with a StatefulSet will be more work. I don't think we should sacrifice correctness in this layer as it would surprise the application author who now has to reason about whether the operation they are performing is idempotent.

The key issue here is that there are two sources of truth, the job itself (k8s API Server) and the Spark driver (JVM) which are at different levels and not talking with each other. Can we have a proposal and understand all the subtleties before trying to change this behavior. For example, if we end up with more than one driver for a single job, I'd like to ensure that only one of them is making progress (for ex. by using a lease in ZK).

@liyinan926

This comment has been minimized.

Copy link
Contributor

commented Jul 6, 2018

+1 on what @foxish said. I would also like to see a detailed discussion on the semantic differences this brings onto the table first before committing to this approach.

@baluchicken

This comment has been minimized.

Copy link
Author

commented Jul 12, 2018

@foxish I just checked on a Google Kubernetes Cluster with Kubernetes version 1.10.4-gke.2. I created a two node cluster and I emulated "network partition" with iptables rules (node running the spark driver become NotReady). After a short/configurable delay the driver pod state changed to Unknown and the Job controller initiated a new spark driver. After that I removed the iptables rules denying the kubelet to speak with the master (The node with status NotReady become Ready again). The driver pod with the unknown state got terminated, with all of it's executors. In this case there are no parallel running spark drivers so I think we are not sacrificing correctness. Am I missing something?

@promiseofcake

This comment has been minimized.

Copy link

commented Jul 12, 2018

@baluchicken, did that test involve using checkpointing in a shared location?

@foxish

This comment has been minimized.

Copy link
Contributor

commented Jul 12, 2018

@baluchicken baluchicken force-pushed the banzaicloud:SPARK-23980 branch to c04179b Jul 17, 2018

@SparkQA

This comment has been minimized.

Copy link

commented Jul 17, 2018

Test build #93201 has finished for PR 21067 at commit c04179b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@baluchicken

This comment has been minimized.

Copy link
Author

commented Jul 17, 2018

I ran some more tests about this. I think we can say that this change can add resiliency to spark batch jobs where just like in case of YARN Spark will retry the job from the beginning if an error happened.

Also it can add resiliency to the Spark Streaming apps. I fully understand your concerns but if someone is going to submit a resilient Spark Streaming app it will use Spark feature Checkpointing. For checkpointing he/she should use some kind of PersistentVolume otherwise all info saved to this dir will be lost in case of a node failure. For PVC the accessMode should be a ReadWriteOnce solution because for this amount of data it is way faster than the ReadWriteMany ones.

My new tests used the same approach described above with one modifications I enabled the checkpointing dir backed with ReadWriteOnce PVC. ReadWriteOnce storage can only be attached to one node. I thought Kubernetes will detach this volume once the Node become "NotReady", but other thing happened. Kubernetes does not detached the volume from the unknown node so despite of the Job Controller created a new driver pod to replace the Unknown one it remained in Pending state because of required PVC still attached to a different node. Once the partitioned node become available again the unknown old driver pod got terminated, the volume got unattached and get reattached to the new driver pod which state now changed from pending to running.

So I think there is no problem with the correctness here. We can maybe add a warning to the documentation that if someone wants to use a ReadWriteMany backed checkpoint dir correctness issue may arise, but otherwise maybe I am still missing something but I think it won't.

@skonto

This comment has been minimized.

Copy link
Contributor

commented Jul 19, 2018

Once the partitioned node become available again the unknown old driver pod got terminated, the volume got unattached and get reattached to the new driver pod which state now changed from pending to running.

What if the node never becomes available again?

@baluchicken

This comment has been minimized.

Copy link
Author

commented Jul 19, 2018

@skonto if the node never become available again the new driver will stay in Pending state until like @foxish said "the user explicitly force-kills the old driver".

@skonto

This comment has been minimized.

Copy link
Contributor

commented Jul 19, 2018

@baluchicken yeah I thought of that but I was hoping for more automation.

@foxish

This comment has been minimized.

Copy link
Contributor

commented Jul 19, 2018

ReadWriteOnce storage can only be attached to one node.

This is well known. Using the RWO volume for fencing here would work - but this is not representative of all users. This breaks down if you include checkpointing to object storage (s3) or HDFS or into ReadWriteMany volumes like NFS. In all of those cases, there will be a problem with correctness.

For folks that need it right away, the same restarts feature can be realized using an approach like the spark-operator without any of this hassle in a safe way, so, why are we trying to fit this into Spark with caveats around how volumes should be used to ensure fencing? This seems more error prone and harder to explain and I can't see the gain from it. One way forward is proposing to the k8s community to have a new option jobs that allow us to get fencing from the k8s apiserver through deterministic names. I think that would be a good way forward.

@liyinan926

This comment has been minimized.

Copy link
Contributor

commented Jul 19, 2018

+1 on what @foxish said. If using a Job is the right way to go ultimately, it's good to open discussion with sig-apps on adding an option to the Job API & controller to use deterministic pod names as well as to offer the exactly-once semantic. Spark probably is not the only use case needing such a semantic guarantee.

@baluchicken

This comment has been minimized.

Copy link
Author

commented Jul 27, 2018

Thanks for the responses, I learned a lot from this:) I am going to close this PR for now, and maybe collaborate on the Kubernetes ticket raised by this PR. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.