-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PIP-13-2/3: support regex based subscription #1279
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change looks good. Just left few comments
@@ -136,7 +136,8 @@ enum ProtocolVersion { | |||
v10 = 10;// Added proxy to broker | |||
v11 = 11;// C++ consumers before this version are not correctly handling the checksum field | |||
v12 = 12;// Added get topic's last messageId from broker | |||
// Added CommandActiveConsumerChange | |||
// Added CommandActiveConsumerChange | |||
// Added CommandGetTopicsOfNamespace |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
v12
got released in 1.22. We should bump the protocol version again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. will change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, checked again, seems 1.22 still use v11:
https://github.com/apache/incubator-pulsar/blob/branch-1.22/pulsar-common/src/main/proto/PulsarApi.proto#L137
|
||
message CommandGetTopicsOfNamespaceResponse { | ||
required uint64 request_id = 1; | ||
repeated string topics = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably not worth it or even necessary to bother about, but I was thinking how we could compress a little be this response.
For one, we at least know that all topics have the same namespace (though they might have persistent and non-persistent topics together).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. How about transfer back only the last part of local_topics_name?
If this is really a issue, maybe we could do the filter in server side, and only transfer back matched topics? This may need add new command.
@@ -340,6 +344,53 @@ public Consumer subscribe(String topic, String subscription, ConsumerConfigurati | |||
return consumerSubscribedFuture; | |||
} | |||
|
|||
public CompletableFuture<Consumer> subscribeAsync(Pattern topicsPattern, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take a look at the refactored code that is in master already. There is a single subscribeAsync(ConsumerConfigurationData conf)
method that is used for all the variations of the subscribe.
Then, for this regex subscribe, we should reuse the same code that subscribe to the list of topics, just adding the extra pre-step of figuring out the list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, found it now, will reference it.
public CompletableFuture<List<String>> getTopicsUnderNamespace(NamespaceName namespace) { | ||
CompletableFuture<List<String>> future = new CompletableFuture<>(); | ||
httpClient | ||
.get(String.format("admin/namespaces/%s/destinations", namespace), String[].class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to deserialize directly into a List<String>
with Jackson, by passing List.class
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, List<String>
deserialise could be like this: List<String> names = mapper.readValue(jsonInput, new TypeReference<List<String>>(){});
while httpclient.get()
not support this way of TypeReference currently. So make a String[]
to leverage current code.
* | ||
* @param topicsPattern | ||
*/ | ||
ConsumerBuilder topicsPattern(Pattern topicsPattern); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to just take String
here for the pattern, since most of the time the regex will be very simple.
We can also offer both versions:
ConsumerBuilder topicsPattern(Pattern topicsPattern);
ConsumerBuilder topicsPattern(String topicsPattern);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will add it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhaijack Actually, I was thinking that probably, using a "glob" pattern should be enough in most cases.
I would expect that almost all usages of this API would be like
"topic-category-*" or very similar.
I cannot really see a case in which a full blown regex is indeed needed.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:) @merlimat , yes, seems most of the case is like that you mentioned. Not sure if user's use case would be different as we thought. It maybe better to provide a wider compatibility for api. Seems Kafka also use this java.util.regex.Pattern
.
How about leave it as you suggested above?:
ConsumerBuilder topicsPattern(Pattern topicsPattern);
ConsumerBuilder topicsPattern(String topicsPattern);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for having both String
and Pattern
builders.
NamespaceName namespace) { | ||
CompletableFuture<List<String>> topicsFuture = new CompletableFuture<List<String>>(); | ||
|
||
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the higher level, we should have a way to retry few times this call, otherwise if we're unlike when starting our consumer we might get exception even if there is a brief transient problem (eg: 1 single broker going down).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will add it.
updated. Thanks @merlimat |
* | ||
* @param topicsPattern | ||
*/ | ||
ConsumerBuilder topicsPattern(Pattern topicsPattern); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for having both String
and Pattern
builders.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I just have one tiny comment on expanding a little bit the Javadoc for the new methods in the builder.
ConsumerBuilder topicsPattern(Pattern topicsPattern); | ||
|
||
/** | ||
* Specify a pattern for topics that this consumer will subscribe on. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain a bit more in detail what kind of pattern is accepted (say regex/glob .. ) and also provide a short example snippet on how to use this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will add it.
Motivation
This is a second sub-task for pip-13, which would like to leverage the first task to support regex based subscription.
Modifications
PulsarClientImpl
.PatternTopicsConsumerImpl
, which extendsTopicsConsumerImpl
.Result
old methods behaviour not changed,
user could use new method to subscribe to topics based on regex pattern