Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14841 Handle callbacks to ConsumerRebalanceListener in MockConsumer #13455

Merged
merged 7 commits into from May 26, 2023

Conversation

Scanteianu
Copy link
Contributor

@Scanteianu Scanteianu commented Mar 26, 2023

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Ensure the ConsumerRebalanceListener passed to the MockConsumer is notified when a rebalance happens. Addresses a TODO comment left in the code. Computes which partitions were revoked and which partitions were assigned, and makes the appropriate callbacks.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Unit test covers the cases where a consumer is assigned partitions, revoked all partitions, and partitions are switched.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@Scanteianu
Copy link
Contributor Author

@philipnee i'd be super grateful for a review if you get the chance

// prepare for rebalance callback
Set<TopicPartition> oldAssignmentSet = this.subscriptions.assignedPartitions();
Set<TopicPartition> newAssignmentSet = new HashSet<>(newAssignment);
List<TopicPartition> added = new ArrayList<>(newAssignment.size());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure you can do set.removeAll(stuff) - saves some code there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i changed it to streams and reduced it to one line per list

public synchronized void rebalance(Collection<TopicPartition> newAssignment) {
// TODO: Rebalance callbacks
// prepare for rebalance callback
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more like, computing the partitions to be added and removed, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks - updated

@philipnee
Copy link
Collaborator

Hey I left some comments, but it looks good afterall. @showuon @guozhangwang - Would you guys be interested in reviewing this PR?

@philipnee
Copy link
Collaborator

@kirktrue - do you want to take a look?

@Scanteianu
Copy link
Contributor Author

Hey I left some comments, but it looks good afterall. @showuon @guozhangwang - Would you guys be interested in reviewing this PR?

Thank you so much, addressed!

Copy link
Collaborator

@philipnee philipnee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei - Could you take a look at this please :)

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks, @Scanteianu !

@vvcephei
Copy link
Contributor

vvcephei commented May 5, 2023

There were a lot of test failures. I just triggered it again.

It doesn't seem implausible that changing the MockConsumer could cause other tests to fail, but I didn't look at whether the failing tests actually use it.

@philipnee
Copy link
Collaborator

Thanks @vvcephei - Think this is the failure. ZK related so I think the PR is good to go.

Build / JDK 17 and Scala 2.13 / [1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT – kafka.zk.ZkMigrationIntegrationTest 1m 51s

@Scanteianu
Copy link
Contributor Author

Thanks @vvcephei - Think this is the failure. ZK related so I think the PR is good to go.

Build / JDK 17 and Scala 2.13 / [1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT – kafka.zk.ZkMigrationIntegrationTest 1m 51s

Thanks @vvcephei @philipnee! The ci failures for the various builds have all seemed independent of this change

@Scanteianu
Copy link
Contributor Author

in past few builds, at least one of the builds passed (in this one it was jdk 17 and scala 2.13). https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13455/9/pipeline/11/

Test failures look unrelated, and I'm unable to replicate locally (for instance, on jdk 8, scala 2.12 on latest build, failures were:
kafka.server.RaftClusterSnapshotTest.testSnapshotsGenerated() failed,
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary failed,
kafka.server.RaftClusterSnapshotTest.testSnapshotsGenerated() failed,)

In the previous build (only change was I pulled the latest trunk changes in, jdk 8 and scala 2.12 passed, and other configs failed https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13455/8/pipeline

It's likely safe to say that these are intermittent failures not related to the changes proposed in this PR.

@vvcephei @philipnee what are the next steps?

@philipnee philipnee added consumer tests Test fixes (including flaky tests) labels May 25, 2023
@vvcephei vvcephei merged commit 6d72c26 into apache:trunk May 26, 2023
1 check failed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
consumer tests Test fixes (including flaky tests)
Projects
None yet
3 participants