Skip to content

Commit

Permalink
[SPARK-23200] Reset Kubernetes-specific config on Checkpoint restore
Browse files Browse the repository at this point in the history
Several configuration parameters related to Kubernetes need to be
reset, as they are changed with each invokation of spark-submit and
thus prevents recovery of Spark Streaming tasks.

## What changes were proposed in this pull request?

When using the Kubernetes cluster-manager and spawning a Streaming workload, it is important to reset many spark.kubernetes.* properties that are generated by spark-submit but which would get rewritten when restoring a Checkpoint. This is so, because the spark-submit codepath creates Kubernetes resources, such as a ConfigMap, a Secret and other variables, which have an autogenerated name and the previous one will not resolve anymore.

In short, this change enables checkpoint restoration for streaming workloads, and thus enables Spark Streaming workloads in Kubernetes, which were not possible to restore from a checkpoint before if the workload went down.

## How was this patch tested?

This patch needs would benefit from testing in different k8s clusters.

This is similar to the YARN related code for resetting a Spark Streaming workload, but for the Kubernetes scheduler. This PR removes the initcontainers properties that existed before because they are now removed in master.

For a previous discussion, see the non-rebased work at: apache-spark-on-k8s#516

Closes #22392 from ssaavedra/fix-checkpointing-master.

Authored-by: Santiago Saavedra <santiagosaavedra@gmail.com>
Signed-off-by: Yinan Li <ynli@google.com>
  • Loading branch information
ssaavedra authored and liyinan926 committed Sep 19, 2018
1 parent a6f37b0 commit 497f00f
Showing 1 changed file with 4 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
"spark.driver.bindAddress",
"spark.driver.port",
"spark.master",
"spark.kubernetes.driver.pod.name",
"spark.kubernetes.executor.podNamePrefix",
"spark.yarn.jars",
"spark.yarn.keytab",
"spark.yarn.principal",
Expand All @@ -64,6 +66,8 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
.remove("spark.driver.host")
.remove("spark.driver.bindAddress")
.remove("spark.driver.port")
.remove("spark.kubernetes.driver.pod.name")
.remove("spark.kubernetes.executor.podNamePrefix")
val newReloadConf = new SparkConf(loadDefaults = true)
propertiesToReload.foreach { prop =>
newReloadConf.getOption(prop).foreach { value =>
Expand Down

0 comments on commit 497f00f

Please sign in to comment.