Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,15 @@ class JDBCOptions(

// An option to allow/disallow pushing down predicate into JDBC data source
val pushDownPredicate = parameters.getOrElse(JDBC_PUSHDOWN_PREDICATE, "true").toBoolean

// ------------------------------------------------------------
// Optional parameters for Kerberos
// ------------------------------------------------------------
// The HDFS path of user's keytab file, which is assumed to be pre-uploaded to
// HDFS by user and distributed to executors by the --file option of spark-submit
val keytab = parameters.getOrElse(JDBC_KEYTAB, null)
// The principal name of user's keytab file
val principal = parameters.getOrElse(JDBC_PRINCIPAL, null)
}

class JdbcOptionsInWrite(
Expand Down Expand Up @@ -242,4 +251,6 @@ object JDBCOptions {
val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement")
val JDBC_PUSHDOWN_PREDICATE = newOption("pushDownPredicate")
val JDBC_KEYTAB = newOption("keytab")
val JDBC_PRINCIPAL = newOption("principal")
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

package org.apache.spark.sql.execution.datasources.jdbc

import java.security.PrivilegedAction
import java.sql.{Connection, Driver, DriverManager, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException}
import java.util.Locale

import scala.collection.JavaConverters._
import scala.util.Try
import scala.util.control.NonFatal

import org.apache.commons.io.FilenameUtils
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.TaskContext
import org.apache.spark.executor.InputMetrics
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -61,7 +65,23 @@ object JdbcUtils extends Logging {
throw new IllegalStateException(
s"Did not find registered driver with class $driverClass")
}
val connection: Connection = driver.connect(options.url, options.asConnectionProperties)

val connection: Connection = {
if (options.keytab != null && options.principal != null) {
val keytabFileName = FilenameUtils.getName(options.keytab)
UserGroupInformation
.loginUserFromKeytabAndReturnUGI(options.principal, keytabFileName)
.doAs(new PrivilegedAction[Connection] {
override def run(): Connection = {
driver.connect(options.url, options.asConnectionProperties)
}
})
}
else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems there is no option for TGT nor delegation token. This later may be useful for hadoop components though. But distributing the keytab is not the most secure way and KDC may be overloaded.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @misutoth ,

  1. Distributing keytab is based on the assumption of secure HDFS. Each executor requires a keytab to do kinit, so we have to distribute the keytab across different executors. If HDFS is secured, then the keytab should be safe.

  2. A user is required to upload the data to /spark/admin.keytab. This file is pre-uploaded by the user.

  3. Since SQL server does not delegation token, we cannot use delegation token. If SQL server supports delegation token in the future, we can use a more efficient way to do the kinit. Right now we need to use keytab (or password) to do the kinit and this limit comes from SQL server side, not HDFS side.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see an issue with UserGroupInformation.loginUserFromKeytabAndReturnUGI(options.principal, keytabFileName)

This would replace static field on UGI for keytab and principal. So further call to UGI for for hadoop may not work.

I think better approach would be to not to rely on UGI. instead do login using LoginContext and wrap call with Subject.doAs(subject,...)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see an issue with UserGroupInformation.loginUserFromKeytabAndReturnUGI(options.principal, keytabFileName)

This would replace static field on UGI for keytab and principal. So further call to UGI for for hadoop may not work.

I think better approach would be to not to rely on UGI. instead do login using LoginContext and wrap call with Subject.doAs(subject,...)

Good suggestion. I will give a try and if it works I will update this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@misutoth @dasbh, the keytab is only for JDBC connection and not intended for future use by other hadoop calls, for now, since these database does not support delegation token yet. Usually customer should use their own principal in keytab file stored in secure HDFS to login to sql database, the same as the credential they use to submit the Spark application. This is really a workaround for databases that do not support delegation token.

driver.connect(options.url, options.asConnectionProperties)
}
}

require(connection != null,
s"The driver could not open a JDBC connection. Check the URL: ${options.url}")

Expand Down