-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15561: Client support for new SubscriptionPattern based subscription #15188
base: trunk
Are you sure you want to change the base?
Conversation
@lianetm, PTAL, thanks in advance. |
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java
Show resolved
Hide resolved
@@ -86,7 +86,6 @@ | |||
import static org.mockito.Mockito.when; | |||
|
|||
public class OffsetsRequestManagerTest { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I find it's better to avoid changes in unrelated files, even if minor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please revert this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @Phuc-Hong-Tran , thanks for the changes! I took a first pass, some high level comments:
- This feature is not yet implemented on the broker, so there are a few areas that may need alignment. Would maybe make sense to wait for some progress in the broker and then give this another push.
- This PR attempts to ensure that the new regex is included in the subscription state and that's definitely needed. But then we're missing the other half of the story: we should make sure that the regex is passed-on to the broker on the next heartbeat request (see HeartbeatRequestManager here)
- We need to add some tests to cover the new changes. Some unit tests, and also integration tests, probably similar to the ones defined for the subscribe with Pattern (see PlainTextConsumerTest). Of course, at this point we would be blocked because the server does not support it yet.
Thanks!
@lianetm, thanks for the comments, I will make sure to address those points in my next PR. Regarding your point about passing the regex for HeartbeatRequestManager, I origninally included that in my code change, then I came across this PR #14956 and decided that we need to wait for the broker to implement new regex logic first. |
This is the task to closely follow https://issues.apache.org/jira/browse/KAFKA-14517, where the broker will support the new regex. |
Thanks @lianetm |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, @Phuc-Hong-Tran !
I did a first pass and left a couple of comments.
I could not find any unit tests. I also think there is something missing regarding getting the new pattern subscription into the heartbeat request.
I also have a more organizational question. We would like to get this ticket done as soon as possible. Do you have time to work on it?
If not, do not worry, I could take over and we would become co-authors. Let us please know what you think. The next release is coming up soon.
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
Show resolved
Hide resolved
I do not understand why we want to wait. We should make progress as much as possible even if the broker code is not there yet. In the worst case, we would get an error back from the broker until the broker-side is implemented. But that would be fine in my opinion. |
@cadonna thanks for the comment, I can still finish one as the deadline required, there is no need to wait for the logic on broker to be finnished, though isn't it this one aiming for 3.8 release? |
Yes, but we have time-based releases in Apache Kafka. That means that the deadline for the release will be somewhere in the beginning of April. We need to keep enough time before this data for testing. |
I understand, will get back to speed on this one |
In my opinion, we should not merge the client side only after the server side is implemented. The reason is that we need to change the RPC (this is actually missing in this PR) and this should be driven by the server side work. |
@dajac OK, but we can implement and unit test everything up to the RPC, right? |
@dajac, when we're talking about the RPC, do we mean the field for the regex in ConsumerGroupHeartbeatRequest.json? |
@cadonna Yes. We could for instance commit the RPC and then work independently on the client and the server. My only concern is that we usually discover issues while working on the server side. This is why I usually prefer to get the server code into a reasonable state first. In this case, the risk is low as we are talking about a single field. |
I have a question. The subscribe method that use Pattern override the subscription with topic(s) that match the Pattern. When user choose to use SubscriptionPattern, but already used Pattern beforehand, should we clear out the old subscription? |
@dajac The javadocs of
kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java Line 727 in f0087ac
One could argue that both subscriptions are pattern subscriptions, but they are quite different internally. I am wondering how complex it is to allow mixed usage. If it is not that complex, I agree with your proposal, otherwise I am doubting whether it is really worth it. In any case the KIP is not clear about the expected behavior. |
@dajac Is it also possible to verify the validity of the regular expression client-side? The benefit would be to find the mistakes in regular expressions without a request to the brokers. The downside of it is that we need some way to validate the regular expressions client-side like the corresponding Google library in Java and I do not know what dependency are needed for clients in other languages. |
I don't think we can include the Google library in the client code. I saw the comment about it on the pull request for the implementation of the regex logic on the broker. Will find it again and quote it here. Edit: here is the comment #14327 (review) |
Ah, I see! Thanks for the link! |
@cadonna I'll see if there is a way to go around with not using the Google library to check regex validity (finger-crossed!) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates @Phuc-Hong-Tran !
Here my feedback!
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
Outdated
Show resolved
Hide resolved
{ "name": "SubscribedTopicRegex", "type": "string", "versions": "1+", "nullableVersions": "1+", "default": "null", | ||
"about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't we say we want to commit this change in a separate PR?
#15188 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will change this to a separate PR
@@ -86,7 +86,6 @@ | |||
import static org.mockito.Mockito.when; | |||
|
|||
public class OffsetsRequestManagerTest { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please revert this change?
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
Show resolved
Hide resolved
Regarding the validation of the regex, I lean towards having a new error, like @dajac suggested. Just to clearly tell the user that it is using an invalid regex, without overcomplicating the experience with an InvalidRequest that would have the user wondering what exactly went wrong, probably hinting a a much larger surface area, since it indicates that something wen't wrong in the client/server interaction. This means I think it's better to validate it on the client side too. I could be missing something, but to my understanding, the point of avoiding the new regex engine on the client was not to avoid the dependency itself, it was more a position we still keep, that the broker is the sole responsible for resolving the topics to include in a subscription when a regex is used. We would still keep that principle if we introduce the dependency on the client just to validate the regex. |
Co-authored-by: Bruno Cadonna <cadonna@apache.org>
This reverts commit 53fb53d.
…o SubscriptionPattern, refactored subscribeInternal for AsyncKafkaConsumer
Regex validity check will be included in the next pull request, I'll try to get it done by this weekend. |
This reverts commit 443739a.
…ubscriptionPattern
This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. |
* The pattern matching will be done on the broker against topics existing at the time of check | ||
* and matching topic(s) will be returned to the client. | ||
* <p> | ||
* This is a short-hand for {@link #subscribe(SubscriptionPattern, ConsumerRebalanceListener)}, which |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be "shorthand".
|
||
/** | ||
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions. | ||
* The pattern matching will be done on the broker against topics existing at the time of check |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better would be "at the time of the check and matching topics will be returned".
@@ -754,6 +752,49 @@ public void subscribe(Pattern pattern) { | |||
delegate.subscribe(pattern); | |||
} | |||
|
|||
/** | |||
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions. | |||
* The pattern matching will be done on the broker against topics existing at the time of check |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better would be "at the time of the check and matching topics will be returned". No need to (s) which is a bit messy.
@@ -141,6 +141,16 @@ public synchronized void subscribe(Pattern pattern) { | |||
subscribe(pattern, Optional.empty()); | |||
} | |||
|
|||
@Override | |||
public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be very easy to adapt the implementation of subscribe(Pattern pattern, ConsumerRebalanceListener callback)
for this rather than leaving it blank.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. This is necessary for full coverage of the client-side changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AndrewJSchofield, just a question, why do we need to adapt the implementation of subscribe(Pattern pattern, ConsumerRebalanceListener callback)
while the SubscriptionPattern
is to be resolved on the broker, not locally like Pattern.
If we go that route the MockConsumer
won't really simulate how the AsyncConsumer
works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't mind about the details of the implementation here, but I expect you do need a mocked implemented of this new method in order to complete the PR.
|
||
@Override | ||
public void subscribe(SubscriptionPattern pattern) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And here.
} else { | ||
this.pattern = pattern; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Extraneous blank line
public class SubscriptionPattern { | ||
final private String pattern; | ||
public SubscriptionPattern(final String pattern) { | ||
if (pattern == null || pattern.equals("")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest using String.isEmpty()
rather than equals("")
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or use org.apache.kafka.common.utils.Utils.isBlank()
which also catches the whitespace case:
if (pattern == null || pattern.equals("")) { | |
if (Utils.isBlank(pattern)) { |
@@ -553,7 +553,7 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { | |||
} | |||
} else { | |||
// SubscribedTopicRegex - only sent if has changed since the last heartbeat | |||
// - not supported yet | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not go ahead and implement the SentFields
code and call data.setSubscribedTopicRegex()
?
/* the pattern user has requested */ | ||
private Pattern subscribedPattern; | ||
|
||
/* RE2J compatible regex */ | ||
private SubscriptionPattern subscriptionPattern; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @cadonna. It seems very odd to me having subscribedPattern
and subscriptionPattern
. I understand that there are old patterns and new patterns, but really they are achieving the same thing. Hiding the difference inside subscribedPattern
seems sensible to me.
@@ -13,6 +13,8 @@ | |||
// See the License for the specific language governing permissions and | |||
// limitations under the License. | |||
|
|||
//Version 1 added SubscribedTopicRegex to the request for KIP-848 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect you to actually define v1 including the extra field, and also v1 of the response (which I suppose is structurally identical to v0).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, @Phuc-Hong-Tran!
Let a few comments, mostly minor.
@@ -141,6 +141,16 @@ public synchronized void subscribe(Pattern pattern) { | |||
subscribe(pattern, Optional.empty()); | |||
} | |||
|
|||
@Override | |||
public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. This is necessary for full coverage of the client-side changes.
public class SubscriptionPattern { | ||
final private String pattern; | ||
public SubscriptionPattern(final String pattern) { | ||
if (pattern == null || pattern.equals("")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or use org.apache.kafka.common.utils.Utils.isBlank()
which also catches the whitespace case:
if (pattern == null || pattern.equals("")) { | |
if (Utils.isBlank(pattern)) { |
@@ -495,6 +496,16 @@ public void subscribe(Pattern pattern) { | |||
subscribeInternal(pattern, Optional.empty()); | |||
} | |||
|
|||
@Override | |||
public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { | |||
throw new IllegalArgumentException("Operation not supported in the classic group protocol"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throwing an IllegalArgumentException
seems just a tiny bit weird. What about UnsupportedOperationException
instead?
public synchronized SubscriptionPattern subscriptionPattern() { | ||
return this.subscriptionPattern; | ||
} | ||
|
||
public synchronized Pattern subscribedPattern() { | ||
return this.subscribedPattern; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super nit: I've seen committers request that we drop unnecessary this
qualifiers:
public synchronized SubscriptionPattern subscriptionPattern() { | |
return this.subscriptionPattern; | |
} | |
public synchronized Pattern subscribedPattern() { | |
return this.subscribedPattern; | |
} | |
public synchronized SubscriptionPattern subscriptionPattern() { | |
return subscriptionPattern; | |
} | |
public synchronized Pattern subscribedPattern() { | |
return subscribedPattern; | |
} | |
@Phuc-Hong-Tran—I wanted to make sure that you weren't held up waiting for input from others. After the number of review comments gets to a certain size, it can be hard to tell where we stand. Let me know if you need us to review or respond to any outstanding questions. Thanks! |
Change:
Committer Checklist (excluded from commit message)