New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers #13704
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jeffkbkim Thanks for the patch. I left a few suggestions.
*/ | ||
public static Record newGroupMetadataRecord( | ||
GenericGroup group, | ||
Map<String, byte[]> memberAssignments, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason why we can't get the assignments from the group?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the assignment comes directly from the sync group request, i don't think we need to store the assignment inside the group
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm... don't we store it in MemberMetadata in the current implementation? We set it in setAndPropagateAssignment
. I think that we need it because we need the ability to provide it the the member at any time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think you're referring to https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L623
the in-memory state is updated after we successfully append to the log which so we won't have this information when we generate the records to append, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that's right. the generic group is a bit weird from this regard because it does not use timeline data structures internally so we could just update it as well (like we do for subscriptions for instance). the part which is weird here is that we take everything from the group but the assignment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think you're saying that ideally we want to create a record solely from the group i.e. update the group with the assignment and pass it into this method. However, we can't because we don't use timeline data structures that could revert if append fails. is my understanding correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is actually the other way around, the generic group is mutable so we could mutate it whenever we want here, i think.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
Outdated
Show resolved
Hide resolved
); | ||
|
||
assertEquals(expectedRecord, groupMetadataRecord); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add the following test:
- A unit test for the tombstone method;
- A unit test which triggers the IllegalStateException exceptions;
- Unit tests which verifies the various record versions? It could be a parameterised one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i updated testNewGroupMetadataRecord
for 3). can you take a look?
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java
Show resolved
Hide resolved
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
Show resolved
Hide resolved
i'm seeing
where this suggests from https://checkstyle.sourceforge.io/apidocs/com/puppycrawl/tools/checkstyle/checks/metrics/ClassDataAbstractionCouplingCheck.html#:~:text=Generally%20speaking%2C%20any%20data%20type,the%20structure%20of%20the%20class
Can we suppress this error? |
It is the first time I see this. I see that we can suppress it. |
@dajac this is ready for review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jeffkbkim I left two small comments. We can merge it when they are addressed.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This PR enables the new group metadata manager to generate GroupMetadataKey/Value records.
Committer Checklist (excluded from commit message)