-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce topic reader in client API #371
Conversation
@rdhabalia Updated with tests |
I will review it soon. |
@@ -332,6 +329,12 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { | |||
final long consumerId = subscribe.getConsumerId(); | |||
final SubType subType = subscribe.getSubType(); | |||
final String consumerName = subscribe.getConsumerName(); | |||
final boolean isDurable = subscribe.getDurable(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Old client don't pass this flag so, broker can consider that subscriber as non-durable
. so, should we check subscribe.hasDurable()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flag is optional in the proto definition but it defaults to true
, so there's no need to check for hasDurable()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, default is true
"Consumer is already present on the connection")); | ||
ServerError error = !existingConsumerFuture.isDone() ? ServerError.ServiceNotReady | ||
: getErrorCode(existingConsumerFuture); | ||
; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra ;
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
Consumer consumer = new Consumer(subscription, subType, consumerId, priorityLevel, consumerName, | ||
maxUnackedMessages, cnx, cnx.getRole()); | ||
subscription.addConsumer(consumer); | ||
if (!cnx.isActive()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead can we avoid creation of Consumer
if cnx
is not active.
if (!cnx.isActive()) {
future.completeExceptionally(new BrokerServiceException("Connection was closed while the opening the cursor "));
return;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part of the code is exactly the same as before, I just refactored the way to get the subscriptionFuture
. For the specific concern, the problem is that the connection can flip to closed even after we have checked the isActive()
, so we need to double check anyway the connection status after we have created the consumer, which internally register itself to get notified when the connection is gone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM .. just few minor comments
@@ -332,6 +329,12 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { | |||
final long consumerId = subscribe.getConsumerId(); | |||
final SubType subType = subscribe.getSubType(); | |||
final String consumerName = subscribe.getConsumerName(); | |||
final boolean isDurable = subscribe.getDurable(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, default is true
@@ -0,0 +1,82 @@ | |||
package com.yahoo.pulsar.client.api; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ups
* @param msg | ||
* the message object | ||
*/ | ||
void received(Message msg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better if api has reader
=> void received(Reader reader, Message msg)
similar like consumerListener => void received(Consumer consumer, Message msg)
because it helps to perform logging
or any additional operation on reader
e.g. => use to log : reader.getTopic()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I'll make the change
|
||
// Signal wether the subscription should be backed by a | ||
// durable cursor or not | ||
optional bool durable = 8 [default = true]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra space
incomingMessages.drainTo(currentMessageQueue); | ||
if (!currentMessageQueue.isEmpty()) { | ||
return (MessageIdImpl) currentMessageQueue.get(0).getMessageId(); | ||
} else if (lastDequeuedMessage != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think every time when reader read msg from queue we update lastDequeuedMessage
value. So, it means if lastDequeuedMessage
is not null then we can use this position to start with. So, should we change the sequence:
if (lastDequeuedMessage != null) {
if (!currentMessageQueue.isEmpty()) {
=> then only we initializecurrentMessageQueue
and drainincomingMessages
to get first position.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this code is wrong... I had changed the logic for startMessageId
from referring to the "first message to be returned" to the "last message that has been consumed"... but that wasn't reflected here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdhabalia Apart from the wrong logic here (delete pos vs read pos). The tricky problem is to tell the broker the exact message from where we want to resume reading after reconnection.
If we do have messages in the queue, we cannot rely on the lastDequeuedMessage
, because another thread might be dequeueing at the same time. So we could see msg1
there, while the application already has dequeued msg2
. At that point the msg2
will get duplicated after reconnection.
By atomically draining the queue on the side, we can establish which was the last message "seen" by the application. Checking the queue size and draining must be done atomically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many good point. will fix that
Consumer consumer = new Consumer(subscription, subType, consumerId, priorityLevel, consumerName, | ||
maxUnackedMessages, cnx, cnx.getRole()); | ||
subscription.addConsumer(consumer); | ||
if (!cnx.isActive()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part of the code is exactly the same as before, I just refactored the way to get the subscriptionFuture
. For the specific concern, the problem is that the connection can flip to closed even after we have checked the isActive()
, so we need to double check anyway the connection status after we have created the consumer, which internally register itself to get notified when the connection is gone.
"Consumer is already present on the connection")); | ||
ServerError error = !existingConsumerFuture.isDone() ? ServerError.ServiceNotReady | ||
: getErrorCode(existingConsumerFuture); | ||
; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
@@ -0,0 +1,82 @@ | |||
package com.yahoo.pulsar.client.api; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ups
* @param msg | ||
* the message object | ||
*/ | ||
void received(Message msg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I'll make the change
incomingMessages.drainTo(currentMessageQueue); | ||
if (!currentMessageQueue.isEmpty()) { | ||
return (MessageIdImpl) currentMessageQueue.get(0).getMessageId(); | ||
} else if (lastDequeuedMessage != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this code is wrong... I had changed the logic for startMessageId
from referring to the "first message to be returned" to the "last message that has been consumed"... but that wasn't reflected here.
@rdhabalia Addressed comments |
MessageIdImpl startMessageId; | ||
synchronized (this) { | ||
currentSize = incomingMessages.size(); | ||
startMessageId = clearReceiverQueue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just clear incomingMessages.clear()
if subscription has Durable
mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't expect this to have a practical impact on the consumer performance, since it only happens on reconnections.
} else if (lastDequeuedMessage != null) { | ||
// If the queue was empty we need to restart from the message just after the last one that has been dequeued | ||
// in the past | ||
return lastDequeuedMessage; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understood it correctly then : when client tries to receive message from the queue, consumer calls messageProcessed
which always updates lastDequeuedMessage
, it means we know which msg, client has seen last. In that case, should we give first preference to lastDequeuedMessage
and we should get it from incomingMessages
only if lastDequeuedMessage
is null?
so, we don't have to drain incomingMessages
and can directly return lastDequeuedMessage
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not exactly, if the queue was already empty, then we can rely on the lastDequeuedMessage
, but if the queue has messages, it means the application is actually getting those messages and then updating the lastDequeuedMessage
, though not atomically.
By draining the queue, we can establish with 100% accuracy what would be the next message , the one that the application has surely not got yet. With that, we can just take 1 step back and get the last-dequeued message id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not exactly, if the queue was already empty, then we can rely on the lastDequeuedMessage, but if the queue has messages, it means the application is actually getting those messages and then updating the lastDequeuedMessage
I couldn't understand actual reason so, sorry but let's say queue received msgs 1,2,3,4. and Reader
read msgs 1 & 2. So, as soon as reader reads msg=2, lastDequeuedMessage
gets updated with value=2 and queue is left with 3,4. Now, reader lost connection and reconnected so, now according to logic => incomingMessages
drains 3,4 to currentMessageQueue
and previousMessage=3-1=2
. So, is there any issue if reader starts with lastDequeuedMessage=2
or just takes incomingMessages.poll(0, TimeUnit.MILLISECONDS)-1 and then incomingMessages.clear()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, is there any issue if reader starts with lastDequeuedMessage=2 or just takes incomingMessages.poll(0, TimeUnit.MILLISECONDS)-1 and then incomingMessages.clear()
The goal here is to have exact precision on which message to ask the broker after the reconnection. When we drain the queue, we can determine with (with no race conditions) which is the 1st message that was in the queue.
If we just rely on the lastDequeuedMessage, the dequeing and the update of the lastDequeuedMessage are not synchronized and thus that could lead to some re-deliveries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@rdhabalia Can take a 2nd look at #366 as well ? |
…pache#371) 1. remove log if the active transaction map is empty. 2. return the `highWaterMark` if there is no active transaction.
Motivation
As explained in #355, introduce
Reader
as a new way to receive data in client API.Modifications
This PR is based on top of #366. In addition to the non-durable cursors, it adds the
Reader
client API and support in wire protocol to signal the broker where to start reading from.Please take a look. I'll add unit tests later on this same PR.