Skip to content

Commit

Permalink
[SPARK-31272][SQL] Support DB2 Kerberos login in JDBC connector
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
When loading DataFrames from JDBC datasource with Kerberos authentication, remote executors (yarn-client/cluster etc. modes) fail to establish a connection due to lack of Kerberos ticket or ability to generate it.

This is a real issue when trying to ingest data from kerberized data sources (SQL Server, Oracle) in enterprise environment where exposing simple authentication access is not an option due to IT policy issues.

In this PR I've added DB2 support (other supported databases will come in later PRs).

What this PR contains:
* Added `DB2ConnectionProvider`
* Added `DB2ConnectionProviderSuite`
* Added `DB2KrbIntegrationSuite` docker integration test
* Changed DB2 JDBC driver to use the latest (test scope only)
* Changed test table data type to a type which is supported by all the databases
* Removed double connection creation on test side
* Increased connection timeout in docker tests because DB2 docker takes quite a time to start

### Why are the changes needed?
Missing JDBC kerberos support.

### Does this PR introduce any user-facing change?
Yes, now user is able to connect to DB2 using kerberos.

### How was this patch tested?
* Additional + existing unit tests
* Additional + existing integration tests
* Test on cluster manually

Closes #28215 from gaborgsomogyi/SPARK-31272.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@apache.org>
  • Loading branch information
gaborgsomogyi authored and vanzin committed Apr 23, 2020
1 parent 54b97b2 commit c619990
Show file tree
Hide file tree
Showing 17 changed files with 281 additions and 33 deletions.
13 changes: 6 additions & 7 deletions external/docker-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,20 +149,19 @@
-Dmaven.repo.drivers=http://my.local.repo
2) have a copy of the DB2 JCC driver and run the following commands :
mvn install:install-file -Dfile=${path to db2jcc4.jar} \
mvn install:install-file -Dfile=${path to jcc.jar} \
-DgroupId=com.ibm.db2 \
-DartifactId=db2jcc4 \
-Dversion=10.5 \
-DartifactId=jcc \
-Dversion=11.5 \
-Dpackaging=jar
Note: IBM DB2 JCC driver is available for download at
http://www-01.ibm.com/support/docview.wss?uid=swg21363866
-->
<dependency>
<groupId>com.ibm.db2.jcc</groupId>
<artifactId>db2jcc4</artifactId>
<version>10.5.0.5</version>
<type>jar</type>
<groupId>com.ibm.db2</groupId>
<artifactId>jcc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env bash

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

USERPROFILE=/database/config/db2inst1/sqllib/userprofile
echo "export DB2_KRB5_PRINCIPAL=db2/__IP_ADDRESS_REPLACE_ME__@EXAMPLE.COM" >> $USERPROFILE
echo "export KRB5_KTNAME=/var/custom/db2.keytab" >> $USERPROFILE
# This trick is needed because DB2 forwards environment variables automatically only if it's starting with DB2.
su - db2inst1 -c "db2set DB2ENVLIST=KRB5_KTNAME"

su - db2inst1 -c "db2 UPDATE DBM CFG USING SRVCON_GSSPLUGIN_LIST IBMkrb5 IMMEDIATE"
su - db2inst1 -c "db2 UPDATE DBM CFG USING SRVCON_AUTH KERBEROS IMMEDIATE"

su - db2inst1 -c "db2stop force; db2start"
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{BooleanType, ByteType, ShortType, StructType}
import org.apache.spark.tags.DockerTest


@DockerTest
@Ignore // AMPLab Jenkins needs to be updated before shared memory works on docker
class DB2IntegrationSuite extends DockerJDBCIntegrationSuite {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.jdbc

import java.security.PrivilegedExceptionAction
import java.sql.Connection
import javax.security.auth.login.Configuration

import com.spotify.docker.client.messages.{ContainerConfig, HostConfig}
import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation}
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS

import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.execution.datasources.jdbc.connection.{DB2ConnectionProvider, SecureConnectionProvider}
import org.apache.spark.tags.DockerTest

@DockerTest
class DB2KrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite {
override protected val userName = s"db2/$dockerIp"
override protected val keytabFileName = "db2.keytab"

override val db = new DatabaseOnDocker {
override val imageName = "ibmcom/db2:11.5.0.0a"
override val env = Map(
"DB2INST1_PASSWORD" -> "rootpass",
"LICENSE" -> "accept",
"DBNAME" -> "db2"
)
override val usesIpc = false
override val jdbcPort = 50000
override val privileged = true
override def getJdbcUrl(ip: String, port: Int): String = s"jdbc:db2://$ip:$port/db2"
override def getJdbcProperties() = {
val options = new JDBCOptions(Map[String, String](
JDBCOptions.JDBC_URL -> getJdbcUrl(dockerIp, externalPort),
JDBCOptions.JDBC_TABLE_NAME -> "bar",
JDBCOptions.JDBC_KEYTAB -> keytabFileName,
JDBCOptions.JDBC_PRINCIPAL -> principal
))
new DB2ConnectionProvider(null, options).getAdditionalProperties()
}

override def beforeContainerStart(
hostConfigBuilder: HostConfig.Builder,
containerConfigBuilder: ContainerConfig.Builder): Unit = {
copyExecutableResource("db2_krb_setup.sh", initDbDir, replaceIp)

hostConfigBuilder.appendBinds(
HostConfig.Bind.from(initDbDir.getAbsolutePath)
.to("/var/custom").readOnly(true).build()
)
}
}

override protected def setAuthentication(keytabFile: String, principal: String): Unit = {
val config = new SecureConnectionProvider.JDBCConfiguration(
Configuration.getConfiguration, "JaasClient", keytabFile, principal)
Configuration.setConfiguration(config)
}

override def getConnection(): Connection = {
val config = new org.apache.hadoop.conf.Configuration
SecurityUtil.setAuthenticationMethod(KERBEROS, config)
UserGroupInformation.setConfiguration(config)

UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytabFullPath).doAs(
new PrivilegedExceptionAction[Connection]() {
override def run(): Connection = {
DB2KrbIntegrationSuite.super.getConnection()
}
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.sql.jdbc

import java.net.ServerSocket
import java.sql.Connection
import java.sql.{Connection, DriverManager}
import java.util.Properties

import scala.collection.JavaConverters._
import scala.util.control.NonFatal
Expand Down Expand Up @@ -53,11 +54,21 @@ abstract class DatabaseOnDocker {
*/
val jdbcPort: Int

/**
* Parameter whether the container should run privileged.
*/
val privileged: Boolean = false

/**
* Return a JDBC URL that connects to the database running at the given IP address and port.
*/
def getJdbcUrl(ip: String, port: Int): String

/**
* Return the JDBC properties needed for the connection.
*/
def getJdbcProperties(): Properties = new Properties()

/**
* Optional entry point when container starts
*
Expand Down Expand Up @@ -118,6 +129,7 @@ abstract class DockerJDBCIntegrationSuite extends SharedSparkSession with Eventu
port
}
val hostConfigBuilder = HostConfig.builder()
.privileged(db.privileged)
.networkMode("bridge")
.ipcMode(if (db.usesIpc) "host" else "")
.portBindings(
Expand All @@ -142,12 +154,11 @@ abstract class DockerJDBCIntegrationSuite extends SharedSparkSession with Eventu
// Start the container and wait until the database can accept JDBC connections:
docker.startContainer(containerId)
jdbcUrl = db.getJdbcUrl(dockerIp, externalPort)
eventually(timeout(1.minute), interval(1.second)) {
val conn = java.sql.DriverManager.getConnection(jdbcUrl)
conn.close()
var conn: Connection = null
eventually(timeout(2.minutes), interval(1.second)) {
conn = getConnection()
}
// Run any setup queries:
val conn: Connection = java.sql.DriverManager.getConnection(jdbcUrl)
try {
dataPreparation(conn)
} finally {
Expand Down Expand Up @@ -183,6 +194,13 @@ abstract class DockerJDBCIntegrationSuite extends SharedSparkSession with Eventu
}
}

/**
* Return the JDBC connection.
*/
def getConnection(): Connection = {
DriverManager.getConnection(jdbcUrl, db.getJdbcProperties())
}

/**
* Prepare databases and tables for testing.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ abstract class DockerKrbJDBCIntegrationSuite extends DockerJDBCIntegrationSuite
}
}

protected def replaceIp(s: String): String = s.replace("__IP_ADDRESS_REPLACE_ME__", dockerIp)

protected def copyExecutableResource(
fileName: String, dir: File, processLine: String => String = identity) = {
val newEntry = new File(dir.getAbsolutePath, fileName)
Expand All @@ -100,7 +102,7 @@ abstract class DockerKrbJDBCIntegrationSuite extends DockerJDBCIntegrationSuite
}

override def dataPreparation(conn: Connection): Unit = {
conn.prepareStatement("CREATE TABLE bar (c0 text)").executeUpdate()
conn.prepareStatement("CREATE TABLE bar (c0 VARCHAR(8))").executeUpdate()
conn.prepareStatement("INSERT INTO bar VALUES ('hello')").executeUpdate()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ class MariaDBKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite {
override def beforeContainerStart(
hostConfigBuilder: HostConfig.Builder,
containerConfigBuilder: ContainerConfig.Builder): Unit = {
def replaceIp(s: String): String = s.replace("__IP_ADDRESS_REPLACE_ME__", dockerIp)
copyExecutableResource("mariadb_docker_entrypoint.sh", entryPointDir, replaceIp)
copyExecutableResource("mariadb_krb_setup.sh", initDbDir, replaceIp)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ class PostgresKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite {
override def beforeContainerStart(
hostConfigBuilder: HostConfig.Builder,
containerConfigBuilder: ContainerConfig.Builder): Unit = {
def replaceIp(s: String): String = s.replace("__IP_ADDRESS_REPLACE_ME__", dockerIp)
copyExecutableResource("postgres_krb_setup.sh", initDbDir, replaceIp)

hostConfigBuilder.appendBinds(
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,12 @@
<version>42.2.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.ibm.db2</groupId>
<artifactId>jcc</artifactId>
<version>11.5.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.ibm.db2</groupId>
<artifactId>jcc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection

import java.sql.{Connection, Driver}

import scala.collection.JavaConverters._

import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions

private[jdbc] class BasicConnectionProvider(driver: Driver, options: JDBCOptions)
extends ConnectionProvider {
def getConnection(): Connection = {
driver.connect(options.url, options.asConnectionProperties)
val properties = getAdditionalProperties()
options.asConnectionProperties.entrySet().asScala.foreach { e =>
properties.put(e.getKey(), e.getValue())
}
driver.connect(options.url, properties)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.jdbc.connection

import java.sql.{Connection, Driver}
import java.util.Properties

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
Expand All @@ -28,6 +29,11 @@ import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
* the parameters.
*/
private[jdbc] trait ConnectionProvider {
/**
* Additional properties for data connection (Data source property takes precedence).
*/
def getAdditionalProperties(): Properties = new Properties()

/**
* Opens connection toward the database.
*/
Expand All @@ -50,6 +56,10 @@ private[jdbc] object ConnectionProvider extends Logging {
logDebug("MariaDB connection provider found")
new MariaDBConnectionProvider(driver, options)

case DB2ConnectionProvider.driverClass =>
logDebug("DB2 connection provider found")
new DB2ConnectionProvider(driver, options)

case _ =>
throw new IllegalArgumentException(s"Driver ${options.driverClass} does not support " +
"Kerberos authentication")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.jdbc.connection

import java.security.PrivilegedExceptionAction
import java.sql.{Connection, Driver}
import java.util.Properties

import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions

private[sql] class DB2ConnectionProvider(driver: Driver, options: JDBCOptions)
extends SecureConnectionProvider(driver, options) {
override val appEntry: String = "JaasClient"

override def getConnection(): Connection = {
setAuthenticationConfigIfNeeded()
UserGroupInformation.loginUserFromKeytabAndReturnUGI(options.principal, options.keytab).doAs(
new PrivilegedExceptionAction[Connection]() {
override def run(): Connection = {
DB2ConnectionProvider.super.getConnection()
}
}
)
}

override def getAdditionalProperties(): Properties = {
val result = new Properties()
// 11 is the integer value for kerberos
result.put("securityMechanism", new String("11"))
result.put("KerberosServerPrincipal", options.principal)
result
}

override def setAuthenticationConfigIfNeeded(): Unit = {
val (parent, configEntry) = getConfigWithAppEntry()
if (configEntry == null || configEntry.isEmpty) {
setAuthenticationConfig(parent)
}
}
}

private[sql] object DB2ConnectionProvider {
val driverClass = "com.ibm.db2.jcc.DB2Driver"
}
Loading

0 comments on commit c619990

Please sign in to comment.