Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset defense for intermittently 'lost' offsets with low-watermark #1286

Closed
jyates opened this issue Dec 14, 2020 · 3 comments
Closed

Reset defense for intermittently 'lost' offsets with low-watermark #1286

jyates opened this issue Dec 14, 2020 · 3 comments
Milestone

Comments

@jyates
Copy link
Contributor

jyates commented Dec 14, 2020

Short description

There are a number of JIRAs where Kafka Brokers can 'mess up' and claim that an offset is out-of-range, often around when a log segment is rolled (see below). This causes the consumer to reset offsets based on its configuration, either latest, earliest or none. Latest will skip data (but that is true of any use, so out of scope here). Earliest and none however, can have very bad impacts on stream processing, likely waking up folks to fix the issue by force-setting the offsets to the right place

We should detect when there is this offset drop/reset and handle it more gracefully.

Details

<pitch>
This issue seems to crop up quite frequently, with different root causes across versions. Often users do not have as easy of a path to upgrade their brokers (unwilling/able to maintain a fork, externally provided cluster, etc) while upgrading the client is more in their control. This also provides an opportunity for alpakka-kafka to provide a superior, delightful experience when compared to other clients.

An 'earliest' rewind when offsets are dropped can cause days or weeks of data to start being consumed again. This can swamp downstream systems and likely will page as the lag will go from minutes back to that start as the consumer begins committing at that earliest offset.

Similarly, a 'none' rewind approach will hang the consumer, eventually paging someone. This is better for not disturbing systems, but is likely rarely used as makes it harder to deploy new consumers using defaults. While we can't do anything for none (that is handled at a lower level in the kafka client) we can help the 'reset to earliest' case.
</pitch>

How might something like this work? We already filter out offsets from partitions that are no longer assigned in the SourceBuffer stage. However, that does not take into account the latest committed offsets. We track the latest committed offsets when doing the periodic offset commits. That maintains a map and doesn't seem to add too much overhead (do not have numbers on hand, but updating a couple maps seems not so bad compared to reading/writing lots of data across network).

It seems like we can combine these two components to identify when there is a rewind and then trigger a seek on the consumer to back to the latest committed location, the low-watermark of the partition.

The obvious concern here is about when someone intentionally seeks a consumer. In that case, the consumer needs to be part of the latest epoch, so either its running in the current set of consumers or its triggered externally.

For the latter, the current consumer group can no longer be part of the latest epoch, so it will be subject to a rebalance. Thus, we would drop the partition 'low watermark' in favor of the next commit (this works because a rebalance will seek to the latest committed offset anyways, so that is safe; if we have the lost offset problem there, not much we can safely do).

For the former, this is quite an advanced technique, so users can be advised about the edge case here, so they can either remove the low-watermark protection before doing offset changes or update their ops practices.

Reported issues

@jyates
Copy link
Contributor Author

jyates commented Dec 17, 2020

Basic sketch (looking for thoughts on design/approach) here: jyates@438d9d6

@ennru
Copy link
Member

ennru commented Dec 22, 2020

Thank you for a great write-up of this scenario.

I looked briefly at your sketch and I believe this is valuable to push further.
Splitting out some of the CommitRefreshing duties into ConsumerProgressTracking looks promising.

(I prefer my Noop-style to Option-wrapping, though.)

jyates added a commit to jyates/alpakka-kafka that referenced this issue Dec 28, 2020
Addresses akka#1286

Includes some refactoring to pull out the common commit progress tracking
that is useful for both reset tracking and commit refreshing. Did not
want to keep state twice, so you get this little bit of twisted design
with the progress tracker, but keeps the logic contained.

Continues to keep the same amount of overhead (that is, very little)
for when commit refreshing and reset tracking are not enabled. At the
same time, avoids double work when both are enabled
jyates added a commit to jyates/alpakka-kafka that referenced this issue Dec 28, 2020
Includes some refactoring to pull out the common commit progress tracking
that is useful for both reset tracking and commit refreshing. Did not
want to keep state twice, so you get this little bit of twisted design
with the progress tracker, but keeps the logic contained.

Continues to keep the same amount of overhead (that is, very little)
for when commit refreshing and reset tracking are not enabled. At the
same time, avoids double work when both are enabled

References akka#1286
jyates added a commit to jyates/alpakka-kafka that referenced this issue Dec 28, 2020
Includes some refactoring to pull out the common commit progress tracking
that is useful for both reset tracking and commit refreshing. Did not
want to keep state twice, so you get this little bit of twisted design
with the progress tracker, but keeps the logic contained.

Continues to keep the same amount of overhead (that is, very little)
for when commit refreshing and reset tracking are not enabled. At the
same time, avoids double work when both are enabled

References akka#1286
jyates added a commit to jyates/alpakka-kafka that referenced this issue Dec 28, 2020
Includes some refactoring to pull out the common commit progress tracking
that is useful for both reset tracking and commit refreshing. Did not
want to keep state twice, so you get this little bit of twisted design
with the progress tracker, but keeps the logic contained.

Continues to keep the same amount of overhead (that is, very little)
for when commit refreshing and reset tracking are not enabled. At the
same time, avoids double work when both are enabled

References akka#1286
jyates added a commit to jyates/alpakka-kafka that referenced this issue Jan 29, 2021
Includes some refactoring to pull out the common commit progress tracking
that is useful for both reset tracking and commit refreshing. Did not
want to keep state twice, so you get this little bit of twisted design
with the progress tracker, but keeps the logic contained.

Continues to keep the same amount of overhead (that is, very little)
for when commit refreshing and reset tracking are not enabled. At the
same time, avoids double work when both are enabled

References akka#1286
jyates added a commit to jyates/alpakka-kafka that referenced this issue Feb 12, 2021
Includes some refactoring to pull out the common commit progress tracking
that is useful for both reset tracking and commit refreshing. Did not
want to keep state twice, so you get this little bit of twisted design
with the progress tracker, but keeps the logic contained.

Continues to keep the same amount of overhead (that is, very little)
for when commit refreshing and reset tracking are not enabled. At the
same time, avoids double work when both are enabled

References akka#1286
@seglo
Copy link
Member

seglo commented Mar 10, 2021

Implemented with #1299

@seglo seglo closed this as completed Mar 10, 2021
@seglo seglo added this to the 2.1.0-RC1 milestone Mar 10, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants