Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16312, KAFKA-16185: Local epochs in reconciliation #15511

Merged
merged 11 commits into from Mar 18, 2024

Conversation

lucasbru
Copy link
Member

The goal of this PR is to change the following internals of the reconciliation:

  • Introduce a "local epoch" to the local target assignment. When a new target is received by the server, we compare it with the current value. If it is the same, no change. Otherwise, we bump the local epoch and store the new target assignment. Then, on the reconciliation, we also store the epoch in the reconciled assignment and keep using target != current to trigger the reconciliation.
  • When we are not in a group (we have not received an assignment), we use null to represent the local target assignment instead of an empty list, to avoid confusions with an empty assignment received by the server. Similarly, we use null to represent the current assignment, when we haven't reconciled the assignment yet.
  • We also carry the new epoch into the request builder to ensure that we report the owned partitions for the last local epoch.
  • To address KAFKA-16312 (call onPartitionsAssigned on empty assignments after joining), we apply the initial assignment returned by the group coordinator (whether empty or not) as a normal reconciliation. This avoids introducing another code path to trigger rebalance listeners - reconciliation is the only way to transition to STABLE. The unneeded parts of reconciliation (autocommit, revocation) will be skipped in the existing. Since a lot of unit tests assumed that not reconciliation behavior is invoked when joining the group with an empty assignment, this required a lot of the changes in the unit tests.

Testing

These changes allow the new consumer to pass the first 10 system tests. We piggy-back a minor change to the HeartbeatManager that are required for those system tests as well: always send rebalanceTimoutMs, subscriptions when (re-)joining.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@lucasbru
Copy link
Member Author

@lianetm @dajac Could you please have a look?

Copy link
Contributor

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lucasbru Thanks for the patch. I left a few initial comments.

boolean sameAssignmentReceived = assignedTopicPartitions.equals(ownedPartitions);

if (sameAssignmentReceived) {
if (resolvedAssignment.equals(currentAssignment)) {
Copy link
Contributor

@lianetm lianetm Mar 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is short-circuiting the reconciliation if the same assignment is received (same epoch, same partitions). But we also need to consider the case that we get the same partitions assigned but in a different epoch. In that case, we should not carry on with the full reconciliation process (there is truly nothing to reconcile), but we should send an ack to the broker, so I would expect we need a similar short-circuit for if sameAssignmentInDiffEpoch => transitionToAck & return;.

It's mainly thinking about the case:

  • member owns t1-1 epoch 3
  • receives new assignment [t1-1, t2-1] epoch 4, stuck reconciling, ex. missing t2 metadata
  • receives new assignment [t1-1] epoch 5 (ex. broker realized t2 has been deleted)
  • member does not need to reconcile t1-1, but should send an ack to the broker with t1-1 that it received on a newer epoch
    Makes sense? Not sure if I may be missing something regarding the expectations

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we could do it, but do we have to, and is it the best thing we should do? It seems like a corner case to me, and maybe the easiest and cleanest behavior is to just run a full reconciliation, because the assignment changed in the meantime, even if our client never managed to reconcile the intermediate assignment. It seems users are using the ConsumerRebalanceListener to detect rebalances (as do our system tests). So the case you are describing, sounds like a rebalance event to me, so I think we should call the listener.

Copy link
Contributor

@lianetm lianetm Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well I was expecting we only need to trigger the callbacks if the assignment changed (could be to empty, but something needs to change), and that's not the case if the member ends up with t1-1 again, that it already owns.

By running a full reconciliation when the the resolved assignment is the same as the current but received later, we end up with a client reconciling the exact same assignment it already owns :S It would turn out noisy I expect, accounting for 2 rebalances in cases probably much more common, where a new topic assigned is temporarily not in metadata and then discovered: member owns [t1-1], receives assignment [t1-1, t2-1] with missing metadata for t2 (t2 discovered shortly after). Wouldn't we generate 2 rebalances??? (a 1st one with no changes in assignment, a 2nd one with the added topic once discovered) when truly things only changed once)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, my understanding is the following:

  1. Start with [A] at local epoch 10;
  2. Go to [A, B] at local epoch 11;
  3. B is not resolvable yet so we end up with [A] at local epoch 11 to reconcile;
  4. [A] is effectively the current assignment so we would trigger the callback with [];
  5. When B is finally available, we get [A, B] at local epoch 11 and reconcile.

@lianetm I think that you're saying that step 4. is not needed, right? My concern with your suggestion is that it defeats a bit the purpose of the patch. However, I do agree with you that in this particular case, we don't have to reconcile at all. Perhaps, a better way to phrase this would be to no trigger the reconciliation if all the newly partitions are waiting on the metadata and there is no revocation. Would something like this work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I will short-cut the reconciliation if the resolvable assignment did not change and if the local target epoch either did not change or was bumped once (the latter being your example). In the latter case, I'll still bump the local current epoch.

It's a bit more special casing than I was hoping for, but I see that if these "delayed topic names" are common, I'd be annoying to get two reconciliations every time.


// Return if we have an assignment, and it is the same as current assignment; comparison without creating a new collection
if (localEpoch != NONE_EPOCH) {
// check if the new assignment is different from the current target assignment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: here we are checking if the new assignment is "the same as" the current (not diff)....even maybe just remove this comment, as the one above seems enough?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -683,7 +681,10 @@ public void testDelayedMetadataUsedToCompleteAssignment() {
receiveAssignment(newAssignment, membershipManager);
membershipManager.poll(time.milliseconds());

verifyReconciliationNotTriggered(membershipManager);
// We bumped the local epoch, so new reconciliation is triggered
Copy link
Contributor

@lianetm lianetm Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new reconciliation triggered is an example of what I was referring to in the comment above, that seems to complicate the flow with 2 rebalances instead of 1. I wouldn't expect a member rebalancing/reconciling at this point (no assignment change for him yet, t2 not in metadata), and then reconciling/rebalance again once it discovers it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

if (!this.subscriptions.hasPatternSubscription()) {
// SubscribedTopicNames - only sent if has changed since the last heartbeat
// SubscribedTopicNames - only sent when joining or if has changed since the last heartbeat
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inherited, but now that we're here: "or if it has changed..."

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -1952,19 +1942,22 @@ private void assertStaleMemberLeavesGroupAndClearsAssignment(MembershipManagerIm
// Should reset epoch to leave the group and release the assignment (right away because
// there is no onPartitionsLost callback defined)
verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
assertTrue(membershipManager.currentAssignment().isEmpty());
assertTrue(membershipManager.currentAssignment().isNone());
assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty());
assertEquals(LEAVE_GROUP_MEMBER_EPOCH, membershipManager.memberEpoch());
}

@Test
public void testMemberJoiningTransitionsToStableWhenReceivingEmptyAssignment() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot find any test for one of the 2 issues we're after with this PR: ensure that we are triggering the callbacks when joining and getting empty assignment. Could we add it? I expect it should be very similar to this one, but just providing a CounterConsumerRebalanceListener and asserting that it called the onPartitionsAssigned

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could also add a test for the case described in KAFKA-16185. Is it possible?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the two tests.

@lianetm
Copy link
Contributor

lianetm commented Mar 13, 2024

hey @lucasbru, I took another full pass, left a few comments. Thanks for the changes!

Copy link
Contributor

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lucasbru Thanks for the update. I left some comments for consideration.

// RebalanceTimeoutMs - only sent if has changed since the last heartbeat
if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
// RebalanceTimeoutMs - only sent when joining
if (membershipManager.memberEpoch() == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we are here, we may be able to do the same for serverAssignor field as it never changes during the runtime.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a second look at all this logic and I actually wonder if it is better to keep the previous logic for rebalanceTimeoutMs and serverAssignor. My concern is that we should actually send out a full request when reset has been called and with the change it is not the case anymore. The main was the send a full request when joining or on any errors (e.g. request timeout).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of slack discussion: We need to resend a full request after a failure, to let the GC know that it needs to send a full response (including repeating the assignment). The reason is that if you had a request timeout for a heartbeat request, that heartbeat could have been the one with a new assignment for you. If you don't send a full request next, the server will assume that you've receive that last one so it won't deliver the new assignment again. So sending a full request is an implicit "force full response".

I reverted that part of the changes (so I brought back the rebalanceMs and serverAssignor in sentFields). We could consider only using reset on the heartbeat state (resetting when transitioning to JOINING may be enough). However, that seems more brittle, it’s quite easy to introduce a code path where we forget to reset the heartbeat state. Having a contract like “always send when epoch==0” for the joining case is easier in my eyes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this was indepndently also implemented in #15401 which I had seen but forgotten about. So I will rebase and epoch == 0 conditions will come from the other PR (in the form of an equivalent state == JOINING condition).

*/
interface LocalAssignment {

Map<Uuid, SortedSet<Integer>> getPartitions();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We usually don't prefix getters with get.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

boolean sameAssignmentReceived = assignedTopicPartitions.equals(ownedPartitions);

if (sameAssignmentReceived) {
if (resolvedAssignment.equals(currentAssignment)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, my understanding is the following:

  1. Start with [A] at local epoch 10;
  2. Go to [A, B] at local epoch 11;
  3. B is not resolvable yet so we end up with [A] at local epoch 11 to reconcile;
  4. [A] is effectively the current assignment so we would trigger the callback with [];
  5. When B is finally available, we get [A, B] at local epoch 11 and reconcile.

@lianetm I think that you're saying that step 4. is not needed, right? My concern with your suggestion is that it defeats a bit the purpose of the patch. However, I do agree with you that in this particular case, we don't have to reconcile at all. Perhaps, a better way to phrase this would be to no trigger the reconciliation if all the newly partitions are waiting on the metadata and there is no revocation. Would something like this work?


@Override
public String toString() {
return "{" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we add LocalAssignmentImpl too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 1579 to 1580
partitions.get(tp.topicId()).size() == tp.partitions().size() &&
partitions.get(tp.topicId()).containsAll(tp.partitions()))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I wonder if we could use equals here. Would it work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that's comparing a list and a set and wouldn't work

@@ -1952,19 +1942,22 @@ private void assertStaleMemberLeavesGroupAndClearsAssignment(MembershipManagerIm
// Should reset epoch to leave the group and release the assignment (right away because
// there is no onPartitionsLost callback defined)
verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
assertTrue(membershipManager.currentAssignment().isEmpty());
assertTrue(membershipManager.currentAssignment().isNone());
assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty());
assertEquals(LEAVE_GROUP_MEMBER_EPOCH, membershipManager.memberEpoch());
}

@Test
public void testMemberJoiningTransitionsToStableWhenReceivingEmptyAssignment() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could also add a test for the case described in KAFKA-16185. Is it possible?

// reconciled. This is ensured by resending the topic partitions whenever the local assignment,
// including its local epoch is changed (although the local epoch is not sent in the heartbeat).
LocalAssignment local = membershipManager.currentAssignment();
if (local == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could use isNone.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member Author

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reviews! I updated the PR @lianetm @dajac


// Return if we have an assignment, and it is the same as current assignment; comparison without creating a new collection
if (localEpoch != NONE_EPOCH) {
// check if the new assignment is different from the current target assignment
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


@Override
public String toString() {
return "{" +
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// RebalanceTimeoutMs - only sent if has changed since the last heartbeat
if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
// RebalanceTimeoutMs - only sent when joining
if (membershipManager.memberEpoch() == 0) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

if (!this.subscriptions.hasPatternSubscription()) {
// SubscribedTopicNames - only sent if has changed since the last heartbeat
// SubscribedTopicNames - only sent when joining or if has changed since the last heartbeat
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// reconciled. This is ensured by resending the topic partitions whenever the local assignment,
// including its local epoch is changed (although the local epoch is not sent in the heartbeat).
LocalAssignment local = membershipManager.currentAssignment();
if (local == null) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*/
interface LocalAssignment {

Map<Uuid, SortedSet<Integer>> getPartitions();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -683,7 +681,10 @@ public void testDelayedMetadataUsedToCompleteAssignment() {
receiveAssignment(newAssignment, membershipManager);
membershipManager.poll(time.milliseconds());

verifyReconciliationNotTriggered(membershipManager);
// We bumped the local epoch, so new reconciliation is triggered
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 1579 to 1580
partitions.get(tp.topicId()).size() == tp.partitions().size() &&
partitions.get(tp.topicId()).containsAll(tp.partitions()))) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that's comparing a list and a set and wouldn't work

@@ -1952,19 +1942,22 @@ private void assertStaleMemberLeavesGroupAndClearsAssignment(MembershipManagerIm
// Should reset epoch to leave the group and release the assignment (right away because
// there is no onPartitionsLost callback defined)
verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
assertTrue(membershipManager.currentAssignment().isEmpty());
assertTrue(membershipManager.currentAssignment().isNone());
assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty());
assertEquals(LEAVE_GROUP_MEMBER_EPOCH, membershipManager.memberEpoch());
}

@Test
public void testMemberJoiningTransitionsToStableWhenReceivingEmptyAssignment() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the two tests.

if (currentAssignment != LocalAssignmentImpl.NONE &&
resolvedAssignment.localEpoch <= currentAssignment.localEpoch + 1 &&
resolvedAssignment.partitions.equals(currentAssignment.partitions)) {
log.debug("Ignoring reconciliation attempt. The resolvable fragment of the target assignment {} " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to consider maybe simplifying the log and clarify the situation: isn't the message here simply that we're ignoring the reconciliation because resolved target is equals to the current assignment? I get the point about intermediate assignments, but an intermediate one would have updated the current assignment so it wouldn't be equals to the resolved target (or leave a reconciliation in progress so it wouldn't even make it to this check)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to talk about the local epoch here, since it's more of implementation detail how to detect intermediate assignments. But then I should log the local epoch I supposed. Updated it accordingly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dajac and I discussed various options to implement the short-cutting. In the end, the outcome was to only skip calling the callbacks but not skipping the entire reconciliation if the resolved assignment is equal to the current assignment but with a different epoch. We still need to update the current assignment with the resolved assignment and the new epoch in order to trigger the "ack", and transition to sending ack as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree in the need to update the assignment. I was exactly pushing for doing only only what's needed (vs doing all that the reconciliation does), so this sounds good to me. Thanks!

@@ -2279,22 +2277,23 @@ private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean expectSubscri
return mockJoinAndReceiveAssignment(expectSubscriptionUpdated, createAssignment(expectSubscriptionUpdated));
}

private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean expectSubscriptionUpdated,
private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean triggerReconciliation,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with the renamed param, but could we update it also in the same overloaded method above, just for consistency

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -1028,9 +1028,9 @@ public void testNewEmptyAssignmentReplacesPreviousOneWaitingOnMetadata() {

verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.onHeartbeatRequestSent();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest we add the check that a reconciliation was triggered here, just adding
verifyReconciliationTriggeredAndCompleted(membershipManager, Collections.emptyList()); right after poll. It's part of what this PR is introducing and it completes the pic of what's happening when getting the first (empty) assignment that can be reconciled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@lianetm
Copy link
Contributor

lianetm commented Mar 14, 2024

Thanks for the updates @lucasbru, left some minor comments.

Copy link
Member Author

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @lianetm . Addressed the second round of comments.

@@ -2279,22 +2277,23 @@ private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean expectSubscri
return mockJoinAndReceiveAssignment(expectSubscriptionUpdated, createAssignment(expectSubscriptionUpdated));
}

private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean expectSubscriptionUpdated,
private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean triggerReconciliation,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -1028,9 +1028,9 @@ public void testNewEmptyAssignmentReplacesPreviousOneWaitingOnMetadata() {

verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
membershipManager.onHeartbeatRequestSent();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (currentAssignment != LocalAssignmentImpl.NONE &&
resolvedAssignment.localEpoch <= currentAssignment.localEpoch + 1 &&
resolvedAssignment.partitions.equals(currentAssignment.partitions)) {
log.debug("Ignoring reconciliation attempt. The resolvable fragment of the target assignment {} " +
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to talk about the local epoch here, since it's more of implementation detail how to detect intermediate assignments. But then I should log the local epoch I supposed. Updated it accordingly.

Copy link
Contributor

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lucasbru Thanks for the update. Overall, it looks pretty good to me. I left a few minor comments.

// reconciled. This is ensured by resending the topic partitions whenever the local assignment,
// including its local epoch is changed (although the local epoch is not sent in the heartbeat).
LocalAssignment local = membershipManager.currentAssignment();
if (!local.equals(sentFields.localAssignment)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need membershipManager.memberEpoch() == 0 here too? I suppose that it works because the current assignment is reset in the membership manager but it may be better to add it here for consistency.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

final LocalAssignment resolvedAssignment = new LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions);

if (!currentAssignment.isNone() &&
resolvedAssignment.partitions.equals(currentAssignment.partitions)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we bring this one on the previous line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


if (!currentAssignment.isNone() &&
resolvedAssignment.partitions.equals(currentAssignment.partitions)) {
log.debug("There are unresolved partitions, and the resolvable fragment of the target assignment {} is equal to the current "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: There is an extra space before target.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -970,7 +973,11 @@ void maybeReconcile() {
log.debug("Auto-commit before reconciling new assignment completed successfully.");
}

revokeAndAssign(assignedTopicIdPartitions, revokedPartitions, addedPartitions);
revokeAndAssign(resolvedAssignment, assignedTopicIdPartitions, revokedPartitions, addedPartitions);
}).whenComplete((__, error) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we use exceptionally?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - but it requires a return null :/

Comment on lines 90 to 93
private final long retryBackoffMs = DEFAULT_RETRY_BACKOFF_MS;
private final int heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS;
private final int maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL_MS;
private final long retryBackoffMaxMs = DEFAULT_RETRY_BACKOFF_MAX_MS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If there are final, could we remove them and directly use the constants where we need to?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


ConsumerGroupHeartbeatRequest heartbeatRequest3 = getHeartbeatRequest(heartbeatRequestManager, version);
assertEquals(Collections.singletonList(expectedTopicPartitions), heartbeatRequest3.data().topicPartitions());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could remove this empty line.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -29,6 +29,8 @@
import org.apache.kafka.common.TopicPartition;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could add the example that we discussed offline as a test:

0: [T1, T2] -- T2 unresolved (only T1 is reconciled)
1: [T1, T2, T3] -- T2 unresolved (skipped, since reconciliation in progress)
2: [T1, T2] -- T2 unresolved

What do you think?

Copy link
Contributor

@lianetm lianetm Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree it's not covered. (Happy to take that separately myself, right after this if it helps @lucasbru...knowing you're getting busy very soon)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done (slighly simplified, but same idea)

}

Optional<LocalAssignment> updateWith(ConsumerGroupHeartbeatResponseData.Assignment assignment) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we remove this empty line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@lianetm
Copy link
Contributor

lianetm commented Mar 15, 2024

Thanks for the updates @lucasbru, took another look, LGTM considering the latests nits.

Just for the record, I will follow-up with KAFKA-16375 for how ongoing reconciliations are discarded after rejoining.

Copy link
Member Author

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, all comments addressed

}

Optional<LocalAssignment> updateWith(ConsumerGroupHeartbeatResponseData.Assignment assignment) {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

final LocalAssignment resolvedAssignment = new LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions);

if (!currentAssignment.isNone() &&
resolvedAssignment.partitions.equals(currentAssignment.partitions)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


if (!currentAssignment.isNone() &&
resolvedAssignment.partitions.equals(currentAssignment.partitions)) {
log.debug("There are unresolved partitions, and the resolvable fragment of the target assignment {} is equal to the current "
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -970,7 +973,11 @@ void maybeReconcile() {
log.debug("Auto-commit before reconciling new assignment completed successfully.");
}

revokeAndAssign(assignedTopicIdPartitions, revokedPartitions, addedPartitions);
revokeAndAssign(resolvedAssignment, assignedTopicIdPartitions, revokedPartitions, addedPartitions);
}).whenComplete((__, error) -> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - but it requires a return null :/

Comment on lines 90 to 93
private final long retryBackoffMs = DEFAULT_RETRY_BACKOFF_MS;
private final int heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS;
private final int maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL_MS;
private final long retryBackoffMaxMs = DEFAULT_RETRY_BACKOFF_MAX_MS;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


ConsumerGroupHeartbeatRequest heartbeatRequest3 = getHeartbeatRequest(heartbeatRequestManager, version);
assertEquals(Collections.singletonList(expectedTopicPartitions), heartbeatRequest3.data().topicPartitions());

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -29,6 +29,8 @@
import org.apache.kafka.common.TopicPartition;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done (slighly simplified, but same idea)

// reconciled. This is ensured by resending the topic partitions whenever the local assignment,
// including its local epoch is changed (although the local epoch is not sent in the heartbeat).
LocalAssignment local = membershipManager.currentAssignment();
if (!local.equals(sentFields.localAssignment)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// Receive extended assignment - assignment received but no reconciliation triggered
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment2).data());
assertEquals(MemberState.RECONCILING, membershipManager.state());
verifyReconciliationNotTriggered(membershipManager);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to double check, it is intentional here that you're not calling poll right? (that's why a reconciliation is not triggered here, otherwise I would expect we do trigger a reconciliation for t1-1)...I guess this is what you refer to as "intermediate" assignment (an assignment received and removed before a call to poll)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly

@lianetm
Copy link
Contributor

lianetm commented Mar 15, 2024

Went over the latest updates, LGTM. Thanks!

Copy link
Contributor

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @lucasbru!

@lucasbru lucasbru merged commit 5c92987 into apache:trunk Mar 18, 2024
1 check failed
clolov pushed a commit to clolov/kafka that referenced this pull request Apr 5, 2024
The goal of this commit is to change the following internals of the reconciliation:

- Introduce a "local epoch" to the local target assignment. When a new target is received by the server, we compare it with the current value. If it is the same, no change. Otherwise, we bump the local epoch and store the new target assignment. Then, on the reconciliation, we also store the epoch in the reconciled assignment and keep using target != current to trigger the reconciliation.
- When we are not in a group (we have not received an assignment), we use null to represent the local target assignment instead of an empty list, to avoid confusions with an empty assignment received by the server. Similarly, we use null to represent the current assignment, when we haven't reconciled the assignment yet.
We also carry the new epoch into the request builder to ensure that we report the owned partitions for the last local epoch.
- To address KAFKA-16312 (call onPartitionsAssigned on empty assignments after joining), we apply the initial assignment returned by the group coordinator (whether empty or not) as a normal reconciliation. This avoids introducing another code path to trigger rebalance listeners - reconciliation is the only way to transition to STABLE. The unneeded parts of reconciliation (autocommit, revocation) will be skipped in the existing. Since a lot of unit tests assumed that not reconciliation behavior is invoked when joining the group with an empty assignment, this required a lot of the changes in the unit tests.

Reviewers: Lianet Magrans <lianetmr@gmail.com>, David Jacot <djacot@confluent.io>
apoorvmittal10 added a commit to confluentinc/kafka that referenced this pull request Apr 11, 2024
* KAFKA-14589 [2/4] Tests of ConsoleGroupCommand rewritten in java (#15363)

This PR is part of #14471
It contains some of ConsoleGroupCommand tests rewritten in java.
Intention of separate PR is to reduce changes and simplify review.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16246: Cleanups in ConsoleConsumer (#15457)


Reviewers: Mickael Maison <mickael.maison@gmail.com>, Omnia Ibrahim <o.g.h.ibrahim@gmail.com>

* KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 (#15261)

The previous pull request in this series was #15112.

This pull request continues the migration of the consumer mock in TaskManagerTest test by test for easier reviews.

I envision there will be at least 1 more pull request to clean things up. For example, all calls to taskManager.setMainConsumer should be removed.

Reviewer: Bruno Cadonna <cadonna@apache.org>

* KAFKA-16100: Add timeout to all the CompletableApplicationEvents (#15455)

This is part of the larger task of enforcing the timeouts for application events, per KAFKA-15974.

This takes a first step by adding a Timer to all of the CompletableApplicationEvent subclasses. For the few classes that already included a timeout, this refactors them to use the Timer mechanism instead.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Bruno Cadonna <cadonna@apache.org>

* MINOR: Add 3.7.0 to core and client's upgrade compatibility tests (#15452)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16319: Divide DeleteTopics requests by leader node (#15479)


Reviewers: Reviewers: Mickael Maison <mickael.maison@gmail.com>, Kirk True <kirk@kirktrue.pro>, Daniel Gospodinow <dgospodinov@confluent.io>

* MINOR: Add read/write all operation (#15462)

There are a few cases in the group coordinator service where we want to read from or write to each of the known coordinators (each of __consumer_offsets partitions). The current implementation needs to get the list of the known coordinators then schedules the operation and finally aggregate the results. This patch is an attempt to streamline this by adding multi read/write to the runtime.

Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-15964: fix flaky StreamsAssignmentScaleTest (#15485)

This PR bumps some timeouts due to slow Jenkins builds.

Reviewers: Bruno Cadonna <bruno@confluent.io>

* MINOR: Use INFO logging for tools tests (#15487)

Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16202 Extra dot in error message in producer (#15296)

The author of KAFKA-16202 noticed that there is an extra dot in the error message for KafkaStorageException message.

Looking into org.apache.kafka.clients.producer.internals.Sender, it turns out that the string for the message to be sent in completeBatch() added an extra dot. I think that the formatted component (error.exception(response.errorMessage).toString())) of the error message already has a dot in the end of its string. Thus the dot after the "{}" sign caused the extra dot.

Reviewers: "Gyeongwon, Do" <dct012@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16325 Add missing producer metrics to documentatio (#15466)

Add `buffer-exhausted-rate`, `buffer-exhausted-total`, `bufferpool-wait-ratio` and `metadata-wait-time-ns-total`

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* MINOR: Reduce memory allocation in ClientTelemetryReporter (#15402)

Reviewers: Divij Vaidya <diviv@amazon.com>

* KAFKA-10892: Shared Readonly State Stores ( revisited ) (#12742)

Implements KIP-813.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Walker Carlson <wcarlson@confluent.io>

* KAFKA-14589 [4/4] Tests of ConsoleGroupCommand rewritten in java (#15465)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* TRIVIAL: fix typo

* HOTFIX: fix html markup

* MINOR: Fix incorrect syntax for config (#15500)

Fix incorrect syntax for config.

Reviewers: Matthias J. Sax <matthias@confluent.io>

* MINOR: remove the copy constructor of LogSegment (#15488)

In the LogSegment, the copy constructor is only used in LogLoaderTest

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* MINOR: Cleanup log.dirs in ReplicaManagerTest on JVM exit (#15289)

- Scala TestUtils now delegates to the function in JTestUtils
- The function is modified such that we delete the rootDir on JVM exit if it didn't exist prior to the function being invoked.

Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>

* MINOR: change "inter.broker.protocol version" to inter.broker.protocol.version (#15504)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16146: Checkpoint log-start-offset for remote log enabled topics (#15201)

The log-start-offset was not getting flushed to the checkpoint file due to the check where we compare the log-start-offset with the localLog first segment base offset only. This change makes sure that tiered storage enabled topics will always try to add their entries in the log-start-offset checkpoint file.

Reviewers: Jun Rao <junrao@gmail.com>, Satish Duggana <satishd@apache.org>

* KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 3 (#15497)

The previous pull request in this series was #15261.

This pull request continues the migration of the consumer mock in TaskManagerTest test by test for easier reviews.

The next pull request in the series will be #15254 which ought to complete the Mockito migration for the TaskManagerTest class

Reviewer: Bruno Cadonna <cadonna@apache.org>

* KAFKA-16227: Avoid IllegalStateException during fetch initialization (#15491)

The AsyncKafkaConsumer might throw an IllegalStateException during
the initialization of a new fetch. The exception is caused by
 the partition being unassigned by the background thread before
the subscription state is accessed during initialisation.

This commit avoids the IllegalStateException by verifying that
the partition was not unassigned each time the subscription state
is accessed.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>

* MINOR: Tweak streams config doc (#15518)

Reviewers: Matthias J. Sax <matthias@confluent.io>

* MINOR: Resolve SSLContextFactory.getNeedClientAuth deprecation (#15468)

Reviewers: Mickael Maison <mickael.maison@gmail.com>

* MINOR; Make string from array (#15526)

If toString is called on an array it returns the string representing the object reference.  Use mkString instead to print the content of the array.

Reviewers: Luke Chen <showuon@gmail.com>, Justine Olshan <jolshan@confluent.io>, Lingnan Liu <liliu@confluent.io>

* MINOR: simplify consumer logic (#15519)

For static member, the `group.instance.id` cannot change.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Lianet Magrans <lianetmr@gmail.com>, David Jacot <david.jacot@gmail.com>

* MINOR: Kafka Streams docs fixes (#15517)

- add missing section to TOC
- add default value for client.id

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Bruno Cadonna <bruno@confluent.io>

* KAFKA-16249; Improve reconciliation state machine (#15364)

This patch re-work the reconciliation state machine on the server side with the goal to fix a few issues that we have recently discovered.
* When a member acknowledges the revocation of partitions (by not reporting them in the heartbeat), the current implementation may miss it. The issue is that the current implementation re-compute the assignment of a member whenever there is a new target assignment installed. When it happens, it does not consider the reported owned partitions at all. As the member is supposed to only report its own partitions when they change, the member is stuck.
* Similarly, as the current assignment is re-computed whenever there is a new target assignment, the rebalance timeout, as it is currently implemented, becomes useless. The issue is that the rebalance timeout is reset whenever the member enters the revocation state. In other words, in the current implementation, the timer is reset when there are no target available even if the previous revocation is not completed yet.

The patch fixes these two issues by not automatically recomputing the assignment of a member when a new target assignment is available. When the member must revoke partitions, the coordinator waits. Otherwise, it recomputes the next assignment. In other words, revoking is really blocking now.

The patch also proposes to include an explicit state in the record. It makes the implementation cleaner and it also makes it more extensible in the future.

The patch also changes the record format. This is a non-backward compatible change. I think that we should do this change to cleanup the record. As KIP-848 is only in early access in 3.7 and that we clearly state that we don't plane to support upgrade from it, this is acceptable in my opinion.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>

* KAFKA-13922: Adjustments for jacoco, coverage reporting (#11982)

Jacoco and scoverage reporting hasn't been working for a while. This commit fixes report generation. After this PR only subproject level reports are generated as Jenkins and Sonar only cares about that.
This PR doesn't change Kafka's Jenkinsfile.

Reviewers: Viktor Somogyi-Vass <viktorsomogyi@gmail.com>

* MINOR: AddPartitionsToTxnManager performance optimizations (#15454)


Reviewers: Mickael Maison <mickael.maison@gmail.com>, Justine Olshan <jolshan@confluent.io>

* KAFKA-14683 Cleanup WorkerSinkTaskTest (#15506)

1) Rename WorkerSinkTaskMockitoTest back to WorkerSinkTaskTest
2) Tidy up the code a bit
3) rewrite "fail" by "assertThrow"

Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16342 fix getOffsetByMaxTimestamp for compressed records (#15474)

Fix getOffsetByMaxTimestamp for compressed records.

This PR adds:

1) For inPlaceAssignment case, compute the correct offset for maxTimestamp when traversing the batch records, and set to ValidationResult in the end, instead of setting to last offset always.

2) For not inPlaceAssignment, set the offsetOfMaxTimestamp for the log create time, like non-compressed, and inPlaceAssignment cases, instead of setting to last offset always.

3) Add tests to verify the fix.

Reviewers: Jun Rao <junrao@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-15206: Fix the flaky RemoteIndexCacheTest.testClose test (#15523)

It is possible that due to resource constraint, ShutdownableThread#run might be called later than the ShutdownableThread#close method.

Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>

* MINOR: Update javadocs and exception string in "deprecated" ProcessorRecordContext#hashcode (#15508)

This PR updates the javadocs for the "deprecated" hashCode() method of ProcessorRecordContext, as well as the UnsupportedOperationException thrown in its implementation, to actually explain why the class is mutable and therefore unsafe for use in hash collections. They now point out the mutable field in the class (namely the Headers)

Reviewers: Matthias Sax <mjsax@apache.org>, Bruno Cadonna <cadonna@apache.org>

* KAFKA-16358: Sort transformations by name in documentation; add missing transformations to documentation; add hyperlinks (#15499)

Reviewers: Yash Mayya <yash.mayya@gmail.com>

* MINOR: Only enable replay methods to modify timeline data structure (#15528)

The patch prevents the main method (the method generating records) from modifying the timeline data structure `groups`  by calling `getOrMaybeCreateConsumerGroup` in kip-848 new group coordinator. Only replay methods are able to add the newly created group to `groups`.

Reviewers: David Jacot <djacot@confluent.io>

* KAFKA-16231: Update consumer_test.py to support KIP-848’s group protocol config (#15330)

Added a new optional group_protocol parameter to the test methods, then passed that down to the setup_consumer method.

Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>

* MINOR: Cleanup BoundedList to Make Constructors More Safe (#15507)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16267: Update consumer_group_command_test.py to support KIP-848’s group protocol config (#15537)

* KAFKA-16267: Update consumer_group_command_test.py to support KIP-848’s group protocol config

Added a new optional group_protocol parameter to the test methods, then passed that down to the setup_consumer method.

Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢

Note: this requires #15330.

* Update consumer_group_command_test.py

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>

* KAFKA-16268: Update fetch_from_follower_test.py to support KIP-848’s group protocol config (#15539)

Added a new optional `group_protocol` parameter to the test methods, then passed that down to the `setup_consumer` method.

Unfortunately, because the new consumer can only be used with the new coordinator, this required a new `@matrix` block instead of adding the `group_protocol=["classic", "consumer"]` to the existing blocks 😢

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>

* KAFKA-16269: Update reassign_partitions_test.py to support KIP-848’s group protocol config (#15540)

Added a new optional `group_protocol` parameter to the test methods, then passed that down to the `setup_consumer` method.

Unfortunately, because the new consumer can only be used with the new coordinator, this required a new `@matrix` block instead of adding the `group_protocol=["classic", "consumer"]` to the existing blocks 😢

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>

* KAFKA-16270: Update snapshot_test.py to support KIP-848’s group protocol config (#15538)

Added a new optional `group_protocol` parameter to the test methods, then passed that down to the `setup_consumer` method.

Unfortunately, because the new consumer can only be used with the new coordinator, this required a new `@matrix` block instead of adding the `group_protocol=["classic", "consumer"]` to the existing blocks 😢

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>

* KAFKA-16190: Member should send full heartbeat when rejoining (#15401)

When the consumer rejoins, heartbeat request builder make sure that all fields are sent in the heartbeat request.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>

* MINOR: fix flaky EosIntegrationTest (#15494)

Bumping some timeout due to slow Jenkins build.

Reviewers: Bruno Cadonna <bruno@confluent.io>

* MINOR: Remove unused client side assignor fields/classes (#15545)

In https://github.com/apache/kafka/pull/15364, we introduced, thoughtfully, a non-backward compatible record change for the new consumer group protocol. So it is a good opportunity for cleaning unused fields, mainly related to the client side assignor logic which is not implemented yet. It is better to introduce them when we need them and more importantly when we implement it.

Note that starting from 3.8, we won't make such changes anymore. Non-backward compatible changes are still acceptable now because we clearly said that upgrade won't be supported from the KIP-848 EA.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16369: Broker may not shut down when SocketServer fails to bind as Address already in use (#15530)

* KAFKA-16369: wait on enableRequestProcessingFuture

Add a Wait in in KafkaServer (ZK mode) for all the SocketServer ports
to be open, and the Acceptors to be started

The BrokerServer (KRaft mode) had such a wait,
which was missing from the KafkaServer (ZK mode).

Add unit test.

* KAFKA-16312, KAFKA-16185: Local epochs in reconciliation (#15511)

The goal of this commit is to change the following internals of the reconciliation:

- Introduce a "local epoch" to the local target assignment. When a new target is received by the server, we compare it with the current value. If it is the same, no change. Otherwise, we bump the local epoch and store the new target assignment. Then, on the reconciliation, we also store the epoch in the reconciled assignment and keep using target != current to trigger the reconciliation.
- When we are not in a group (we have not received an assignment), we use null to represent the local target assignment instead of an empty list, to avoid confusions with an empty assignment received by the server. Similarly, we use null to represent the current assignment, when we haven't reconciled the assignment yet.
We also carry the new epoch into the request builder to ensure that we report the owned partitions for the last local epoch.
- To address KAFKA-16312 (call onPartitionsAssigned on empty assignments after joining), we apply the initial assignment returned by the group coordinator (whether empty or not) as a normal reconciliation. This avoids introducing another code path to trigger rebalance listeners - reconciliation is the only way to transition to STABLE. The unneeded parts of reconciliation (autocommit, revocation) will be skipped in the existing. Since a lot of unit tests assumed that not reconciliation behavior is invoked when joining the group with an empty assignment, this required a lot of the changes in the unit tests.

Reviewers: Lianet Magrans <lianetmr@gmail.com>, David Jacot <djacot@confluent.io>

* MINOR; Log reason for deleting a kraft snapshot (#15478)

There are three reasons why KRaft would delete a snapshot. One, it is older than the retention time. Two, the total number of bytes between the log and the snapshot excess the configuration. Three, the latest snapshot is newer than the log.

This change allows KRaft to log the exact reason why a snapshot is getting deleted.

Reviewers: David Arthur <mumrah@gmail.com>, Hailey Ni <hni@confluent.io>

* KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort state (#15524)

Now the removal of entries from the transactionsWithPendingMarkers map
checks the value and all pending marker operations keep the value along
with the operation state.  This way, the pending marker operation can
only delete the state it created and wouldn't accidentally delete the
state from a different epoch (which could lead to "stuck" transactions).

Reviewers: Justine Olshan <jolshan@confluent.io>

* KAFKA-16341 fix the LogValidator for non-compressed type (#15476)

- Fix the verifying logic. If it's LOG_APPEND_TIME, we choose the offset of the first record. Else, we choose the record with the maxTimeStamp.
- rename the shallowOffsetOfMaxTimestamp to offsetOfMaxTimestamp

Reviewers: Jun Rao <junrao@gmail.com>, Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received (#15533)

This patch fixes a bug in the logic which decides when a full ConsumerGroupHeartbeat response must be returned to the client. Prior to it, the logic only relies on the `ownedTopicPartitions` field to check whether the response was a full response. This is not enough because `ownedTopicPartitions` is also set in different situations. This patch changes the logic to check `ownedTopicPartitions`, `subscribedTopicNames` and `rebalanceTimeoutMs` as they are the only three non optional fields.

Reviewers: Lianet Magrans <lianetmr@gmail.com>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>

* KAFKA-12187 replace assertTrue(obj instanceof X) with assertInstanceOf (#15512)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* MINOR: Update upgrade docs to refer 3.6.2 version

* KAFKA-16222: desanitize entity name when migrate client quotas (#15481)

The entity name is sanitized when it's in Zk mode.
We didn't desanitize it when we migrate client quotas. Add Sanitizer.desanitize to fix it.

Reviewers: Luke Chen <showuon@gmail.com>

* KAFKA-14589 ConsumerGroupCommand rewritten in java (#14471)

This PR contains changes to rewrite ConsumerGroupCommand in java and transfer it to tools module

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16313: Offline group protocol migration (#15546)

This patch enables an empty classic group to be automatically converted to a new consumer group and vice versa.

Reviewers: David Jacot <djacot@confluent.io>

* KAFKA-16392: Stop emitting warning log message when parsing source connector offsets with null partitions (#15562)

Reviewers: Yash Mayya <yash.mayya@gmail.com>

* MINOR : Removed the depreciated information about Zk to Kraft migration. (#15552)

Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>

* MINOR: Update muckrake version mapping for 3.7 (#1118)

* MINOR: Bump master to 7.8.0-0 (#1107)

* KAFKA-16318 : add javafoc for kafka metric (#15483)

Add the javadoc for KafkaMetric

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16206: Fix unnecessary topic config deletion during ZK migration (#14206)


Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ron Dagostino <rndgstn@gmail.com>

* KAFKA-16273: Update consumer_bench_test.py to use consumer group protocol (#15548)

Adding this as part of the greater effort to modify the system tests to incorporate the use of consumer group protocol from KIP-848. Following is the test results and the tests using protocol = consumer are expected to fail:

================================================================================
SESSION REPORT (ALL TESTS)
ducktape version: 0.11.4
session_id:       2024-03-16--002
run time:         76 minutes 36.150 seconds
tests run:        28
passed:           25
flaky:            0
failed:           3
ignored:          0
================================================================================

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Kirk True <ktrue@confluent.io>

* MINOR: KRaft upgrade tests should only use latest stable mv (#15566)

This should help us avoid testing MVs before they are usable (stable).
We revert back from testing 3.8 in this case since 3.7 is the current stable version.

Reviewers: Proven Provenzano <pprovenzano@confluent.io>, Justine Olshan <jolshan@confluent.io>

* KAFKA-14133: Move stateDirectory mock in TaskManagerTest to Mockito (#15254)

This pull requests migrates the StateDirectory mock in TaskManagerTest from EasyMock to Mockito.
The change is restricted to a single mock to minimize the scope and make it easier for review.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Bruno Cadonna <cadonna@apache.org>

* KAFKA-16271: Upgrade consumer_rolling_upgrade_test.py (#15578)

Upgrading the test to use the consumer group protocol. The two tests are failing due to Mismatch Assignment

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>

* KAFKA-16274: Update replica_scale_test.py to support KIP-848’s group protocol config (#15577)

Added a new optional group_protocol parameter to the test methods, then passed that down to the methods involved.

Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>

* KAFKA-16276: Update transactions_test.py to support KIP-848’s group protocol config (#15567)

Added a new optional group_protocol parameter to the test methods, then passed that down to the methods involved.

Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>

* KAFKA-16314: Introducing the AbortableTransactionException (#15486)

As a part of KIP-890, we are introducing a new class of Exceptions which when encountered shall lead to Aborting the ongoing Transaction. The following PR introduces the same with client side handling and server side changes.

On client Side, the code attempts to handle the exception as an Abortable error and ensure that it doesn't take the producer to a fatal state. For each of the Transactional APIs, we have added the appropriate handling. For the produce request, we have verified that the exception transitions the state to Aborted.
On the server side, we have bumped the ProduceRequest, ProduceResponse, TxnOffestCommitRequest and TxnOffsetCommitResponse Version. The appropriate handling on the server side has been added to ensure that the new error case is sent back only for the new clients. The older clients will continue to get the old Invalid_txn_state exception to maintain backward compatibility.

Reviewers: Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>

* KAFKA-16381 use volatile to guarantee KafkaMetric#config visibility across threads (#15550)

Reviewers: vamossagar12 <sagarmeansocean@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>

* MINOR: Tuple2 replaced with Map.Entry (#15560)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16388 add production-ready test of 3.3 - 3.6 release to MetadataVersionTest.testFromVersionString (#15563)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16408  kafka-get-offsets / GetOffsetShell doesn't handle --version or --help (#15583)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16410  kafka-leader-election / LeaderElectionCommand doesn't set exit code on error (#15591)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16374; High watermark updates should have a higher priority (#15534)

When the group coordinator is under heavy load, the current mechanism to release pending events based on updated high watermark, which consist in pushing an event at the end of the queue, is bad because pending events pay the cost of the queue twice. A first time for the handling of the first event and a second time for the handling of the hwm update. This patch changes this logic to push the hwm update event to the front of the queue in order to release pending events as soon as as possible.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>

* KAFKA-15882: Add nightly docker image scan job (#15013)


Reviewers: Mickael Maison <mickael.maison@gmail.com>

* KAFKA-16375: Fix for rejoin while reconciling (#15579)

This PR includes a fix to properly identify a reconciliation that should be interrupted and not applied because the member has rejoined. It does so simply based on a flag (not epochs, server or local). If the member has rejoined while reconciling, the reconciliation will be interrupted.

This also ensures that the check to abort the reconciliation is performed on all the 3 stages of the reconciliation that could be delayed: commit, onPartitionsRevoked, onPartitionsAssigned.

Reviewers: David Jacot <djacot@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>

* KAFKA-16406: Splitting consumer integration test (#15535)

Splitting consumer integration tests to allow for parallelization and reduce build times. This PR is only extracting tests from PlainTextConsumerTest into separate files, no changes in logic. Grouping tests by the feature they relate to so that they can be easily found

Reviewers: Andrew Schofield <aschofield@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>

* KAFKA-15950: Serialize heartbeat requests (#14903)

In between HeartbeatRequest being sent and the response being handled,
i.e. while a HeartbeatRequest is in flight, an extra request may be
immediately scheduled if propagateDirectoryFailure, setReadyToUnfence,
or beginControlledShutdown is called.

To prevent the extra request, we can avoid the extra requests by checking
whether a request is in flight, and delay the scheduling if necessary.

Some of the tests in BrokerLifecycleManagerTest are also improved to
remove race conditions and reduce flakiness.

Reviewers: Colin McCabe <colin@cmccabe.xyz>, Ron Dagostino <rdagostino@confluent.io>, Jun Rao <junrao@gmail.com>

* KAFKA-16224: Do not retry committing if topic or partition deleted (#15581)

Current logic for auto-committing offsets when partitions are revoked
will retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION,
leading to the member not completing the revocation in time.

This commit considers error UNKNOWN_TOPIC_OR_PARTITION to be fatal
in the context of an auto-commit of offsets before a revocation,
even though the error is defined as retriable. This ensures that
the revocation can finish in time.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>, Lianet Magrans <lianetmr@gmail.com>

* KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification (#15559)

KIP-890 Part 1 introduced verification of transactions with the
transaction coordinator on the `Produce` and `TxnOffsetCommit` paths.
This introduced the possibility of new errors when responding to those
requests. For backwards compatibility with older clients, a choice was
made to convert some of the new retriable errors to existing errors that
are expected and retried correctly by older clients.

`NETWORK_EXCEPTION` was forgotten about and not converted, but can occur
if, for example, the transaction coordinator is temporarily refusing
connections. Now, we convert it to:
 * `NOT_ENOUGH_REPLICAS` on the `Produce` path, just like the other
   retriable errors that can arise from transaction verification.
 * `COORDINATOR_LOAD_IN_PROGRESS` on the `TxnOffsetCommit` path. This
   error does not force coordinator lookup on clients, unlike
   `COORDINATOR_NOT_AVAILABLE`. Note that this deviates from KIP-890,
   which says that retriable errors should be converted to
   `COORDINATOR_NOT_AVAILABLE`.

Reviewers: Artem Livshits <alivshits@confluent.io>, David Jacot <djacot@confluent.io>, Justine Olshan <jolshan@confluent.io>

* KAFKA-16409: DeleteRecordsCommand should use standard exception handling (#15586)

DeleteRecordsCommand should use standard exception handling

Reviewers: Luke Chen <showuon@gmail.com>

* KAFKA-16415 Fix handling of "--version" option in ConsumerGroupCommand (#15592)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* update tags and added seamphore tags as part of semaphore migration (#1102)

* added new names

* fixed URL

* added semaphorebuildurl for tracking which url job spin up that vagrant worker

* fixed syntax

* Added new SemaphoreWorkflowUrl and SemaphoreJobId in ec2 tags

* KAFKA-15949: Unify metadata.version format in log and error message (#15505)

There were different words for metadata.version like metadata version or metadataVersion. Unify format as metadata.version.

Reviewers: Luke Chen <showuon@gmail.com>

* KAFKA-16416 Use NetworkClientTest to replace RequestResponseTest to be the example of log4j output (#15596)

Reviewers: Kirk True <kirk@kirktrue.pro>, Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16084: Simplify and deduplicate standalone herder test mocking (#15389)

Reviewers: Greg Harris <greg.harris@aiven.io>

* KAFKA-16411: Correctly migrate default client quota entities (#15584)

KAFKA-16222 fixed a bug whereby we didn't undo the name sanitization used on client quota entity names
stored in ZooKeeper. However, it incorrectly claimed to fix the handling of default client quota
entities. It also failed to correctly re-sanitize when syncronizing the data back to ZooKeeper.

This PR fixes ZkConfigMigrationClient to do the sanitization correctly on both the read and write
paths. We do de-sanitization before invoking the visitors, since after all it does not make sense to
do the same de-sanitization step in each and every visitor.

Additionally, this PR fixes a bug causing default entities to be converted incorrectly. For example,
ClientQuotaEntity(user -> null) is stored under the /config/users/<default> znode in ZooKeeper. In
KRaft it appears as a ClientQuotaRecord with EntityData(entityType=users, entityName=null).
Prior to this PR, this was being converted to a ClientQuotaRecord with EntityData(entityType=users,
entityName=""). That represents a quota on the user whose name is the empty string (yes, we allow
users to name themselves with the empty string, sadly.)

The confusion appears to have arisen because for TOPIC and BROKER configurations, the default
ConfigResource is indeed the one named with the empty (not null) string. For example, the default
topic configuration resource is ConfigResource(name="", type=TOPIC).  However, things are different
for client quotas. Default client quota entities in KRaft (and also in AdminClient) are represented
by maps with null values. For example, the default User entity is represented by Map("user" ->
null).  In retrospect, using a map with null values was a poor choice; a Map<String,
Optional<String>> would have made more sense. However, this is the way the API currently is and we
have to convert correctly.

There was an additional level of confusion present in KAFKA-16222 where someone thought that using
the ZooKeeper placeholder string "<default>" in the AdminClient API would yield a default client
quota entity. Thise seems to have been suggested by the ConfigEntityName class that was created
recently. In fact, <default> is not part of any public API in Kafka. Accordingly, this PR also
renames ConfigEntityName.DEFAULT to ZooKeeperInternals.DEFAULT_STRING, to make it clear that the
string <default> is just a detail of the ZooKeeper implementation.  It is not used in the Kafka API
to indicate defaults. Hopefully this will avoid confusion in the future.

Finally, the PR also creates KRaftClusterTest.testDefaultClientQuotas to get extra test coverage of
setting default client quotas.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Igor Soarez <soarez@apple.com>

* KAFKA-16385 Enhance documentation for retention.ms and retention.bytes configurations (#15588)

Reviewers: Igor Soarez <soarez@apple.com>, Chia-Ping Tsai <chia7712@gmail.com>

* MINOR: add docker usage documentation link in README.md (#15600)

We have a detail usage guide for running Kafka in docker. However, it might be easy for people to miss it if they only scan through the README.

This PR add a basic example command for quick start and link directing users to the detailed usage guide

Reviewers: Luke Chen <showuon@gmail.com>

* KAFKA-16391: remove .lock file when FileLock#destroy (#15568)

Currently, server adds a .lock file to each log folder. The file is useless after server is down.

Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>

* MINOR: Add retry mechanism to EOS example (#15561)



In the initial EOS example, a retry logic was implemented within the resetToLastCommittedPositions method. During refactoring, this logic was removed becasue a poison pill prevented the example from reaching the final phase of consuming from the output topic.

In this change, I suggest to add it back, but with a retry limit defined as MAX_RETRIES. Once this limit is reached, the problematic batch will be logged and skipped, allowing the processor to move on and process remaining records. If some records are skipped, the example will still hit the hard timeout (2 minutes), but after consuming all processed records.

Reviewers: Luke Chen <showuon@gmail.com>

* KAFKA-16353: Offline protocol migration integration tests (#15492)

This patch adds integration tests for offline protocol migration.

Reviewers: David Jacot <djacot@confluent.io>

* MINOR: Preventing running the :core tests twice when testing with coverage (#15580)

reportScoverage task (which was used previously as dependency of the registered coverage task)
creates a task for each Test task and executes them. There's unitTest, integrationTest and
the test tasks (which is just for executing both unit and integration), so reportScoverage
executes all three with their corresponding scoverage task, hence running all tests twice.
Solution is just to use the reportTestScoverage task as dependency.

Reviewers: Viktor Somogyi-Vass <viktorsomogyi@gmail.com>

* KAFKA-16272: Update connect_distributed_test.py to support KIP-848’s group protocol config (#15576)

Update connect_distributed_test.py to support KIP-848’s group protocol config. Not all tests are updated because only a subset of it is using the consumer directly.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Kirk True <ktrue@confluent.io>

* KAFKA-14588 ZK configuration moved to ZkConfig (#15075)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* MINOR: Renaming the `Abortable_Transaction` error to `Transaction_Abortable` (#15609)

This is a follow-up to this PR (https://github.com/apache/kafka/pull/15486) which introduced the new ABORTABLE_TRANSACTION error as a part of KIP-890 efforts. However on further discussion, we seem to gain consensus that the error should be rather named as TRANSACTION_ABORTABLE.

This PR aims to address the same. There are no changes in the code apart from that.

Reviewers: Justine Olshan <jolshan@confluent.io>, Igor Soarez <soarez@apple.com>, Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16428: Fix bug where config change notification znode may not get created during migration (#15608)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Luke Chen <showuon@gmail.com>

* KAFKA-16232: kafka hangs forever in the starting process if the authorizer future is not returned (#15549)

add logs before and after future waiting, to allow admin to know we're waiting for the authorizer future.

Reviewers: Luke Chen <showuon@gmail.com>

* MINOR: Metadata image test improvements (#15373)


Reviewers: Mickael Maison <mickael.maison@gmail.com>

* KAFKA-16406 [2] : Split consumer commit tests (#15612)

Follow-up to #15535, splitting consumer integration tests defined in the long-running PlainTextConsumerTest. This PR extracts the tests that directly relate to committing offsets. No changes in logic.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>

* MINOR: Use CONFIG suffix in ZkConfigs (#15614)


Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Omnia Ibrahim <o.g.h.ibrahim@gmail.com>
Co-authored-by: n.izhikov <n.izhikov@vk.team>

* KAFKA-7663: Reprocessing on user added global stores restore (#15414)

When custom processors are added via StreamBuilder#addGlobalStore they will now reprocess all records through the custom transformer instead of loading directly.

We do this so that users that transform the records will not get improperly formatted records down stream.

Reviewers: Matthias J. Sax <matthias@confluent.io>

* MINOR: AbstractConfig cleanup (#15597)

Signed-off-by: Greg Harris <greg.harris@aiven.io>

Reviewers: Chris Egerton <chrise@aiven.io>, Mickael Maison <mickael.maison@gmail.com>, Omnia G H Ibrahim <o.g.h.ibrahim@gmail.com>, Matthias J. Sax <matthias@confluent.io>

* KAFKA-16349: Prevent race conditions in Exit class from stopping test JVM (#15484)

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Reviewers: Chris Egerton <chrise@aiven.io>

* KAFKA-16397 Use ByteBufferOutputStream to avoid array copy (#15589)

Reviewers: Apoorv Mittal <amittal@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16303: Add upgrade notes to 3.5.0, 3.5.2, and 3.7.0 about MM2 offset translation (#15423)

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Reviewers: Mickael Maison <mickael.maison@gmail.com>

* KAFKA-16223 Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest (1/3) (#15520)

Reviewers: Greg Harris <greg.harris@aiven.io>

* KAFKA-16217: Stop the abort transaction try loop when closing producers (#15541)

This is a mitigation fix for the https://issues.apache.org/jira/browse/KAFKA-16217. Exceptions should not block closing the producers.
This PR reverts a part of the change #13591

Reviewers: Kirk True <ktrue@confluent.io>, Justine Olshan <jolshan@confluent.io>

* KAFKA-15899 [1/2] Move kafka.security package from core to server module (#15572)

1) This PR moves kafka.security classes from core to server module.
2) AclAuthorizer not moved, because it has heavy dependencies on core classes that not rewrited from scala at the moment.
3) AclAuthorizer will be deleted as part of ZK removal

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16447: Fix failed ReplicaManagerTest (#15630)

The change in https://github.com/apache/kafka/pull/15373/files#r1544335647 updated broker port from 9093 to 9094. Some of test cases check broker endpoint with fixed string 9093. I change test cases to get endpoint by broker id, so these cases will not fail if someone change the port again in the future.

Reviewers: Luke Chen <showuon@gmail.com>

* MINOR: Remove redundant ApiVersionsResponse#filterApis (#15611)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric (#15463)

The variable of metrics item (kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec) is singleton object and it could be removed by other tests which are running in same JVM (and it is not recreated). Hence, verifying the metrics value is not stable to this test case.

Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>

* KAFKA-15823: disconnect from controller on AuthenticationException (#14760)

This PR changes the handling of authenticationException on a request from the node to the controller.

We disconnect controller connection and invalidate the cache so that the next run of the thread will establish a connection with the (potentially) updated controller.

Reviewers: Luke Chen <showuon@gmail.com>, Igor Soarez <soarez@apple.com>, Omnia Ibrahim <o.g.h.ibrahim@gmail.com>

* MINOR: Fix doc of Admin#createDelegationToken (#15632)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16435 Add test for KAFKA-16428 (#15635)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16161: Avoid empty remote metadata snapshot file in partition dir (#15636)

Avoid empty remote metadata snapshot file in partition dir

Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Satish Duggana <satishd@apache.org>

* KAFKA-16365: AssignmentsManager callback handling issues (#15521)

When moving replicas between directories in the same broker, future replica promotion hinges on acknowledgment from the controller of a change in the directory assignment.

ReplicaAlterLogDirsThread relies on AssignmentsManager for a completion notification of the directory assignment change.

In its current form, under certain assignment scheduling, AssignmentsManager both miss completion notifications, or prematurely trigger them.

Reviewers: Luke Chen <showuon@gmail.com>, Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Gaurav Narula <gaurav_narula2@apple.com>

* MINOR: enhance kafka-reassign-partitions command output (#15610)

Currently, when we using kafka-reassign-partitions to move the log directory, the output only indicates which replica's movement has successfully started.

This PR propose to show more detailed information, helping end users understand that the operation is proceeding as expected.

Reviewers: Luke Chen <showuon@gmail.com>, Andrew Schofield <aschofield@confluent.io>

* KAFKA-16148: Implement GroupMetadataManager#onUnloaded (#15446)

This patch completes all awaiting futures when a group is unloaded.

Reviewers: David Jacot <djacot@confluent.io>

* KAFKA-15417: move outerJoinBreak-flags out of the loop (#15510)

Follow up PR for https://github.com/apache/kafka/pull/14426 to fix a bug introduced by the previous PR.

Cf https://github.com/apache/kafka/pull/14426#discussion_r1518681146

Reviewers: Matthias J. Sax <matthias@confluent.io>

* KAFKA-16457 Useless import class `org.apache.kafka.common.config.ConfigDef.Type` (#15646)

Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>

* MINOR: AbstractConfig cleanup Part 2 (#15639)

Reviewers:  Manikumar Reddy <anikumar.reddy@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16413 add FileLockTest (#15624)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16275: Update kraft_upgrade_test.py to support KIP-848’s group protocol config (#15626)

Added a new optional group_protocol parameter to the test methods, then passed that down to the methods involved.

Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢

Reviewers: Walker Carlson <wcarlson@apache.org>

* KAFKA-16438: Update consumer_test.py’s static tests to support KIP-848’s group protocol config (#15627)

Migrated the following tests for the new consumer:

- test_fencing_static_consumer
- test_static_consumer_bounce
- test_static_consumer_persisted_after_rejoin

Reviewers: Walker Carlson <wcarlson@apache.org>

* KAFKA-16440: Update security_test.py to support KIP-848’s group protocol config (#15628)

Added a new optional group_protocol parameter to the test methods, then passed that down to the setup_consumer method.

Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢

Reviewers: Walker Carlson <wcarlson@apache.org>

* KAFKA-16439: Update replication_replica_failure_test.py to support KIP-848’s group protocol config (#15629)

Added a new optional group_protocol parameter to the test methods, then passed that down to the setup_consumer method.

Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢

Reviewers: Walker Carlson <wcarlson@apache.org>

* KAFKA-16359: Corrected manifest file for kafka-clients (#15532)

The issue KAFKA-16359 reported inclusion of kafka-clients runtime dependencies in MANIFEST.MF Class-Path.

The root cause is the issue defined here with the usage of shadow plugin.

Looking into the specifics of plugin and documentation, specifies that any dependency marked as shadow will be treated as following by the shadow plugin:

    1. Adds the dependency as runtime dependency in resultant pom.xml - code here
    2. Adds the dependency as Class-Path in MANIFEST.MF as well - code here

Resolution

We do need the runtime dependencies available in the pom (1 above) but not on manifest (2 above). Also there is no clear way to separate the behaviour as both above tasks relies on shadow configuration.

To fix, I have defined another custom configuration named shadowed which is later used to populate the correct pom (the code is similar to what shadow plugin does to populate pom for runtime dependencies).

Though this might seem like a workaround, but I think that's the only way to fix the issue. I have checked other SDKs which suffered with same issue and went with similar route to populate pom.

Reviewers: Luke Chen <showuon@gmail.com>, Reviewers: Mickael Maison <mickael.maison@gmail.com>, Gaurav Narula <gaurav_narula2@apple.com>

* KAFKA-15586: Clean shutdown detection - server side (#14706)

If the broker registers with the same broker epoch as the previous session, it is recognized as a clean shutdown. Otherwise, it is an unclean shutdown. This replica will be removed from any ELR.

Reviewers: Artem Livshits <alivshits@confluent.io>, David Arthur <mumrah@gmail.com>

* Minor changes according to latest trunk

* Updated Tuple2->Entry changes

* Resolving merge conflicts

* Resolving merge conflicts

* Fixing build

* Fixing build

---------

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Co-authored-by: Nikolay <nizhikov@apache.org>
Co-authored-by: Confluent Jenkins Bot <jenkins@confluent.io>
Co-authored-by: Dmitry Werner <grimekillah@gmail.com>
Co-authored-by: Christo Lolov <lolovc@amazon.com>
Co-authored-by: Kirk True <kirk@kirktrue.pro>
Co-authored-by: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
Co-authored-by: Andrew Schofield <aschofield@confluent.io>
Co-authored-by: David Jacot <djacot@confluent.io>
Co-authored-by: Matthias J. Sax <matthias@confluent.io>
Co-authored-by: PoAn Yang <payang@apache.org>
Co-authored-by: Dung Ha <60119105+infantlikesprogramming@users.noreply.github.com>
Co-authored-by: Owen Leung <owen.leung2@gmail.com>
Co-authored-by: testn <test1@doramail.com>
Co-authored-by: Daan Gerits <daan.gerits@gmail.com>
Co-authored-by: Joel Hamill <11722533+joel-hamill@users.noreply.github.com>
Co-authored-by: Johnny Hsu <44309740+johnnychhsu@users.noreply.github.com>
Co-authored-by: Gaurav Narula <gaurav_narula2@apple.com>
Co-authored-by: Kamal Chandraprakash <kchandraprakash@uber.com>
Co-authored-by: Bruno Cadonna <cadonna@apache.org>
Co-authored-by: Cheryl Simmons <csimmons@confluent.io>
Co-authored-by: Greg Harris <greg.harris@aiven.io>
Co-authored-by: José Armando García Sancio <jsancio@users.noreply.github.com>
Co-authored-by: Andras Katona <41361962+akatona84@users.noreply.github.com>
Co-authored-by: David Mao <47232755+splett2@users.noreply.github.com>
Co-authored-by: Hector Geraldino <hgeraldino@gmail.com>
Co-authored-by: Luke Chen <showuon@gmail.com>
Co-authored-by: A. Sophie Blee-Goldman <ableegoldman@gmail.com>
Co-authored-by: Manikumar Reddy <manikumar.reddy@gmail.com>
Co-authored-by: Dongnuo Lyu <139248811+dongnuo123@users.noreply.github.com>
Co-authored-by: Chris Holland <41524756+ChrisAHolland@users.noreply.github.com>
Co-authored-by: TapDang <89607407+phong260702@users.noreply.github.com>
Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
Co-authored-by: Lucas Brutschy <lbrutschy@confluent.io>
Co-authored-by: Artem Livshits <84364232+artemlivshits@users.noreply.github.com>
Co-authored-by: Kuan-Po (Cooper) Tseng <brandboat@gmail.com>
Co-authored-by: Chris Egerton <chrise@aiven.io>
Co-authored-by: John Yu <54207775+chiacyu@users.noreply.github.com>
Co-authored-by: Alyssa Huang <ahuang@confluent.io>
Co-authored-by: Philip Nee <pnee@confluent.io>
Co-authored-by: Sanskar Jhajharia <122860866+sjhajharia@users.noreply.github.com>
Co-authored-by: Vedarth Sharma <142404391+VedarthConfluent@users.noreply.github.com>
Co-authored-by: Lianet Magrans <98415067+lianetm@users.noreply.github.com>
Co-authored-by: Igor Soarez <i@soarez.me>
Co-authored-by: Sean Quah <squah@confluent.io>
Co-authored-by: Avneesh Kumar Singhal <116718699+avn-confluent@users.noreply.github.com>
Co-authored-by: Cheng-Kai, Zhang <kai821104@gmail.com>
Co-authored-by: Ahmed Sobeh <ahmed.sobeh@aiven.io>
Co-authored-by: Colin Patrick McCabe <cmccabe@apache.org>
Co-authored-by: Federico Valeri <fedevaleri@gmail.com>
Co-authored-by: n.izhikov <n.izhikov@vk.team>
Co-authored-by: Walker Carlson <18128741+wcarlson5@users.noreply.github.com>
Co-authored-by: John Yu <yujuan476@gmail.com>
Co-authored-by: Calvin Liu <83986057+CalvinConfluent@users.noreply.github.com>
Co-authored-by: Ori Hoch <ori@uumpa.com>
Co-authored-by: Jeff Kim <kimkb2011@gmail.com>
Co-authored-by: Victor van den Hoven <victor.vanden.hoven@alliander.com>
Co-authored-by: rykovsi <45871243+rykovsi@users.noreply.github.com>
Co-authored-by: Apoorv Mittal <amittal@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants