Skip to content

Commit

Permalink
add HDFS token when security enbale
Browse files Browse the repository at this point in the history
  • Loading branch information
AngersZhuuuu committed Aug 8, 2019
1 parent 5135706 commit 76c563d
Showing 1 changed file with 19 additions and 19 deletions.
Expand Up @@ -18,20 +18,18 @@
package org.apache.spark.sql.hive.thriftserver

import scala.collection.JavaConverters._

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hive.service.cli.{HiveSQLException, SessionHandle}
import org.apache.hive.service.cli.session.{HiveSession, HiveSessionImpl, HiveSessionImplwithUGI, HiveSessionProxy, SessionManager}
import org.apache.hive.service.server.HiveServer2

import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
import org.apache.spark.sql.hive.thriftserver.util.{ThriftServerHadoopUtils, ThriftServerHDFSDelegationTokenProvider}
import org.apache.spark.sql.hive.thriftserver.util.{ThriftServerHDFSDelegationTokenProvider, ThriftServerHadoopUtils}

private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: SQLContext)
extends SessionManager(hiveServer)
Expand Down Expand Up @@ -69,24 +67,26 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext:
hiveConf,
ipAddress,
delegationToken)
val ugi = sessionWithUGI.getSessionUgi
val originalCreds = ugi.getCredentials
val creds = new Credentials()
ThriftServerHadoopUtils.doAs(ugi)(() => hadoopTokenProvider
.obtainDelegationTokens(creds, username))
if (UserGroupInformation.isSecurityEnabled) {
val ugi = sessionWithUGI.getSessionUgi
val originalCreds = ugi.getCredentials
val creds = new Credentials()
ThriftServerHadoopUtils.doAs(ugi)(() => hadoopTokenProvider
.obtainDelegationTokens(creds, username))

val tokens: String = creds.getAllTokens.asScala.map(token => {
token.encodeToUrlString()
}).mkString(SparkContext.SPARK_JOB_TOKEN_DELIMiTER)

ugi.addCredentials(creds)
val existing = ugi.getCredentials()
existing.mergeAll(originalCreds)
ugi.addCredentials(existing)
val tokens: String = creds.getAllTokens.asScala.map(token => {
token.encodeToUrlString()
}).mkString(SparkContext.SPARK_JOB_TOKEN_DELIMiTER)

ugi.addCredentials(creds)
val existing = ugi.getCredentials()
existing.mergeAll(originalCreds)
ugi.addCredentials(existing)
sparkSqlOperationManager.sessionToTokens.put(session.getSessionHandle, tokens)
}
session = HiveSessionProxy.getProxy(sessionWithUGI, sessionWithUGI.getSessionUgi)
sessionWithUGI.setProxySession(session)
sparkSqlOperationManager.sessionToTokens.put(session.getSessionHandle, tokens)

sparkSession = sparkSessionManager.getOrCreteSparkSession(sessionWithUGI, true)
} else {
session = new HiveSessionImpl(protocol, username, passwd, hiveConf, ipAddress)
Expand Down

0 comments on commit 76c563d

Please sign in to comment.