-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Issue 5476]Fix message deduplicate issue while using external sequence id with batch produce #5491
Conversation
It's hard to handle sequence id in key based batcher, if user set an external sequence id, we can't rewrite it and the sequence ids will distributed into inner batchers of key based batcher. Current implementation will throw exception if client can ensure the message is duplicated, since we do not throws exceptions before, is it ok to throws exception? |
run java8 tests |
hello @codelipenghui can you fix the test case:
|
run java8 tests |
try { | ||
producer.newMessage().sequenceId(producerThread.getLastSeqId() + 1).value("end").send(); | ||
fail("should failed, because send a duplication"); | ||
} catch (PulsarClientException.InvalidMessageException ignore) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is a valid assumption. The logic of dedup is to transparently perform the deduplication, without giving error to the application in case of duplicates.
if (sequenceId <= lastSequenceIdPublished) { | ||
callback.sendComplete(new PulsarClientException | ||
.InvalidMessageException("Message is definitely a duplicate")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't trigger an error to the application when there are dups, rather the contract is that it gets an OK. In any case we need to think through the implication of triggering an error just for one message out of band.
This is a very different behavior from the other failure modes where all the messages are failed after 1 failure.
I believe that handling this in broker side is much preferable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If users use the external sequence id and enable batch on producer, sequence id 1,2,3,1 will happens if we do not the check, we can just throw an exception when using the external sequence id?
callback.sendComplete(new PulsarClientException | ||
.InvalidMessageException("Message is a definitely a duplicate or not cannot be " + | ||
"determined at this time")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't know whether is dup or not, then why are we triggering error here?
|
||
/// Add lowest and highest sequence id to support external sequence id | ||
optional uint64 lowest_sequence_id = 6 [default = 0]; | ||
optional uint64 highest_sequence_id = 7 [default = 0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need 2 new fields here? Don't we just need to a new last_sequence_id
? isn't lowest_sequence_id
the same as sequence_id
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can only add a new last_sequence_id, will fix.
@@ -76,6 +76,14 @@ default long getOriginalSequenceId() { | |||
} | |||
|
|||
void completed(Exception e, long ledgerId, long entryId); | |||
|
|||
default long getLowestSequenceId() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see how these sequence ids relate to a Topic
.
- What's the lowest sequenceId ? I think this only apply to 1 single batch.
- Sequence id are per producer anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is in the inner class named PublishContext of Topic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is in the inner class named PublishContext of Topic
Which already contains a sequence id field
@@ -257,6 +285,9 @@ public ServerCnx getCnx() { | |||
private String originalProducerName; | |||
private long originalSequenceId; | |||
|
|||
private long lowestSequenceId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does it mean the lowest sequence id of a producer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is in the inner class named MessagePublishContext of Producer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, though as in other comment, there's already a sequenceId
field. Is that now ignored? If yes, then it should be removed.
Also, as you can see, there's an originalSequenceId
field. This is used in the context of geo-replication and it would have to be accounted for as well.
@merlimat Thanks for the review, i have addressed your comments, please take a look again. |
@@ -257,6 +285,9 @@ public ServerCnx getCnx() { | |||
private String originalProducerName; | |||
private long originalSequenceId; | |||
|
|||
private long lowestSequenceId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, though as in other comment, there's already a sequenceId
field. Is that now ignored? If yes, then it should be removed.
Also, as you can see, there's an originalSequenceId
field. This is used in the context of geo-replication and it would have to be accounted for as well.
@@ -76,6 +76,14 @@ default long getOriginalSequenceId() { | |||
} | |||
|
|||
void completed(Exception e, long ledgerId, long entryId); | |||
|
|||
default long getLowestSequenceId() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is in the inner class named PublishContext of Topic
Which already contains a sequence id field
@@ -367,6 +371,11 @@ public void sendAsync(Message<T> message, SendCallback callback) { | |||
msgMetadataBuilder.setSequenceId(sequenceId); | |||
} else { | |||
sequenceId = msgMetadataBuilder.getSequenceId(); | |||
if (sequenceId <= lastSequenceIdPushed) { | |||
callback.sendComplete(new PulsarClientException | |||
.InvalidMessageException("Message is definitely a duplicate")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not correct on 2 levels:
- If
sequenceId <= lastSequenceIdPushed
we don't know yet whether the message is already dup, because the previous attempt might still fail. This has to be disambiguated by the broker which has visibility at the storage level. - As mentioned before, we cannot throw error when there's a duplicate, rather we need to return "ok" to the application.
run java8 tests |
run java8 tests |
@merlimat Please help take a look, i have addressed your comments |
run java8 tests |
@@ -298,7 +344,8 @@ public void completed(Exception exception, long ledgerId, long entryId) { | |||
if (!(exception instanceof TopicClosedException)) { | |||
// For TopicClosed exception there's no need to send explicit error, since the client was | |||
// already notified | |||
producer.cnx.ctx().writeAndFlush(Commands.newSendError(producer.producerId, sequenceId, | |||
long callBackSequenceId = lastSequenceId >= sequenceId ? lastSequenceId : sequenceId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be simplified with the following statement?
long callbackSequenceId = Math.max(lastSequenceId, sequenceId);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add a common function to get the callback sequenceId
@@ -330,8 +377,9 @@ public void run() { | |||
// stats | |||
rateIn.recordMultipleEvents(batchSize, msgSize); | |||
producer.topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS); | |||
long callBackSequenceId = lastSequenceId >= sequenceId ? lastSequenceId : sequenceId;; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as above
@@ -80,6 +81,12 @@ public void add(MessageImpl<?> msg, SendCallback callback) { | |||
previousCallback = callback; | |||
currentBatchSizeBytes += msg.getDataBuffer().readableBytes(); | |||
messages.add(msg); | |||
if (lowestSequenceId == -1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
if (lowestSequenceId == -1) { | |
if (lowestSequenceId == -1L) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
@@ -50,7 +50,8 @@ | |||
|
|||
private PulsarApi.MessageMetadata.Builder messageMetadata = PulsarApi.MessageMetadata.newBuilder(); | |||
// sequence id for this batch which will be persisted as a single entry by broker | |||
private long sequenceId = -1; | |||
private long lowestSequenceId = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private long lowestSequenceId = -1; | |
private long lowestSequenceId = -1L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
@@ -50,7 +50,8 @@ | |||
|
|||
private PulsarApi.MessageMetadata.Builder messageMetadata = PulsarApi.MessageMetadata.newBuilder(); | |||
// sequence id for this batch which will be persisted as a single entry by broker | |||
private long sequenceId = -1; | |||
private long lowestSequenceId = -1; | |||
private long highestSequenceId = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private long highestSequenceId = -1; | |
private long highestSequenceId = -1L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
@@ -776,8 +798,7 @@ void ackReceived(ClientCnx cnx, long sequenceId, long ledgerId, long entryId) { | |||
} | |||
return; | |||
} | |||
|
|||
long expectedSequenceId = op.sequenceId; | |||
long expectedSequenceId = op.highestSequenceId > 0 ? op.highestSequenceId : op.sequenceId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you make a common function for this statement here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
@@ -803,7 +824,7 @@ void ackReceived(ClientCnx cnx, long sequenceId, long ledgerId, long entryId) { | |||
if (callback) { | |||
op = pendingCallbacks.poll(); | |||
if (op != null) { | |||
lastSequenceIdPublished = op.sequenceId + op.numMessagesInBatch - 1; | |||
lastSequenceIdPublished = sequenceId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be highest sequence id?
@@ -1391,6 +1429,9 @@ private void processOpSendMsg(OpSendMsg op) { | |||
batchMessageAndSend(); | |||
} | |||
pendingMessages.put(op); | |||
if (op.msg != null) { | |||
lastSequenceIdPushed = op.sequenceId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I think we need to keep track highestSequenceId, no?
- We should only assign the highest sequence id, no?
lastSequenceIdPushed = Math.max(lastSequenceIdPushed, op.lastSequenceIdPushed);
@@ -127,6 +127,9 @@ message MessageMetadata { | |||
// transaction related message info | |||
optional uint64 txnid_least_bits = 22 [default = 0]; | |||
optional uint64 txnid_most_bits = 23 [default = 0]; | |||
|
|||
/// Add last sequence id to support batch message with external sequence id | |||
optional uint64 last_sequence_id = 24 [default = 0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that we use highest
sequence id at the client side, but use last
at the wire protocol and broker side. Can we make it consistent by just using highest
across the places?
|
||
public void publishMessage(long producerId, long sequenceId, long lastSequenceId, | ||
ByteBuf headersAndPayload, long batchSize) { | ||
if (sequenceId > lastSequenceId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sequenceId
=> lowestSequenceId
lastSequenceId
=> highestSequenceId
It is very confusing using sequenceId
and lastSequenceId
.
run java8 tests |
run java8 tests |
1 similar comment
run java8 tests |
run java8 tests |
@@ -803,7 +824,7 @@ void ackReceived(ClientCnx cnx, long sequenceId, long ledgerId, long entryId) { | |||
if (callback) { | |||
op = pendingCallbacks.poll(); | |||
if (op != null) { | |||
lastSequenceIdPublished = op.sequenceId + op.numMessagesInBatch - 1; | |||
lastSequenceIdPublished = getHighestSequenceId(op); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should :
lastSequenceIdPublished = Math.max(lastSequenceIdPublished, getHighestSequenceId(op));
because the duplicated publishes will still succeed, right. It can override a larger sequence Id with a smaller one.
@@ -839,7 +864,7 @@ protected synchronized void recoverChecksumError(ClientCnx cnx, long sequenceId) | |||
log.debug("[{}] [{}] Got send failure for timed out msg {}", topic, producerName, sequenceId); | |||
} | |||
} else { | |||
long expectedSequenceId = op.sequenceId; | |||
long expectedSequenceId = op.highestSequenceId > 0 ? op.highestSequenceId : op.sequenceId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expectedSequenceId = getHighestSequenceId(op);
run java8 tests |
2 similar comments
run java8 tests |
run java8 tests |
a29257d
to
d982c8d
Compare
run java8 tests |
2 similar comments
run java8 tests |
run java8 tests |
run cpp tests |
run java8 tests |
ping @merlimat PTAL again. |
run java8 tests |
2 similar comments
run java8 tests |
run java8 tests |
ping @merlimat PTAL again. |
@merlimat PTAL again, thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Nice work @codelipenghui
@codelipenghui The change is not compatible with 2.4.2, i will move the |
## Motivation Since #5491 merged, while user use new pulsar client to produce batch messages to older version broker(e.g. 2.4.0), send ack error will occur: ``` [pulsar-client-io-8-2] WARN org.apache.pulsar.client.impl.ProducerImpl - [persistent://sandbox/pressure-test/test-A-partition-11] [pulsar-cluster-test-13-294] Got ack for msg. expecting: 13 - got: 224 - queue-size: 9 ``` The problem is client use highest sequence id to match the response sequence id, but in old version broker can not return the highest id. So, this pr is try to fix the problem of produce batch message with new version client and old version broker. ### Modifications Add highest sequence id to CommandSendReceipt. If the response highest sequence id of send receipt > lowest sequence id, it means broker is a new version broker, so we need to verify the highest sequence id, otherwise we only verify the lowest sequence id.
Fixes #5476
Motivation
Fix #5476
Modifications
last_sequence_id
in MessageMetadata and CommandSend, use sequence id and last_sequence_id to indicate the batchlowest_sequence_id
andhighest_sequence_id
.last_sequence_id
to client and add message deduplicate check in clientVerifying this change
Added new unit tests to verify this change
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation