From 9ddd5b4bf5ca8c0759d411ac44e3ea02a578d1ba Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Wed, 1 Jul 2015 20:18:50 +0800 Subject: [PATCH 1/2] [SPARK-8755][Streaming]Login user before reading hdfs file. --- .../org/apache/spark/streaming/Checkpoint.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 5279331c9e122..3116ac4fad4bb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -23,6 +23,7 @@ import java.util.concurrent.RejectedExecutionException import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{SparkException, SparkConf, Logging} import org.apache.spark.io.CompressionCodec @@ -290,6 +291,7 @@ object CheckpointReader extends Logging { conf: SparkConf, hadoopConf: Configuration, ignoreReadError: Boolean = false): Option[Checkpoint] = { + loginUserIfConfExsis(conf) val checkpointPath = new Path(checkpointDir) // TODO(rxin): Why is this a def?! @@ -324,6 +326,17 @@ object CheckpointReader extends Logging { } None } + + /** + * If using the keytab to login user in security mode, this checkpoint application has to login + * before reading files from hdfs. + */ + private def loginUserIfConfExsis(conf: SparkConf): Unit = { + for(principal <- conf.getOption("spark.yarn.principal"); + keytab <- conf.getOption("spark.yarn.keytab")) { + UserGroupInformation.loginUserFromKeytab(principal, keytab) + } + } } private[streaming] From f2f1cb7fdcd011a593e09fd60970afeb288a7d7a Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Tue, 7 Jul 2015 17:16:25 +0800 Subject: [PATCH 2/2] Fix a typo --- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 3116ac4fad4bb..1b7dd8d953384 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -291,7 +291,7 @@ object CheckpointReader extends Logging { conf: SparkConf, hadoopConf: Configuration, ignoreReadError: Boolean = false): Option[Checkpoint] = { - loginUserIfConfExsis(conf) + loginUserIfConfExists(conf) val checkpointPath = new Path(checkpointDir) // TODO(rxin): Why is this a def?! @@ -331,7 +331,7 @@ object CheckpointReader extends Logging { * If using the keytab to login user in security mode, this checkpoint application has to login * before reading files from hdfs. */ - private def loginUserIfConfExsis(conf: SparkConf): Unit = { + private def loginUserIfConfExists(conf: SparkConf): Unit = { for(principal <- conf.getOption("spark.yarn.principal"); keytab <- conf.getOption("spark.yarn.keytab")) { UserGroupInformation.loginUserFromKeytab(principal, keytab)