Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15771: fix ProduceRequest#partitionSizes() to make it an atomic operation #14674

Merged
merged 1 commit into from Nov 7, 2023

Conversation

fxbing
Copy link
Contributor

@fxbing fxbing commented Oct 31, 2023

Encountered a concurrency issue in method ProduceRequest#partitionSizes() while developing with Kafka. When both Thread 1 and Thread 2 concurrently call method ProduceRequest#partitionSizes(), Thread 2 may receive an incomplete or empty result if Thread 1 is still in the process of initializing partitionSizes. This is an incorrect state. the code to ensure that Thread 2 obtains the final state rather than an intermediate one.

Copy link
Collaborator

@vamossagar12 vamossagar12 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @fxbing . I am not totally clear about what we are trying to solve using this though. Could you please elaborate in the PR description section? Is there a race condition that you have identified?

@fxbing
Copy link
Contributor Author

fxbing commented Oct 31, 2023

Thanks for the PR @fxbing . I am not totally clear about what we are trying to solve using this though. Could you please elaborate in the PR description section? Is there a race condition that you have identified?

@vamossagar12 I added some instructions, please see if there are other questions

Copy link
Collaborator

@hudeqi hudeqi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this PR! @fxbing. I think this deserves a jira to track. And can you add a unit test?

@hudeqi hudeqi added the producer label Nov 1, 2023
@fxbing fxbing changed the title MINOR: fix ProduceRequest#partitionSizes() to make it an atomic operation KAFKA-15771: fix ProduceRequest#partitionSizes() to make it an atomic operation Nov 1, 2023
@fxbing
Copy link
Contributor Author

fxbing commented Nov 1, 2023

Thanks for this PR! @fxbing. I think this deserves a jira to track. And can you add a unit test?

@hudeqi Thanks for the comments, I've added the jira issue.
But regarding unit testing, it is not easy to conduct non-intrusive unit testing accurately. What I can think of is to ensure consistent results through concurrent calls by many threads, but even in the problematic version, there is a certain probability of successful execution.
Do you want to add such unit tests? Any other better suggestions?

Copy link
Collaborator

@vamossagar12 vamossagar12 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation @fxbing and for creating the JIRA ticket. It makes sense now. LGTM. Regarding tests, I agree it might be difficult to write a test which will simulate this race condition always. I am ok to not have such a test.

@vamossagar12
Copy link
Collaborator

@chia7712 , do you ming taking a look at this one plz?

@fxbing
Copy link
Contributor Author

fxbing commented Nov 6, 2023

@vamossagar12 @hudeqi Can this PR be merged?

@vamossagar12
Copy link
Collaborator

@fxbing , Both me and @hudeqi don't have access to merge PRs. We will have to wait for somebody who has access. I had pinged @chia7712 who has access and expertise on this part of code.

Copy link
Contributor

@divijvaidya divijvaidya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the change. Looks good! I will backport it to 3.5 and 3.6 as well.

@divijvaidya
Copy link
Contributor

Test failures and build failure is unrelated:

[Build / JDK 21 and Scala 2.13 / org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14674/3/testReport/junit/org.apache.kafka.connect.integration/OffsetsApiIntegrationTest/Build___JDK_21_and_Scala_2_13___testAlterSinkConnectorOffsetsZombieSinkTasks/)
[Build / JDK 21 and Scala 2.13 / kafka.coordinator.transaction.ProducerIdManagerTest.[1] error=UNKNOWN_SERVER_ERROR](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14674/3/testReport/junit/kafka.coordinator.transaction/ProducerIdManagerTest/Build___JDK_21_and_Scala_2_13____1__error_UNKNOWN_SERVER_ERROR/)
[Build / JDK 21 and Scala 2.13 / kafka.log.remote.RemoteIndexCacheTest.testClose()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14674/3/testReport/junit/kafka.log.remote/RemoteIndexCacheTest/Build___JDK_21_and_Scala_2_13___testClose__/)
[Build / JDK 8 and Scala 2.12 / kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete(String).quorum=zk](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14674/3/testReport/junit/kafka.integration/MetricsDuringTopicCreationDeletionTest/Build___JDK_8_and_Scala_2_12___testMetricsDuringTopicCreateDelete_String__quorum_zk/)
[Build / JDK 8 and Scala 2.12 / kafka.server.MetadataRequestBetweenDifferentIbpTest.testUnknownTopicId()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14674/3/testReport/junit/kafka.server/MetadataRequestBetweenDifferentIbpTest/Build___JDK_8_and_Scala_2_12___testUnknownTopicId__/)
[Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.AdjustStreamThreadCountTest.shouldAddStreamThread()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14674/3/testReport/junit/org.apache.kafka.streams.integration/AdjustStreamThreadCountTest/Build___JDK_8_and_Scala_2_12___shouldAddStreamThread__/)
[Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsets](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14674/3/testReport/junit/org.apache.kafka.connect.integration/OffsetsApiIntegrationTest/Build___JDK_17_and_Scala_2_13___testGetSinkConnectorOffsets/)
[Build / JDK 17 and Scala 2.13 / integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor(String).quorum=zk](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14674/3/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_17_and_Scala_2_13___testRackAwareRangeAssignor_String__quorum_zk/)
[Build / JDK 17 and Scala 2.13 / kafka.api.ConsumerBounceTest.testClose()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14674/3/testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_17_and_Scala_2_13___testClose__/)
[Build / JDK 17 and Scala 2.13 / kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testProduceConsumeViaSubscribe(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14674/3/testReport/junit/kafka.api/DelegationTokenEndToEndAuthorizationWithOwnerTest/Build___JDK_17_and_Scala_2_13___testProduceConsumeViaSubscribe_String__quorum_kraft/)

@divijvaidya divijvaidya added the backport-candidate This pull request is a candidate to be backported to previous versions label Nov 7, 2023
@divijvaidya divijvaidya merged commit 2d07e57 into apache:trunk Nov 7, 2023
1 check failed
divijvaidya pushed a commit that referenced this pull request Nov 7, 2023
…14674)

A commit fixes a bug in ProduceRequest#partitionSizes() that may cause this method to incorrectly returning an empty or incomplete response for a thread when another thread is in the process of initialising it. 

Reviewers: Divij Vaidya <diviv@amazon.com>, hudeqi <1217150961@qq.com>, vamossagar12 <sagarmeansocean@gmail.com>

--------------------------------
Co-authored-by: fangxiaobing <fangxiaobing@kuaishou.com>
divijvaidya pushed a commit that referenced this pull request Nov 7, 2023
…14674)

A commit fixes a bug in ProduceRequest#partitionSizes() that may cause this method to incorrectly returning an empty or incomplete response for a thread when another thread is in the process of initialising it. 

Reviewers: Divij Vaidya <diviv@amazon.com>, hudeqi <1217150961@qq.com>, vamossagar12 <sagarmeansocean@gmail.com>

--------------------------------
Co-authored-by: fangxiaobing <fangxiaobing@kuaishou.com>
mjsax pushed a commit to confluentinc/kafka that referenced this pull request Nov 22, 2023
…pache#14674)

A commit fixes a bug in ProduceRequest#partitionSizes() that may cause this method to incorrectly returning an empty or incomplete response for a thread when another thread is in the process of initialising it. 

Reviewers: Divij Vaidya <diviv@amazon.com>, hudeqi <1217150961@qq.com>, vamossagar12 <sagarmeansocean@gmail.com>

--------------------------------
Co-authored-by: fangxiaobing <fangxiaobing@kuaishou.com>
rreddy-22 pushed a commit to rreddy-22/kafka-rreddy that referenced this pull request Jan 2, 2024
…pache#14674)

A commit fixes a bug in ProduceRequest#partitionSizes() that may cause this method to incorrectly returning an empty or incomplete response for a thread when another thread is in the process of initialising it. 

Reviewers: Divij Vaidya <diviv@amazon.com>, hudeqi <1217150961@qq.com>, vamossagar12 <sagarmeansocean@gmail.com>

--------------------------------
Co-authored-by: fangxiaobing <fangxiaobing@kuaishou.com>
anurag-harness pushed a commit to anurag-harness/kafka that referenced this pull request Feb 9, 2024
…pache#14674)

A commit fixes a bug in ProduceRequest#partitionSizes() that may cause this method to incorrectly returning an empty or incomplete response for a thread when another thread is in the process of initialising it. 

Reviewers: Divij Vaidya <diviv@amazon.com>, hudeqi <1217150961@qq.com>, vamossagar12 <sagarmeansocean@gmail.com>

--------------------------------
Co-authored-by: fangxiaobing <fangxiaobing@kuaishou.com>
anurag-harness added a commit to anurag-harness/kafka that referenced this pull request Feb 9, 2024
…pache#14674) (#218)

A commit fixes a bug in ProduceRequest#partitionSizes() that may cause this method to incorrectly returning an empty or incomplete response for a thread when another thread is in the process of initialising it. 

Reviewers: Divij Vaidya <diviv@amazon.com>, hudeqi <1217150961@qq.com>, vamossagar12 <sagarmeansocean@gmail.com>

--------------------------------

Co-authored-by: Xiaobing Fang <bingxf@qq.com>
Co-authored-by: fangxiaobing <fangxiaobing@kuaishou.com>
yyu1993 pushed a commit to yyu1993/kafka that referenced this pull request Feb 15, 2024
…pache#14674)

A commit fixes a bug in ProduceRequest#partitionSizes() that may cause this method to incorrectly returning an empty or incomplete response for a thread when another thread is in the process of initialising it. 

Reviewers: Divij Vaidya <diviv@amazon.com>, hudeqi <1217150961@qq.com>, vamossagar12 <sagarmeansocean@gmail.com>

--------------------------------
Co-authored-by: fangxiaobing <fangxiaobing@kuaishou.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-candidate This pull request is a candidate to be backported to previous versions producer
Projects
None yet
4 participants