Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-31363] Do not checkpoint a KafkaCommittable if the transaction was empty #15

Closed
wants to merge 5 commits into from

Conversation

tzulitai
Copy link
Contributor

@tzulitai tzulitai commented Mar 23, 2023

This PR fixes FLINK-31363 by changing how we handle empty transactions on KafkaWriter#prepareCommit().

Previously, regardless of whether the current transaction is empty or non-empty, we always emit a KafkaCommittable for it to be checkpointed by the CommitterOperator. The issue: on restore, when we resume the transaction and commit it, we recreate a FlinkKafkaInternalProducer that always has the internal transactionStarted flag set to true, which means that an EndTxnRequest will be sent to the brokers for committing the transaction. This results in an InvalidTxnState error since on the broker side the transaction hasn't actually been started yet (transactions are lazily started on brokers on the first record sent).

I've considered two possible ways to address this:

  1. Store the transactionStarted flag in a KafkaCommittable alongside other txn metadata. Then, on restore, on the recreated producer, we set the internal transactionStarted accordingly to what the checkpoint says.
  2. Never checkpoint a KafkaCommitable if the transaction is empty. In this case, any KafkaCommittable restored from a checkpoint always has some data in them, and therefore it is correct to always set the internal transactionStarted flag to true on the recreated producer.

This PR chooses to go with approach 2.

On prepareCommit, if the current ongoing transaction is empty, then:

  1. Do NOT emit a KafkaCommittable (which will be checkpointed by the downstream CommitterOperator)
  2. Immediately commit the empty transaction (this will be a no-op since the producer would not issue an EndTxnRequest at all)
  3. Recycle the producer back into the idle pool for future reuse.

…ucer

This new flag tracks whether or not data was actually written to the current transaction
…nsaction

Tests that if the KafkaWriter is asked to prepareCommit but its current transaction is empty,
it should:
- NOT emit a KafkaCommittable
- Immediately commit the empty transaction, and
- Recycle the producer
@boring-cyborg
Copy link

boring-cyborg bot commented Mar 23, 2023

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

@tzulitai
Copy link
Contributor Author

cc @RamanVerma @Gerrrr for review

snapshottedCommittable.getProducerId(), snapshottedCommittable.getEpoch());

try {
resumedProducer.commitTransaction();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use assertThatThrownBy here

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
if (inTransaction) {
hasRecordsInTransaction = true;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this boolean be set in the callback, in the successful send scenario?

Copy link
Contributor Author

@tzulitai tzulitai Mar 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, that's a good point. I think the question to ask is: is it incorrect to set this flag (to allow a KafkaCommittable to be generated for the txn at pre-commit time) preemptively, instead of only setting it when data has actually been successfully written?

I think the answer is that it is not incorrect, so it is ok to leave this as is. Reasoning is as follows:

  • At pre-commit time and performing flush, if some data failed to be flushed, the pre-commit will fail so a KafkaCommittable will not be checkpointed for the txn anyways. In this scenario, the hasRecordsInTransaction flag is irrelevant no matter its value.

  • If all records are correctly flushed, then good; a KafkaCommittable should be generated for the txn. We're good here because we've alraedy preemptively set the hasRecordsInTransaction flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Speaking of which, it might give a more complete picture of this interaction after I rebase this PR branch on top of the latest changes (to include the fix that adds checkAsyncExceptions).

resumedProducer.resumeTransaction(
snapshottedCommittable.getProducerId(), snapshottedCommittable.getEpoch());

assertThatThrownBy(resumedProducer::commitTransaction);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we check the exception type also using
isInstanceOf(<exception type>.class)

Copy link

@RamanVerma RamanVerma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the PR, @tzulitai
LGTM

@tzulitai
Copy link
Contributor Author

thanks for the review @RamanVerma, merging this now!

@tzulitai tzulitai closed this in 150eaf8 Mar 29, 2023
tzulitai added a commit that referenced this pull request Apr 4, 2023
tzulitai added a commit that referenced this pull request Apr 4, 2023
mas-chen pushed a commit to mas-chen/flink-connector-kafka that referenced this pull request Apr 25, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants