Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignite-17834 Fixed. #1168

Merged
merged 7 commits into from
Oct 7, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ public TransactionException(UUID traceId, int code, Throwable cause) {
super(traceId, code, cause);
}

/**
* Creates a new transaction exception with the given error code and detail message.
*
* @param code Full error code.
* @param message Detail message.
*/
public TransactionException(int code, String message) {
super(code, message);
}

/**
* Creates a new transaction exception with the given error code, detail message and cause.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,9 @@ public static class Transactions {

/** Failed to rollback a transaction. */
public static final int TX_ROLLBACK_ERR = TX_ERR_GROUP.registerErrorCode(8);

/** Failed to enlist operation into already finished transaction. */
public static final int TX_ALREADY_FINISHED_ERR = TX_ERR_GROUP.registerErrorCode(9);
}

/** Replicator error group. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

/**
* Replica response interface with a timestamp to adjust a hybrid logical clock.
* TODO:IGNITE-17258 Add a specific response type for a replica listener. (@Transferable(ReplicaMessageGroup.TYPE_RESPONSE))
*/
@Transferable(ReplicaMessageGroup.TIMESTAMP_AWARE_REPLICA_RESPONSE)
public interface TimestampAwareReplicaResponse extends ReplicaResponse, TimestampAware {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ private RaftGroupOptions groupOptionsForPartition(
raftGroupOptions = RaftGroupOptions.forPersistentStores();
}

//TODO Revisit peers String representation: https://issues.apache.org/jira/browse/IGNITE-17420
//TODO Revisit peers String representation: https://issues.apache.org/jira/browse/IGNITE-17814
raftGroupOptions.snapshotStorageFactory(new PartitionSnapshotStorageFactory(
raftMgr.topologyService(),
//TODO IGNITE-17302 Use miniumum from mv storage and tx state storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,17 @@ public CompletableFuture<T> apply(T r, Throwable e) {
throw (RuntimeException) e;
}); // Preserve failed state.
} else {
tx0.enlistResultFuture(fut);
try {
tx0.enlistResultFuture(fut);
} catch (TransactionException enlistResultException) {
return tx0.rollbackAsync().handle((ignored, err) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you rolling back the transaction if it is already finished?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it's required to cleanup locks and writeIntents for operation that failed to enlistResultFuture.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that will not happen because the transaction is already finished.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right we need an escape path in order to rollback partially finished transaction.

if (err != null) {
e.addSuppressed(err);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e is always null here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}

throw enlistResultException;
});
}

return implicit ? tx0.commitAsync().thenApply(ignored -> r) : completedFuture(r);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.NotNull;

/**
Expand Down Expand Up @@ -65,6 +66,7 @@ public interface InternalTransaction extends Transaction {
* Enlists operation future in transaction. It's used in order to wait corresponding tx operations before commit.
*
* @param resultFuture Operation result future.
* @throws TransactionException if failed to enlist operation result future because transaction was already moved to finishing state.
*/
void enlistResultFuture(CompletableFuture<?> resultFuture);
void enlistResultFuture(CompletableFuture<?> resultFuture) throws TransactionException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.ignite.internal.tx.impl;

import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR;
import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR;

Expand Down Expand Up @@ -54,10 +56,13 @@ public class TransactionImpl implements InternalTransaction {
private final TxManager txManager;

/** Enlisted replication groups: replication group id -> (primary replica node, raft term). */
private Map<String, IgniteBiTuple<ClusterNode, Long>> enlisted = new ConcurrentSkipListMap<>();
private final Map<String, IgniteBiTuple<ClusterNode, Long>> enlisted = new ConcurrentSkipListMap<>();

/** Enlisted operation futures in this transaction. */
private volatile List<CompletableFuture<?>> enlistedResults = new ArrayList<>();
private final List<CompletableFuture<?>> enlistedResults = new ArrayList<>();

/** Guard that prevents finishing or enlisting new operations into already finished or finishing transaction. */
private boolean txInFinishState = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Way do not you want to make txInFinishState to AtomicBoolean and get rid of synchronized in the finish method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it's required not only to change txInFinishState atomically but also to populate enlistedResults with new value

    public synchronized void enlistResultFuture(CompletableFuture<?> resultFuture) {
        if (txInFinishState) {
            throw new TransactionException(TX_ALREADY_FINISHED_ERR,
                    "Failed to enlist operation into already finished transaction txId={" + id + '}');
        }

        enlistedResults.add(resultFuture);
    }


/**
* The constructor.
Expand Down Expand Up @@ -137,6 +142,14 @@ public CompletableFuture<Void> rollbackAsync() {
* @return The future.
*/
private CompletableFuture<Void> finish(boolean commit) {
synchronized (this) {
if (txInFinishState) {
return failedFuture(new TransactionException(commit ? TX_COMMIT_ERR : TX_ROLLBACK_ERR,
"Failed to finish already finished transaction txId={" + id + '}'));
}
txInFinishState = true;
}

// TODO: https://issues.apache.org/jira/browse/IGNITE-17688 Add proper exception handling.
return CompletableFuture
.allOf(enlistedResults.toArray(new CompletableFuture[0]))
Expand Down Expand Up @@ -181,7 +194,12 @@ private CompletableFuture<Void> finish(boolean commit) {

/** {@inheritDoc} */
@Override
public void enlistResultFuture(CompletableFuture<?> resultFuture) {
public synchronized void enlistResultFuture(CompletableFuture<?> resultFuture) {
if (txInFinishState) {
throw new TransactionException(TX_ALREADY_FINISHED_ERR,
"Failed to enlist operation into already finished transaction txId={" + id + '}');
}

enlistedResults.add(resultFuture);
}
}