Skip to content

Commit

Permalink
[KYUUBI-167][FOLLOWUP]populate tokens via spark session cache mgr (#183)
Browse files Browse the repository at this point in the history
* Prepare releasing v0.6.0

* fix #KYUUBI-167 populate token via spark session cache mgr

* typo

* fix ut

* code cov
  • Loading branch information
yaooqinn committed May 7, 2019
1 parent 43c9547 commit 1f0bc74
Show file tree
Hide file tree
Showing 14 changed files with 52 additions and 224 deletions.
14 changes: 7 additions & 7 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ deploy:

jobs:
include:
- stage: spark2.4
language: scala
script: ./build/mvn clean install -Pspark-2.4 -Dmaven.javadoc.skip=true -B -V
- stage: spark2.3
- stage: spark2.1
language: scala
script: ./build/mvn clean install -Pspark-2.3 -Dmaven.javadoc.skip=true -B -V
script: ./build/mvn clean install -Pspark-2.1 -Dmaven.javadoc.skip=true -B -V
- stage: spark2.2
language: scala
script: ./build/mvn clean install -Pspark-2.2 -Dmaven.javadoc.skip=true -B -V
- stage: spark2.1
- stage: spark2.3
language: scala
script: ./build/mvn clean install -Pspark-2.1 -Dmaven.javadoc.skip=true -B -V
script: ./build/mvn clean install -Pspark-2.3 -Dmaven.javadoc.skip=true -B -V
- stage: spark2.4
language: scala
script: ./build/mvn clean install -Pspark-2.4 -Dmaven.javadoc.skip=true -B -V

after_success:
- bash <(curl -s https://codecov.io/bash)
1 change: 0 additions & 1 deletion docs/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ spark.kyuubi.<br />backend.session.idle.timeout|30min|SparkSession timeout.
spark.kyuubi.<br />backend.session.local.dir|KYUUBI_HOME/<br />local|Default value to set `spark.local.dir`. For YARN mode, this only affect the Kyuubi server side settings according to the rule of Spark treating `spark.local.dir`.
spark.kyuubi.<br />backend.session.long.cache|${UserGroupInformation.<br />isSecurityEnabled}|Whether to update the tokens of Spark's executor to support long caching SparkSessions iff this is true && `spark.kyuubi.backend.token.update.class` is loadable. This is used towards kerberized hadoop clusters in case of `spark.kyuubi.backend.session.idle.timeout` is set longer than token expiration time limit or SparkSession never idles.
spark.kyuubi.<br />backend.session.token.update.class|org.apache.spark.<br />scheduler.cluster.<br />CoarseGrainedClusterMessages$<br />UpdateDelegationTokens|`CoarseGrainedClusterMessages` for token update message from the driver of Spark to executors, it is loadable only by higher version Spark release(2.3 and later)
spark.kyuubi.<br />backend.session.token.renew.interval|2h|Interval for KyuubiServiceCredentialProvider to update tokens ,which is a ServiceCredentialProvider implementation of Apache Spark

#### Operation

Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

7 changes: 0 additions & 7 deletions kyuubi-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>yaooqinn</groupId>
<artifactId>kyuubi-security</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${spark.group}</groupId>
<artifactId>spark-yarn_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -257,7 +251,6 @@
<artifactSet>
<includes>
<include>yaooqinn:kyuubi-common</include>
<include>yaooqinn:kyuubi-security</include>
</includes>
</artifactSet>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,22 +329,6 @@ object KyuubiSparkUtil extends Logging {
// ForkJoinPool which points to another calling context. Turn off parallel listing seems
// to be a solution to this issue.
conf.setIfMissing(RDD_PAR_LISTING, Int.MaxValue.toString)

val sparkTokenProviders = List("hdfs", "hadoopfs", "hive", "hbase")
val tokenProviderPattens = List(
"spark.yarn.security.tokens.%s.enabled",
"spark.yarn.security.credentials.%s.enabled",
"spark.security.credentials.%s.enabled")
// Set kyuubi credential renewer on, if we do not explicitly turn it off.
tokenProviderPattens.map(_.format("kyuubi")).foreach(conf.setIfMissing(_, "true"))
// Force to turn off Spark's internal token providers, because all useful works will be done
// in KyuubiServiceCredentialProvider, and those ones in Spark always have impersonation
// issue while renew tokens
sparkTokenProviders.foreach { service =>
tokenProviderPattens.map(_.format(service)).foreach {
conf.set(_, "false")
}
}
}

val kyuubiJar = Option(System.getenv("KYUUBI_JAR")).getOrElse("")
Expand All @@ -355,12 +339,6 @@ object KyuubiSparkUtil extends Logging {
}

conf.set(SPARK_YARN_DIST_JARS, distJars)
// We should obey our client side hadoop settings while running Kyuubi towards HDFS
// federations with maybe only on Yarn cluster
// see https://github.com/apache/spark/pull/24120
val hadoopConf = newConfiguration(conf)
Option(hadoopConf.get("fs.defaultFS"))
.foreach(conf.setIfMissing("spark.hadoop.fs.defaultFS", _))
}

@tailrec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.{ByteArrayOutputStream, DataOutputStream}
import scala.collection.mutable
import scala.util.control.NonFatal

import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.KyuubiConf._
import org.apache.spark.SparkContext

Expand All @@ -43,13 +43,21 @@ object KyuubiSparkExecutorUtils extends Logging {
* @param user the UserGroupInformation associated with the current KyuubiSession
*/
def populateTokens(sc: SparkContext, user: UserGroupInformation): Unit = {
populateTokens(sc, user.getCredentials)
}

/**
* Populate the tokens contained in the current KyuubiSession's ugi to the all the alive
* executors associated with its own SparkContext.
*/
def populateTokens(sc: SparkContext, creds: Credentials): Unit = {
val schedulerBackend = sc.schedulerBackend
schedulerBackend match {
case backend: CoarseGrainedSchedulerBackend =>
try {
val byteStream = new ByteArrayOutputStream
val dataStream = new DataOutputStream(byteStream)
user.getCredentials.writeTokenStorageToStream(dataStream)
creds.writeTokenStorageToStream(dataStream)
val tokens = byteStream.toByteArray
val executorField =
classOf[CoarseGrainedSchedulerBackend].getName.replace('.', '$') + "$$executorDataMap"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,16 @@ private[security] object HDFSTokenCollector extends TokenCollector with Logging
tokenRenewer
}

override def obtainTokens(conf: SparkConf): Unit = {
override def obtainTokens(conf: SparkConf): Unit = try {
val hadoopConf = newConfiguration(conf)
val tokenRenewer = renewer(hadoopConf)
val creds = new Credentials()
hadoopFStoAccess(conf, hadoopConf).foreach { fs =>
fs.addDelegationTokens(tokenRenewer, creds)
}
UserGroupInformation.getCurrentUser.addCredentials(creds)
} catch {
case e: Exception =>
error("Failed to obtain HDFS tokens", e)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._

import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.hadoop.security.Credentials
import org.apache.spark.{KyuubiSparkUtil, SparkConf}
import org.apache.spark.KyuubiConf._
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.cluster.KyuubiSparkExecutorUtils
import org.apache.spark.sql.SparkSession

import yaooqinn.kyuubi.Logging
Expand All @@ -46,24 +48,34 @@ class SparkSessionCacheManager private(name: String) extends AbstractService(nam
private val userLatestLogout = new ConcurrentHashMap[String, Long]
private var idleTimeout: Long = _

private val userToCredentials = new ConcurrentHashMap[String, Credentials]
private var needPopulateToken: Boolean = _

private val sessionCleaner = new Runnable {
override def run(): Unit = {
userToSession.asScala.foreach {
case (user, (session, _)) if session.sparkContext.isStopped =>
warn(s"SparkSession for $user might already be stopped by forces outside Kyuubi," +
warn(s"SparkSession for $user might already be stopped outside Kyuubi," +
s" cleaning it..")
removeSparkSession(user)
case (user, (_, times)) if times.get() > 0 =>
case (user, (session, times)) if times.get() > 0 || !userLatestLogout.containsKey(user) =>
debug(s"There are $times active connection(s) bound to the SparkSession instance" +
s" of $user ")
case (user, (_, _)) if !userLatestLogout.containsKey(user) =>
if (needPopulateToken) {
val credentials = userToCredentials.getOrDefault(user, new Credentials)
KyuubiSparkExecutorUtils.populateTokens(session.sparkContext, credentials)
}
case (user, (session, _))
if userLatestLogout.get(user) + idleTimeout <= System.currentTimeMillis() =>
info(s"Stopping idle SparkSession for user [$user].")
removeSparkSession(user)
session.stop()
System.setProperty("SPARK_YARN_MODE", "true")
case _ =>
case (user, (session, _)) =>
if (needPopulateToken) {
val credentials = userToCredentials.getOrDefault(user, new Credentials)
KyuubiSparkExecutorUtils.populateTokens(session.sparkContext, credentials)
}
}
}
}
Expand Down Expand Up @@ -105,16 +117,21 @@ class SparkSessionCacheManager private(name: String) extends AbstractService(nam
}
}

def setupCredentials(user: String, creds: Credentials): Unit = {
userToCredentials.put(user, creds)
}

override def init(conf: SparkConf): Unit = {
idleTimeout = math.max(conf.getTimeAsMs(BACKEND_SESSION_IDLE_TIMEOUT.key), 60 * 1000)
needPopulateToken = conf.get(BACKEND_SESSION_LONG_CACHE).toBoolean &&
KyuubiSparkUtil.classIsLoadable(conf.get(BACKEND_SESSION_TOKEN_UPDATE_CLASS))
super.init(conf)
}

/**
* Periodically close idle SparkSessions in 'spark.kyuubi.session.clean.interval(default 1min)'
*/
override def start(): Unit = {
// at least 1 minutes
val interval = math.max(conf.getTimeAsSeconds(BACKEND_SESSION_CHECK_INTERVAL.key), 1)
info(s"Scheduling SparkSession cache cleaning every $interval seconds")
cacheManager.scheduleAtFixedRate(sessionCleaner, interval, interval, TimeUnit.SECONDS)
Expand Down

0 comments on commit 1f0bc74

Please sign in to comment.