Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Split ConsumerCoordinator#testCommitOffsetMetadata onto two test cases testing commitSync and commitAsync #13665

Merged
merged 2 commits into from Jun 24, 2023

Conversation

machi1990
Copy link
Contributor

@machi1990 machi1990 commented May 3, 2023

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@machi1990
Copy link
Contributor Author

machi1990 commented May 10, 2023

@ableegoldman @showuon can you've a look at this draft PR once you've sometime? I'll love to get your early feedback Thanks

@showuon
Copy link
Contributor

showuon commented May 15, 2023

I'll try to have a look this week. Thanks.

@machi1990
Copy link
Contributor Author

I'll try to have a look this week. Thanks.

Thanks @showuon I appreciate it.

Copy link
Contributor

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@machi1990 , thanks for the patch. Left some comments.

@machi1990
Copy link
Contributor Author

Thank you for the review @showuon I'll address the comments by pushing code change in the next while.

@machi1990 machi1990 force-pushed the KAFKA-12485 branch 2 times, most recently from 401fb41 to a4fe7dd Compare May 19, 2023 08:28
@machi1990 machi1990 requested a review from showuon May 19, 2023 08:49
Copy link
Contributor

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Could we add tests to verify the committed offsets cache will be updated when the consumer committed some offests? Also, you could change to "non-draft" state when you're ready. Thanks.

@machi1990
Copy link
Contributor Author

Overall LGTM. Could we add tests to verify the committed offsets cache will be updated when the consumer committed some offests? Also, you could change to "non-draft" state when you're ready. Thanks.

Thank you @showuon for the review. I was away on public holiday yesterday, I am catching up today and I'l have a look on adding more tests tomorrow. Once that is done, I'll promote the PR and mark it ready for review and ping you then. Cheers!

@machi1990
Copy link
Contributor Author

Hey @showuon the PR should be ready for another round of review. Thank you.

Copy link
Contributor

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@machi1990 , thanks for the update, left some more comments.

@machi1990 machi1990 force-pushed the KAFKA-12485 branch 2 times, most recently from 5ca33ba to 76f3297 Compare June 15, 2023 10:19
@machi1990
Copy link
Contributor Author

@machi1990 , thanks for the update, left some more comments.

Thanks for the thorough review @showuon I've addressed all the comments. Please have another look when you've some time, thanks.

@machi1990 machi1990 requested a review from showuon June 15, 2023 10:20
Copy link
Contributor

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@machi1990 , thanks for the update. Left some more comments.

@machi1990
Copy link
Contributor Author

Thanks @showuon for your review on this. I've addressed the comments.

@machi1990 machi1990 requested a review from showuon June 15, 2023 12:17
Copy link
Contributor

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Just one minor comment. After addressing it, I'll check the CI build result tomorrow. Thank you for the patch!

@machi1990
Copy link
Contributor Author

LGTM! Just one minor comment. After addressing it, I'll check the CI build result tomorrow. Thank you for the patch!

Thank you so much for the review and help on this @showuon

@showuon
Copy link
Contributor

showuon commented Jun 19, 2023

Re-triggering CI build

@showuon
Copy link
Contributor

showuon commented Jun 19, 2023

@machi1990 , looks like this change breaks some tests. Could you take a look?

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13665/12

@machi1990
Copy link
Contributor Author

@machi1990 , looks like this change breaks some tests. Could you take a look?

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13665/12

Thank you @showuon I am re-running the whole test suite on my machine again. I'll report back what I find. In the meanwhile, let me know if you prefer me to mark this PR as draft? I am okay either way.

@showuon
Copy link
Contributor

showuon commented Jun 19, 2023

No need to mark as "draft". No worries! :)

@machi1990
Copy link
Contributor Author

@showuon I was looking onto this and after several local runs, I managed to eliminate some flasky test and came up with the list of failures that are only caused by this change. The failure total number of failures that I've seen locally are:

        kafka.api.TransactionsBounceTest.testWithGroupId()
        kafka.api.TransactionsBounceTest.testWithGroupMetadata()
        kafka.api.TransactionsTest.testSendOffsetsWithGroupId(String)[1]
        kafka.api.TransactionsTest.testSendOffsetsWithGroupId(String)[2]
        kafka.api.TransactionsTest.testSendOffsetsWithGroupMetadata(String)[1]
        kafka.api.TransactionsTest.testSendOffsetsWithGroupMetadata(String)[2]
        kafka.server.ClientQuotasRequestTest.testAlterClientQuotasBadIp()[1]
        kafka.server.ClientQuotasRequestTest.testAlterClientQuotasBadIp()[2]
        kafka.server.ClientQuotasRequestTest.testAlterClientQuotasBadIp()[3]
        kafka.server.DynamicConfigChangeUnitTest.testIpHandlerUnresolvableAddress()
        kafka.zk.ZkMigrationIntegrationTest.testNewAndChangedTopicsInDualWrite(ClusterInstance)[1]
        kafka.admin.ConfigCommandTest.shouldFailIfInvalidHost()

And only the TransactionsTest and TransactionsBounceTest are the ones that I've identified to be related to this PR.
I've started to investigate these it so far my conclusion is that the failure there are related to reading of stale cache values because the cache item is stored only once when fetching the offset in [1]. The test neither calls commitSync, nor commitAsync which means that the cache is never updated in [2] after initially set in [1].

I was thinking of dropping off cache update when fetching committed offsets i.e in [1] and only perform cache update when during offset commit [2]

  1. https://github.com/apache/kafka/blob/76d25c94e2c8723eec31a3df64c752bc66c79b34/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1075
  2. https://github.com/apache/kafka/blob/76d25c94e2c8723eec31a3df64c752bc66c79b34/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1456

That would align to the comment you raised in #13665 (comment)
Let me know what you think.

@machi1990
Copy link
Contributor Author

@showuon I was looking onto this and after several local runs, I managed to eliminate some flasky test and came up with the list of failures that are only caused by this change. The failure total number of failures that I've seen locally are:

        kafka.api.TransactionsBounceTest.testWithGroupId()
        kafka.api.TransactionsBounceTest.testWithGroupMetadata()
        kafka.api.TransactionsTest.testSendOffsetsWithGroupId(String)[1]
        kafka.api.TransactionsTest.testSendOffsetsWithGroupId(String)[2]
        kafka.api.TransactionsTest.testSendOffsetsWithGroupMetadata(String)[1]
        kafka.api.TransactionsTest.testSendOffsetsWithGroupMetadata(String)[2]
        kafka.server.ClientQuotasRequestTest.testAlterClientQuotasBadIp()[1]
        kafka.server.ClientQuotasRequestTest.testAlterClientQuotasBadIp()[2]
        kafka.server.ClientQuotasRequestTest.testAlterClientQuotasBadIp()[3]
        kafka.server.DynamicConfigChangeUnitTest.testIpHandlerUnresolvableAddress()
        kafka.zk.ZkMigrationIntegrationTest.testNewAndChangedTopicsInDualWrite(ClusterInstance)[1]
        kafka.admin.ConfigCommandTest.shouldFailIfInvalidHost()

And only the TransactionsTest and TransactionsBounceTest are the ones that I've identified to be related to this PR. I've started to investigate these it so far my conclusion is that the failure there are related to reading of stale cache values because the cache item is stored only once when fetching the offset in [1]. The test neither calls commitSync, nor commitAsync which means that the cache is never updated in [2] after initially set in [1].

I was thinking of dropping off cache update when fetching committed offsets i.e in [1] and only perform cache update when during offset commit [2]

  1. https://github.com/apache/kafka/blob/76d25c94e2c8723eec31a3df64c752bc66c79b34/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1075
  2. https://github.com/apache/kafka/blob/76d25c94e2c8723eec31a3df64c752bc66c79b34/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1456

That would align to the comment you raised in #13665 (comment) Let me know what you think.

I've pushed this change in 9539c55

@showuon
Copy link
Contributor

showuon commented Jun 20, 2023

@machi1990 , could you explain more about this:

The test neither calls commitSync, nor commitAsync which means that the cache is never updated in [2] after initially set in [1].

If it don't commit anything, then the expected committed value should be 0, right? And if the expected value are all greater than 0, there should be somewhere doing offset commit, right? It could be auto commit in the consumer side. (maybe?)

@machi1990
Copy link
Contributor Author

@machi1990 , could you explain more about this:

The test neither calls commitSync, nor commitAsync which means that the cache is never updated in [2] after initially set in [1].

If it don't commit anything, then the expected committed value should be 0, right? And if the expected value are all greater than 0, there should be somewhere doing offset commit, right? It could be auto commit in the consumer side. (maybe?)

Sorry @showuon , I should have clarified: There is no manual sync/async commit of offsets.
The offset are all greater than 0. The consumer doesn't do auto commit as can be seen here in the initialization https://github.com/apache/kafka/blob/9539c559a782aba8ce95c9b8b48831c6879821d2/core/src/test/scala/integration/kafka/api/TransactionsTest.scala#L774
offsets commits are happening somewhere else and that's via the producer.sendOffsetsToTransaction(..) e.g https://github.com/apache/kafka/blob/9539c559a782aba8ce95c9b8b48831c6879821d2/core/src/test/scala/integration/kafka/api/TransactionsTest.scala#L247

I didn't think of this earlier on and it changes a few thing, making me think that I might need to revisit the caching logic for committed offsets e.g re-use consumer#position(tp).

I am keen to know what you think?

@showuon
Copy link
Contributor

showuon commented Jun 21, 2023

offsets commits are happening somewhere else and that's via the producer.sendOffsetsToTransaction(..) e.g

Oh, EOS case! I didn't consider it, sorry!
Hmm... if there is EOS case to consider, the original cache mechanism will not work since the offset commit is not via consumer, the consumer has no idea which offset has committed.
I think we should close this PR and JIRA ticket as "invalid" and add comment into the JIRA ticket. WDYT?

@machi1990 machi1990 changed the title KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions MINOR: Split ConsumerCoordinator#testCommitOffsetMetadata onto two test cases testing commitSync and commitAsync Jun 21, 2023
@machi1990
Copy link
Contributor Author

offsets commits are happening somewhere else and that's via the producer.sendOffsetsToTransaction(..) e.g

Oh, EOS case! I didn't consider it, sorry! Hmm... if there is EOS case to consider, the original cache mechanism will not work since the offset commit is not via consumer, the consumer has no idea which offset has committed. I think we should close this PR and JIRA ticket as "invalid" and add comment into the JIRA ticket. WDYT?

Thanks @showuon I've marked the JIRA as invalid.
As for the PR, I've repurposed it to only keep the things we can keep from this change.
Please have a look when you can.

Copy link
Contributor

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for the work. Let's wait for the CI build completed.

@showuon
Copy link
Contributor

showuon commented Jun 24, 2023

Failed tests are unrelated:

    Build / JDK 11 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testDualWriteScram, MetadataVersion=3.5-IV2, Security=PLAINTEXT
    Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
    Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testSyncTopicConfigs()
    Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication()
    Build / JDK 8 and Scala 2.12 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testNewAndChangedTopicsInDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT
    Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.EosIntegrationTest.shouldWriteLatestOffsetsToCheckpointOnShutdown[at_least_once]
    Build / JDK 17 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[1] tlsProtocol=TLSv1.2, useInlinePem=false
    Build / JDK 17 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[3] tlsProtocol=TLSv1.3, useInlinePem=false
    Build / JDK 17 and Scala 2.13 / org.apache.kafka.common.network.SslVersionsTransportLayerTest.tlsServerProtocol = [TLSv1.3, TLSv1.2], tlsClientProtocol = [TLSv1.2, TLSv1.3]
    Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testRestartReplication()
    Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testOneWayReplicationWithAutoOffsetSync()
    Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector
    Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
    Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()

@showuon showuon merged commit c5889fc into apache:trunk Jun 24, 2023
1 check failed
@machi1990 machi1990 deleted the KAFKA-12485 branch June 26, 2023 08:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants