From 7e07efe701cf9dffaaf8411b108bdd2b3ca99f91 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 2 Nov 2015 20:30:35 +0800 Subject: [PATCH 1/2] Fix Spark Streaming checkpoint with Yarn-cluster configuration recovery issue --- .../scala/org/apache/spark/streaming/Checkpoint.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index b7de6dde61c63..2bcc0fc400485 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -55,7 +55,8 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) "spark.driver.port", "spark.master", "spark.yarn.keytab", - "spark.yarn.principal") + "spark.yarn.principal", + "spark.ui.filters") val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs) .remove("spark.driver.host") @@ -66,6 +67,14 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) newSparkConf.set(prop, value) } } + + // Add Yarn proxy filter specific configurations to the recovered SparkConf + val filter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" + val filterPrefix = s"spark.$filter.param." + newReloadConf.getAll + .filter { case (k, v) => k.startsWith(filterPrefix) && k.length > filterPrefix.length } + .foreach { case (k, v) => newSparkConf.set(k, v) } + newSparkConf } From c4a9cb93c3d01ee99546b1d0e5e21559a6ea8be1 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 3 Nov 2015 20:39:46 +0800 Subject: [PATCH 2/2] Address the comments --- .../scala/org/apache/spark/streaming/Checkpoint.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 2bcc0fc400485..0cd55d9aec2cd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -71,9 +71,11 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) // Add Yarn proxy filter specific configurations to the recovered SparkConf val filter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" val filterPrefix = s"spark.$filter.param." - newReloadConf.getAll - .filter { case (k, v) => k.startsWith(filterPrefix) && k.length > filterPrefix.length } - .foreach { case (k, v) => newSparkConf.set(k, v) } + newReloadConf.getAll.foreach { case (k, v) => + if (k.startsWith(filterPrefix) && k.length > filterPrefix.length) { + newSparkConf.set(k, v) + } + } newSparkConf }