Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag #13178

Merged
merged 41 commits into from Feb 17, 2023

Conversation

gharris1727
Copy link
Contributor

@gharris1727 gharris1727 commented Jan 31, 2023

The primary issue being addressed here is the incorrect translation of offsets, the title issue KAFKA-12468.
Additionally, this PR addresses KAFKA-13659, which prevents MirrorCheckpointTasks that restart from emitting checkpoints before they can read to the end of the offset syncs topic.
Additionally, this PR stabilizes the MirrorMaker2 integration tests which were too flakey to properly verify this fix.

The MM2 KIP does not discuss the offset translation mechanism in detail, so I'll summarize the mechanism as it currently exists on trunk:

  1. Records are mirrored from source topic-partition to target topic-partition by the MirrorSourceTask
  2. MirrorSourceTask will (occasionally) emit OffsetSync messages to an Offset Syncs topic. Offset syncs contain the upstream and downstream offset of an emitted data record.
  3. The MirrorCheckpointTask will consume from the offset syncs topic, and maintain an in-memory copy of the latest offset sync seen for each topic-partition (in OffsetSyncStore)
  4. Periodically the MirrorCheckpointTask will poll consumer group offsets for the source topic, and use it's in-memory copy of the latest offset sync to translate upstream offsets to downstream offsets.
  5. This is done by measuring the 'distance' between the MM2 offset sync and the upstream consumer group, and then assuming that the same distance applies in the downstream topic.

Step (5) is correct when assuming that every offset from the source topic has already been reproduced in the downstream topic. However, this assumption is violated when offsets are not present, which can happen for a variety of reasons, including:

  1. Transaction markers take an offset but will never be emitted as records from the consumer
  2. Records are dropped by SMTs and will never be emitted to the target topic
  3. The source topic has been compacted and some offsets will never be emitted by the consumer
  4. MM2 replication is lagging behind an upstream consumer group and some records have not been replicated yet

In any of these conditions, an upstream offset may be translated to a downstream offset which is beyond the corresponding record in the downstream topic. Consider the following concrete example of situation (4) resulting in negative lag:

  1. Source topic A has 1000 records, all with contiguous offsets
  2. An upstream consumer group cg is at the end of the log, offset 1000.
  3. MM2 begins replicating the topic, and writes 500 upstream records to the target topic target.A, and writes offset-syncs correlating (A, 500) with (target.A, 500).
  4. MM2 checkpoint reads cg offset 1000, translates the offset to 500 + (1000-500) = 1000, and writes to target.cg
  5. Someone checks the target.cg offset for target.A and observes that the group offset is 1000, the topic end offset is 500, and the lag is -500.

And the following concrete example of situation (1) resulting in undelivered data.

  1. Source topic A has 1000 records, all emitted with a transactional producer.
  2. The 1000 records are interleaved with 1000 commit markers at every other offset.
  3. An upstream consumer group cg is in the middle of the topic, at offset 1000.
  4. MM2 begins replicating the topic, and writes 1000 records to the target topic target.A, and writes offset-syncs correlating (A, 500) with (target.A, 250), in addition to other offset-syncs.
  5. MM2 checkpoint reads the cg offset 1000, translates the offset to 250 + (1000 - 500) = 750, and writes to target.cg
  6. A system fails-over from cg to target.cg and someone notices that the cg application read records 0-500, target.cg application read 750-1000, but no consumer ever received offsets 500-750.

This PR adds a test that replicates transactional data, as in situation (1). It asserts that whenever an offset is translated, it does not pass the end of the downstream topic, and cannot cause negative lag. In addition the tests are strengthened to require the offset syncs to be emitted up to the end of the topic, requiring a fix for the offset-syncs topic starvation issue. This also exposed a number of mistakes and flakiness in the existing tests, so this PR also stabilizes the tests to make them useful for validating the negative offsets fix.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Signed-off-by: Greg Harris <greg.harris@aiven.io>
…consumer offsets

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
…rds are mirrored

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
…ream offsets

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
…arvation

Signed-off-by: Greg Harris <greg.harris@aiven.io>
…er lag

Signed-off-by: Greg Harris <greg.harris@aiven.io>
@gharris1727
Copy link
Contributor Author

After #13181 is merged, I'll rebase and remove my fairness patch.

@gharris1727 gharris1727 changed the title KAFKA-12468, KAFKA-14663, KAFKA-12566: Fix MM2 causing negative downstream lag KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag Feb 6, 2023
Copy link
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gharris1727. I had to read a few parts of this several times over to grok things, but your description and the changes here make sense now.

I believe that the proposed solution is reasonable since the current approach is based on a flawed assumption and we have seen several real-life cases where users are affected negatively by this. Eliminating reliance on this assumption will eliminate cases of negative consumer lag and potential data loss by downstream consumers, at the cost of making duplicate message consumption on failover to downstream clusters more likely. I think this is a tradeoff worth making.

It does seem that there's a chance this could lead to a regression in behavior in cases where the assumption about offsets of upstream records aligning with offsets of downstream records (or rather, of the delta between the two) holds. I notice that we don't do a read-to-end of the offset syncs topic in MirrorCheckpointTask before we begin syncing consumer group offsets, and we begin reading that topic from the beginning. This may cause us to sync offsets based on stale checkpoints if there are later checkpoints available in the topic that we haven't consumed yet. Do you think it might make sense to add a read-to-end for the offset syncs topic before we begin syncing consumer group offsets in the checkpoint connector? (If so, this should probably be handled in a follow-up PR; I only bring it up now because it seems like the impact of syncing from stale checkpoints may be exacerbated after with this change.

@C0urante
Copy link
Contributor

C0urante commented Feb 8, 2023

Oh, and the Jenkins build seems to be consistently failing on the IdentityReplicationIntegrationTest::testNoCheckpointsIfNoRecordsAreMirrored test case. Probably want to look into that before we merge 😄

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
@gharris1727
Copy link
Contributor Author

I notice that we don't do a read-to-end of the offset syncs topic in MirrorCheckpointTask before we begin syncing consumer group offsets, and we begin reading that topic from the beginning. This may cause us to sync offsets based on stale checkpoints if there are later checkpoints available in the topic that we haven't consumed yet. Do you think it might make sense to add a read-to-end for the offset syncs topic before we begin syncing consumer group offsets in the checkpoint connector?

Ouch, yeah that is certainly an issue that gets worse with my change.
Before the checkpoints would be non-monotonic for transactional/compacted topics, and after it's non-monotonic for everyone.
I think addressing this in a follow-up is a smart idea, this change is already messy enough.

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
@gharris1727
Copy link
Contributor Author

Oh, and the Jenkins build seems to be consistently failing on the IdentityReplicationIntegrationTest::testNoCheckpointsIfNoRecordsAreMirrored test case. Probably want to look into that before we merge 😄

It looks like this was failing due to a typo in my offset.flush.interval.ms overrides. This should be fixed now.

Signed-off-by: Greg Harris <greg.harris@aiven.io>
@C0urante
Copy link
Contributor

C0urante commented Feb 8, 2023

Ouch, yeah that is certainly an issue that gets worse with my change.
Before the checkpoints would be non-monotonic for transactional/compacted topics, and after it's non-monotonic for everyone.
I think addressing this in a follow-up is a smart idea, this change is already messy enough.

And funnily enough, someone's already filed a ticket for that exact issue! 🎉

Out of an abundance of caution, what do you think about targeting your mm2-negative-offsets branch with a new PR to address KAFKA-13659, which can be reviewed separately but then merged to trunk in tandem with this PR?

This has the potential to be a fairly large change in behavior, and I'd like to do everything we can to minimize the chances that it breaks users' setups. Ensuring that this PR is merged if and only if a fix for KAFKA-13659 would help on that front.

@C0urante
Copy link
Contributor

C0urante commented Feb 9, 2023

@mimaison This is a moderately large change in behavior and if possible, it'd be nice to get another set of eyes on it before merging. We don't need another reviewer for the PR changes (although comments are always welcome); instead, I'd just like confirmation that this change is safe to make as a bug fix.

TL;DR: If an upstream consumer group is ahead of the upstream offset for the latest-emitted checkpoint, we will only sync offsets for that consumer group to the downstream cluster based on the offset pair for that checkpoint, instead of adding the delta of (upstream offset for consumer group - upstream offset in checkpoint), since there is no guarantee that that delta will be accurate in cases where the upstream topic is compacted, has transaction markers, or has some records filtered out via SMT.

…nteg test with unit

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Copy link
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks Greg! This was much trickier than expected but I'm really happy with the quality of this fix and am optimistic it's going to make our users much happier.

Will merge pending CI build.

@C0urante
Copy link
Contributor

Hmmm... there appear to be some integration test failures. I've reproduced some of them locally too, which makes flakiness an unlikely cause. Can you look into the integration test failures and see if we can get a green run before merging this?

…returning

Signed-off-by: Greg Harris <greg.harris@aiven.io>
@gharris1727
Copy link
Contributor Author

Unfortunately those test failures only appear in the EOS test and appear to be caused by EOS mode.
This is because MM2 doesn't do the periodic background commits that the offset syncs starvation fix relies on, and these assertions rely on the offset syncs starvation fix to assert that all the syncs are emitted.

I added a tweak to the firePendingOffsetSyncs which drains the offset syncs map on each commit. Do you think we can use the same blocking-drain for EOS and normal mode, or should this behavior only be enabled for EOS mode?

@C0urante
Copy link
Contributor

Hmmm... we make an effort to periodically invoke SourceTask::commit in non-EOS mode, even if there have been no new records written and there are no offsets to commit. I think we should do the same in EOS mode, instead of adjusting MM2 to accommodate this unexpected change in behavior.

…cords are produced

Signed-off-by: Greg Harris <greg.harris@aiven.io>
@gharris1727
Copy link
Contributor Author

I've split the periodic commit fix into a separate PR (#13262) as it is not related to the changes here, only the test changes. This PR depends on that change landing first, and I'll update this branch once the other PR is merged.

…en no records are produced"

This reverts commit aaf3e16.
…over minimum offsets

This de-flakes the MirrorConnectorsIntegrationExactlyOnce test that had this
condition exit too early, causing a later assertion to fail.

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Copy link
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test failures are all unrelated. LGTM!

@C0urante C0urante merged commit a54a34a into apache:trunk Feb 17, 2023
gharris1727 added a commit to aiven/kafka that referenced this pull request Feb 21, 2023
…tream lag, syncing stale offsets, and flaky integration tests (apache#13178)

KAFKA-12468: Fix negative lag on down consumer groups synced by MirrorMaker 2

KAFKA-13659: Stop syncing consumer groups with stale offsets in MirrorMaker 2

KAFKA-12566: Fix flaky MirrorMaker 2 integration tests

Reviewers: Chris Egerton <chrise@aiven.io>
AnatolyPopov pushed a commit to aiven/kafka that referenced this pull request Feb 22, 2023
…tream lag, syncing stale offsets, and flaky integration tests (apache#13178)

KAFKA-12468: Fix negative lag on down consumer groups synced by MirrorMaker 2

KAFKA-13659: Stop syncing consumer groups with stale offsets in MirrorMaker 2

KAFKA-12566: Fix flaky MirrorMaker 2 integration tests

Reviewers: Chris Egerton <chrise@aiven.io>
C0urante pushed a commit that referenced this pull request Feb 23, 2023
…tream lag, syncing stale offsets, and flaky integration tests (#13178)

KAFKA-12468: Fix negative lag on down consumer groups synced by MirrorMaker 2

KAFKA-13659: Stop syncing consumer groups with stale offsets in MirrorMaker 2

KAFKA-12566: Fix flaky MirrorMaker 2 integration tests

Reviewers: Chris Egerton <chrise@aiven.io>
C0urante pushed a commit that referenced this pull request Feb 23, 2023
…tream lag, syncing stale offsets, and flaky integration tests (#13178)

KAFKA-12468: Fix negative lag on down consumer groups synced by MirrorMaker 2

KAFKA-13659: Stop syncing consumer groups with stale offsets in MirrorMaker 2

KAFKA-12566: Fix flaky MirrorMaker 2 integration tests

Reviewers: Chris Egerton <chrise@aiven.io>
jjaakola-aiven added a commit to aiven/kafka that referenced this pull request Mar 15, 2023
…ve downstream lag, syncing stale offsets, and flaky integration tests (apache#13178)"

This reverts commit b2b9f18.
jjaakola-aiven added a commit to aiven/kafka that referenced this pull request Mar 15, 2023
…ve downstream lag, syncing stale offsets, and flaky integration tests (apache#13178)"

This reverts commit ead8dfb.
@gkousouris
Copy link

gkousouris commented Sep 19, 2023

Hi @gharris1727, if I understand this PR correctly, this will (almost always) cause duplication of data. Our problem is this:

  1. We have a service reading from a Kafka topic and committing offsets for its consumer group.
  2. We use MirrorMaker to replicate the topic to a different cluster.
  3. We pause the service and check the current offsets for the 2 streams (the one in the old cluster and the one in the new cluster).

In step 3, these offsets will be different, specifically, the offset from the old cluster will be the last message the service managed to commit an offset for. And the new topic will have as an offset the value: last_checkpoint_before_upstream_offset + 1.

Thus, when we restart the service (and make it consume from the new topic), it will re-process all messages from last_checkpoint_before_upstream_offset + 1 until downstream_offset. Isn't this a problem considering that Mirror Maker is providing Exactly Once Semantics for committing messages ?

This behaviour was verified by looking at the output of the kafka-consumer-groups script.

@gharris1727
Copy link
Contributor Author

Hi @gkousouris Thanks for asking!

Thus, when we restart the service (and make it consume from the new topic), it will re-process all messages from last_checkpoint_before_upstream_offset + 1 until downstream_offset. Isn't this a problem considering that Mirror Maker is providing Exactly Once Semantics for committing messages ?

Your understanding of the offset translation (post-KAFKA-12468) is correct, and I would expect re-processing of messages downstream after a fail-over. I also understand that this doesn't satisfy "exactly once semantics" for some definition, because it allows for re-delivery of the same message to the same "application", when that application uses multiple Kafka clusters.

MirrorMaker2 currently provides "exactly once semantics" for replicating data, but not for offsets. I believe this is captured by the "MirrorSourceConnector" declaring that it supports EOS, but the"MirrorCheckpointConnector" does not. This means that when you replicate a topic with EOS mode, and use read_committed on the downstream topic from the beginning, EOS would mean that you read each record present in the upstream topic exactly once. When you instead start reading at the committed downstream offset, you may have records delivered downstream that have already been committed upstream.

This is not just caused by the offset translation that this PR implements, it's a limitation of the asynchronous offset translation that MirrorMaker2 uses. Consider this sequence:

  1. MirrorCheckpointTask syncs offsets
  2. MirrorCheckpointTask sleeps
  3. MirrorSourceTask replicates some records
  4. The upstream consumer group consumes upstream records and commits offsets to the upstream group
  5. The downstream consumer group starts reading the topic with the stale offsets in the downstream group

Thanks for doing your due diligence on the claims of "exactly once semantics", and I hope that you can still make MirrorMaker2 work for your use-case. I suspect that EOS semantics across multiple Kafka clusters is a much larger effort than just changing the offset translation logic :) If you have a Jira account, please consider opening a ticket about this shortcoming.

Thanks!

@gkousouris
Copy link

Thanks a lot for your reply! I will look into creating a Jira account and creating a ticket for this.

I should have mentioned that for we were planning on using Mirror Maker to migrate the topic a service reads from from cluster A to cluster B. So we would not be limited by the asynchronous offset translation that MirrorMaker uses, since we would be:

  1. Mirroring data from old topic in cluster A to new topic in cluster B.
  2. Stopping the service and waiting for the consumer offset of the last message on cluster A to be replicated on the new topic on cluster.
  3. Restart the service to read from the new topic.

We would have hoped that the offset would be translated exactly at some point, and would let us to seamlessly start consuming from the same point it was last stopped.

MirrorMaker seems like a great use case for us, but this might be a bit of a blocker. Using the old offset translation version before this PR could perhaps work if we were to disable EOS (to get rid of the transactional messages).

Otherwise, the only solution I can think of is the hacky approach of reading the offsets and trying to decipher what message to read on the application-side, which seems brittle.

Would you perhaps recommend a different approach to not re-process a message twice ?

@gharris1727
Copy link
Contributor Author

@gkousouris Thanks for sharing your use-case. I think you are right to look towards MM2 for this sort of translation, and I think it's unfortunate that it isn't straightforward. The current offset translation doesn't "converge" for consumer groups which are inactive due to memory limitations, but for a single-shot migration use-case, that's not good enough.

Are you able to stop the producers to the upstream topic, and let the consumers commit offsets at the end of the topic before performing the migration? If you set offset.lag.max very low, MM2 should be able to translate offsets at the end of the topic.

Otherwise, the only solution I can think of is the hacky approach of reading the offsets and trying to decipher what message to read on the application-side, which seems brittle.

Yeah, if you want to get a 100% precise translation away from the end of the topic and don't want to modify MM2, you're going to need to "synchronize" the two topics and figure out which messages line up. Between offset.lag.max, the syncs topic throttle semaphore, and the OffsetSyncStore, a lot of intermediate offset syncs get discarded and the precision of the translation decreases significantly. If you let MirrorMaker2 perform a rough translation that you later refine with a custom script, you probably only need to compare a few hundred record checksums for each topic-partition-consumer-group. This would also allow you to compensate for the skipped offsets that EOS mode produces.

I think you could make such a script reliable enough for a one-off migration, with some manual spot-checking to make sure it doesn't do anything too incorrect.

If you're willing to hack on the MirrorMaker connectors, you could disable the throttling semaphore, the offset.lag.max parameter, and implement the full-depth OffsetSyncStore to get perfect translation. I don't think we could add those to mainline MM2 without a configuration, but you are certainly welcome to temporarily fork MM2 to get the job done.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants