From 652786dad5a65e997d6a5a38a0fa2bbb8a7f145f Mon Sep 17 00:00:00 2001 From: zixu Date: Fri, 19 Apr 2019 19:07:37 -0700 Subject: [PATCH] Support Kerberos login in JDBC connector --- .../datasources/jdbc/JDBCOptions.scala | 11 ++++++++++ .../datasources/jdbc/JdbcUtils.scala | 22 ++++++++++++++++++- 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index b4469cb538fa6..96a6405df29d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -190,6 +190,15 @@ class JDBCOptions( // An option to allow/disallow pushing down predicate into JDBC data source val pushDownPredicate = parameters.getOrElse(JDBC_PUSHDOWN_PREDICATE, "true").toBoolean + + // ------------------------------------------------------------ + // Optional parameters for Kerberos + // ------------------------------------------------------------ + // The HDFS path of user's keytab file, which is assumed to be pre-uploaded to + // HDFS by user and distributed to executors by the --file option of spark-submit + val keytab = parameters.getOrElse(JDBC_KEYTAB, null) + // The principal name of user's keytab file + val principal = parameters.getOrElse(JDBC_PRINCIPAL, null) } class JdbcOptionsInWrite( @@ -242,4 +251,6 @@ object JDBCOptions { val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel") val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement") val JDBC_PUSHDOWN_PREDICATE = newOption("pushDownPredicate") + val JDBC_KEYTAB = newOption("keytab") + val JDBC_PRINCIPAL = newOption("principal") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 86a27b5afc250..df934f813b967 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.security.PrivilegedAction import java.sql.{Connection, Driver, DriverManager, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException} import java.util.Locale @@ -24,6 +25,9 @@ import scala.collection.JavaConverters._ import scala.util.Try import scala.util.control.NonFatal +import org.apache.commons.io.FilenameUtils +import org.apache.hadoop.security.UserGroupInformation + import org.apache.spark.TaskContext import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.Logging @@ -61,7 +65,23 @@ object JdbcUtils extends Logging { throw new IllegalStateException( s"Did not find registered driver with class $driverClass") } - val connection: Connection = driver.connect(options.url, options.asConnectionProperties) + + val connection: Connection = { + if (options.keytab != null && options.principal != null) { + val keytabFileName = FilenameUtils.getName(options.keytab) + UserGroupInformation + .loginUserFromKeytabAndReturnUGI(options.principal, keytabFileName) + .doAs(new PrivilegedAction[Connection] { + override def run(): Connection = { + driver.connect(options.url, options.asConnectionProperties) + } + }) + } + else { + driver.connect(options.url, options.asConnectionProperties) + } + } + require(connection != null, s"The driver could not open a JDBC connection. Check the URL: ${options.url}")