Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ack response implementation #8996

Merged

Conversation

congbobo184
Copy link
Contributor

@congbobo184 congbobo184 commented Dec 18, 2020

Motivation

in order to handle ack response implementation. When this PR commit, I will handle #8997.

implement

  1. we implement a new PersistentAcknowledgmentsWithResponseGroupingTracker.
  2. we will add two ackRequests struct for async and sync flush.
  3. add a timer to handle timeout and the timeout task don't need to lock, because the timeout is sequential.

Verifying this change

Add the tests for it

Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes

Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)

@congbobo184
Copy link
Contributor Author

/pulsarbot run-failure-checks

congbo added 2 commits December 23, 2020 00:36
# Conflicts:
#	pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@congbobo184
Copy link
Contributor Author

/pulsarbot run-failure-checks

pendingIndividualTransactionAcks
.add(Triple.of(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits(), msgId));
consumer.onAcknowledgeCumulative(msgId, null);
if (((BatchMessageIdImpl) msgId).ackCumulative()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we don't have this class? Should we add an instanceof test?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 183 is checked, I think @congbobo184 you can use batchMessageId since you already convert to BatchMessageIdImpl in 184

// when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
// change currentFuture. but we can lock by the read lock, because the currentFuture is not change
// any ack operation is allowed.
this.lock.readLock().lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this line out of the finally block

@congbobo184
Copy link
Contributor Author

/pulsarbot run-failure-checks

@congbobo184
Copy link
Contributor Author

/pulsarbot run-failure-checks

1 similar comment
@congbobo184
Copy link
Contributor Author

/pulsarbot run-failure-checks

@congbobo184
Copy link
Contributor Author

/pulsarbot run-failure-checks

@sijie
Copy link
Member

sijie commented Jan 6, 2021

@codelipenghui Can you review it?

congbo added 2 commits January 6, 2021 23:12
# Conflicts:
#	pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
#	pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
#	pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
#	pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
# Conflicts:
#	pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@congbobo184
Copy link
Contributor Author

/pulsarbot run-failure-checks

@congbobo184 congbobo184 reopened this Jan 11, 2021
@congbobo184
Copy link
Contributor Author

/pulsarbot run-failure-checks

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to consider the compatibility that the new client connects the old version broker which does not support enable ack response. If a broker does not write the ack response to the client, the client might be stuck on the message acknowledgment?

And this should be covered by the compatibility test in the tests modules.

/**
* Consumer ack for response timeout.
*/
public static class AckResponseTimeoutException extends PulsarClientException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the PulsarClientException.TimeoutException works?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it can be deleted.

* @param ackResponseEnabled {@link Boolean} is enable ack for response
* @return the consumer builder instance
*/
ConsumerBuilder<T> enableAckResponse(boolean ackResponseEnabled);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ConsumerBuilder<T> enableAckResponse(boolean ackResponseEnabled);
ConsumerBuilder<T> isAckReceiptEnabled(boolean ackReceiptEnabled);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think response here a little bit ambiguous since the client also can get the returned future without this feature.

pendingIndividualTransactionAcks
.add(Triple.of(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits(), msgId));
consumer.onAcknowledgeCumulative(msgId, null);
if (((BatchMessageIdImpl) msgId).ackCumulative()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 183 is checked, I think @congbobo184 you can use batchMessageId since you already convert to BatchMessageIdImpl in 184

Comment on lines 229 to 247
private MessageIdImpl modifyBatchMessageIdAndStatusInConsumer(BatchMessageIdImpl batchMessageId) {
MessageIdImpl messageId = new MessageIdImpl(batchMessageId.getLedgerId(),
batchMessageId.getEntryId(), batchMessageId.getPartitionIndex());
consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize());
modifyMessageIdStatusInConsumerCommon(messageId);
return messageId;
}

private void modifyMessageIdStatusInConsumer(MessageIdImpl messageId) {
consumer.getStats().incrementNumAcksSent(1);
modifyMessageIdStatusInConsumerCommon(messageId);
}

private void modifyMessageIdStatusInConsumerCommon(MessageIdImpl messageId) {
consumer.getUnAckedMessageTracker().remove(messageId);
if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
consumer.getPossibleSendToDeadLetterTopicMessages().remove(messageId);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods not clear here. Stats, not status. And should split the update stats method and cleanup consumer method stay independent, this will improve the code readability

Comment on lines 482 to 491
ByteBuf cmd = Commands.newAck(consumer.consumerId, lastCumulativeAck.messageId.ledgerId,
lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable,
AckType.Cumulative, null, Collections.emptyMap(), requestId);
cnx.newAckForResponseWithFuture(cmd, requestId, currentCumulativeAckFuture);
this.currentCumulativeAckFuture = new TimedCompletableFuture<>();
} else {
ByteBuf cmd = Commands.newAck(consumer.consumerId, lastCumulativeAck.messageId.ledgerId,
lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable,
AckType.Cumulative, null, Collections.emptyMap(), -1);
cnx.ctx().write(cmd, cnx.ctx().voidPromise());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please optimize duplicate code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we optimize the code, we will judge the ackResponseEnabled twice, because get requestId then new cmd and then write. get requestId and write are different from normal ack and ack response. the middle operation newAckComand can't be optimized.

Comment on lines 604 to 606
long requestId = consumer.getClient().newRequestId();
ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId, entriesToAck, requestId);
completableFuture = cnx.newAckForResponse(cmd, requestId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should write the command to the broker? And please consider reducing the duplicate code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because newImmediateAckAndFlush so we should write the command to broker immediately.

Comment on lines 613 to 614
// if don't support multi message ack, it also support ack response, so we should not think about the
// ack response in this logic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only support for muti message ack. Single message acknowledge also can enable ack receipt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the chunk message, so if want to use ack response, broker must support multi message ack, we don't need to support broker don't support multi message ack and then we return ack response.

Comment on lines 623 to 635
if (ackResponseEnabled) {
long requestId = consumer.getClient().newRequestId();
ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), msgId.getEntryId(), bitSet,
ackType, null, map, requestId);
completableFuture = cnx.newAckForResponse(cmd, requestId);
} else {
cnx.ctx().write(cmd, cnx.ctx().voidPromise());
ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), msgId.getEntryId(), bitSet,
ackType, null, map, -1);
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
completableFuture = CompletableFuture.completedFuture(null);
}
}
return completableFuture;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use a method getRequestId() and a method getCompletableFuture(). I think this will make the logic simpler

Comment on lines 840 to 842
private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId,
RequestType requestType, boolean flush,
TimedCompletableFuture<T> future) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to return the future again?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can use 2 methods to handle this case

private <T> void sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId,
                                                                 RequestType requestType, boolean flush,
                                                                 TimedCompletableFuture<T> future) {

}
private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId, RequestType requestType, boolean flush) {
       TimedCompletableFuture<T> future = new TimedCompletableFuture<>();
        sendRequestAndHandleTimeout(... future);
        return future;
}

@lhotari
Copy link
Member

lhotari commented Jul 7, 2021

@congbobo184 @codelipenghui @Anonymitaet @merlimat Has the "ackReceipt" feature added by this PR been documented? The only documentation that I can find is in the javadoc of the isAckReceiptEnabled method "Ack will return receipt but does not mean that the message will not be resent after get receipt." .
Would it be possible to clarify this in the Pulsar documentation?
Usually PIP documents are created for changes which change the wire protocol. I didn't find and PIP documents or mailing list discussions about this change. What problem is this change solving? The issue and PR description aren't very informative. Please elaborate more about the context behind this change.

Would it make sense to document this change in the "Pulsar binary protocol specification" document, https://pulsar.apache.org/docs/en/develop-binary-protocol/ https://github.com/apache/pulsar/blob/master/site2/docs/developing-binary-protocol.md ?

@congbobo184
Copy link
Contributor Author

@lhotari hi, this pr only means server receive client ack request and then send response to client. "Ack will return receipt but does not mean that the message will not be resent after get receipt." . can add to the Pulsar documentation.

@lhotari lhotari added the doc-required Your PR changes impact docs and you will update later. label Jul 8, 2021
@lhotari
Copy link
Member

lhotari commented Jul 8, 2021

Hi @Anonymitaet , could you help @congbobo184 in documenting this feature?

Since this PR also changed the wire protocol (there were protobuf changes), I think that this requires creating a PIP document. This is a common rule in Pulsar development that there must be a PIP for wire protocol changes. @codelipenghui Could you help with a PIP document for the "ack receipt" feature? /cc @merlimat @sijie

@codelipenghui
Copy link
Contributor

@lhotari The ack response proposal contained in the transaction proposal, you can find here https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#bookmark=id.r370pcotoarj

For the documentation, you can just create an issue to track it.

@Anonymitaet
Copy link
Member

I will work with @congbobo184 on the docs and track the issue here: #11272

@lhotari
Copy link
Member

lhotari commented Jul 20, 2021

@congbobo184 I recently came across #7683 . Does this PR somehow help resolve that issue?

@Anonymitaet
Copy link
Member

Doc status update: I've discussed w/ @liangyepianzhou, he will add docs and @momo-jun will help review.

@congbobo184 @codelipenghui @Anonymitaet @merlimat Has the "ackReceipt" feature added by this PR been documented? The only documentation that I can find is in the javadoc of the isAckReceiptEnabled method "Ack will return receipt but does not mean that the message will not be resent after get receipt." . Would it be possible to clarify this in the Pulsar documentation? Usually PIP documents are created for changes which change the wire protocol. I didn't find and PIP documents or mailing list discussions about this change. What problem is this change solving? The issue and PR description aren't very informative. Please elaborate more about the context behind this change.

Would it make sense to document this change in the "Pulsar binary protocol specification" document, https://pulsar.apache.org/docs/en/develop-binary-protocol/ https://github.com/apache/pulsar/blob/master/site2/docs/developing-binary-protocol.md ?

@Anonymitaet Anonymitaet added doc-complete Your PR changes impact docs and the related docs have been already added. and removed doc-required Your PR changes impact docs and you will update later. labels May 10, 2022
@Anonymitaet
Copy link
Member

Anonymitaet commented May 10, 2022

Doc is added #15497

BewareMyPower added a commit to BewareMyPower/pulsar that referenced this pull request Jun 15, 2022
### Motivation

There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- apache#10586
- apache#12343

The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.

In addition, the current `LastCumulativeAck` class  was added in
apache#8996 to hold two object
references, but this modification is wrong.

Before apache#8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.

However, after apache#8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).

### Modifications

To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, two synchronized methods
were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
  `BitSetRecyclable` object before assigning new values.
- `moveOwnershipTo`: This method moves the ownership to another
  `LastCumulativeAck` object, which will be responsible to recycle the
  `BitSetRecyclable` field after that.

With the methods above, each time `flushAsync` is called, move the
ownership of `lastCumulativeAck` field to another thread local field to
send the ACK command and recycle the `BitSetRecyclable` field.

- `lastCumulativeAck` updates the latest message ID and bit set, the
  update operations can be performed by multiple threads and
  `lastCumulativeAck` saves the latest value.
- `threadLocalLastCumulativeAckToFlush` only acts as a temporary cache
  to the latest value in `flushAsync`.
BewareMyPower added a commit to BewareMyPower/pulsar that referenced this pull request Jun 15, 2022
### Motivation

There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- apache#10586
- apache#12343

The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.

In addition, the current `LastCumulativeAck` class  was added in
apache#8996 to hold two object
references, but this modification is wrong.

Before apache#8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.

However, after apache#8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).

### Modifications

To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, two synchronized methods
were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
  `BitSetRecyclable` object before assigning new values.
- `moveOwnershipTo`: This method moves the ownership to another
  `LastCumulativeAck` object, which will be responsible to recycle the
  `BitSetRecyclable` field after that.

With the methods above, each time `flushAsync` is called, move the
ownership of `lastCumulativeAck` field to another thread local field to
send the ACK command and recycle the `BitSetRecyclable` field.

- `lastCumulativeAck` updates the latest message ID and bit set, the
  update operations can be performed by multiple threads and
  `lastCumulativeAck` saves the latest value.
- `threadLocalLastCumulativeAckToFlush` only acts as a temporary cache
  to the latest value in `flushAsync`.
BewareMyPower added a commit to BewareMyPower/pulsar that referenced this pull request Jun 16, 2022
### Motivation

There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- apache#10586
- apache#12343

The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.

In addition, the current `LastCumulativeAck` class  was added in
apache#8996 to hold two object
references, but this modification is wrong.

Before apache#8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.

However, after apache#8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).

### Modifications

To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, the following synchronized
methods were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
  `BitSetRecyclable` object before assigning new values.
- `moveOwnershipTo`: This method moves the ownership to another
  `LastCumulativeAck` object. After that, the `update` operation on this
  object won't recycle the `BitSetRecyclable` field.
- `restoreOwnershipIfEmpty`: Restore the ownership from another
  `LastCumulativeAck` object.

With the methods above, each time `flushAsync` is called, move the
ownership of `lastCumulativeAck` field to another thread local field to
send the ACK command. After that, restore the ownership to
`lastCumulativeAck` unless it has been updated in other threads.
BewareMyPower added a commit to BewareMyPower/pulsar that referenced this pull request Jun 21, 2022
### Motivation

There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- apache#10586
- apache#12343

The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.

In addition, the current `LastCumulativeAck` class  was added in
apache#8996 to hold two object
references, but this modification is wrong.

Before apache#8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.

However, after apache#8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).

There is another issue that it uses a flag `cumulativeAckFlushRequired`
to mark if `lastCumulativeAck` should flush. However, if `flushAsync`
was called concurrently, both would send ACK commands to broker.

### Modifications

To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, the following synchronized
methods were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
  `BitSetRecyclable` object before assigning new values and indicates
  itself can be flushed.
- `flush`: If it can be flushed, return a thread local
  `LastCumulativeAck` instance that contains the message ID and the bit
  set. Then mark it as no need to flush.

In addition, since the `messageId` field is volatile, the `getMessageId`
method can always retrieve the latest reference.

Based on the new design, we can only maintain a `LastCumulativeAck`
field in `PersistentAcknowledgmentsGroupingTracker` and call the related
methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem
that two concurrent `flushAsync` calls might send the same ACK command
twice.
BewareMyPower added a commit to BewareMyPower/pulsar that referenced this pull request Jun 21, 2022
### Motivation

There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- apache#10586
- apache#12343

The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.

In addition, the current `LastCumulativeAck` class  was added in
apache#8996 to hold two object
references, but this modification is wrong.

Before apache#8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.

However, after apache#8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).

There is another issue that it uses a flag `cumulativeAckFlushRequired`
to mark if `lastCumulativeAck` should flush. However, if `flushAsync`
was called concurrently, both would send ACK commands to broker.

### Modifications

To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, the following synchronized
methods were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
  `BitSetRecyclable` object before assigning new values and indicates
  itself can be flushed.
- `flush`: If it can be flushed, return a thread local
  `LastCumulativeAck` instance that contains the message ID and the bit
  set. The bit set is deep copied to avoid the original reference being
  recycled in another `update` call.

In addition, since the `messageId` field is volatile, the `getMessageId`
method can always retrieve the latest reference.

`LastCumulativeAckTest` is added to verify the sematics above.

Based on the new design, we can only maintain a `LastCumulativeAck`
field in `PersistentAcknowledgmentsGroupingTracker` and call the related
methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem
that two concurrent `flushAsync` calls might send the same ACK command
twice.

Remove unused field

Don't reset in LastCumulativeAck#flush
BewareMyPower added a commit that referenced this pull request Jun 22, 2022
…6072)

### Motivation

There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- #10586
- #12343

The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.

In addition, the current `LastCumulativeAck` class  was added in
#8996 to hold two object
references, but this modification is wrong.

Before #8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.

However, after #8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).

There is another issue that it uses a flag `cumulativeAckFlushRequired`
to mark if `lastCumulativeAck` should flush. However, if `flushAsync`
was called concurrently, both would send ACK commands to broker.

### Modifications

To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, the following synchronized
methods were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
  `BitSetRecyclable` object before assigning new values and indicates
  itself can be flushed.
- `flush`: If it can be flushed, return a thread local
  `LastCumulativeAck` instance that contains the message ID and the bit
  set. The bit set is deep copied to avoid the original reference being
  recycled in another `update` call.

In addition, since the `messageId` field is volatile, the `getMessageId`
method can always retrieve the latest reference.

`LastCumulativeAckTest` is added to verify the sematics above.

Based on the new design, we can only maintain a `LastCumulativeAck`
field in `PersistentAcknowledgmentsGroupingTracker` and call the related
methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem
that two concurrent `flushAsync` calls might send the same ACK command
twice.
codelipenghui pushed a commit that referenced this pull request Jun 28, 2022
…6072)

### Motivation

There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- #10586
- #12343

The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.

In addition, the current `LastCumulativeAck` class  was added in
#8996 to hold two object
references, but this modification is wrong.

Before #8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.

However, after #8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).

There is another issue that it uses a flag `cumulativeAckFlushRequired`
to mark if `lastCumulativeAck` should flush. However, if `flushAsync`
was called concurrently, both would send ACK commands to broker.

### Modifications

To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, the following synchronized
methods were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
  `BitSetRecyclable` object before assigning new values and indicates
  itself can be flushed.
- `flush`: If it can be flushed, return a thread local
  `LastCumulativeAck` instance that contains the message ID and the bit
  set. The bit set is deep copied to avoid the original reference being
  recycled in another `update` call.

In addition, since the `messageId` field is volatile, the `getMessageId`
method can always retrieve the latest reference.

`LastCumulativeAckTest` is added to verify the sematics above.

Based on the new design, we can only maintain a `LastCumulativeAck`
field in `PersistentAcknowledgmentsGroupingTracker` and call the related
methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem
that two concurrent `flushAsync` calls might send the same ACK command
twice.

(cherry picked from commit 936d6fd)
mattisonchao pushed a commit that referenced this pull request Jul 2, 2022
…6072)

### Motivation

There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- #10586
- #12343

The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.

In addition, the current `LastCumulativeAck` class  was added in
#8996 to hold two object
references, but this modification is wrong.

Before #8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.

However, after #8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).

There is another issue that it uses a flag `cumulativeAckFlushRequired`
to mark if `lastCumulativeAck` should flush. However, if `flushAsync`
was called concurrently, both would send ACK commands to broker.

### Modifications

To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, the following synchronized
methods were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
  `BitSetRecyclable` object before assigning new values and indicates
  itself can be flushed.
- `flush`: If it can be flushed, return a thread local
  `LastCumulativeAck` instance that contains the message ID and the bit
  set. The bit set is deep copied to avoid the original reference being
  recycled in another `update` call.

In addition, since the `messageId` field is volatile, the `getMessageId`
method can always retrieve the latest reference.

`LastCumulativeAckTest` is added to verify the sematics above.

Based on the new design, we can only maintain a `LastCumulativeAck`
field in `PersistentAcknowledgmentsGroupingTracker` and call the related
methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem
that two concurrent `flushAsync` calls might send the same ACK command
twice.

(cherry picked from commit 936d6fd)
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Jul 4, 2022
…ache#16072)

### Motivation

There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- apache#10586
- apache#12343

The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.

In addition, the current `LastCumulativeAck` class  was added in
apache#8996 to hold two object
references, but this modification is wrong.

Before apache#8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.

However, after apache#8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).

There is another issue that it uses a flag `cumulativeAckFlushRequired`
to mark if `lastCumulativeAck` should flush. However, if `flushAsync`
was called concurrently, both would send ACK commands to broker.

### Modifications

To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, the following synchronized
methods were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
  `BitSetRecyclable` object before assigning new values and indicates
  itself can be flushed.
- `flush`: If it can be flushed, return a thread local
  `LastCumulativeAck` instance that contains the message ID and the bit
  set. The bit set is deep copied to avoid the original reference being
  recycled in another `update` call.

In addition, since the `messageId` field is volatile, the `getMessageId`
method can always retrieve the latest reference.

`LastCumulativeAckTest` is added to verify the sematics above.

Based on the new design, we can only maintain a `LastCumulativeAck`
field in `PersistentAcknowledgmentsGroupingTracker` and call the related
methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem
that two concurrent `flushAsync` calls might send the same ACK command
twice.

(cherry picked from commit 936d6fd)
(cherry picked from commit 5eefdf1)
BewareMyPower added a commit that referenced this pull request Jul 29, 2022
…6072)

### Motivation

There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- #10586
- #12343

The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.

In addition, the current `LastCumulativeAck` class  was added in
#8996 to hold two object
references, but this modification is wrong.

Before #8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.

However, after #8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).

There is another issue that it uses a flag `cumulativeAckFlushRequired`
to mark if `lastCumulativeAck` should flush. However, if `flushAsync`
was called concurrently, both would send ACK commands to broker.

### Modifications

To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, the following synchronized
methods were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
  `BitSetRecyclable` object before assigning new values and indicates
  itself can be flushed.
- `flush`: If it can be flushed, return a thread local
  `LastCumulativeAck` instance that contains the message ID and the bit
  set. The bit set is deep copied to avoid the original reference being
  recycled in another `update` call.

In addition, since the `messageId` field is volatile, the `getMessageId`
method can always retrieve the latest reference.

`LastCumulativeAckTest` is added to verify the sematics above.

Based on the new design, we can only maintain a `LastCumulativeAck`
field in `PersistentAcknowledgmentsGroupingTracker` and call the related
methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem
that two concurrent `flushAsync` calls might send the same ACK command
twice.

(cherry picked from commit 936d6fd)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-complete Your PR changes impact docs and the related docs have been already added.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants