Skip to content

Commit

Permalink
Merge pull request #1027 from saLeox/dev-1.0.3
Browse files Browse the repository at this point in the history
Support proxy user with kerberos
  • Loading branch information
peacewong committed Oct 13, 2021
2 parents 509009f + c6e177b commit b8e2017
Show file tree
Hide file tree
Showing 13 changed files with 42 additions and 21 deletions.
1 change: 0 additions & 1 deletion .gitignore
Expand Up @@ -2,7 +2,6 @@
.idea
.DS_Store


out/
linkis.ipr
linkis.iws
Expand Down
Expand Up @@ -28,6 +28,10 @@ object HadoopConf {

val KEYTAB_HOST_ENABLED = CommonVars("wds.linkis.keytab.host.enabled", false)

val KEYTAB_PROXYUSER_ENABLED = CommonVars("wds.linkis.keytab.proxyuser.enable", false)

val KEYTAB_PROXYUSER_SUPERUSER = CommonVars("wds.linkis.keytab.proxyuser.superuser", "hadoop")

val hadoopConfDir = CommonVars("hadoop.config.dir", CommonVars("HADOOP_CONF_DIR", "").getValue).getValue

val HADOOP_EXTERNAL_CONF_DIR_PREFIX = CommonVars("wds.linkis.hadoop.external.conf.dir.prefix", "/appcom/config/external-conf/hadoop")
Expand Down
Expand Up @@ -130,15 +130,23 @@ object HDFSUtils extends Logging {
}
}

def getUserGroupInformation(userName: String): UserGroupInformation = {
def getUserGroupInformation(userName: String): UserGroupInformation = {
if (KERBEROS_ENABLE.getValue) {
val path = new File(KEYTAB_FILE.getValue, userName + ".keytab").getPath
val user = getKerberosUser(userName)
UserGroupInformation.setConfiguration(getConfiguration(userName))
UserGroupInformation.loginUserFromKeytabAndReturnUGI(user, path)
} else {
UserGroupInformation.createRemoteUser(userName)
}
if (!KEYTAB_PROXYUSER_ENABLED.getValue) {
val path = new File(KEYTAB_FILE.getValue, userName + ".keytab").getPath
val user = getKerberosUser(userName)
UserGroupInformation.setConfiguration(getConfiguration(userName))
UserGroupInformation.loginUserFromKeytabAndReturnUGI(user, path)
} else {
val superUser = KEYTAB_PROXYUSER_SUPERUSER.getValue
val path = new File(KEYTAB_FILE.getValue, superUser + ".keytab").getPath
val user = getKerberosUser(superUser)
UserGroupInformation.setConfiguration(getConfiguration(superUser))
UserGroupInformation.createProxyUser(userName, UserGroupInformation.loginUserFromKeytabAndReturnUGI(user, path))
}
} else {
UserGroupInformation.createRemoteUser(userName)
}
}

def getKerberosUser(userName: String): String = {
Expand Down
Expand Up @@ -27,5 +27,3 @@ wds.linkis.engineconn.plugin.default.class=com.webank.wedatasphere.linkis.engine
wds.linkis.engine.connector.hooks=com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.JarUdfEngineHook

wds.linkis.engineconn.executor.manager.class=com.webank.wedatasphere.linkis.engineconnplugin.flink.executormanager.FlinkExecutorManager


Expand Up @@ -26,4 +26,4 @@ wds.linkis.engineconn.plugin.default.class=com.webank.wedatasphere.linkis.engine

wds.linkis.bdp.hive.init.sql.enable=true

wds.linkis.engine.connector.hooks=com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.JarUdfEngineHook
wds.linkis.engine.connector.hooks=com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.JarUdfEngineHook
Expand Up @@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import com.webank.wedatasphere.linkis.hadoop.common.conf.HadoopConf

class HiveEngineConnExecutor(id: Int,
sessionState: SessionState,
Expand Down Expand Up @@ -90,6 +91,9 @@ class HiveEngineConnExecutor(id: Int,

override def init(): Unit = {
LOG.info(s"Ready to change engine state!")
if (HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) {
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
}
setCodeParser(new SQLCodeParser)
super.init()
}
Expand Down
Expand Up @@ -30,4 +30,4 @@ wds.linkis.engineconn.io.version=1
wds.linkis.engineconn.support.parallelism=true

wds.linkis.engineconn.max.free.time=0
wds.linkis.engine.push.log.enable=false
wds.linkis.engine.push.log.enable=false
Expand Up @@ -26,4 +26,3 @@ wds.linkis.engineconn.plugin.default.class=com.webank.wedatasphere.linkis.manage
#wds.linkis.engine.io.opts=" -Dfile.encoding=UTF-8 -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=49100 "

wds.linkis.engineconn.support.parallelism=true

Expand Up @@ -26,4 +26,3 @@ wds.linkis.engineconn.debug.enable=true
wds.linkis.engineconn.plugin.default.class=com.webank.wedatasphere.linkis.manager.engineplugin.pipeline.PipelineEngineConnPlugin

wds.linkis.engineconn.max.free.time=5m

Expand Up @@ -25,4 +25,4 @@ wds.linkis.engineconn.debug.enable=true

wds.linkis.engineconn.plugin.default.class=com.webank.wedatasphere.linkis.manager.engineplugin.python.PythonEngineConnPlugin

wds.linkis.engine.connector.hooks=com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.PyFunctionEngineHook
wds.linkis.engine.connector.hooks=com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.PyFunctionEngineHook
Expand Up @@ -27,4 +27,3 @@ wds.linkis.engineconn.debug.enable=true
wds.linkis.engineconn.plugin.default.class=com.webank.wedatasphere.linkis.engineplugin.spark.SparkEngineConnPlugin

wds.linkis.engine.connector.hooks=com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.PyUdfEngineHook,com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.ScalaUdfEngineHook,com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.JarUdfEngineHook,com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.PyFunctionEngineHook,com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.ScalaFunctionEngineHook

Expand Up @@ -79,7 +79,6 @@ object SparkConfiguration extends Logging {

val IS_VIEWFS_ENV = CommonVars("wds.linkis.spark.engine.is.viewfs.env", true)


private def getMainJarName(): String = {
val somePath = ClassUtils.jarOfClass(classOf[SparkEngineConnFactory])
if (somePath.isDefined) {
Expand Down
Expand Up @@ -32,9 +32,11 @@ import com.webank.wedatasphere.linkis.manager.label.entity.Label
import com.webank.wedatasphere.linkis.manager.label.entity.engine.UserCreatorLabel
import com.webank.wedatasphere.linkis.protocol.UserWithCreator
import org.apache.commons.lang.StringUtils

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import com.webank.wedatasphere.linkis.hadoop.common.conf.HadoopConf


/**
*
Expand Down Expand Up @@ -159,10 +161,14 @@ class SparkSubmitProcessEngineConnLaunchBuilder private extends JavaProcessEngin
}
}


addOpt("--master", _master)
addOpt("--deploy-mode", _deployMode)
addOpt("--name", _name)

if (HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue && _proxyUser.nonEmpty) {
addOpt("--proxy-user", _proxyUser)
}

//addOpt("--jars",Some(ENGINEMANAGER_JAR.getValue))
// info("No need to add jars for " + _jars.map(fromPath).exists(x => x.equals("hdfs:///")).toString())
_jars = _jars.filter(_.isNotBlankPath())
Expand Down Expand Up @@ -364,7 +370,6 @@ class SparkSubmitProcessEngineConnLaunchBuilder private extends JavaProcessEngin
val file = new java.io.File(x.path)
file.isFile
}).foreach(jar)
proxyUser(getValueAndRemove(properties, "proxyUser", ""))
if (null != darResource) {
this.queue(darResource.yarnResource.queueName)
} else {
Expand All @@ -387,6 +392,13 @@ class SparkSubmitProcessEngineConnLaunchBuilder private extends JavaProcessEngin
}
}
}

if (!HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) {
this.proxyUser(getValueAndRemove(properties, "proxyUser", ""))
} else {
this.proxyUser(this._userWithCreator.user)
}

//deal spark conf and spark.hadoop.*
val iterator = properties.entrySet().iterator()
val sparkConfKeys = ArrayBuffer[String]()
Expand Down

0 comments on commit b8e2017

Please sign in to comment.