Skip to content

Commit

Permalink
[SPARK-17147][STREAMING][KAFKA] address further feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
koeninger committed Feb 27, 2018
1 parent 248b511 commit e3ae845
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 45 deletions.
Expand Up @@ -109,7 +109,9 @@ class CachedKafkaConsumer[K, V] private(
* Assumes compactedStart has been called first, and ignores gaps.
*/
def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
if (!buffer.hasNext()) { poll(timeout) }
if (!buffer.hasNext()) {
poll(timeout)
}
require(buffer.hasNext(),
s"Failed to get records for compacted $groupId $topic $partition after polling for $timeout")
val record = buffer.next()
Expand Down
Expand Up @@ -117,31 +117,33 @@ private[spark] class KafkaRDD[K, V](
override def take(num: Int): Array[ConsumerRecord[K, V]] =
if (compacted) {
super.take(num)
} else if (num < 1) {
Array.empty[ConsumerRecord[K, V]]
} else {
val nonEmptyPartitions = this.partitions
.map(_.asInstanceOf[KafkaRDDPartition])
.filter(_.count > 0)

if (num < 1 || nonEmptyPartitions.isEmpty) {
return new Array[ConsumerRecord[K, V]](0)
}

// Determine in advance how many messages need to be taken from each partition
val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) =>
val remain = num - result.values.sum
if (remain > 0) {
val taken = Math.min(remain, part.count)
result + (part.index -> taken.toInt)
} else {
result
if (nonEmptyPartitions.isEmpty) {
Array.empty[ConsumerRecord[K, V]]
} else {
// Determine in advance how many messages need to be taken from each partition
val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) =>
val remain = num - result.values.sum
if (remain > 0) {
val taken = Math.min(remain, part.count)
result + (part.index -> taken.toInt)
} else {
result
}
}
}

context.runJob(
this,
(tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) =>
it.take(parts(tc.partitionId)).toArray, parts.keys.toArray
).flatten
context.runJob(
this,
(tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) =>
it.take(parts(tc.partitionId)).toArray, parts.keys.toArray
).flatten
}
}

private def executors(): Array[ExecutorCacheTaskLocation] = {
Expand Down Expand Up @@ -239,7 +241,7 @@ private class KafkaRDDIterator[K, V](

val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]

context.addTaskCompletionListener{ context => closeIfNeeded() }
context.addTaskCompletionListener(_ => closeIfNeeded())

val consumer = if (useConsumerCache) {
CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.streaming.kafka010

import java.{ util => ju }
import java.io.File

import scala.collection.JavaConverters._
import scala.util.Random
Expand Down Expand Up @@ -74,11 +75,11 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
// LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api
val logs = new Pool[TopicAndPartition, Log]()
val logDir = kafkaTestUtils.brokerLogDir
val dir = new java.io.File(logDir, topic + "-" + partition)
val dir = new File(logDir, topic + "-" + partition)
dir.mkdirs()
val logProps = new ju.Properties()
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.1f: java.lang.Float)
logProps.put(LogConfig.MinCleanableDirtyRatioProp, java.lang.Float.valueOf(0.1f))
val log = new Log(
dir,
LogConfig(logProps),
Expand All @@ -87,10 +88,10 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
mockTime
)
messages.foreach { case (k, v) =>
val msg = new ByteBufferMessageSet(
NoCompressionCodec,
new Message(v.getBytes, k.getBytes, Message.NoTimestamp, Message.CurrentMagicValue))
log.append(msg)
val msg = new ByteBufferMessageSet(
NoCompressionCodec,
new Message(v.getBytes, k.getBytes, Message.NoTimestamp, Message.CurrentMagicValue))
log.append(msg)
}
log.roll()
logs.put(TopicAndPartition(topic, partition), log)
Expand Down
Expand Up @@ -172,12 +172,12 @@ private[kafka010] class KafkaTestUtils extends Logging {

/** Create a Kafka topic and wait until it is propagated to the whole cluster */
def createTopic(topic: String, partitions: Int): Unit = {
createTopic(topic, partitions, new Properties)
createTopic(topic, partitions, new Properties())
}

/** Create a Kafka topic and wait until it is propagated to the whole cluster */
def createTopic(topic: String): Unit = {
createTopic(topic, 1, new Properties)
createTopic(topic, 1, new Properties())
}

/** Java-friendly function for sending messages to the Kafka broker */
Expand Down
Expand Up @@ -58,18 +58,16 @@ private[kafka010] class MockScheduler(val time: Time) extends Scheduler {
* If you are using the scheduler associated with a MockTime instance this call
* will be triggered automatically.
*/
def tick() {
this synchronized {
val now = time.milliseconds
while(!tasks.isEmpty && tasks.head.nextExecution <= now) {
/* pop and execute the task with the lowest next execution time */
val curr = tasks.dequeue
curr.fun()
/* if the task is periodic, reschedule it and re-enqueue */
if(curr.periodic) {
curr.nextExecution += curr.period
this.tasks += curr
}
def tick(): Unit = synchronized {
val now = time.milliseconds
while(!tasks.isEmpty && tasks.head.nextExecution <= now) {
/* pop and execute the task with the lowest next execution time */
val curr = tasks.dequeue
curr.fun()
/* if the task is periodic, reschedule it and re-enqueue */
if(curr.periodic) {
curr.nextExecution += curr.period
this.tasks += curr
}
}
}
Expand All @@ -79,11 +77,9 @@ private[kafka010] class MockScheduler(val time: Time) extends Scheduler {
fun: () => Unit,
delay: Long = 0,
period: Long = -1,
unit: TimeUnit = TimeUnit.MILLISECONDS) {
this synchronized {
tasks += MockTask(name, fun, time.milliseconds + delay, period = period)
tick()
}
unit: TimeUnit = TimeUnit.MILLISECONDS): Unit = synchronized {
tasks += MockTask(name, fun, time.milliseconds + delay, period = period)
tick()
}

}
Expand Down

0 comments on commit e3ae845

Please sign in to comment.