Conversation
There was a problem hiding this comment.
Do you see any use of this mutex_ here?
in fact I think it can create a dead-lock.
eg: if incomingMessages-queue is empty then receive() will take the mutex and wait on it until message is received. Now, when message is received and if it tries to take mutex_ then it will create a deadlock.
However, for batch msg we are using pendingReceiveMutex_ into receiveIndividualMessagesFromBatch to sync msg-queue and pending-receive queue.
There was a problem hiding this comment.
A. Yes I need the mutex since batchAcknowledgementTracker_ is not thread safe
B. As discussed - I don't see a deadlock
There was a problem hiding this comment.
Receive Callback should return a Result along with Message
There was a problem hiding this comment.
it's a async-receive and it will never have any failure as a result and it only returns a message when available so, I think there is no need of result in this case.??
There was a problem hiding this comment.
What if consumer is not initialized - most of the other async operations in Consumer.cc return a ResultConsumerNotInitialized .
There was a problem hiding this comment.
then we should add request into pending-queue and it should complete the callback when the message will be available.
There was a problem hiding this comment.
however, we should immediately fail the callback if consumer is closing or already closed (I will make similar change in java-client as well). so, I have added the result into the callback.
pulsar-client-cpp/lib/ConsumerImpl.h
Outdated
There was a problem hiding this comment.
UnboundedBlockingQueue is not a good idea, since you don't want to block if the queue is empty
I would suggest using a vector or a list.
There was a problem hiding this comment.
Sure. actually, my motivation to use UnboundedBlockingQueue to have thread-safe, unbounded collection. Do you see any specific issue with UnboundedBlockingQueue? Also I think std::list/queue creates a copy of object on pop_front()/front()?
There was a problem hiding this comment.
replaced UnboundedBlockingQueue with std::queue
There was a problem hiding this comment.
No need for a separate pendingReceiveMutex_ - I suggest to use mutex_
There was a problem hiding this comment.
pendingReceiveMutex_ is a mutex which requires to sync pendingReceives-queue and incomingMessages-queue together to make sure message-queue is empty while adding into pending-queue and also requires to prevent race-condition on pendingReceive_ .
mutex_ will not work here for example:
- receive() is a blocking call which takes locks on
mutext_until message is received - now, if we use the same
mutex_on message-received then it will create a deadlock.
There was a problem hiding this comment.
Agreed, but the problem is that in PartitionedConsumerImpl:receive we should have released the lock after we checked the state_ and messageListener_ condition since the other data structures are thread safe (see ConsumerImpl::receiveHelper to understand my point)
If you fix this locking then you won't require pendingReceiveMutex_
I am stressing about this minor thing because in all classes we have only one lock to protect all members (mutex_), so as far as possible I don't want to break this convention.
64f10fc to
36795a0
Compare
There was a problem hiding this comment.
Agreed, but the problem is that in PartitionedConsumerImpl:receive we should have released the lock after we checked the state_ and messageListener_ condition since the other data structures are thread safe (see ConsumerImpl::receiveHelper to understand my point)
If you fix this locking then you won't require pendingReceiveMutex_
I am stressing about this minor thing because in all classes we have only one lock to protect all members (mutex_), so as far as possible I don't want to break this convention.
There was a problem hiding this comment.
Why not :
void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
LOG_DEBUG("Received Message from one of the partition - " << msg.impl_->messageId.partition_);
Lock lock(mutex_);
if(!pendingReceives_.empty()) {
ReceiveCallback callback = pendingReceives_.front();
pendingReceives_.pop();
lock.unlock();
callback(ResultOk, msg);
unAckedMessageTrackerPtr_->add(msg.getMessageId());
return;
}
messages_.push(msg);
if (messageListener_) {
listenerExecutor_->postWork(boost::bind(&PartitionedConsumerImpl::internalListener, shared_from_this(), consumer));
}
}
There was a problem hiding this comment.
for receiveAsync, we need to access pendingReceives_ and messages_ atomically. so, if we release lock before pushing to messages_ then it will face race condition while receiving msg using receiveAsync and callback will never be called.
eg:
- thread-1 comes to
receiveAsync()which first checksmessages_if it's empty then it adds callback intopendingReceives_ - thread-2: in this method, if it adds the message into the
messages_same time then : message will sit intomessages_queue and will never be notified for receiveAsync-Callback.
Therefore, we need separate mutex to access pendingReceives_ and messages_ atomically.
There was a problem hiding this comment.
No need for notifyPendingReceivedCallback function, just call the callback
|
Refer to this link for build results (access rights to CI server needed): |
|
@rdhabalia what is the current state of this PR? are we targeting this for 2.0? If not, can we move it to 2.1? |
|
it seems that this PR was from 1.19.0-incubating. so I will move it out of 2.0, if we need this for 2.0, let's add it back. |
|
sure. I will rebase it and we can take it to next release. |
fcff2bc to
cb5d348
Compare
replace UnboundedBlockingQueue with queue add result to receiveAsync-callback to fail callback on consumer closing/closed check uninitialized consumer state notify callback directly
cb5d348 to
5de943d
Compare
### Motivation In many cases, client requires receiveAsync() api in Consumer. This api is already available into java-client but doesn't exist into CPP-client. ### Modification Add support for receiveAsync() api in cpp-client consumer. This PR is rebased and reopened from #577
Motivation
Sometimes, client wants to receive a message async to complete specific depending task or request. eg. java-client library has receiveAsync api which is used by proxy to receive msg async and completes pending proxy request on message-received. CPP-client library doesn't have
receiveAsyncapi.Modifications
Add
receiveAsyncapi to cpp-client library.