Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3853 (KIP-88): Report offsets for empty groups in consumer group describe command (new consumer) #2074

Closed
wants to merge 1 commit into from

Conversation

vahidhashemian
Copy link
Contributor

@vahidhashemian vahidhashemian commented Oct 28, 2016

When there are no active new consumers in a consumer group report the offsets within the group instead of reporting that the group has no active members.

This PR also implements the API change proposed by KIP-88.

@vahidhashemian
Copy link
Contributor Author

@hachikuji This PR also fixes another issue that was introduced by the fix for KAFKA-3144. I point to it inline. Let me know if you think that issue should be fixed and merged (more quickly) in a separate PR. Thanks.

case Some(other) =>
// the control should never reach here
throw new KafkaException(s"Expected a valid consumer group state, but found '$other'.")
case None =>
// the control should never reach here
throw new KafkaException("Expected a valid consumer group state, but none found.")
Copy link
Contributor Author

@vahidhashemian vahidhashemian Oct 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where the bug was introduced. If state is None, there is a possibility that the old consumer based group does not have any active members; so we need to check whether new consumer is used or not, and then proceed accordingly.

public static final Schema[] DESCRIBE_GROUPS_REQUEST = new Schema[] {DESCRIBE_GROUPS_REQUEST_V0};
public static final Schema[] DESCRIBE_GROUPS_RESPONSE = new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0};
public static final Schema[] DESCRIBE_GROUPS_RESPONSE = new Schema[] {DESCRIBE_GROUPS_RESPONSE_V1};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there a KIP for this that I missed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for bringing this up. I wasn't totally sure what to process is for changing the protocol, and whether I'm actually correct to assume that the protocol has to be changed for this JIRA. I haven't opened a KIP yet, if you think that's eventually going to be needed for I'd be happy to create one. Thanks in advance for clarifying this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, protocol changes definitely need a KIP. Probably makes sense then to split the bug fix into a separate patch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I'll submit the bug fix separately, and then work on creating a KIP.

@vahidhashemian vahidhashemian changed the title KAFKA-3853: Report offsets for empty groups in consumer group describe command (new consumer) KAFKA-3853 (WIP): Report offsets for empty groups in consumer group describe command (new consumer) Oct 28, 2016
@vahidhashemian vahidhashemian force-pushed the KAFKA-3853 branch 3 times, most recently from 3d733d8 to 5b1c5fc Compare November 4, 2016 22:59
props.setProperty("key.deserializer", classOf[ByteArrayDeserializer].getName)
props.setProperty("value.deserializer", classOf[ByteArrayDeserializer].getName)
val consumer1Mock = EasyMock.createMockBuilder(classOf[NewShinyConsumer]).withConstructor(Some(topic), None, None, None, props, Long.MaxValue.asInstanceOf[Object]).createMock()
val consumer2Mock = EasyMock.createMockBuilder(classOf[NewShinyConsumer]).withConstructor(Some(topic), None, None, None, props, Long.MaxValue.asInstanceOf[Object]).createMock()
Copy link
Contributor Author

@vahidhashemian vahidhashemian Nov 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji I was hoping you could take a look at an issue I'm running into if and when you get a chance.
While KIP-88 is open for discussion I spent some time creating some unit tests for this PR.

This particular unit test simply mocks two consumers that consume from the same 1-partition topic and belong to the same consumer group. A similar unit test on old consumers exists earlier in the same file and runs fine. There are also other unit tests above using the new consumer that run fine but they mock only one consumer.

The problem I'm running into is this line that mocks the second consumer and takes a long time to run (I believe for the consumer to join the group that eventually times out) and the consumer group gets corrupted somehow. When I debug and check the status of the group down in the waitUntilTrue check, sometimes it is Empty, or Dead, or even Stable with only one of the consumers and it never gets into the expected state (Stable with two members). Where it gets stuck in a loop I think is here (after trying to join the group).

I'm not sure if I'm doing something wrong with the unit test or if I'm hitting some bug. I thought you might know by just looking at it. Thanks.

@vahidhashemian vahidhashemian force-pushed the KAFKA-3853 branch 3 times, most recently from 0198627 to 74cae9f Compare November 15, 2016 21:26
@vahidhashemian vahidhashemian force-pushed the KAFKA-3853 branch 7 times, most recently from 5ef58b9 to 6a3e835 Compare November 30, 2016 03:30
@vahidhashemian vahidhashemian changed the title KAFKA-3853 (WIP): Report offsets for empty groups in consumer group describe command (new consumer) KAFKA-3853 (KIP-88): Report offsets for empty groups in consumer group describe command (new consumer) Dec 12, 2016
@asfbot
Copy link

asfbot commented Dec 12, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/83/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Dec 12, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/81/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Dec 12, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/82/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Dec 13, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/112/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Dec 13, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/111/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Dec 13, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/110/
Test FAILed (JDK 7 and Scala 2.10).

@vahidhashemian
Copy link
Contributor Author

@hachikuji I've tried to address both KIP-88 and KAFKA-3853 in the same PR here. I'd appreciate your feedback when you get a chance to review this. Thanks.

@asfbot
Copy link

asfbot commented Jan 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/827/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/825/
Test PASSed (JDK 7 and Scala 2.10).

@hachikuji
Copy link
Contributor

@vahidhashemian Saw your question about why we are only checking 3 of the 5 error codes. Haha, I noticed the same thing when reviewing and opened KAFKA-4622.

@vahidhashemian
Copy link
Contributor Author

Thanks for clarifying. Then I can leave that to be fixed as part of that JIRA.

Copy link
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is getting close, but I think there's still a problem with the handler logic in KafkaApis.

@@ -47,6 +48,14 @@ public Builder(String groupId, List<TopicPartition> partitions) {
this.partitions = partitions;
}

public static Builder allTopicPartitions(String groupId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should enforce a minimum version number when querying all partitions? You can look at ListOffsetRequest for an example of this.

short versionId = version();

if (versionId < 2 && partitions != null) {
for (TopicPartition partition: partitions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a space before the :.

this.error = topLevelError;
struct.set(ERROR_CODE_KEY_NAME, this.error.code());
} else
this.error = Errors.NONE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if there's any harm retaining the top-level error regardless of the version. Seems more consistent with how we handle the case of constructing from a Struct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be okay to do that. Will update.

responseData.put(new TopicPartition(topic, partition), partitionData);
}
}

this.error = struct.hasField(ERROR_CODE_KEY_NAME) ? Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)) : topLevelError;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth a comment explaining why we do this.

@@ -66,60 +75,81 @@ public String toString() {
private final String groupId;
private final List<TopicPartition> partitions;

// v0 and v1 have the same fields.
public static OffsetFetchRequest forAllPartitions(String groupId, short version) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to above comment. This method only makes sense for version 2, so maybe we should remove version and use 2 directly.

partitions.map { topicPartition =>
(topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))}.toMap
} else if (!isCoordinatorForGroup(groupId)) {
partitions: Seq[TopicPartition]): (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using null as the sentinel, we could use an Option.

// return offsets blindly regardless the current group state since the group may be using
// Kafka commit storage without automatic group management
groupManager.getOffsets(groupId, partitions)
(Errors.NONE, groupManager.getOffsets(groupId, partitions match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option would be to push the handling of all offsets into getOffsets. One small advantage is that you would only need to acquire the lock once instead of twice.

Copy link
Contributor Author

@vahidhashemian vahidhashemian Jan 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea because getOffsets already gives us what we want (here).

}
} else {
val isAllPartitions = offsetFetchRequest.isAllPartitions() // header.apiVersion == 2
var groupOffsets: (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could do these assignments at once:

val (groupOffsets, partitions) = ...

We use this pattern just below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it also work if one assignment depends on the other one? partitions uses groupOffsets. Also, it may not read easily since there is a big type definition for groupOffsets.

Copy link
Contributor

@hachikuji hachikuji Jan 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does type inference not work? I was thinking something like this:

        val (groupOffsets, partitions) =
          if (isAllPartitions) {
            val groupOffsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, null)
            (groupOffsets, groupOffsets._2.keySet)
          } else {
            val requestPartitions = offsetFetchRequest.partitions.asScala.toList
            (coordinator.handleFetchOffsets(offsetFetchRequest.groupId, requestPartitions), requestPartitions)
          }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it works perfectly. Sorry for the premature question!

var groupOffsets: (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = null
val partitions =
if (isAllPartitions) {
groupOffsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this and the other call to handleFetchOffsets below needs to go to the else case after the check for version 0. For version 0, we pull offsets from zookeeper. I'm wondering if your first approach, which collected all partitions from the coordinator first, may have been a little cleaner. Another option to consider, perhaps you could do the post-filtering for the isAllPartitions case separately, and continue doing pre-filtering when we are provided the partition list.

(topicPartition, unknownTopicPartitionResponse)
// reject the request if not authorized to the group
if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId))) {
if (header.apiVersion >= 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking we could handle both of these cases in the same constructor. The constructor would take a single error code and the list of requested partitions. If the version is greater than or equal to 2, the partitions are ignored; otherwise, the error code is written into the partition errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Thanks.

@hachikuji
Copy link
Contributor

@vahidhashemian I merged KIP-103, which looks like it causes some conflicts in ApiVersion. Hopefully shouldn't be too much trouble to resolve.

new OffsetFetchResponse(results.asJava, Errors.NONE, header.apiVersion)
}
} else {
val isAllPartitions = offsetFetchRequest.isAllPartitions() // header.apiVersion == 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unneeded parenthesis.

@vahidhashemian vahidhashemian force-pushed the KAFKA-3853 branch 2 times, most recently from a6a3220 to ad48ee9 Compare January 13, 2017 22:16
coordinatorDead();
future.raise(error);
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic"));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not too sure about this. Did you want to move this error check up in the first if block?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I see the problem. Would we ever see this at the top level?

Copy link
Contributor Author

@vahidhashemian vahidhashemian Jan 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm referring to your earlier comment, and I'm not sure if it implied modifying this method too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mainly concerned that we'd need to check errors in both places, but I think we're good now since we ensure that top-level error codes will always appear at the top level (even for older versions).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks for clarifying.

@Override
public OffsetFetchRequest build() {
if (isAllTopicPartitions() && version() < minVersion)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I guess we could make this a little simpler. We know we need version 2, so maybe we can use it directly and remove minVersion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I thought about it too, but thought to keep it in sync with ListOffsetRequest. I'll update.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we're still feeling out the best patterns for handling older versions.


// for version 2 and later use the top level error code (in ERROR_CODE_KEY_NAME) from the response
// for older versions there is no top level error in the response and all errors are partition errors,
// so if there is a group or coordinator error at the partition level use that as the top level error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we mention that we do this so that the client can depend on the top-level error code regardless of the offset fetch version?

@asfbot
Copy link

asfbot commented Jan 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/866/
Test PASSed (JDK 8 and Scala 2.11).

private final Map<TopicPartition, PartitionData> responseData;
private final Errors error;
private Set<Errors> partitionErrors;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need another field if the errors are already contained in PartitionData?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this to keep track of partition errors that is needed by AdminClient here. Unless it's okay to process PartitionData on the fly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems just as efficient to me, especially since we only throw the first error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, then I think we perhaps need to at least have another Errors member for that first partition error. So we don't have to process partitionData multiple times for checking the existence and actually retrieving the error (in hasPartitionErrors and partitionErrors).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. It just doesn't seem worth optimizing for. Processing the partition data means what? Looping over it and checking if error is NONE? Does it matter if we do that twice? We could also just leave off the hasPartitionErrors and do a single iteration and raise the error on the first exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would something like this work?

public Errors getPartitionError() {
    Collection<PartitionData> partitionsData = this.responseData.values();
    for (PartitionData data : partitionsData) {
        if (data.hasError())
            return data.error;
    }
     return null;
}

Copy link
Contributor

@hachikuji hachikuji Jan 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that would work. Maybe getFirstPartitionError is a clearer name? Or you could bundle the exception throwing as well into a single maybeThrowFirstPartitionError? Either way is fine with me, but I'd prefer not to additional fields without a clear case that they're needed.

@@ -146,3 +148,9 @@ case object KAFKA_0_10_2_IV0 extends ApiVersion {
val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
val id: Int = 9
}

case object KAFKA_0_10_2_IV1 extends ApiVersion {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked to @ijuma about this, and I don't think we need to bump the internal version number since the brokers do not use offset fetches themselves.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I'll remove the internal version.

None) // we are not interested in all offsets in the group here
} else {
// the request is for all group offsets (api version 2 and later)
val groupOffsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is close, but it's a bit annoying that we have to call handleFetchOffsets in two places, right? Wouldn't it be better to delay the check for isAllPartitions and the filtering until after the call to handleFetchOffsets below. That gives us a clean separation of the Kafka and Zookeeper offset handling. So maybe the logic can be something like this:

  1. Filter unauthorized partitions
  2. Check if this is version 0
    a. Fetch from zk for version 0
    b. Check from kafka for versions 1 and above. After receiving the fetched offsets, check if isAllPartitions is set. If so, additionally filter out the fetched offsets for topics we are not authorized for.

Does that make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this correctly, for version 1 and above, to receive the fetched offsets we already need to check isAllPartitions to determine if None or Some(partitions) should be passed to handleFetchOffsets.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's fair. The point is that it should happen in the Kafka branch of that if and not before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to call isAllPartitions upfront anyway because we need to make sure offsetFetchRequest.partitions is not null before starting to filter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

…p describe command (new consumer)

When there are no active new consumers in a consumer group report the offsets within the group instead of reporting that the group has no active members.

This PR also implements the API change proposed by [KIP-88](https://cwiki.apache.org/confluence/display/KAFKA/KIP-88%3A+OffsetFetch+Protocol+Update).
}
else
new OffsetFetchResponse(offsets._1)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope this is now closer to what you described.

Copy link
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for being so responsive throughout the review iterations!

@asfbot
Copy link

asfbot commented Jan 14, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/875/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 14, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/864/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 14, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/864/
Test FAILed (JDK 8 and Scala 2.12).

@asfgit asfgit closed this in c2d9b95 Jan 14, 2017
@asfbot
Copy link

asfbot commented Jan 14, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/873/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 14, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/873/
Test FAILed (JDK 8 and Scala 2.12).

soenkeliebau pushed a commit to soenkeliebau/kafka that referenced this pull request Feb 7, 2017
…a consumer group (KIP-88)

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>, Jason Gustafson <jason@confluent.io>

Closes apache#2074 from vahidhashemian/KAFKA-3853
efeg pushed a commit to efeg/kafka that referenced this pull request May 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants