Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[transaction][acknowledge] Introduce in-memory PENDING_ACK state in acknowledgement path #4265

Merged
merged 19 commits into from
Jun 17, 2019

Conversation

MarvinCai
Copy link
Contributor

@MarvinCai MarvinCai commented May 12, 2019

Master Issue: #2664

Motivation:
Add acknowledgeMessage, commit, abort for transaction in PersistentSubscription.

Changes:
Will put message in Pending_ACK status when acknowledgeMessage is called with TxnID.
No real status class introduced, only added collection to hold messages in Pending_ACK status.
Current PR only keep Pending_ACK state in memory, in subsequent PR will also persistent these pending acks so we can recover from broker failure.

Add commitTxn to put message to Deleted status.
Add abortTxn to put message to Pending status.

For normal acknowledgeMessage and redeliverUnacknowledgedMessages, will check to see if
message if message is in Pending_ACK first. If true, will ignore those acks/redeliverys.

Add unit test.

@MarvinCai
Copy link
Contributor Author

@sijie @Tuten @zymap Can you help review this pr.

@sijie
Copy link
Member

sijie commented May 14, 2019

@MarvinCai thank you for picking this up! Great work!

I am traveling this week. I will probably get to it late this week.

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MarvinCai overall looks great!

I left some comments. PTAL

@@ -122,6 +128,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a duplicated dependency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, didn't see there's already one for pulsar-transaction-common, removed this one.

"pendingCumulativeAckMessage");

// Transaction currently using cumulative ack.
private volatile TxnID txnID;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a better name might be pendingCumulativeAckTxnId?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, that would be a more informative name.

if (this.txnID != null) {
log.warn("[{}][{}] An ongoing transaction:{}{} is doing cumulative ack, " +
"new cumulative ack is not allowed till the transaction is committed.",
this.txnID.getMostSigBits(), this.txnID.getLeastSigBits(), topicName, subName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we just have a toString implementation on TxnID? so we don't need to logging two parts separately.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I think the argument sequence in this log statement should be topicName, subName, txnID, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.


// If single ack is within range of cumulative ack of an ongoing transaction, fail the ack.
if (null != this.pendingCumulativeAckMessage &&
((PositionImpl) position).compareTo((PositionImpl) this.pendingCumulativeAckMessage) < 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to check if position is an instance of PositionImpl?

Alternatively, maybe we can consider making Position extending Comparable interface?

Copy link
Contributor Author

@MarvinCai MarvinCai May 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we mostly check type in catch clause or when passed in parameter is an Object, and PositionImpl is only implementation class of Position so I skip the check.
But seems I do find some places in BK code base that we're checking if it's a PositionImpl instance. Also added here, just for safe.


// If single ack is within range of cumulative ack of an ongoing transaction, fail the ack.
if (null != this.pendingCumulativeAckMessage &&
((PositionImpl) position).compareTo((PositionImpl) this.pendingCumulativeAckMessage) < 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be <=?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, markDelete include the last position, so should be <=.

pendingAckMessage.add(position);
this.pendingAckMessages.add(position);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to persistent these pending acks to cursor. so when a broker crashes, we can restore/recover those pending acks back. otherwise if a broker crashes, all the pending acks are gone.

This can probably be done in a subsequent change though. We can consider updating the description of this PR to introduce in-memory PENDING_ACK state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, you're right, I missed this part, will add in next pr.

String errorMsg = "[" + topicName + "][" + subName + "] Transaction with id:" + txnID.getMostSigBits()
+ txnID.getLeastSigBits() + " not found.";
log.error(errorMsg);
throw new IllegalArgumentException(errorMsg);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw TransactionNotFoundException?

Copy link
Contributor Author

@MarvinCai MarvinCai May 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's a better one, I'll merge mainline and update.

* @throws TransactionConflictException
*/
public synchronized void commitTxn(TxnID txnID, Map<String,Long> properties) throws TransactionConflictException {
if (!this.pendingAckMessagesMap.containsKey(txnID)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this check can be moved down to line 890. so we just do remove and check if the TxnID can be be removed. so it is one operation to prevent any concurrent commits on same TxnID.

 ConcurrentOpenHashSet<Position> pendingAckMessageCurTxn = this.pendingAckMessageMap.remove(txnID);
if (null == pendingAckMessageCurTxn) {
     throw new TransactionNotFoundException(...);
} 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense, will update.

* @throws IllegalArgumentException
* @throws TransactionConflictException
*/
public synchronized void commitTxn(TxnID txnID, Map<String,Long> properties) throws TransactionConflictException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should return CompletableFuture<Void> here, because a transaction is only committed when those positions are deleted.

* @param txnID {@link TxnID} to identify the transaction.
* @throws IllegalArgumentException
*/
public synchronized void abortTxn(TxnID txnID) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar comments as commitTxn - return CompletableFuture<Void>.

In this PR you can handle in-memory pending_ack state, but in subsequent PRs, we need to look into persistenting those states into cursor ledger for handling broker crashes.

throw new IllegalArgumentException(errorMsg);
}

// This shouldn't happen.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reconsideration, I think this check here doesn't really make sense.
When client commit a transaction, it doesn't necessary to be the ongoing transaction that did cumulative ack on this subscription before.
But we could add timeout check here, so when a transaction did cumulative ack but did commit within time, it's position will be removed. So other subsequent transaction can still use cumulative ack.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm

@MarvinCai
Copy link
Contributor Author

@sijie Can you take another look, thanks!

@@ -45,4 +45,8 @@
*/
private final long leastSigBits;

@Override
public String toString() {
return String.valueOf(mostSigBits) + leastSigBits;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe "(" + mostSigBits + "," + leastSigBits + ")"?

@@ -76,13 +90,38 @@
// for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold
private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;

// Map to keep track of message ack by each txn.
private final ConcurrentOpenHashMap<TxnID, ConcurrentOpenHashSet<Position>> pendingAckMessagesMap;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually one last comment:

@MarvinCai : can you make pendingAckMessagesMap and pendingAckMessages non-final and initialize them when they are needed? so we don't need to construct these two structures if people don't use these features, especially when transaction is a feature under development.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, changed to only initialize them when client start ack message for transaction, and add null check for other places using them.

@sijie
Copy link
Member

sijie commented Jun 1, 2019

@MarvinCai

Can you rebase this pull request to latest master and also address my last comment?

Make pendingAckMessagesMap and pendingAckMessages set non-final to we only initialize them when needed.
@MarvinCai
Copy link
Contributor Author

rerun java8 tests
rerun cpp tests
rerun integration tests

@MarvinCai
Copy link
Contributor Author

rerun java8 tests

A message should be redeliver if the map and set holding in-memory pending_ack message is null
which meas consumer haven't start transaction and ack message yet.
@MarvinCai
Copy link
Contributor Author

rerun cpp tests

1 similar comment
@MarvinCai
Copy link
Contributor Author

rerun cpp tests

…test to fail.

Bug is when get unacked message from consumer and check if message is in pending_ack status for some transaction,
it's not constructing position correctly, so causing ledger read exception.

Also update java test case so it covers this scenario.
@MarvinCai
Copy link
Contributor Author

run java8 tests

@sijie
Copy link
Member

sijie commented Jun 7, 2019

@merlimat @rdhabalia @jiazhai this change is mostly isolated from existing code. but since it adds a few fields in the acknowledge path for transaction support. the code path will not be activated until transaction is enabled. I would like you guys also to have a look at this path as well.

@sijie sijie added this to the 2.5.0 milestone Jun 7, 2019
@sijie
Copy link
Member

sijie commented Jun 11, 2019

ping @merlimat @rdhabalia @jiazhai

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MarvinCai missed a few review comments. PTAL

}

@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
dispatcher.redeliverUnacknowledgedMessages(consumer, positions);
// Check if message is in pending_ack status.
List<PositionImpl> pendingPositions = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MarvinCai

I missed this in the previous reviews. We should avoid creating a new array list when this is called when transaction is not used.

So the logic should be something like:

List<PositionImpl> pendingPositions = positions;

if (pendingAckMessages != null || cumulativeAckPosition != null) {
    pendingPositions = new ArrayList<>();
   // process the logic here
}

ConcurrentLongLongPairHashMap positionMap = consumer.getPendingAcks();
// Check if message is in pending_ack status.
if (null != positionMap) {
List<PositionImpl> pendingPositions = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as what I made in the other redeliverUnacknowledgedMessages method.

@@ -232,6 +276,27 @@ public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<St
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Individual acks on {}", topicName, subName, positions);
}

synchronized (PersistentSubscription.this) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add this block under an if check statement. so this code will not be processed when transaction is not used.

if (pendingAckMessages != null || pendingCumulativeAckMessage != null) {
    // process the logic
}

…ge we only check if message is acked by ongoing transaction when there is an ongoing transaction.
@MarvinCai
Copy link
Contributor Author

@sijie Addressed your comment

@codelipenghui codelipenghui changed the title [transaction][acknowledge] Introduce PENDING_ACK state in acknowledgement path [transaction][acknowledge] Introduce in-memory PENDING_ACK state in acknowledgement path Jun 13, 2019
Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MarvinCai
Overall looks good to me, just few problem i'm not sure it is correct.

// It's valid to create transaction then commit without doing any operation, which will cause
// pendingAckMessagesMap to be null.
ConcurrentOpenHashSet<Position> pendingAckMessageForCurrentTxn = pendingAckMessagesMap != null ?
this.pendingAckMessagesMap.remove(txnId) : new ConcurrentOpenHashSet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to create a new Set here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right, it's actually unnecessary to create this empty set, will update in next commit.

* @param txnId {@link TxnID} to identify the transaction.
* @throws IllegalArgumentException if given {@link TxnID} is not found in this subscription.
*/
public synchronized CompletableFuture<Void> abortTxn(TxnID txnId) {
Copy link
Contributor

@codelipenghui codelipenghui Jun 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is abortTxn need to redeliver messages the txnId contains? because if broker handle
redeliverUnacknowledgedMessages will skip the messages which is in a transaction, if abortTxn do not redeliver these messages, i think message will never be redelivery.

I think need to call dispatcher.redeliverUnacknowledgedMessages(consumer, pendingAckMessagesMap.get(txnId))

Copy link
Contributor Author

@MarvinCai MarvinCai Jun 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, redelivery all un acked messages after aborting a transaction is more desired behavior for most transaction use case. So will update to call dispatcher.redeliverUnacknowledgedMessages after abort a transaction.

if (pendingAckMessages != null && this.pendingAckMessages.contains(position)) {
log.warn("[{}][{}] Invalid acks position conflict with an ongoing transaction:{}.",
topicName, subName, this.pendingCumulativeAckTxnId.toString());
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if consumer ack 1,2,3,4,5 and 1 is acknowledged by ongoing transaction, is 2,3,4,5 can not be acked?

Copy link
Contributor Author

@MarvinCai MarvinCai Jun 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2, 3, 4, 5 should be acked, return in foreach will just skip to next item.
Current logic here isn't actually handling the case correctly, will update.

for (Position position : positions) {
// If try to ack message already acked by some transaction(can be itself), throw exception.
// Acking single message within range of cumulative ack(if exist) is considered valid operation.
if (this.pendingAckMessages.contains(position)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ack message already acked by consumer without transaction, shall we need to abort the transaction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should raise conflict, will update.

1. Remove creation of unnecessary set when committing a transaction.
2. Correctly skip message already acked by ongoing transaction when doing normal ack.
3. When acking message for transaction, also check if messge has been acked.
4. After aborting a transaction, redeliver all messages delivery to a consumer during that transaction.
@MarvinCai
Copy link
Contributor Author

run java8 tests

1 similar comment
@MarvinCai
Copy link
Contributor Author

run java8 tests

@MarvinCai
Copy link
Contributor Author

@codelipenghui thanks for your comments, I've addressed them, can you take another look?

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM +1

@sijie sijie modified the milestones: 2.5.0, 2.4.0 Jun 17, 2019
@sijie sijie merged commit e66ba27 into apache:master Jun 17, 2019
@gaoran10 gaoran10 mentioned this pull request Nov 21, 2020
32 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants