Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][txn] Add getState in transaction for client API #17423

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -1469,4 +1469,52 @@ public void testTBRecoverChangeStateError() throws InterruptedException, Timeout
Assert.assertTrue(t instanceof BrokerServiceException.ServiceUnitNotReadyException);
}
}

@Test
public void testGetTxnState() throws Exception {
Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
.build().get();

// test OPEN and TIMEOUT
assertEquals(transaction.getState(), Transaction.State.OPEN);
Transaction timeoutTxn = transaction;
Awaitility.await().until(() -> timeoutTxn.getState() == Transaction.State.TIME_OUT);

// test abort
transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
.build().get();
transaction.abort().get();
assertEquals(transaction.getState(), Transaction.State.ABORTED);

// test commit
transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
.build().get();
transaction.commit().get();
assertEquals(transaction.getState(), Transaction.State.COMMITTED);

// test error
transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
.build().get();
pulsarServiceList.get(0).getTransactionMetadataStoreService()
.endTransaction(transaction.getTxnID(), 0, false);
transaction.commit();
Transaction errorTxn = transaction;
Awaitility.await().until(() -> errorTxn.getState() == Transaction.State.ERROR);

// test committing
transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
.build().get();
((TransactionImpl) transaction).registerSendOp(new CompletableFuture<>());
transaction.commit();
Transaction committingTxn = transaction;
Awaitility.await().until(() -> committingTxn.getState() == Transaction.State.COMMITTING);

// test aborting
transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
.build().get();
((TransactionImpl) transaction).registerSendOp(new CompletableFuture<>());
transaction.abort();
Transaction abortingTxn = transaction;
Awaitility.await().until(() -> abortingTxn.getState() == Transaction.State.ABORTING);
}
}
Expand Up @@ -1052,7 +1052,7 @@ public void testTxnTimeOutInClient() throws Exception{
.build().get();
producer.newMessage().send();
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(((TransactionImpl)transaction).getState(), TransactionImpl.State.TIMEOUT);
Assert.assertEquals(((TransactionImpl)transaction).getState(), TransactionImpl.State.TIME_OUT);
});

try {
Expand Down
Expand Up @@ -29,6 +29,55 @@
@InterfaceStability.Evolving
public interface Transaction {

enum State {

/**
* When a transaction is in the `OPEN` state, messages can be produced and acked with this transaction.
*
* When a transaction is in the `OPEN` state, it can commit or abort.
*/
OPEN,

/**
* When a client invokes a commit, the transaction state is changed from `OPEN` to `COMMITTING`.
*/
COMMITTING,

/**
* When a client invokes an abort, the transaction state is changed from `OPEN` to `ABORTING`.
*/
ABORTING,

/**
* When a client receives a response to a commit, the transaction state is changed from
* `COMMITTING` to `COMMITTED`.
*/
COMMITTED,

/**
* When a client receives a response to an abort, the transaction state is changed from `ABORTING` to `ABORTED`.
*/
ABORTED,

/**
* When a client invokes a commit or an abort, but a transaction does not exist in a coordinator,
* then the state is changed to `ERROR`.
*
* When a client invokes a commit, but the transaction state in a coordinator is `ABORTED` or `ABORTING`,
* then the state is changed to `ERROR`.
*
* When a client invokes an abort, but the transaction state in a coordinator is `COMMITTED` or `COMMITTING`,
* then the state is changed to `ERROR`.
*/
ERROR,
congbobo184 marked this conversation as resolved.
Show resolved Hide resolved

/**
* When a transaction is timed out and the transaction state is `OPEN`,
* then the transaction state is changed from `OPEN` to `TIME_OUT`.
*/
TIME_OUT
}

/**
* Commit the transaction.
*
Expand All @@ -48,4 +97,12 @@ public interface Transaction {
* @return {@link TxnID} the txnID.
*/
TxnID getTxnID();

/**
* Get transaction state.
*
* @return {@link State} the state of the transaction.
*/
State getState();

}
Expand Up @@ -70,17 +70,7 @@ public class TransactionImpl implements Transaction , TimerTask {

@Override
public void run(Timeout timeout) throws Exception {
STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIMEOUT);
}

public enum State {
OPEN,
COMMITTING,
ABORTING,
COMMITTED,
ABORTED,
ERROR,
TIMEOUT
STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIME_OUT);
}

TransactionImpl(PulsarClientImpl client,
Expand Down Expand Up @@ -215,6 +205,11 @@ public TxnID getTxnID() {
return new TxnID(txnIdMostBits, txnIdLeastBits);
}

@Override
public State getState() {
return state;
}

public <T> boolean checkIfOpen(CompletableFuture<T> completableFuture) {
if (state == State.OPEN) {
return true;
Expand Down