Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactions: Add offsets to be committed directly after `producer.send` #752

Merged
merged 10 commits into from Apr 3, 2019

Conversation

Projects
None yet
4 participants
@2m
Copy link
Member

commented Mar 20, 2019

Purpose

The transaction flow could produce duplicates (as the test cases showed). By adding the offsets to be committed directly after producers.send instead of in the call-back this is solved.

Background context

The included test-case where multiple transactional source-sink flows are being restarted while moving messages from one topic to another. This test-case showed produced duplicates before the change.

@2m

This comment has been minimized.

Copy link
Member Author

commented Mar 21, 2019

The restarted transactional producers should be fine. They provide transactional id which then is used to retrieve the producer id. This, together with calling producer.initTransactions() on the producer stage startup make sure that existing transaction is aborted, if any. Or if there is a transaction being committed, then it waits until the commit succeeds.

Still investigating the root cause of this. Currently looking into the effects of rebalancing to the transactional consumer, where the duplicates are coming from.

@2m 2m force-pushed the 2m:wip-restarting-transaction-2m branch from d328007 to 70f76d5 Mar 22, 2019

@2m

This comment has been minimized.

Copy link
Member Author

commented Mar 22, 2019

Added a commit where consumer actor is only initialized when the consumer stage gets demand. This makes sure that initTransactions() from the TransactionalProducerStageLogic.preStart() is called before initializing the consumer. initTransactions() aborts transactions that have been started by another reincarnation of producer with the same transaction-id (producer-id).

However I still noticed some duplicates (but less) after this change.

valuesSource(settings, topic)
.runWith(TestSink.probe)

private def valuesSource(settings: ConsumerSettings[String, String],

This comment has been minimized.

Copy link
@seglo

seglo Mar 22, 2019

Contributor

Does the probe consumer properties enable isolation.level read_committed? This will ensure that only successfully committed messages are consumed.

This comment has been minimized.

Copy link
@2m

2m Mar 22, 2019

Author Member

Yes, the consumer has that in its settings:

.withProperties(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> "read_committed")

@2m

This comment has been minimized.

Copy link
Member Author

commented Mar 22, 2019

Here is another commit that builds onto this, where it checks for duplicate messages in the producer. This would catch any duplicates arriving to the same instance of producer. However there does not seem to be any when running the test-case, meaning that duplication occurs between different instances of the producer.

2m@bf1ff44

@2m 2m force-pushed the 2m:wip-restarting-transaction-2m branch from 70f76d5 to d381b4f Mar 29, 2019

@2m

This comment has been minimized.

Copy link
Member Author

commented Mar 29, 2019

@seglo @ennru The offset update change fixes this test-case for me and I do not see any duplicates anymore. I have removed the initialization order fix as that seems to not be needed now.

@2m

This comment has been minimized.

Copy link
Member Author

commented Mar 29, 2019

I also published jars with this fix for anymore that would like to try this out: 1.0.1+11-d381b4f4

You will need to add Alpakka Kafka snapshot repository to your build to resolve it: https://doc.akka.io/docs/alpakka-kafka/current/snapshots.html

//cc @JustinPihony

@ennru
Copy link
Member

left a comment

Quite surprising that this makes all the difference.

@JustinPihony
Copy link

left a comment

This did not fix the rebalance issue I have been seeing.

2m added some commits Apr 2, 2019

@2m

This comment has been minimized.

Copy link
Member Author

commented Apr 2, 2019

Addressed review feedback.

@ennru

ennru approved these changes Apr 2, 2019

Copy link
Member

left a comment

LGTM.

@ennru ennru changed the title Add a testcase where transactional source+sink is being restarted Transactions: Add offsets to be committed directly after `producer.send` Apr 2, 2019

@ennru ennru added this to the 1.0.2 milestone Apr 2, 2019

2m added some commits Apr 2, 2019

@ennru ennru merged commit 80e91d6 into akka:master Apr 3, 2019

1 of 2 checks passed

continuous-integration/travis-ci/pr The Travis CI build failed
Details
typesafe-cla-validator All users have signed the CLA
Details

@2m 2m deleted the 2m:wip-restarting-transaction-2m branch Apr 3, 2019

@ennru ennru referenced this pull request Apr 11, 2019

Closed

Sprint plan Alpakka Team 2019-03-11 #97

2 of 13 tasks complete
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.