-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-24131][datastream/kafka] Reliably shutdown producer #17152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit ddc241c (Sun Sep 05 21:44:18 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
XComp
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @AHeise for your contribution. I added some minor comments below.
| + '\'' | ||
| + ", inTransaction=" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| + '\'' | |
| + ", inTransaction=" | |
| + "', inTransaction=" |
nit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strange - it's autogenerated.
| } catch (ProducerFencedException e) { | ||
| // initTransaction has been called on this transaction before | ||
| LOG.error( | ||
| "Transactions {} timed out or was overridden and data has been potentially lost.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| "Transactions {} timed out or was overridden and data has been potentially lost.", | |
| "Transaction {} timed out or was overridden and data has been potentially lost.", |
nit
| "Transaction {} was previously canceled and data has been lost.", | ||
| committable, | ||
| e); | ||
| } catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to be sure we don't miss anything here: Why don't we handle Throwable here anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was an unintended change but now I intend to keep it. I don't think we can and should handle Errors here (think of OOM).
|
|
||
| this.kafkaWriterState = new KafkaWriterState(transactionalIdPrefix); | ||
| this.lastCheckpointId = sinkInitContext.getRestoredCheckpointId().orElse(-1); | ||
| this.lastCheckpointId = sinkInitContext.getRestoredCheckpointId().orElse(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We change it to 0 because the checkpoint ID counting starts at 1. That knowledge was a bit hidden in StandaloneCheckpointIDCounter:34, ZooKeeperCheckpointIDCounter:86 or KubernetesCheckpointIDCounter:167. Can't we move this value in a single place like CheckpointConfig.DEFAULT_INITIAL_CHECKPOINT_ID and use it here as well like CheckpointConfig.DEFAULT_INITIAL_CHECKPOINT_ID - 1? This would make the intention of this literal 0 more obvious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or should we use CheckpointConfig.DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA instead? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a good idea to introduce a constant but CheckpointConfig is probably not the correct place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added it to CheckpointIDCounter which is non-public, but we should probably expose it some public class if we intend to commit to default = 1 (which I guess quite a few users rely on).
| if (exception != null && producerAsyncException == null) { | ||
| producerAsyncException = exception; | ||
| if (exception != null) { | ||
| mailboxExecutor.execute( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you shortly explain what throwing an RuntimeException on the mailbox executor would trigger?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This throws the exception in the main task thread leading to regular failover. The callback is potentially in a different thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the clarification
| committable, | ||
| e); | ||
| } | ||
| recyclable.ifPresent(Recyclable::close); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can we close the producer if we might want to retry committing the Committables? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a test for the closing to KafkaCommitterTest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can we close the producer if we might want to retry committing the Committables? 🤔
We are not retrying them at this point. Retriable path is short-cutted with continue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True. IMHO, the code is hard to understand here because of the little continue being hidden in the catch blocks. But I cannot come up with a quick fix. ¯_(ツ)_/¯
| /** Causes a permanent error by misconfiguration. */ | ||
| @Test | ||
| public void testRetryCommittableOnFatalError() throws IOException { | ||
| final KafkaCommitter committer = new KafkaCommitter(new Properties()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /** Causes a permanent error by misconfiguration. */ | |
| @Test | |
| public void testRetryCommittableOnFatalError() throws IOException { | |
| final KafkaCommitter committer = new KafkaCommitter(new Properties()); | |
| @Test | |
| public void testRetryCommittableOnFatalError() throws IOException { | |
| // Causes a permanent error by misconfiguration. | |
| final KafkaCommitter committer = new KafkaCommitter(new Properties()); |
nit: the comment actually describes the behaviour caused by the new Properties() parameter value. It doesn't describe the test itself which is totally fine because of the test method being descriptive enough.
| final KafkaCommitter committer = new KafkaCommitter(new Properties()); | ||
| final short epoch = 0; | ||
| public void testRetryCommittableOnRetriableError() throws IOException { | ||
| final KafkaCommitter committer = new KafkaCommitter(getProperties()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| final KafkaCommitter committer = new KafkaCommitter(getProperties()); | |
| // causes a network error by inactive broker | |
| final KafkaCommitter committer = new KafkaCommitter(getProperties()); |
nit: I would remove the test method comment. The method name is descriptive enough to show that we're testing the retry here. Why the configuration returned by getProperties() causes the expected behavior is valuable, though, and should be added to the respective line.
| @Parameterized.Parameter public int run; | ||
|
|
||
| @Parameterized.Parameters | ||
| public static Set<Integer> getConfigurations() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to add some comment here describing the intention of run? run is never used in any of the methods. I guess, the intention is that we want to loop over the tests for some reason? getConfigurations is a quite generic term to describe the intention. Maybe, there's a better name (like getTestRunCounter()) to describe what this method returns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should really start reviewing by commit :p This is just a tmp commit for testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair point 👍
| } | ||
|
|
||
| @Test | ||
| public void testRecoveryWithExactlyOnceGuaranteeAndConcurrentCheckpoints() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have expected this test to fail without your changes considering that we ran into issues when doing the release testing in FLINK-23850 resulting in FLINK-24151. But reverting all you changes and only applying the KafkaSinkITCase diff didn't result in any failure on my local machine after 80 runs of testRecoveryWithExactlyOnceGuaranteeAndConcurrentCheckpoints
Is this expected? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I ran all tests 100 times before and it didn't fail once. You just have different timings on a completely overloaded AZP.
|
Running the FLINK-23850 job on this PR's codebase results in the following error with constant data being added to the Kafka input topic: |
|
FLINK-23850 job: |
|
Here is a tar archive containing the |
XComp
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing else to add. LGTM 👍
The local test using the FLINK-23850 setup is also passing. An upgrade of the Kafka server to 2.7.1 was necessary. It appears that we ran into issues with Kafka Server 2.4.1 due to some bug on the Kafka side.
… called twice for final commit. With FLIP-147 (final checkpoint) and the respective opt-in option, a sink would invoke prepareCommit twice for the final commit.
…ectly even with Interruptions.
…ed correctly even with Interruptions.
This also aligns the transaction ids with checkpoint ids starting at 1.
…in KafkaCommitter.
Removed pending records as it doesn't add anything to KafkaWriter#flush (same post-condition as per JavaDoc) but introduces instabilities because of concurrency.
The metric can only be registered once and should simply take the currentProducer to calculate.
What is the purpose of the change
Producer can leak if there are interruptions during cancellation while the producer tries to orderly abort transactions.
Brief change log
SinkWriter#prepareCommit)Verifying this change
Added test with concurrent checkpoints to KafkaWriterITCase.
Most fixes cover issues exposed by existing tests.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation