Skip to content

Commit

Permalink
[KYUUBI #6368] Support impersonation mode for flink sql engine
Browse files Browse the repository at this point in the history
  • Loading branch information
wForget committed May 10, 2024
1 parent b28d879 commit 66f94ce
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 4 deletions.
1 change: 1 addition & 0 deletions docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.engine.flink.initialize.sql | SHOW DATABASES | The initialize sql for Flink engine. It fallback to `kyuubi.engine.initialize.sql`. | seq | 1.8.1 |
| kyuubi.engine.flink.java.options | <undefined> | The extra Java options for the Flink SQL engine. Only effective in yarn session mode. | string | 1.6.0 |
| kyuubi.engine.flink.memory | 1g | The heap memory for the Flink SQL engine. Only effective in yarn session mode. | string | 1.6.0 |
| kyuubi.engine.flink.proxy.user.enabled | false | Whether to enable using hadoop proxy user to run flink engine. Only takes effect in kerberos environment and when `kyuubi.engine.doAs.enabled` is set to `true`. | boolean | 1.10.0 |
| kyuubi.engine.hive.deploy.mode | LOCAL | Configures the hive engine deploy mode, The value can be 'local', 'yarn'. In local mode, the engine operates on the same node as the KyuubiServer. In YARN mode, the engine runs within the Application Master (AM) container of YARN. | string | 1.9.0 |
| kyuubi.engine.hive.event.loggers | JSON | A comma-separated list of engine history loggers, where engine/session/operation etc events go.<ul> <li>JSON: the events will be written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be done</li> <li>CUSTOM: to be done.</li></ul> | seq | 1.7.0 |
| kyuubi.engine.hive.extra.classpath | &lt;undefined&gt; | The extra classpath for the Hive query engine, for configuring location of the hadoop client jars and etc. | string | 1.6.0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2932,6 +2932,14 @@ object KyuubiConf {
.version("1.8.1")
.fallbackConf(ENGINE_INITIALIZE_SQL)

val ENGINE_FLINK_PROXY_USER_ENABLED: ConfigEntry[Boolean] =
buildConf("kyuubi.engine.flink.proxy.user.enabled")
.doc("Whether to enable using hadoop proxy user to run flink engine. Only takes effect" +
s" in kerberos environment and when `${ENGINE_DO_AS_ENABLED.key}` is set to `true`.")
.version("1.10.0")
.booleanConf
.createWithDefault(false)

val SERVER_LIMIT_CONNECTIONS_PER_USER: OptionalConfigEntry[Int] =
buildConf("kyuubi.server.limit.connections.per.user")
.doc("Maximum kyuubi server connections per user." +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,31 @@ class FlinkProcessBuilder(
// flink.execution.target are required in Kyuubi conf currently
val executionTarget: Option[String] = conf.getOption("flink.execution.target")

private lazy val proxyUserEnable: Boolean = {
doAsEnabled && conf.get(ENGINE_FLINK_PROXY_USER_ENABLED) &&
conf.getOption(s"flink.$FLINK_SECURITY_KEYTAB_KEY").isEmpty &&
conf.getOption(s"flink.$FLINK_SECURITY_PRINCIPAL_KEY").isEmpty &&
!conf.getOption(s"flink.$FLINK_SECURITY_DELEGATION_TOKENS_ENABLED_KEY").exists(_.toBoolean)
}

override protected def module: String = "kyuubi-flink-sql-engine"

override protected def mainClass: String = "org.apache.kyuubi.engine.flink.FlinkSQLEngine"

override def env: Map[String, String] = conf.getEnvs +
("FLINK_CONF_DIR" -> conf.getEnvs.getOrElse(
"FLINK_CONF_DIR",
s"$flinkHome${File.separator}conf"))
override def env: Map[String, String] = {
val flinkExtraEnvs = if (proxyUserEnable) {
Map(
"FLINK_CONF_DIR" -> conf.getEnvs.getOrElse(
"FLINK_CONF_DIR",
s"$flinkHome${File.separator}conf"),
FLINK_PROXY_USER_KEY -> proxyUser)
} else {
Map("FLINK_CONF_DIR" -> conf.getEnvs.getOrElse(
"FLINK_CONF_DIR",
s"$flinkHome${File.separator}conf"))
}
conf.getEnvs ++ flinkExtraEnvs
}

override def clusterManager(): Option[String] = {
executionTarget match {
Expand Down Expand Up @@ -121,6 +138,13 @@ class FlinkProcessBuilder(
buffer += s"-Dyarn.application.name=${conf.getOption(APP_KEY).get}"
buffer += s"-Dyarn.tags=${conf.getOption(YARN_TAG_KEY).get}"
buffer += "-Dcontainerized.master.env.FLINK_CONF_DIR=."
if (proxyUserEnable && conf.getOption(
s"flink.$FLINK_SECURITY_DELEGATION_TOKENS_ENABLED_KEY").isEmpty) {
// FLINK-31109: Flink only supports hadoop proxy user only when delegation tokens fetch
// is managed outside. So we need to disable delegation tokens of flink and rely on kyuubi
// server to maintain engine's tokens.
buffer += s"-D$FLINK_SECURITY_DELEGATION_TOKENS_ENABLED_KEY=false"
}

hiveConfDirOpt.foreach { _ =>
buffer += "-Dcontainerized.master.env.HIVE_CONF_DIR=."
Expand Down Expand Up @@ -217,4 +241,7 @@ object FlinkProcessBuilder {
final val YARN_TAG_KEY = "yarn.tags"
final val FLINK_HADOOP_CLASSPATH_KEY = "FLINK_HADOOP_CLASSPATH"
final val FLINK_PROXY_USER_KEY = "HADOOP_PROXY_USER"
final val FLINK_SECURITY_KEYTAB_KEY = "security.kerberos.login.keytab"
final val FLINK_SECURITY_PRINCIPAL_KEY = "security.kerberos.login.principal"
final val FLINK_SECURITY_DELEGATION_TOKENS_ENABLED_KEY = "security.delegation.tokens.enabled"
}

0 comments on commit 66f94ce

Please sign in to comment.