-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cpp client: add PatternMultiTopicsConsumerImpl to support regex subscribe #2219
Conversation
retest this please for |
retest this please |
@merlimat @ivankelly can you guys spend sometime on reviewing this? |
/** | ||
* subscribe for multiple topics, which match given regexPattern, under the same namespace. | ||
*/ | ||
Result subscribe(const std::string& regexPattern, const std::string& consumerName, Consumer& consumer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using boolean parameter, we could change the method signature into subscribeWithRegex()
or similar, to make it more explicit in the application code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot to submit the review :)
return; | ||
} | ||
|
||
ClientConnectionPtr conn = clientCnx.lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This gets unlocked when it goes out of scope?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. lock is for newrequestId and newcommand.
@@ -856,28 +856,35 @@ void ClientConnection::handleIncomingCommand() { | |||
<< " -- req_id: " << error.request_id()); | |||
|
|||
Lock lock(mutex_); | |||
PendingRequestsMap::iterator it = pendingRequests_.find(error.request_id()); | |||
if (it != pendingRequests_.end()) { | |||
if (pendingRequests_.find(error.request_id()) != pendingRequests_.end()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should be able to avoid calling find twice with
if ((PendingRequestMap::iterator it = pendingRequests_.find(error.request_id()) != pendingRequests_.end()) {
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not supported, will change back to avoid find twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, you need to declare the iterator outside of the if.
#include <iostream>
#include <unordered_map>
int main() {
std::unordered_map<int,int> m;
m.insert({1, 2});
m.insert({2, 3});
std::unordered_map<int,int>::iterator it;
if ((it = m.find(2)) != m.end()) {
std::cout << "found " << it->second << std::endl;
} else {
std::cout << "not found" << std::endl;
}
}
for (int i = 0; i < numTopics; i++) { | ||
// remove partition part | ||
const std::string& topicName = response.topics(i); | ||
int pos = topicName.find("-partition-"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should already be library code to do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. searched again in google. seems substr() only support using position.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, there should be library code within pulsar c++ to do this? It seems a fairly common operation.
std::string filteredName = topicName.substr(0, pos); | ||
|
||
// filter duped topic name | ||
if (std::find(topicsPtr->begin(), topicsPtr->end(), filteredName) == |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is O(N^2). If you used an unordered_set for topics it'd be average time O(1). It can be converted to a vector at the end.
pulsar-client-cpp/lib/ClientImpl.cc
Outdated
@@ -212,6 +217,65 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat | |||
consumers_.push_back(reader->getConsumer()); | |||
} | |||
|
|||
void ClientImpl::subscribeAsync(const std::string& regexPattern, const std::string& consumerName, | |||
const ConsumerConfiguration& conf, bool useRegex, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove useRegex. If we see more different pattern types happening in future, then rename to subscribeRegexAsync. I can't see what other pattern you'd use though. Regex is ubiquitous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assumes that the regex will be somehting like "my-tenant/my-ns/my-topic-[0-9]*". As in, the namespace has to exist in the pattern if it's non-default?
We should validate that the tenant and namespace are valid and that the user hasn't tried to put a regex there also. I guess this will happen automatically with TopicName::get.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 minor comment, but gtg otherwise
}; | ||
|
||
// call to subscribe new added topics, then in its callback do unsubscribe | ||
onTopicsAdded(topicsListsMinus(*newTopics, *oldTopics), topicsAddedCallback); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You've already declare topicsAdded above, so it can just be passed in.
cherry-pick as |
Looks like the client-feature matrix can now be updated. ref |
In PR #1279 and #1298 we added regex based subscription. This is a catch up work to add PatternMultiTopicsConsumerImpl in cpp client.