-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18373][SPARK-18529][SS][Kafka]Make failOnDataLoss=false work with Spark jobs #15820
Conversation
} | ||
}).start() | ||
|
||
val testTime = 1.minutes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this to 20 minutes and test it locally. It passed.
Use https://github.com/apache/spark/pull/15820/files?w=1 to review PR to ignore space changes. |
Test build #68380 has finished for PR 15820 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, getAndIgnoreLostData is hard to read due to the length and early returns, and I'm pretty sure I've missed something.
I know the actual underlying cases are complicated to get right, but is it possible to refactor it?
logWarning(s"Buffer miss for $groupId $topicPartition [$offset, ${record.offset})") | ||
} | ||
nextOffsetInFetchedData = record.offset + 1 | ||
return record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these two early returns actually necessary?
} | ||
|
||
private def reset(): Unit = { | ||
nextOffsetInFetchedData = -2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This use of -2 as a magic number here and earlier in the file is a little misleading, since the new consumer won't actually let you seek to -2 as a means of indicating earliest
} | ||
} else if (!fetchedData.hasNext()) { | ||
// The last pre-fetched data has been drained. | ||
seek(offset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's necessary to seek every time the fetched data is empty, in normal operation the poll should return the next offset, right?
} | ||
|
||
logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") | ||
var outOfOffset = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this var be eliminated by just using a single try around the if / else? It's the same catch condition in either case
return null | ||
} else { | ||
// Case 4 or 5 | ||
getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Why isn't this an early return?
- Unless I'm misreading, this is a recursive call without changing the arguments. Why is it guaranteed to terminate?
Wow, looks like the new github comment interface did all kinds of weird things, apologies about that. |
@koeninger Thanks for reviewing. Refactored the codes to avoid using early returns and addressed your comments. |
Test build #68421 has finished for PR 15820 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I think that version is easier to read, and hopefully in normal operation won't be recursing much so the @tailrec loss wont be an efficiency hit.
I'm mostly concerned about clarifying the timeout situation at this point.
null | ||
} else { | ||
// beginningOffset <= offset <= min(latestOffset - 1, untilOffset - 1) | ||
getAndIgnoreLostData(offset, untilOffset, pollTimeoutMs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm clearer on why this terminates, but I think it's worth a comment, since it's a mutually recursive call without changing arguments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Especially since with the loss of the use of @tailrec
, this must now prove it will terminate within a limited stack size, and should prove it will under most stack size configurations.
getRecordFromFetchedData(offset, untilOffset) | ||
} catch { | ||
case e: OffsetOutOfRangeException => | ||
logWarning(s"Cannot fetch offset $offset, try to recover from the beginning offset", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's worth the warning explicitly stating that data has been lost
if (!fetchedData.hasNext()) { | ||
// We cannot fetch anything after `poll`. Two possible cases: | ||
// - `beginningOffset` is `offset` but there is nothing for `beginningOffset` right now. | ||
// - Cannot fetch any date before timeout. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a user, I'm not sure that setting failOnDataLoss=false would make me know that a timeout would cause me to miss data in my spark job (that might otherwise still be in kafka)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If throwing an exception here, test("access offset 0 in Spark job but the topic has been deleted")
will fail. It seems a reasonable case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think topic deletion is anywhere near as common as timeouts, and topic deletion is something user initiated. As a user, is skipping the rest of the offsets in a timeout really what you would want to happen (it isn't for me)? If so, does this need a separate configuration?
Test build #68508 has finished for PR 15820 at commit
|
untilOffset: Long, | ||
pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { | ||
// scalastyle:off | ||
// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont get this, there is no reference to latestOffset
in this method. What does it refer to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
latestOffset
means the latest offset that we get from the Kafka.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also is beginningOffset the earlier offset available in Kafka? Then lets rename this as earliestOffset, and "earliest" and "latest" and more popular terms in the context of Kafka
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Will update the terms.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay i can guess that you used beginningOffset because of consumer.seekToBeginning. But then we should be consistent with seekToEnd as well by calling latest offset as endOffset. Rather, lets be consistent with more well known names "earliest" and "latest".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two main high level points
- Should not have separate code paths for failOnDataLoss = true/false.
- Merge the if conditions in getRecordFromFetchedData to reduce it from 4 cases to 3.
- Needs better inline documentation as the code is not easy to understand. Sometimes better to document using ascii art ;)
// 3. The topic is deleted. | ||
// There is nothing to fetch, return null. | ||
// 4. The topic is deleted and recreated, and `beginningOffset <= offset <= untilOffset - 1 <= latestOffset - 1`. | ||
// We cannot detect this case. We can still fetch data like nothing happens. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: nothing happened.
|
||
/** | ||
* Get the earliest record in [offset, untilOffset) from the fetched data. If there is no such | ||
* record, returns null. Must be called after `poll`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: return null. Note that this method must be ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not true. This sounds like poll() has to be called immediately before this method is called. This is not the case in the above usage where poll is called only if the conditions match.
Rather say "This must be called after some data has already been fetched using poll
."
null | ||
} else { | ||
val record = fetchedData.next() | ||
if (record.offset >= untilOffset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you document what cases this can happen.
untilOffset: Long, | ||
pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { | ||
// scalastyle:off | ||
// When `failOnDataLoss` is `false`, we need to handle the following cases (note: untilOffset and latestOffset are exclusive): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay i can guess that you used beginningOffset because of consumer.seekToBeginning. But then we should be consistent with seekToEnd as well by calling latest offset as endOffset. Rather, lets be consistent with more well known names "earliest" and "latest".
} else { | ||
val record = fetchedData.next() | ||
if (record.offset >= untilOffset) { | ||
logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $untilOffset)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please improve the log message based on when this can happen. something like "There may have been some data loss because some data may have been aged out in Kafka and is therefore unavailable for processing. If you want your streaming query to fail on such cases, set option ...... "
logInfo(s"Initial fetch for $topicPartition $offset") | ||
seek(offset) | ||
poll(pollTimeoutMs) | ||
} else if (!fetchedData.hasNext()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can these two conditions be merged as
if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
seek(offset)
poll(pollTimeoutMs)
}
// beginningOffset <= offset <= min(latestOffset - 1, untilOffset - 1) | ||
// | ||
// This will happen when a topic is deleted and recreated, and new data are pushed very fast | ||
// , then we will see `offset` disappears first then appears again. Although the parameters |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: comma after new line
untilOffset: Long, | ||
pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { | ||
val beginningOffset = getBeginningOffset() | ||
if (beginningOffset <= offset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: its easier to think when "offset" is before in the condition, i.e. offset >= beginningOffset
} | ||
} else { | ||
if (beginningOffset >= untilOffset) { | ||
// offset <= untilOffset - 1 < beginningOffset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
improve docs by addition plain word explanation. e.g. required offset is earlier than the available offsets
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also improve printed warning. gave an example in another comment.
null | ||
} else { | ||
// offset < beginningOffset <= untilOffset - 1 | ||
logWarning(s"Buffer miss for $groupId $topicPartition [$offset, $beginningOffset)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
improve docs and warning.
Test build #68731 has finished for PR 15820 at commit
|
Test build #68733 has finished for PR 15820 at commit
|
Test build #68735 has finished for PR 15820 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we spoke offline, this can further simplified. I am posting my detailed comments anyways, all of them are not applicable if you are going to refactor the code.
untilOffset: Long, | ||
pollTimeoutMs: Long, | ||
failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { | ||
require(offset < untilOffset, s"offset: $offset, untilOffset: $untilOffset") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
give better error message, saying "offset must always be less than untilOffset"
// The last pre-fetched data has been drained. | ||
// Seek to the offset because we may call seekToBeginning or seekToEnd before this. | ||
seek(offset) | ||
poll(pollTimeoutMs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why havent these two cases been merged?
@@ -96,10 +289,20 @@ private[kafka010] case class CachedKafkaConsumer private( | |||
logDebug(s"Polled $groupId ${p.partitions()} ${r.size}") | |||
fetchedData = r.iterator | |||
} | |||
|
|||
private def getCurrentOffsetRange(): (Long, Long) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getCurrentOffsetRange -> getValidOffsetRange or getAvailableOffsetRange to make it more clear on what this is.
And add docs.
|
||
/** | ||
* Get the record for the given offset, waiting up to timeout ms if IO is necessary. | ||
* Sequential forward access will use buffers, but random access will be horribly inefficient. | ||
* | ||
* If `failOnDataLoss` is `false`, it will try to get the earliest record in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: earliest available record
/** | ||
* Get the earliest record in [offset, untilOffset) from the fetched data. If there is no such | ||
* record, return null. Note that this must be called after some data has already been fetched | ||
* using poll. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... if there is not such record, return null and clear the fetched data.
|
||
/** | ||
* Get the record for the given offset, waiting up to timeout ms if IO is necessary. | ||
* Sequential forward access will use buffers, but random access will be horribly inefficient. | ||
* | ||
* If `failOnDataLoss` is `false`, it will try to get the earliest record in | ||
* `[offset, untilOffset)` when some illegal state happens. Otherwise, an `IllegalStateException` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Otherwise" means, when failOnDataLoss is false but not some illegal state? its confusing.
Rather just rewrite it as
When failOnDataLoss
is true
, this will either return record at offset if available, or throw exception.
When failOnDataLoss
is false
, this will either return record at offset if available, or return the next earliest available record < untilOffset, or null. It will not throw any exception.
if (record.offset >= untilOffset) { | ||
// This may happen when records are aged out. | ||
val message = | ||
if (failOnDataLoss) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is ALREADY a if statement in reportDataLoss on failOnDataLoss. having another one, looks too complicated. i think best to not have an if statement here. and same for other places. this too much complication for documentation.
Test build #68811 has finished for PR 15820 at commit
|
* [offset, untilOffset) are invalid (e.g., the topic is deleted and recreated), it will return | ||
* `UNKNOWN_OFFSET`. | ||
*/ | ||
private def getNextEarliestOffset(offset: Long, untilOffset: Long): Long = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to getEarliestAvailableOffsetBetween(...)
@@ -47,40 +51,191 @@ private[kafka010] case class CachedKafkaConsumer private( | |||
|
|||
/** Iterator to the already fetch data */ | |||
private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] | |||
private var nextOffsetInFetchedData = -2L | |||
private var nextOffsetInFetchedData = UNKNOWN_OFFSET | |||
|
|||
/** | |||
* Get the record for the given offset, waiting up to timeout ms if IO is necessary. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: update docs to clarify earlier that this may not return offset
Get the record for the given offset if available. Otherwise it will either throw error (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset].
Use @param to explain pollTimeoutMs and others in more detail
*/ | ||
private def fetchData( | ||
offset: Long, | ||
untilOffset: Long, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
untilOffset and failOnDataLoss not really needed.
val topic = newTopic() | ||
topics += topic | ||
testUtils.createTopic(topic, partitions = 1) | ||
case 1 => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update test to recreate same topics.
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") | ||
.as[(String, String)] | ||
KafkaSourceSuite.globalTestUtils = testUtils | ||
val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new ForeachWriter[Int] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add explanation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this test lower. after the basic tests.
Test build #68878 has finished for PR 15820 at commit
|
Because the comment made by me and +1'ed by marmbrus is hidden at this point, I just want to re-iterate that this patch should not skip the rest of the partition in the case that a timeout happens. |
// `OffsetOutOfRangeException` to let the caller handle it. | ||
// - Cannot fetch any data before timeout. TimeoutException will be thrown. | ||
val (earliestOffset, latestOffset) = getAvailableOffsetRange() | ||
if (offset < earliestOffset || offset >= latestOffset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@koeninger Just updated the timeout logic. It will check the current available offset range and use it to distinguish these two cases.
Test build #68948 has finished for PR 15820 at commit
|
Test build #68949 has finished for PR 15820 at commit
|
Test build #68952 has finished for PR 15820 at commit
|
seek(offset) | ||
poll(pollTimeoutMs) | ||
// The following loop is basically for `failOnDataLoss = false`. When `failOnDataLoss` is | ||
// `false`, firstly, we will try to fetch the record at `offset`. If no such record, then we |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
firstly --> first,
no such record exists
overall +1, thanks for this explanation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just a few documentation improvements. This is much cleaner and easier to understand now.
*/ | ||
def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { | ||
def get( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also document that it can return null.
case e: OffsetOutOfRangeException => | ||
// When there is some error thrown, it's better to use a new consumer to drop all cached | ||
// states in the old consumer. We don't need to worry about the performance because this | ||
// is not a normal path. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
normal --> common
val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE = | ||
""" | ||
|There may have been some data loss because some data may have been aged out in Kafka or | ||
| the topic has been deleted and is therefore unavailable for processing. If you want your |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: better grammar
Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed.
Similarly change below.
@@ -413,13 +451,59 @@ class KafkaSourceSuite extends KafkaSourceTest { | |||
) | |||
} | |||
|
|||
test("Delete a topic when a Spark job is running") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: D -> d, for consistency.
Test build #68969 has finished for PR 15820 at commit
|
@tdas I did some changes to make the stress test stable and ran it for 20 minutes without errors. I also confirmed the warning logs did appear in the unit test logs. |
Test build #68978 has finished for PR 15820 at commit
|
Test build #3431 has started for PR 15820 at commit |
@zsxwing should the title include SPARK-18529 per your comment on the JIRA? |
Thanks for pointing out. Updated. |
Test build #3432 has finished for PR 15820 at commit
|
LGTM. Merging this in master and 2.1 |
…with Spark jobs ## What changes were proposed in this pull request? This PR adds `CachedKafkaConsumer.getAndIgnoreLostData` to handle corner cases of `failOnDataLoss=false`. It also resolves [SPARK-18529](https://issues.apache.org/jira/browse/SPARK-18529) after refactoring codes: Timeout will throw a TimeoutException. ## How was this patch tested? Because I cannot find any way to manually control the Kafka server to clean up logs, it's impossible to write unit tests for each corner case. Therefore, I just created `test("stress test for failOnDataLoss=false")` which should cover most of corner cases. I also modified some existing tests to test for both `failOnDataLoss=false` and `failOnDataLoss=true` to make sure it doesn't break existing logic. Author: Shixiong Zhu <shixiong@databricks.com> Closes #15820 from zsxwing/failOnDataLoss. (cherry picked from commit 2fd101b) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
…with Spark jobs ## What changes were proposed in this pull request? This PR adds `CachedKafkaConsumer.getAndIgnoreLostData` to handle corner cases of `failOnDataLoss=false`. It also resolves [SPARK-18529](https://issues.apache.org/jira/browse/SPARK-18529) after refactoring codes: Timeout will throw a TimeoutException. ## How was this patch tested? Because I cannot find any way to manually control the Kafka server to clean up logs, it's impossible to write unit tests for each corner case. Therefore, I just created `test("stress test for failOnDataLoss=false")` which should cover most of corner cases. I also modified some existing tests to test for both `failOnDataLoss=false` and `failOnDataLoss=true` to make sure it doesn't break existing logic. Author: Shixiong Zhu <shixiong@databricks.com> Closes apache#15820 from zsxwing/failOnDataLoss.
…with Spark jobs ## What changes were proposed in this pull request? This PR adds `CachedKafkaConsumer.getAndIgnoreLostData` to handle corner cases of `failOnDataLoss=false`. It also resolves [SPARK-18529](https://issues.apache.org/jira/browse/SPARK-18529) after refactoring codes: Timeout will throw a TimeoutException. ## How was this patch tested? Because I cannot find any way to manually control the Kafka server to clean up logs, it's impossible to write unit tests for each corner case. Therefore, I just created `test("stress test for failOnDataLoss=false")` which should cover most of corner cases. I also modified some existing tests to test for both `failOnDataLoss=false` and `failOnDataLoss=true` to make sure it doesn't break existing logic. Author: Shixiong Zhu <shixiong@databricks.com> Closes apache#15820 from zsxwing/failOnDataLoss.
…with Spark jobs ## What changes were proposed in this pull request? This PR adds `CachedKafkaConsumer.getAndIgnoreLostData` to handle corner cases of `failOnDataLoss=false`. It also resolves [SPARK-18529](https://issues.apache.org/jira/browse/SPARK-18529) after refactoring codes: Timeout will throw a TimeoutException. ## How was this patch tested? Because I cannot find any way to manually control the Kafka server to clean up logs, it's impossible to write unit tests for each corner case. Therefore, I just created `test("stress test for failOnDataLoss=false")` which should cover most of corner cases. I also modified some existing tests to test for both `failOnDataLoss=false` and `failOnDataLoss=true` to make sure it doesn't break existing logic. Author: Shixiong Zhu <shixiong@databricks.com> Closes apache#15820 from zsxwing/failOnDataLoss.
What changes were proposed in this pull request?
This PR adds
CachedKafkaConsumer.getAndIgnoreLostData
to handle corner cases offailOnDataLoss=false
.It also resolves SPARK-18529 after refactoring codes: Timeout will throw a TimeoutException.
How was this patch tested?
Because I cannot find any way to manually control the Kafka server to clean up logs, it's impossible to write unit tests for each corner case. Therefore, I just created
test("stress test for failOnDataLoss=false")
which should cover most of corner cases.I also modified some existing tests to test for both
failOnDataLoss=false
andfailOnDataLoss=true
to make sure it doesn't break existing logic.