Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recovery Strategies for Exactly-Once Sink to Kafka During Commit-Phase #224

Open
jacksonrnewhouse opened this issue Jul 31, 2023 · 1 comment
Labels
enhancement New feature or request help wanted Extra attention is needed question Further information is requested

Comments

@jacksonrnewhouse
Copy link
Contributor

We recently added exactly-once semantics to the Kafka sink. This is done by writing data to a unique transaction ID for each (subtask, checkpoint interval) pair. On receiving a checkpoint barrier, the sink stops writing the old transaction_id but keeps the producer around. It then receives a commit message and calls commit_transaction().

However, we face a challenge in the event of an unplanned failure occurring between checkpointing the data and committing to Kafka. We need a recovery strategy to deal with potential data loss, but closing in-flight transactions or recovering uncommitted data is not straightforward.

Here are the options we're considering:

Query __transaction_state and Modify Kafka Client

Kafka tracks transactions in the internal __transaction_state topic. This is in a binary format and not part of the API. However, you can read the data. This is the approach that Flink takes. Additionally, Flink uses reflection to reinstantiate a client with the same internal state as the previous one, namely the same producer ID and epoch. This is all very undocumented/unrecommended by Kafka, but it does seem to write.

Start New Transactions and Identify Messages for Rewrite

Another approach is to, upon checkpoint restoration, cancel all previous transactions and then compare the difference between committed and uncommitted messages using two Kafka consumers. Additionally, you'd store to the Arroyo state backend the transaction_id, partition_id and offset for each message written to Kafka. For the Parquet backend this should compress very well, and allow you to only use public Kafka APIs. This does require being sure that you've appropriately cancelled all previous transactions, as otherwise the commit offsets will not have advanced yet.

Maintain Separate State and Replay Transactions

Alternately, we could write the messages themselves to Arroyo's state. This would still require correctly determining which transactions were and weren't committed, but wouldn't require reading out from Kafka.

Abandon Exactly-Once Semantics During Commit Phase

This is our current state, and we could just either keep the same behavior or proceed with the next epoch. This doesn't seem like the correct call, but listed here for completeness.

I'd love to hear thoughts on the best way to proceed. In particular, how ill-advised is hacking into the kafka internals. Additionally, I'd like better understanding of the precise semantics of Kafka guarantees so the second and third approaches could be implemented reliably.

@jacksonrnewhouse jacksonrnewhouse added enhancement New feature or request help wanted Extra attention is needed question Further information is requested labels Jul 31, 2023
@jacksonrnewhouse
Copy link
Contributor Author

This is a known issue with Kafka, and there's a draft of a proposal to fix this by adding capabilities to Kafka. https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed question Further information is requested
Projects
None yet
Development

No branches or pull requests

1 participant