Skip to content

Commit

Permalink
[KYUUBI-116][Experimental] Support long cache spark session in kerber…
Browse files Browse the repository at this point in the history
…ized cluster (#117)

* fixes @116 Support long caching SparkSession/SparkContext for secured hadoop cluster

* handle sub classes of coarse grained scheduler backend

* fix ut for spark 2.2

* updating doc
  • Loading branch information
yaooqinn committed Feb 22, 2019
1 parent 6352c6a commit 72e664f
Show file tree
Hide file tree
Showing 9 changed files with 282 additions and 54 deletions.
71 changes: 37 additions & 34 deletions docs/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,71 +30,74 @@ $ bin/start-kyuubi.sh \

Name|Default|Description
---|---|---
spark.kyuubi.ha.enabled|false|Whether KyuubiServer supports dynamic service discovery for its clients. To support this, each instance of KyuubiServer currently uses ZooKeeper to register itself, when it is brought up. JDBC/ODBC clients should use the ZooKeeper ensemble: spark.kyuubi.ha.zk.quorum in their connection string.
spark.kyuubi.ha.mode|load-balance|High availability mode, one is load-balance which is used by default, another is failover as master-slave mode.
spark.kyuubi.ha.zk.quorum|none|Comma separated list of ZooKeeper servers to talk to, when KyuubiServer supports service discovery via Zookeeper.
spark.kyuubi.ha.zk.namespace|kyuubiserver|The parent node in ZooKeeper used by KyuubiServer when supporting dynamic service discovery.
spark.kyuubi.ha.zk.client.port|2181|The port of ZooKeeper servers to talk to. If the list of Zookeeper servers specified in spark.kyuubi.zookeeper.quorum does not contain port numbers, this value is used.
spark.kyuubi.ha.zk.session.timeout|1,200,000|ZooKeeper client's session timeout (in milliseconds). The client is disconnected, and as a result, all locks released, if a heartbeat is not sent in the timeout.
spark.kyuubi.ha.zk.connection.basesleeptime|1,000|Initial amount of time (in milliseconds) to wait between retries when connecting to the ZooKeeper server when using ExponentialBackoffRetry policy.
spark.kyuubi.ha.zk.connection.max.retries|3|Max retry times for connecting to the zk server
spark.kyuubi.<br />ha.enabled|false|Whether KyuubiServer supports dynamic service discovery for its clients. To support this, each instance of KyuubiServer currently uses ZooKeeper to register itself, when it is brought up. JDBC/ODBC clients should use the ZooKeeper ensemble: spark.kyuubi.ha.zk.quorum in their connection string.
spark.kyuubi.<br />ha.mode|load-balance|High availability mode, one is load-balance which is used by default, another is failover as master-slave mode.
spark.kyuubi.<br />ha.zk.quorum|none|Comma separated list of ZooKeeper servers to talk to, when KyuubiServer supports service discovery via Zookeeper.
spark.kyuubi.<br />ha.zk.namespace|kyuubiserver|The parent node in ZooKeeper used by KyuubiServer when supporting dynamic service discovery.
spark.kyuubi.<br />ha.zk.client.port|2181|The port of ZooKeeper servers to talk to. If the list of Zookeeper servers specified in spark.kyuubi.zookeeper.quorum does not contain port numbers, this value is used.
spark.kyuubi.<br />ha.zk.session.timeout|1,200,000|ZooKeeper client's session timeout (in milliseconds). The client is disconnected, and as a result, all locks released, if a heartbeat is not sent in the timeout.
spark.kyuubi.<br />ha.zk.connection.basesleeptime|1,000|Initial amount of time (in milliseconds) to wait between retries when connecting to the ZooKeeper server when using ExponentialBackoffRetry policy.
spark.kyuubi.<br />ha.zk.connection.max.retries|3|Max retry times for connecting to the zk server

#### Operation Log

Name|Default|Description
---|---|---
spark.kyuubi.logging.operation.enabled|true|When true, Kyuubi Server will save operation logs and make them available for clients
spark.kyuubi.logging.operation.log.dir|`KYUUBI_LOG_DIR` -> `java.io.tmpdir`/operation_logs|Top level directory where operation logs are stored if logging functionality is enabled
spark.kyuubi.<br />logging.operation.enabled|true|When true, Kyuubi Server will save operation logs and make them available for clients
spark.kyuubi.<br />logging.operation.log.dir|`KYUUBI_LOG_DIR` -> `java.io.tmpdir`/operation_logs|Top level directory where operation logs are stored if logging functionality is enabled

#### Frontend Service options

Name|Default|Description
---|---|---
spark.kyuubi.frontend.bind.host | localhost | Bind host on which to run the Kyuubi Frontend service.
spark.kyuubi.frontend.bind.port| 10009 | Port number of Kyuubi Frontend service. set 0 will get a random available one
spark.kyuubi.frontend.min.worker.threads| 50 | Minimum number of Thrift worker threads.
spark.kyuubi.frontend.max.worker.threads| 500 | Maximum number of Thrift worker threads
spark.kyuubi.frontend.worker.keepalive.time | 60s| Keepalive time (in seconds) for an idle worker thread. When the number of workers exceeds min workers, excessive threads are killed after this time interval.
spark.kyuubi.authentication | NONE | Client authentication types. NONE: no authentication check; NOSASL: no authentication check KERBEROS: Kerberos/GSSAPI authentication.
spark.kyuubi.frontend.allow.user.substitution | true | Allow alternate user to be specified as part of Kyuubi open connection request.
spark.kyuubi.frontend.enable.doAs | true | Set true to have Kyuubi execute SQL operations as the user making the calls to it.
spark.kyuubi.frontend.max.message.size | 104857600 | Maximum message size in bytes a Kyuubi server will accept.
spark.kyuubi.<br />frontend.bind.host | localhost | Bind host on which to run the Kyuubi Frontend service.
spark.kyuubi.<br />frontend.bind.port| 10009 | Port number of Kyuubi Frontend service. set 0 will get a random available one
spark.kyuubi.<br />frontend.min.worker.threads| 50 | Minimum number of Thrift worker threads.
spark.kyuubi.<br />frontend.max.worker.threads| 500 | Maximum number of Thrift worker threads
spark.kyuubi.<br />frontend.worker.keepalive.time | 60s| Keepalive time (in seconds) for an idle worker thread. When the number of workers exceeds min workers, excessive threads are killed after this time interval.
spark.kyuubi.<br />authentication | NONE | Client authentication types. NONE: no authentication check; NOSASL: no authentication check KERBEROS: Kerberos/GSSAPI authentication.
spark.kyuubi.<br />frontend.allow.user.substitution | true | Allow alternate user to be specified as part of Kyuubi open connection request.
spark.kyuubi.<br />frontend.enable.doAs | true | Set true to have Kyuubi execute SQL operations as the user making the calls to it.
spark.kyuubi.<br />frontend.max.message.size | 104857600 | Maximum message size in bytes a Kyuubi server will accept.

#### Background Execution Thread Pool

Name|Default|Description
---|---|---
spark.kyuubi.async.exec.threads|100|Number of threads in the async thread pool for KyuubiServer.
spark.kyuubi.async.exec.wait.queue.size|100|Size of the wait queue for async thread pool in KyuubiServer. After hitting this limit, the async thread pool will reject new requests.
spark.kyuubi.async.exec.keep.alive.time|10,000|Time (in milliseconds) that an idle KyuubiServer async thread (from the thread pool) will wait for a new task to arrive before terminating.
spark.kyuubi.async.exec.shutdown.timeout|10,000|How long KyuubiServer shutdown will wait for async threads to terminate.
spark.kyuubi.<br />async.exec.threads|100|Number of threads in the async thread pool for KyuubiServer.
spark.kyuubi.<br />async.exec.wait.queue.size|100|Size of the wait queue for async thread pool in KyuubiServer. After hitting this limit, the async thread pool will reject new requests.
spark.kyuubi.<br />async.exec.keep.alive.time|10,000|Time (in milliseconds) that an idle KyuubiServer async thread (from the thread pool) will wait for a new task to arrive before terminating.
spark.kyuubi.<br />async.exec.shutdown.timeout|10,000|How long KyuubiServer shutdown will wait for async threads to terminate.

#### Kyuubi Session

Name|Default|Description
---|---|---
spark.kyuubi.frontend.session.check.interval|6h|The check interval for frontend session/operation timeout, which can be disabled by setting to zero or negative value.
spark.kyuubi.frontend.session.timeout|8h|The check interval for session/operation timeout, which can be disabled by setting to zero or negative value.
spark.kyuubi.frontend.session.check.operation| true |Session will be considered to be idle only if there is no activity, and there is no pending operation. This setting takes effect only if session idle timeout `spark.kyuubi.frontend.session.timeout` and checking `spark.kyuubi.frontend.session.check.interval` are enabled.
spark.kyuubi.<br />frontend.session.check.interval|6h|The check interval for frontend session/operation timeout, which can be disabled by setting to zero or negative value.
spark.kyuubi.<br />frontend.session.timeout|8h|The check interval for session/operation timeout, which can be disabled by setting to zero or negative value.
spark.kyuubi.<br />frontend.session.check.operation| true |Session will be considered to be idle only if there is no activity, and there is no pending operation. This setting takes effect only if session idle timeout `spark.kyuubi.frontend.session.timeout` and checking `spark.kyuubi.frontend.session.check.interval` are enabled.

#### Spark Session

Name|Default|Description
---|---|---
spark.kyuubi.backend.session.wait.other.times | 60 | How many times to check when another session with the same user is initializing SparkContext. Total Time will be times by `spark.kyuubi.backend.session.wait.other.interval`.
spark.kyuubi.backend.session.wait.other.interval|1s|The interval for checking whether other thread with the same user has completed SparkContext instantiation.
spark.kyuubi.backend.session.init.timeout|60s|How long we suggest the server to give up instantiating SparkContext.
spark.kyuubi.backend.session.check.interval|5min|The check interval for backend session a.k.a SparkSession timeout.
spark.kyuubi.backend.session.idle.timeout|30min|SparkSession timeout.
spark.kyuubi.<br />backend.session.wait.other.times | 60 | How many times to check when another session with the same user is initializing SparkContext. Total Time will be times by `spark.kyuubi.backend.session.wait.other.interval`.
spark.kyuubi.<br />backend.session.wait.other.interval|1s|The interval for checking whether other thread with the same user has completed SparkContext instantiation.
spark.kyuubi.<br />backend.session.init.timeout|60s|How long we suggest the server to give up instantiating SparkContext.
spark.kyuubi.<br />backend.session.check.interval|5min|The check interval for backend session a.k.a SparkSession timeout.
spark.kyuubi.<br />backend.session.idle.timeout|30min|SparkSession timeout.
spark.kyuubi.<br />backend.session.local.dir|KYUUBI_HOME/local|Default value to set `spark.local.dir`. For YARN mode, this only affect the Kyuubi server side settings according to the rule of Spark treating `spark.local.dir`.
spark.kyuubi.<br />backend.session.long.cache|${UserGroupInformation.isSecurityEnabled}|Whether to update the tokens of Spark's executor to support long caching SparkSessions iff this is true && `spark.kyuubi.backend.token.update.class` is loadable. This is used towards kerberized hadoop clusters in case of `spark.kyuubi.backend.session.idle.timeout` is set longer than token expiration time limit or SparkSession never idles.
spark.kyuubi.<br />backend.session.token.update.class|org.apache.spark.<br />scheduler.cluster.<br />CoarseGrainedClusterMessages$<br />UpdateDelegationTokens|`CoarseGrainedClusterMessages` for token update message from the driver of Spark to executors, it is loadable only by higher version Spark release(2.3 and later)


#### Operation

Name|Default|Description
---|---|---
spark.kyuubi.operation.idle.timeout|6h|Operation will be closed when it's not accessed for this duration of time.
spark.kyuubi.operation.incremental.collect|false|Whether to use incremental result collection from Spark executor side to Kyuubi server side.
spark.kyuubi.operation.result.limit|-1|In non-incremental result collection mode, set this to a positive value to limit the size of result collected to driver side.
spark.kyuubi.<br />operation.idle.timeout|6h|Operation will be closed when it's not accessed for this duration of time.
spark.kyuubi.<br />operation.incremental.collect|false|Whether to use incremental result collection from Spark executor side to Kyuubi server side.
spark.kyuubi.<br />operation.result.limit|-1|In non-incremental result collection mode, set this to a positive value to limit the size of result collected to driver side.

---

Expand Down
30 changes: 25 additions & 5 deletions kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package org.apache.spark

import java.io.File
import java.util.HashMap
import java.util.{HashMap => JMap}
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.language.implicitConversions

import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.internal.config.{ConfigBuilder, ConfigEntry}

/**
Expand All @@ -32,15 +33,15 @@ import org.apache.spark.internal.config.{ConfigBuilder, ConfigEntry}
*/
object KyuubiConf {

private[this] val kyuubiConfEntries = new HashMap[String, ConfigEntry[_]]()
private val kyuubiConfEntries = new JMap[String, ConfigEntry[_]]()

def register(entry: ConfigEntry[_]): Unit = {
require(!kyuubiConfEntries.containsKey(entry.key),
s"Duplicate SQLConfigEntry. ${entry.key} has been registered")
kyuubiConfEntries.put(entry.key, entry)
}

private[this] object KyuubiConfigBuilder {
private object KyuubiConfigBuilder {
def apply(key: String): ConfigBuilder = ConfigBuilder(key).onCreate(register)
}

Expand Down Expand Up @@ -265,7 +266,7 @@ object KyuubiConf {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.SECONDS.toMillis(1L))

val BACKEND_SESSTION_INIT_TIMEOUT: ConfigEntry[Long] =
val BACKEND_SESSION_INIT_TIMEOUT: ConfigEntry[Long] =
KyuubiConfigBuilder("spark.kyuubi.backend.session.init.timeout")
.doc("How long we suggest the server to give up instantiating SparkContext")
.timeConf(TimeUnit.SECONDS)
Expand All @@ -285,12 +286,31 @@ object KyuubiConf {

val BACKEND_SESSION_LOCAL_DIR: ConfigEntry[String] =
KyuubiConfigBuilder("spark.kyuubi.backend.session.local.dir")
.doc("Default value to set spark.local.dir")
.doc("Default value to set `spark.local.dir`, for YARN mode, this only affect the Kyuubi" +
" server side settings according to the rule of Spark treating `spark.local.dir`")
.stringConf
.createWithDefault(
s"${sys.env.getOrElse("KYUUBI_HOME", System.getProperty("java.io.tmpdir"))}"
+ File.separator + "local")

val BACKEND_SESSION_LONG_CACHE: ConfigEntry[Boolean] =
KyuubiConfigBuilder("spark.kyuubi.backend.session.long.cache")
.doc("Whether to update the tokens of Spark's executor to support long caching" +
" SparkSessions iff this is true && `spark.kyuubi.backend.token.update.class` is" +
" loadable. This is used towards kerberized hadoop clusters in case of" +
" `spark.kyuubi.backend.session.idle.timeout` is set longer than token expiration time" +
" limit or SparkSession never idles.")
.booleanConf
.createWithDefault(UserGroupInformation.isSecurityEnabled)

val BACKEND_SESSION_TOKEN_UPDATE_CLASS: ConfigEntry[String] =
KyuubiConfigBuilder("spark.kyuubi.backend.session.token.update.class")
.doc("`CoarseGrainedClusterMessages` for token update message from the driver of Spark to" +
" executors, it is loadable only by higher version Spark release(2.3 and later)")
.stringConf
.createWithDefault(
"org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages$UpdateDelegationTokens")

/////////////////////////////////////////////////////////////////////////////////////////////////
// Authentication //
/////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.net.URI

import scala.annotation.tailrec
import scala.collection.Map
import scala.util.Try
import scala.util.matching.Regex

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -255,6 +256,11 @@ object KyuubiSparkUtil extends Logging {
loader
}

/** Determines whether the provided class is loadable in the current thread. */
def classIsLoadable(clazz: String): Boolean = {
Try { Class.forName(clazz, false, getContextOrSparkClassLoader) }.isSuccess
}

/**
* Generate proper configurations before server starts
* @param conf the default [[SparkConf]]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster

import java.io.{ByteArrayOutputStream, DataOutputStream}

import scala.collection.mutable
import scala.util.control.NonFatal

import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.KyuubiConf._
import org.apache.spark.SparkContext

import yaooqinn.kyuubi.utils.ReflectUtils._

/**
* Tool for methods used for Kyuubi to talking to Spark Executors
*/
object KyuubiSparkExecutorUtils {

/**
* Populate the tokens contained in the current KyuubiSession's ugi to the all the alive
* executors associated with its own SparkContext.
*
* @param sc The SparkContext with its runtime environment which contains all the executors,
* associated with the current KyuubiSession and UserGroupInformation.
* @param user the UserGroupInformation associated with the current KyuubiSession
*/
def populateTokens(sc: SparkContext, user: UserGroupInformation): Unit = {
val schedulerBackend = sc.schedulerBackend
schedulerBackend match {
case backend: CoarseGrainedSchedulerBackend =>
try {
val byteStream = new ByteArrayOutputStream
val dataStream = new DataOutputStream(byteStream)
user.getCredentials.writeTokenStorageToStream(dataStream)
val tokens = byteStream.toByteArray
val executorField =
classOf[CoarseGrainedSchedulerBackend].getName.replace('.', '$') + "$$executorDataMap"
val executors = backend match {
case _: YarnClientSchedulerBackend | _: YarnClusterSchedulerBackend |
_: StandaloneSchedulerBackend =>
getAncestorField(backend, 2, executorField)
case _ => getFieldValue(backend, executorField)
}
val msg = newInstance(sc.conf.get(BACKEND_SESSION_TOKEN_UPDATE_CLASS),
Seq(classOf[Array[Byte]]), Seq(tokens))
executors.asInstanceOf[mutable.HashMap[String, ExecutorData]]
.values.foreach(_.executorEndpoint.send(msg))
} catch {
case NonFatal(e) => warn(s"Failed to populate secured tokens to executors", e)
}
case _ =>
}
}
}

0 comments on commit 72e664f

Please sign in to comment.