Skip to content

Commit

Permalink
[SPARK-39530][SS][TESTS] Fix KafkaTestUtils to support IPv6
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR aims to fix `KafkaTestUtils` to support IPv6.

### Why are the changes needed?

Currently, the test suite is using a hard-coded `127.0.0.1` like the following.
```
props.put("listeners", "SASL_PLAINTEXT://127.0.0.1:0")
props.put("advertised.listeners", "SASL_PLAINTEXT://127.0.0.1:0")
```

### Does this PR introduce _any_ user-facing change?

No. This is a test-only change.

### How was this patch tested?

Pass the CIs.

Closes #36923 from dongjoon-hyun/SPARK-39530.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
  • Loading branch information
dongjoon-hyun authored and wangyum committed Jun 20, 2022
1 parent 540e695 commit 550f5fe
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.kafka010

import java.io.{File, IOException}
import java.net.{InetAddress, InetSocketAddress}
import java.net.InetSocketAddress
import java.nio.charset.StandardCharsets
import java.util.{Collections, Properties, UUID}
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -68,13 +68,13 @@ class KafkaTestUtils(

private val JAVA_AUTH_CONFIG = "java.security.auth.login.config"

private val localCanonicalHostName = InetAddress.getLoopbackAddress().getCanonicalHostName()
logInfo(s"Local host name is $localCanonicalHostName")
private val localHostNameForURI = Utils.localHostNameForURI()
logInfo(s"Local host name is $localHostNameForURI")

private var kdc: MiniKdc = _

// Zookeeper related configurations
private val zkHost = localCanonicalHostName
private val zkHost = localHostNameForURI
private var zkPort: Int = 0
private val zkConnectionTimeout = 60000
private val zkSessionTimeout = 10000
Expand All @@ -83,12 +83,12 @@ class KafkaTestUtils(
private var zkClient: KafkaZkClient = _

// Kafka broker related configurations
private val brokerHost = localCanonicalHostName
private val brokerHost = localHostNameForURI
private var brokerPort = 0
private var brokerConf: KafkaConfig = _

private val brokerServiceName = "kafka"
private val clientUser = s"client/$localCanonicalHostName"
private val clientUser = s"client/$localHostNameForURI"
private var clientKeytabFile: File = _

// Kafka broker server
Expand Down Expand Up @@ -202,17 +202,17 @@ class KafkaTestUtils(
assert(kdcReady, "KDC should be set up beforehand")
val baseDir = Utils.createTempDir()

val zkServerUser = s"zookeeper/$localCanonicalHostName"
val zkServerUser = s"zookeeper/$localHostNameForURI"
val zkServerKeytabFile = new File(baseDir, "zookeeper.keytab")
kdc.createPrincipal(zkServerKeytabFile, zkServerUser)
logDebug(s"Created keytab file: ${zkServerKeytabFile.getAbsolutePath()}")

val zkClientUser = s"zkclient/$localCanonicalHostName"
val zkClientUser = s"zkclient/$localHostNameForURI"
val zkClientKeytabFile = new File(baseDir, "zkclient.keytab")
kdc.createPrincipal(zkClientKeytabFile, zkClientUser)
logDebug(s"Created keytab file: ${zkClientKeytabFile.getAbsolutePath()}")

val kafkaServerUser = s"kafka/$localCanonicalHostName"
val kafkaServerUser = s"kafka/$localHostNameForURI"
val kafkaServerKeytabFile = new File(baseDir, "kafka.keytab")
kdc.createPrincipal(kafkaServerKeytabFile, kafkaServerUser)
logDebug(s"Created keytab file: ${kafkaServerKeytabFile.getAbsolutePath()}")
Expand Down Expand Up @@ -489,7 +489,7 @@ class KafkaTestUtils(
protected def brokerConfiguration: Properties = {
val props = new Properties()
props.put("broker.id", "0")
props.put("listeners", s"PLAINTEXT://127.0.0.1:$brokerPort")
props.put("listeners", s"PLAINTEXT://$localHostNameForURI:$brokerPort")
props.put("log.dir", Utils.createTempDir().getAbsolutePath)
props.put("zookeeper.connect", zkAddress)
props.put("zookeeper.connection.timeout.ms", "60000")
Expand All @@ -505,8 +505,8 @@ class KafkaTestUtils(
props.put("transaction.state.log.min.isr", "1")

if (secure) {
props.put("listeners", "SASL_PLAINTEXT://127.0.0.1:0")
props.put("advertised.listeners", "SASL_PLAINTEXT://127.0.0.1:0")
props.put("listeners", s"SASL_PLAINTEXT://$localHostNameForURI:0")
props.put("advertised.listeners", s"SASL_PLAINTEXT://$localHostNameForURI:0")
props.put("inter.broker.listener.name", "SASL_PLAINTEXT")
props.put("delegation.token.master.key", UUID.randomUUID().toString)
props.put("sasl.enabled.mechanisms", "GSSAPI,SCRAM-SHA-512")
Expand Down Expand Up @@ -648,7 +648,8 @@ class KafkaTestUtils(
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
val (ip, port) = {
val splits = zkConnect.split(":")
(splits(0), splits(1).toInt)
val port = splits(splits.length - 1)
(zkConnect.substring(0, zkConnect.length - port.length - 1), port.toInt)
}
val factory = new NIOServerCnxnFactory()
factory.configure(new InetSocketAddress(ip, port), 16)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
* The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
*/
private[kafka010] class KafkaTestUtils extends Logging {
private val localHostNameForURI = Utils.localHostNameForURI()

// Zookeeper related configurations
private val zkHost = "127.0.0.1"
private val zkHost = localHostNameForURI
private var zkPort: Int = 0
private val zkConnectionTimeout = 60000
private val zkSessionTimeout = 10000
Expand All @@ -63,7 +64,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
private var admClient: AdminZkClient = _

// Kafka broker related configurations
private val brokerHost = "127.0.0.1"
private val brokerHost = localHostNameForURI
private var brokerPort = 0
private var brokerConf: KafkaConfig = _

Expand Down Expand Up @@ -239,8 +240,8 @@ private[kafka010] class KafkaTestUtils extends Logging {
private def brokerConfiguration: Properties = {
val props = new Properties()
props.put("broker.id", "0")
props.put("host.name", "127.0.0.1")
props.put("advertised.host.name", "127.0.0.1")
props.put("host.name", localHostNameForURI)
props.put("advertised.host.name", localHostNameForURI)
props.put("port", brokerPort.toString)
props.put("log.dir", brokerLogDir)
props.put("zookeeper.connect", zkAddress)
Expand Down Expand Up @@ -319,7 +320,8 @@ private[kafka010] class KafkaTestUtils extends Logging {
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
val (ip, port) = {
val splits = zkConnect.split(":")
(splits(0), splits(1).toInt)
val port = splits(splits.length - 1)
(zkConnect.substring(0, zkConnect.length - port.length - 1), port.toInt)
}
val factory = new NIOServerCnxnFactory()
factory.configure(new InetSocketAddress(ip, port), 16)
Expand Down

0 comments on commit 550f5fe

Please sign in to comment.