Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34187][SS] Use available offset range obtained during polling when checking offset validation #31275

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -170,11 +170,14 @@ private[consumer] object FetchedDataPool {
}

private object CachedFetchedData {
import KafkaDataConsumer.AvailableOffsetRange
viirya marked this conversation as resolved.
Show resolved Hide resolved

def empty(): CachedFetchedData = {
val emptyData = FetchedData(
ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
UNKNOWN_OFFSET,
UNKNOWN_OFFSET)
UNKNOWN_OFFSET,
AvailableOffsetRange(UNKNOWN_OFFSET, UNKNOWN_OFFSET))

CachedFetchedData(emptyData)
}
Expand Down
Expand Up @@ -71,7 +71,7 @@ private[kafka010] class InternalKafkaConsumer(
* consumer polls nothing before timeout.
*/
def fetch(offset: Long, pollTimeoutMs: Long):
(ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
(ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long, AvailableOffsetRange) = {

// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
seek(offset)
Expand All @@ -80,15 +80,15 @@ private[kafka010] class InternalKafkaConsumer(
logDebug(s"Polled $groupId ${p.partitions()} ${r.size}")
val offsetAfterPoll = consumer.position(topicPartition)
logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling")
val fetchedData = (r, offsetAfterPoll)
val range = getAvailableOffsetRange()
val fetchedData = (r, offsetAfterPoll, range)
if (r.isEmpty) {
// We cannot fetch anything after `poll`. Two possible cases:
// - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will
// be thrown.
// - Cannot fetch any data before timeout. `TimeoutException` will be thrown.
// - Fetched something but all of them are not invisible. This is a valid case and let the
// caller handles this.
val range = getAvailableOffsetRange()
if (offset < range.earliest || offset >= range.latest) {
throw new OffsetOutOfRangeException(
Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
Expand Down Expand Up @@ -142,18 +142,22 @@ private[kafka010] class InternalKafkaConsumer(
* should check if the pre-fetched data is still valid.
* @param _offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to
* poll when `records` is drained.
* @param _availableOffsetRange the available offset range in Kafka when polling the records.
*/
private[consumer] case class FetchedData(
private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
private var _nextOffsetInFetchedData: Long,
private var _offsetAfterPoll: Long) {
private var _offsetAfterPoll: Long,
private var _availableOffsetRange: AvailableOffsetRange) {

def withNewPoll(
records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
offsetAfterPoll: Long): FetchedData = {
offsetAfterPoll: Long,
availableOffsetRange: AvailableOffsetRange): FetchedData = {
this._records = records
this._nextOffsetInFetchedData = UNKNOWN_OFFSET
this._offsetAfterPoll = offsetAfterPoll
this._availableOffsetRange = availableOffsetRange
this
}

Expand All @@ -180,6 +184,7 @@ private[consumer] case class FetchedData(
_records = ju.Collections.emptyListIterator()
_nextOffsetInFetchedData = UNKNOWN_OFFSET
_offsetAfterPoll = UNKNOWN_OFFSET
_availableOffsetRange = AvailableOffsetRange(UNKNOWN_OFFSET, UNKNOWN_OFFSET)
}

/**
Expand All @@ -192,6 +197,13 @@ private[consumer] case class FetchedData(
* Returns the next offset to poll after draining the pre-fetched records.
*/
def offsetAfterPoll: Long = _offsetAfterPoll

/**
* Returns the tuple of earliest and latest offsets that is the available offset range when
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Use @returns annotation.

Sorry for being late with this, but I've just noticed it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I think this just follows other methods above and below.

* polling the records.
*/
def availableOffsetRange: (Long, Long) =
(_availableOffsetRange.earliest, _availableOffsetRange.latest)
}

/**
Expand Down Expand Up @@ -474,8 +486,8 @@ private[kafka010] class KafkaDataConsumer(
// In general, Kafka uses the specified offset as the start point, and tries to fetch the next
// available offset. Hence we need to handle offset mismatch.
if (record.offset > offset) {
val range = consumer.getAvailableOffsetRange()
if (range.earliest <= offset) {
val (earliestOffset, _) = fetchedData.availableOffsetRange
if (earliestOffset <= offset) {
// `offset` is still valid but the corresponding message is invisible. We should skip it
// and jump to `record.offset`. Here we move `fetchedData` back so that the next call of
// `fetchRecord` can just return `record` directly.
Expand Down Expand Up @@ -524,8 +536,8 @@ private[kafka010] class KafkaDataConsumer(
fetchedData: FetchedData,
offset: Long,
pollTimeoutMs: Long): Unit = {
val (records, offsetAfterPoll) = consumer.fetch(offset, pollTimeoutMs)
fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
val (records, offsetAfterPoll, range) = consumer.fetch(offset, pollTimeoutMs)
fetchedData.withNewPoll(records.listIterator, offsetAfterPoll, range)
}

private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = {
Expand Down
Expand Up @@ -30,7 +30,7 @@ import org.scalatest.PrivateMethodTester

import org.apache.spark.SparkConf
import org.apache.spark.sql.kafka010.{FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL, FETCHED_DATA_CACHE_TIMEOUT}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{AvailableOffsetRange, CacheKey}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.ManualClock

Expand Down Expand Up @@ -69,7 +69,7 @@ class FetchedDataPoolSuite extends SharedSparkSession with PrivateMethodTester {
assertFetchedDataPoolStatistic(dataPool, expectedNumCreated = 10, expectedNumTotal = 10)

dataList.map { case (_, data) =>
data.withNewPoll(testRecords(0, 5).listIterator, 5)
data.withNewPoll(testRecords(0, 5).listIterator, 5, AvailableOffsetRange(0, 4))
}

dataList.foreach { case (key, data) =>
Expand All @@ -91,7 +91,7 @@ class FetchedDataPoolSuite extends SharedSparkSession with PrivateMethodTester {
val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))

val data = dataPool.acquire(cacheKey, 0)
data.withNewPoll(testRecords(0, 5).listIterator, 5)
data.withNewPoll(testRecords(0, 5).listIterator, 5, AvailableOffsetRange(0, 4))

(0 to 3).foreach { _ => data.next() }

Expand Down Expand Up @@ -130,14 +130,14 @@ class FetchedDataPoolSuite extends SharedSparkSession with PrivateMethodTester {
assert(getCache(dataPool)(cacheKey)(1).inUse)

// reading from task 1
dataFromTask1.withNewPoll(testRecords(0, 5).listIterator, 5)
dataFromTask1.withNewPoll(testRecords(0, 5).listIterator, 5, AvailableOffsetRange(0, 4))

(0 to 3).foreach { _ => dataFromTask1.next() }

dataPool.release(cacheKey, dataFromTask1)

// reading from task 2
dataFromTask2.withNewPoll(testRecords(0, 30).listIterator, 30)
dataFromTask2.withNewPoll(testRecords(0, 30).listIterator, 30, AvailableOffsetRange(0, 29))

(0 to 5).foreach { _ => dataFromTask2.next() }

Expand Down Expand Up @@ -189,7 +189,7 @@ class FetchedDataPoolSuite extends SharedSparkSession with PrivateMethodTester {
assertFetchedDataPoolStatistic(dataPool, expectedNumCreated = 10, expectedNumTotal = 10)

dataList.map { case (_, data) =>
data.withNewPoll(testRecords(0, 5).listIterator, 5)
data.withNewPoll(testRecords(0, 5).listIterator, 5, AvailableOffsetRange(0, 4))
}

val dataToEvict = dataList.take(3)
Expand Down