Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix transaction offsets for transactional producer #742

Merged
merged 3 commits into from Mar 13, 2019

Conversation

Projects
None yet
3 participants
@szymonm
Copy link
Contributor

commented Mar 8, 2019

Pull Request Checklist

Purpose

Callbacks from producer.send are guaranteed to execute in order only for
a single output partition. That means we can have a race condition where we
execute a callback for input record with offset 1 before executing a
callback for input record with offset 0. That causes
NonEmptyTransactionBatch to contain offset 0, when committing
transaction. That leads to data duplication.

Background Context

Use maximal offset when committing transaction in
TransactionalProducerStage

This fix ensures that we only increase the offsets stored in
TransactionBatch. Since having all consecutive offsets wrote to Kafka is
guaranteed by awaitingConfirmation == 0, we can only keep the maximal
offset in the TransactionBatch.

References

See #539 for some relevant context.

Are there any relevant issues / PRs / mailing lists discussions?

@szymonm

This comment has been minimized.

Copy link
Contributor Author

commented Mar 8, 2019

I know of failing tests... will fix them.

@ennru

This comment has been minimized.

Copy link
Member

commented Mar 8, 2019

Thank you for digging into this down to the bottom -- great find!

@seglo

This comment has been minimized.

Copy link
Contributor

commented Mar 9, 2019

Good work @szymonm. Thank you.

I'm curious how you can recreate this reliably in a test without some mechanism to force one callback to return before the other?

@szymonm

This comment has been minimized.

Copy link
Contributor Author

commented Mar 11, 2019

@ennru I cannot easily force callbacks to return in specific other.

However, the probability of the one with the highest order to return last is 1/number of ktps per topic, so after increasing the number of partitions and the number of consumers I can get pretty good reproducibility of the failure. What do you think of this approach?

@seglo

This comment has been minimized.

Copy link
Contributor

commented Mar 11, 2019

@ennru I cannot easily force callbacks to return in specific other.

However, the probability of the one with the highest order to return last is 1/number of ktps per topic, so after increasing the number of partitions and the number of consumers I can get pretty good reproducibility of the failure. What do you think of this approach?

@szymonm I assume this question was directed to me?

I think that's probably the best we can do right now without modifying the TransactionalProducerStage just for the sake of testing this. Instead of adding another test, how about we increase the number of partitions for all the tests running in TransactionsSpec? I think this is good enough because if it were already done it would have revealed this issue as transient failures.

@szymonm

This comment has been minimized.

Copy link
Contributor Author

commented Mar 11, 2019

You're right @seglo.

We need to have at least two consumers to test that offsets are stored in Kafka correctly (more if we want the test to be more reliable in finding errors).
We have that in tests 3rd and 4th, so that should work. However, IIUC these tests focus on failure recovery. I would argue that having specialised test for offset committing in case of multiple consumers makes sense.

@seglo

This comment has been minimized.

Copy link
Contributor

commented Mar 11, 2019

@szymonm I'm not sure I get it.

I'm attempting to understand what's happening in your test provided with this PR. It seems that it sequentially runs transactional streams for 5 batches of numbers. i.e.)

1..10
11..20
21..30
31..40
41..50

The source test data used in each transaction is always coming from a single partition, but is produced to multiple partitions using the round robin partioning strategy because a key isn't defined in the ProducerRecord. Is the idea that one of these batches could potentially commit an earlier offset back to the group, and that the next next transaction may read a duplicate already processed message?

@szymonm

This comment has been minimized.

Copy link
Contributor Author

commented Mar 11, 2019

That's right, @seglo .

One thing I would add is that we don't impose any order in which the streams process data (the runStream method is non-blocking). It's on Kafka to assign ktps to streams and to ensure that at most one is processing the data at the time. Each stream processes 10 elements and then finishes, so Kafka will assign the ktp with elements to another stream.

@szymonm szymonm force-pushed the szymonm:szymon-fix-offsets-for-transaction branch from 6411558 to ff46372 Mar 11, 2019

szymonm added some commits Mar 4, 2019

Use maximal offset when committing transaction in TransactionalProduc…
…erStage

Callbacks from `producer.send` are guaranteed to execute in order
only for a single output partition. That means we can have a race condition
where we execute a callback for input record with offset 1 before
executing a callback for input record with offset 0. That causes
`NonEmptyTransactionBatch` to contain offset 0, when committing
transaction. That leads to data duplication.

This fix ensures that we only increase the offsets stored in
`TransactionBatch`. Since having all consecutive offsets wrote to Kafka
is guaranteed by `awaitingConfirmation == 0`, we can only keep the
maximal offset in the `TransactionBatch`.

@szymonm szymonm force-pushed the szymonm:szymon-fix-offsets-for-transaction branch from ff46372 to 5d6de51 Mar 11, 2019

@szymonm

This comment has been minimized.

Copy link
Contributor Author

commented Mar 11, 2019

FYI:

  • fixed the tests so only the last one writes to multiple partitions,
  • deleted accidental changes to logging config,
  • refactored the tests in TransactionalSpec to avoid code duplication.

Pls review commit by commit.

@seglo

This comment has been minimized.

Copy link
Contributor

commented Mar 11, 2019

@szymonm Got it, and thanks for DRYing up the tests. Would you mind adding some comments to your new test so it's more clear to others (and myself in 6 months) what its purpose is?

@seglo

seglo approved these changes Mar 11, 2019

Copy link
Contributor

left a comment

LGTM. @ennru I think the failing test is an unrelated transient error?

@ennru

ennru approved these changes Mar 13, 2019

Copy link
Member

left a comment

As I said, great find!
LGTM.

@ennru

This comment has been minimized.

Copy link
Member

commented Mar 13, 2019

Failure was #745

@ennru ennru added this to the 1.0.2 milestone Mar 13, 2019

@ennru ennru merged commit d4de2c3 into akka:master Mar 13, 2019

1 of 2 checks passed

continuous-integration/travis-ci/pr The Travis CI build failed
Details
typesafe-cla-validator All users have signed the CLA
Details
@ennru

This comment has been minimized.

Copy link
Member

commented Mar 13, 2019

Thank you for digging this deep and fixing it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.