Skip to content

Commit

Permalink
[SPARK-18057][FOLLOW-UP][SS] Update Kafka client version from 0.10.0.…
Browse files Browse the repository at this point in the history
…1 to 2.0.0

## What changes were proposed in this pull request?

Update to kafka 2.0.0 in streaming-kafka module, and remove override for Scala 2.12. It won't compile for 2.12 otherwise.

## How was this patch tested?

Existing tests.

Author: Sean Owen <srowen@gmail.com>

Closes #21955 from srowen/SPARK-18057.2.
  • Loading branch information
srowen committed Aug 3, 2018
1 parent 273b284 commit c32dbd6
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 43 deletions.
10 changes: 1 addition & 9 deletions external/kafka-0-10-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<properties>
<sbt.project.name>sql-kafka-0-10</sbt.project.name>
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
<kafka.version>2.0.0</kafka.version>
</properties>
<packaging>jar</packaging>
Expand Down Expand Up @@ -128,13 +129,4 @@
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>

<profiles>
<profile>
<id>scala-2.12</id>
<properties>
<kafka.version>0.10.1.1</kafka.version>
</properties>
</profile>
</profiles>

</project>
26 changes: 16 additions & 10 deletions external/kafka-0-10/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<properties>
<sbt.project.name>streaming-kafka-0-10</sbt.project.name>
<kafka.version>0.10.0.1</kafka.version>
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
<kafka.version>2.0.0</kafka.version>
</properties>
<packaging>jar</packaging>
<name>Spark Integration for Kafka 0.10</name>
Expand Down Expand Up @@ -58,6 +59,20 @@
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.sf.jopt-simple</groupId>
Expand Down Expand Up @@ -93,13 +108,4 @@
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>

<profiles>
<profile>
<id>scala-2.12</id>
<properties>
<kafka.version>0.10.1.1</kafka.version>
</properties>
</profile>
</profiles>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import java.io.File
import scala.collection.JavaConverters._
import scala.util.Random

import kafka.common.TopicAndPartition
import kafka.log._
import kafka.message._
import kafka.log.{CleanerConfig, Log, LogCleaner, LogConfig, ProducerStateManager}
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import kafka.utils.Pool
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.scalatest.BeforeAndAfterAll

Expand Down Expand Up @@ -72,33 +72,39 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {

private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]) {
val mockTime = new MockTime()
// LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api
val logs = new Pool[TopicAndPartition, Log]()
val logs = new Pool[TopicPartition, Log]()
val logDir = kafkaTestUtils.brokerLogDir
val dir = new File(logDir, topic + "-" + partition)
dir.mkdirs()
val logProps = new ju.Properties()
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
logProps.put(LogConfig.MinCleanableDirtyRatioProp, java.lang.Float.valueOf(0.1f))
val logDirFailureChannel = new LogDirFailureChannel(1)
val topicPartition = new TopicPartition(topic, partition)
val log = new Log(
dir,
LogConfig(logProps),
0L,
0L,
mockTime.scheduler,
mockTime
new BrokerTopicStats(),
mockTime,
Int.MaxValue,
Int.MaxValue,
topicPartition,
new ProducerStateManager(topicPartition, dir),
logDirFailureChannel
)
messages.foreach { case (k, v) =>
val msg = new ByteBufferMessageSet(
NoCompressionCodec,
new Message(v.getBytes, k.getBytes, Message.NoTimestamp, Message.CurrentMagicValue))
log.append(msg)
val record = new SimpleRecord(k.getBytes, v.getBytes)
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, record), 0);
}
log.roll()
logs.put(TopicAndPartition(topic, partition), log)
logs.put(topicPartition, log)

val cleaner = new LogCleaner(CleanerConfig(), logDirs = Array(dir), logs = logs)
val cleaner = new LogCleaner(CleanerConfig(), Array(dir), logs, logDirFailureChannel)
cleaner.startup()
cleaner.awaitCleaned(topic, partition, log.activeSegment.baseOffset, 1000)
cleaner.awaitCleaned(new TopicPartition(topic, partition), log.activeSegment.baseOffset, 1000)

cleaner.shutdown()
mockTime.scheduler.shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
server = new KafkaServer(brokerConf)
server.startup()
brokerPort = server.boundPort()
brokerPort = server.boundPort(brokerConf.interBrokerListenerName)
(server, brokerPort)
}, new SparkConf(), "KafkaBroker")

Expand Down Expand Up @@ -222,6 +222,8 @@ private[kafka010] class KafkaTestUtils extends Logging {
props.put("zookeeper.connect", zkAddress)
props.put("log.flush.interval.messages", "1")
props.put("replica.socket.timeout.ms", "1500")
props.put("offsets.topic.replication.factor", "1")
props.put("group.initial.rebalance.delay.ms", "10")
props
}

Expand Down Expand Up @@ -270,12 +272,10 @@ private[kafka010] class KafkaTestUtils extends Logging {
private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
case Some(partitionState) =>
val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr

val leader = partitionState.basePartitionState.leader
val isr = partitionState.basePartitionState.isr
zkUtils.getLeaderForPartition(topic, partition).isDefined &&
Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
leaderAndInSyncReplicas.isr.nonEmpty

Request.isValidBrokerId(leader) && !isr.isEmpty
case _ =>
false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import java.util.concurrent.TimeUnit

import scala.collection.mutable.PriorityQueue

import kafka.utils.{Scheduler, Time}
import kafka.utils.Scheduler
import org.apache.kafka.common.utils.Time

/**
* A mock scheduler that executes tasks synchronously using a mock time instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.streaming.kafka010.mocks

import java.util.concurrent._

import kafka.utils.Time
import org.apache.kafka.common.utils.Time

/**
* A class used for unit testing things which depend on the Time interface.
Expand All @@ -36,12 +36,14 @@ private[kafka010] class MockTime(@volatile private var currentMs: Long) extends

def this() = this(System.currentTimeMillis)

def milliseconds: Long = currentMs
override def milliseconds: Long = currentMs

def nanoseconds: Long =
override def hiResClockMs(): Long = milliseconds

override def nanoseconds: Long =
TimeUnit.NANOSECONDS.convert(currentMs, TimeUnit.MILLISECONDS)

def sleep(ms: Long) {
override def sleep(ms: Long) {
this.currentMs += ms
scheduler.tick()
}
Expand Down

0 comments on commit c32dbd6

Please sign in to comment.