Skip to content

Commit

Permalink
Address TD's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Nov 19, 2016
1 parent 6eb1cb9 commit 2fc98cd
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,17 @@ private[kafka010] case class CachedKafkaConsumer private(
private var nextOffsetInFetchedData = UNKNOWN_OFFSET

/**
* Get the record for the given offset, waiting up to timeout ms if IO is necessary.
* Sequential forward access will use buffers, but random access will be horribly inefficient.
* Get the record for the given offset if available. Otherwise it will either throw error
* (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset).
*
* When `failOnDataLoss` is `true`, this will either return record at offset if available, or
* throw exception.
*
* When `failOnDataLoss` is `false`, this will either return record at offset if available, or
* return the next earliest available record less than untilOffset, or null. It will not throw
* any exception.
* @param offset the offset to fetch.
* @param untilOffset the max offset to fetch. Exclusive.
* @param pollTimeoutMs timeout in milliseconds to poll data from Kafka.
* @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at
* offset if available, or throw exception.when `failOnDataLoss` is `false`,
* this method will either return record at offset if available, or return
* the next earliest available record less than untilOffset, or null. It
* will not throw any exception.
*/
def get(
offset: Long,
Expand All @@ -75,8 +77,21 @@ private[kafka010] case class CachedKafkaConsumer private(
var toFetchOffset = offset
while (toFetchOffset != UNKNOWN_OFFSET) {
try {
val record = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
val record = fetchData(toFetchOffset, pollTimeoutMs)
if (record == null) {
// We cannot fetch anything after `poll`. Two possible cases:
// - `earliestOffset` is `offset` but there is nothing for `earliestOffset` right now.
// - Cannot fetch any data before timeout.
// Because there is no way to distinguish, just skip the rest offsets in the current
// partition.
val message =
if (failOnDataLoss) {
s"Cannot fetch record for offset $offset"
} else {
s"Cannot fetch record for offset $offset. " +
s"Records in [$offset, $untilOffset) will be skipped"
}
reportDataLoss(failOnDataLoss, message)
reset()
}
return record
Expand All @@ -90,7 +105,7 @@ private[kafka010] case class CachedKafkaConsumer private(
"Recovering from the earliest offset"
}
reportDataLoss(failOnDataLoss, message, e)
toFetchOffset = getNextEarliestOffset(toFetchOffset, untilOffset)
toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset, untilOffset)
}
}
reset()
Expand All @@ -102,7 +117,7 @@ private[kafka010] case class CachedKafkaConsumer private(
* [offset, untilOffset) are invalid (e.g., the topic is deleted and recreated), it will return
* `UNKNOWN_OFFSET`.
*/
private def getNextEarliestOffset(offset: Long, untilOffset: Long): Long = {
private def getEarliestAvailableOffsetBetween(offset: Long, untilOffset: Long): Long = {
val (earliestOffset, latestOffset) = getAvailableOffsetRange()
if (offset >= latestOffset || earliestOffset >= untilOffset) {
// [offset, untilOffset) and [earliestOffset, latestOffset) have no overlap,
Expand Down Expand Up @@ -154,14 +169,11 @@ private[kafka010] case class CachedKafkaConsumer private(
}

/**
* Get the earliest record in [offset, untilOffset). If there is not such record, return null and
* clear the fetched data.
* Get the record at `offset`. If there is not such record, return null.
*/
private def fetchData(
offset: Long,
untilOffset: Long,
pollTimeoutMs: Long,
failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
// This is the first fetch, or the last pre-fetched data has been drained.
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
Expand All @@ -170,19 +182,6 @@ private[kafka010] case class CachedKafkaConsumer private(
}

if (!fetchedData.hasNext()) {
// We cannot fetch anything after `poll`. Two possible cases:
// - `earliestOffset` is `offset` but there is nothing for `earliestOffset` right now.
// - Cannot fetch any data before timeout.
// Because there is no way to distinguish, just skip the rest offsets in the current
// partition.
val message =
if (failOnDataLoss) {
s"Cannot fetch record for offset $offset"
} else {
s"Cannot fetch record for offset $offset. " +
s"Records in [$offset, $untilOffset) will be skipped"
}
reportDataLoss(failOnDataLoss, message)
null
} else {
val record = fetchedData.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,105 +228,6 @@ class KafkaSourceSuite extends KafkaSourceTest {
)
}

test("access offset 0 in Spark job but the topic has been deleted") {
KafkaSourceSuite.collectedData.clear()

val topic = newTopic()
testUtils.createTopic(topic, partitions = 1)
testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray)

val reader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("subscribe", topic)
// If a topic is deleted and we try to poll data starting from offset 0,
// the Kafka consumer will just block until timeout and return an empty result.
// So set the timeout to 1 second to make this test fast.
.option("kafkaConsumer.pollTimeoutMs", "1000")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
val kafka = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
KafkaSourceSuite.globalTestUtils = testUtils
val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new ForeachWriter[Int] {
override def open(partitionId: Long, version: Long): Boolean = {
KafkaSourceSuite.globalTestUtils.deleteTopic(topic)
true
}

override def process(value: Int): Unit = {
KafkaSourceSuite.collectedData.add(value)
}

override def close(errorOrNull: Throwable): Unit = {}
}).start()
query.processAllAvailable()
assert(KafkaSourceSuite.collectedData.isEmpty)
query.stop()
// `failOnDataLoss` is `false`, we should not fail the query
assert(query.exception.isEmpty)
}

test("access non-zero offset in Spark job but the topic has been deleted") {
KafkaSourceSuite.collectedData.clear()

val topic = newTopic()
testUtils.createTopic(topic, partitions = 1)
testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray)

val reader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
val kafka = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
KafkaSourceSuite.globalTestUtils = testUtils
val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new ForeachWriter[Int] {

private var currentVersion: Long = -1

override def open(partitionId: Long, version: Long): Boolean = {
if (version != 0) {
KafkaSourceSuite.globalTestUtils.deleteTopic(topic)
}
currentVersion = version
true
}

override def process(value: Int): Unit = {
KafkaSourceSuite.collectedData.add(value)
}

override def close(errorOrNull: Throwable): Unit = {
if (currentVersion > 0) {
KafkaSourceSuite.collectedData.add(-1)
} else {
// Let the source create the Spark job
KafkaSourceSuite.globalTestUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray)
}
}
}).start()
eventually(timeout(streamingTimeout)) {
// Make sure the first two batches are done
assert(KafkaSourceSuite.collectedData.asScala.lastOption === Some(-1))
}
// Because `sendMessages` and `deleteTopic` are not atomic, the KafkaConsumer may prefetch some
// data between these two operations, which makes the results are indeterminate. So we only
// check the first 10 messags.
assert(KafkaSourceSuite.collectedData.asScala.take(10) === (1 to 10))
query.stop()
// `failOnDataLoss` is `false`, we should not fail the query
assert(query.exception.isEmpty)
}

for (failOnDataLoss <- Seq(true, false)) {
test(s"assign from latest offsets (failOnDataLoss: $failOnDataLoss)") {
val topic = newTopic()
Expand Down Expand Up @@ -550,6 +451,49 @@ class KafkaSourceSuite extends KafkaSourceTest {
)
}

test("Delete a topic when a Spark job is running") {
KafkaSourceSuite.collectedData.clear()

val topic = newTopic()
testUtils.createTopic(topic, partitions = 1)
testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray)

val reader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("subscribe", topic)
// If a topic is deleted and we try to poll data starting from offset 0,
// the Kafka consumer will just block until timeout and return an empty result.
// So set the timeout to 1 second to make this test fast.
.option("kafkaConsumer.pollTimeoutMs", "1000")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
val kafka = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
KafkaSourceSuite.globalTestUtils = testUtils
// The following ForeachWriter will delete the topic before fetching data from Kafka
// in executors.
val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new ForeachWriter[Int] {
override def open(partitionId: Long, version: Long): Boolean = {
KafkaSourceSuite.globalTestUtils.deleteTopic(topic)
true
}

override def process(value: Int): Unit = {
KafkaSourceSuite.collectedData.add(value)
}

override def close(errorOrNull: Throwable): Unit = {}
}).start()
query.processAllAvailable()
query.stop()
// `failOnDataLoss` is `false`, we should not fail the query
assert(query.exception.isEmpty)
}

private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"

private def assignString(topic: String, partitions: Iterable[Int]): String = {
Expand Down Expand Up @@ -844,21 +788,28 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared

val testTime = 1.minutes
val startTime = System.currentTimeMillis()
// Track the current existing topics
val topics = mutable.ArrayBuffer[String]()
// Track topics that have been deleted
val deletedTopics = mutable.Set[String]()
while (System.currentTimeMillis() - testTime.toMillis < startTime) {
Random.nextInt(6) match {
case 0 =>
case 0 => // Create a new topic
val topic = newTopic()
topics += topic
testUtils.createTopic(topic, partitions = 1)
case 1 =>
if (topics.nonEmpty) {
val topic = topics.remove(Random.nextInt(topics.size))
testUtils.deleteTopic(topic)
}
case 2 =>
case 1 if topics.nonEmpty => // Delete an existing topic
val topic = topics.remove(Random.nextInt(topics.size))
testUtils.deleteTopic(topic)
deletedTopics += topic
case 2 if deletedTopics.nonEmpty => // Recreate a topic that was deleted.
val topic = deletedTopics.toSeq(Random.nextInt(deletedTopics.size))
deletedTopics -= topic
topics += topic
testUtils.createTopic(topic, partitions = 1)
case 3 =>
Thread.sleep(100)
case _ =>
case _ => // Push random messages
for (topic <- topics) {
val size = Random.nextInt(10)
for (_ <- 0 until size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,9 @@ class KafkaTestUtils extends Logging {
}
checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
})
deletePath && topicPath && replicaManager && logManager && cleaner
// ensure the topic is gone
val deleted = !zkUtils.getAllTopics().contains(topic)
deletePath && topicPath && replicaManager && logManager && cleaner && deleted
}
eventually(timeout(10.seconds)) {
assert(isDeleted, s"$topic not deleted after timeout")
Expand Down

0 comments on commit 2fc98cd

Please sign in to comment.