Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14517: Implement regex subscriptions #14327

Open
wants to merge 21 commits into
base: trunk
Choose a base branch
from

Conversation

JimmyWang6
Copy link

@JimmyWang6 JimmyWang6 commented Sep 2, 2023

This pull request implements the server-side functionality for regex subscriptions in the next generation of the consumer rebalance protocol.

For the ConsumerGroupHeartbeat API, the server now accepts a regex pattern, and a constraint is enforced that only one of either the regular subscription pattern or topic name subscription is allowed. The subscribedTopicRegex is eventually transformed into subscribedTopicName in the ConsumerGroupMember.

To handle this, an additional argument, TopicsImage, is introduced in the maybeUpdateSubscribedTopicNames() function. This argument is used to filter the topics that match the regex pattern. During replay, when the function is called with a null TopicsImage argument, the regex expression is not considered.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@JimmyWang6 JimmyWang6 force-pushed the KAFKA-14048 branch 4 times, most recently from be7f5ef to c68c37d Compare September 3, 2023 13:52
@JimmyWang6 JimmyWang6 marked this pull request as draft September 3, 2023 14:15
@dajac
Copy link
Contributor

dajac commented Sep 5, 2023

@a961370183 Thanks for the PR. The approach taken in this PR is incorrect. As explained in KIP-848, the goal is to move the resolution of the regex to the server side. Therefore, the regex is passed to the server via the new ConsumerGroupHeartbeat API. That being said, I think that we should start by implementing the server side. Are you interested in doing do?

Note that the part adding the validation command to the consumer group command line tool is correct. We could keep this part.

@JimmyWang6 JimmyWang6 marked this pull request as ready for review September 11, 2023 23:40
@JimmyWang6
Copy link
Author

@dajac Almost done. Could you pls help to review the code?

Copy link
Contributor

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @JimmyWang6 for the PR.

As @dajac mentioned, we don't want to have the client include the RE2J dependency. The evaluation of the regex is performed strictly on the broker.

/**
* local variable to avoid multiple compile
*/
private Pattern complitedPattern;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: typo

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks

KAFKA-14048:remove client side code
@JimmyWang6
Copy link
Author

Hi, @dajac @kirktrue Review comments have been addressed, please take a look when you have time.

@dajac
Copy link
Contributor

dajac commented Sep 22, 2023

@JimmyWang6 I am really sorry but I haven't had the time to dive into this yet. At a high level, I think that the approach taken so far does not cover all the aspects. For instance, how is the target assignment updates based on the resolved regular expressions?

@vamossagar12
Copy link
Collaborator

vamossagar12 commented Oct 4, 2023

@JimmyWang6 , Apart from all the suggestions made in the PR reviews, I have a minor one. In the PR title, you have mentioned KAFKA-14048 which is the uber ticket for KIP-848. Please change it to KAFKA-14517 instead.

@JimmyWang6
Copy link
Author

@vamossagar12 Much thanks for your review. Will change to KAFKA-14517 later

@JimmyWang6 JimmyWang6 changed the title KAFKA-14048:Implement regex subscriptions KAFKA-14517:Implement regex subscriptions Oct 11, 2023
@JimmyWang6
Copy link
Author

@dajac
Much Thanks for your review!
As for target assignment updates, the function maybeUpdateSubscribedTopicNames(oldMember, newMember)
image
The old member's RegexSubscribedTopics has been converted to subscribedTopicNames(),
and subscriptions can be reduced by using subscribedTopicCount.compute(topicName, ConsumerGroup::decValue).
The new member's RegexSubscribedTopics will be transformed into subscribedTopicNames through getRegexSubscribedTopics().

@JimmyWang6
Copy link
Author

Hi @dajac
Thank you for your feedback. I have just understood your point and realized that there are indeed some issues with my code. I will make it a priority to address them as soon as possible. Thank you for bringing this to my attention.

@JimmyWang6
Copy link
Author

@

@JimmyWang6 , I think for this part

What if a new topic gets created which matches the regex subscription of some member.

we need to use the metadata update mechanism defined by metadata.max.age.ms. This is the interval using which the consumer will keep refreshing the metadata and do the pattern matching periodically against all topics existing at the time of check. Having said that, this looks like a client level config so not sure if it needs to be part of this PR. @dajac , WDYT?

@dajac @vamossagar12
Thank you for your comments!
I have added more unit tests and addressed the issues mentioned in the previous comments. In the case of a newly created topic that matches the regex subscription of certain members, I believe the method public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) will be invoked. Therefore, I have made modifications to the groupsSubscribedToTopic method to identify the consumer groups that have subscribed to topics matching the regular expression and requested a metadata refresh for those groups.

The latest code appears to be functioning well. Do you have any further comments or suggestions regarding this approach?

@Phuc-Hong-Tran
Copy link
Contributor

Phuc-Hong-Tran commented Jan 7, 2024

@JimmyWang6, are we still using re2j.Pattern for server-side regex checking, I don't see its usage in the PR?

@JimmyWang6
Copy link
Author

@JimmyWang6, are we still using re2j.Pattern for server-side regex checking, I don't see its usage in the PR?

@Phuc-Hong-Tran The work to replace the existing regex matching with re4j is not included in this PR. I think we should create a new subtask to address this.

@Phuc-Hong-Tran
Copy link
Contributor

Sure @JimmyWang6, but we need your PR to get merged first before we can process with that task, otherwise whoever gonna work on that task need to fork your branch, which is not an ideal workflow

@JimmyWang6
Copy link
Author

@dajac
I've updated the PR. Please take a look when you have a moment. Thanks.

@dajac
Copy link
Contributor

dajac commented Jan 29, 2024

Hey @JimmyWang6. I am really sorry for the delay on this pull request. We have too many other things to finish before I could really focus on it. I will have more time in the coming weeks. If you're still interested and you also have time, we could get it done now.

@dajac dajac changed the title KAFKA-14517:Implement regex subscriptions KAFKA-14517: Implement regex subscriptions Jan 29, 2024
@dajac
Copy link
Contributor

dajac commented Jan 29, 2024

As a first step, it would be great if we could keep this pull request focused on the RPC and its implementation on the server side. I would extract the command line tool part into its own pull request. Would it be possible?

@Phuc-Hong-Tran
Copy link
Contributor

@dajac I have a question regarding the usage of RE2J on the server side. Shouldn't RE2J be the primary engine that will be used for compiling regular expressions. Why is it not in the scope of this ticket?

Copy link
Contributor

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JimmyWang6 I left a few high level comments to start with. Are you on the Apache slack? We could also discuss there offline if you want.

Comment on lines +38 to +39
{ "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We must bump the version of the API to version 1 and use version 1+ for this field in order to make it backward compatible.

@@ -1040,6 +1051,7 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
.maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs))
.maybeUpdateServerAssignorName(Optional.ofNullable(assignorName))
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
.maybeUpdateSubscribedTopicRegex(Optional.ofNullable(subscribedTopicRegex))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we miss a few fundamental things in consumerGroupHeartbeat. Let me explain. The consumerGroupHeartbeat is structured in 3 parts.

  1. We update the member and its subscriptions.
  2. We compute the new target assignment if needed.
  3. We reconcile the member.

I think that in 1), we need to update the subscription for the member as you do here. However, we also need to verify it before storing it and we also need to update the subscription metadata if the regex was changed. See L1080. In step 2), we also need to change the logic to include the topic matching the regex. See L1115. Step 3) is fine as it is.

We also need a mechanism to periodically refresh the regexes in order to catch new topics or deleted topics. What was your plan for this? I thought that we could piggy back the mechanism to refresh the subscription metadata (L1076) to also refresh the regexes.

I think that we also need to store the resolved regular expressions somehow. I mean a mapping from the regex (as string) to the matching topics because we need this for step 2). For this, I was considering whether we could just use a LRU cache. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dajac, +1 on using LRU cache, it would also save us from unnecessary assignment computations when member rejoining

Comment on lines +1611 to +1616
Pattern pattern = Pattern.compile(regex);
for (String topicName : metadataImage.topics().topicsByName().keySet()) {
if (pattern.matcher(topicName).matches()) {
subscribeGroupToTopic(groupId, topicName);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this for two reasons: (1) Computing the list of topics is quite expensive so doing it here may not be the best place; and (2) It would not catch the changes as it applies it only when the regex is stored. I think that we need to discuss the high level approach. Take a look at my previous comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one question, why is computing the list of topics to subscribe to expensive here? My guess is that metadataImage is something that we need to load from disk?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Phuc-Hong-Tran It is definitely expensive wherever it will be :). The major downside of doing it here is that when you reload the group coordinator state from the partition, you may have obsolete records containing regex which are not compacted yet. For those, we would compute the list for nothing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dajac thanks for the explanation

@dajac
Copy link
Contributor

dajac commented Jan 29, 2024

@dajac I have a question regarding the usage of RE2J on the server side. Shouldn't RE2J be the primary engine that will be used for compiling regular expressions. Why is it not in the scope of this ticket?

@Phuc-Hong-Tran Yes, you're right. We need to use RE2J in this ticket. cc @JimmyWang6

@JimmyWang6
Copy link
Author

h level comments to start with. Are you on the Apache slack? We could also discuss there offline if you want.

@dajac
Thanks for your reply!
I will remove this part of code

@Phuc-Hong-Tran
Copy link
Contributor

@dajac Can you invite me to the apache slack as well? The email is phuctran3289@gamil.com, TIA

@dajac
Copy link
Contributor

dajac commented Jan 30, 2024

@dajac Can you invite me to the apache slack as well? The email is phuctran3289@gamil.com, TIA

@Phuc-Hong-Tran Done. You should receive an invitation when the admin approves my request.

@Phuc-Hong-Tran
Copy link
Contributor

@dajac I miss-spelled the email, the correct one is phuctran3289@gmail.com

@JimmyWang6
Copy link
Author

Thanks for your reply!
I will remove this part of code

@dajac
I'm sorry that I replied with the wrong content. Here is my email address: www.wangzhiwang@qq.com and much thanks for your invitation

@Phuc-Hong-Tran
Copy link
Contributor

hi @JimmyWang6, are you still working on this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants