Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-17839 #1183

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public interface IgniteTransactions {
/**
* Executes a closure within a transaction.
*
* <p>Please make sure all asynchronous operations are enlisted into the transaction before returning from the callback.
sanpwc marked this conversation as resolved.
Show resolved Hide resolved
*
* <p>If the closure is executed normally (no exceptions) the transaction is automatically committed.
*
* @param clo The closure.
Expand All @@ -100,10 +102,9 @@ default void runInTransaction(Consumer<Transaction> clo) throws TransactionExcep
/**
* Executes a closure within a transaction and returns a result.
*
* <p>If the closure is executed normally (no exceptions) the transaction is automatically committed.
* <p>Please make sure all asynchronous operations are enlisted into the transaction before returning from the callback.
*
* <p>This method will automatically enlist all tables into the transaction, but the execution of
* the transaction shouldn't leave starting thread or an exception will be thrown.
* <p>If the closure is executed normally (no exceptions) the transaction is automatically committed.
*
* @param clo The closure.
* @param <T> Closure result type.
Expand All @@ -121,6 +122,7 @@ default <T> T runInTransaction(Function<Transaction, T> clo) throws TransactionE

return ret;
} catch (Throwable t) {
// TODO FIXME https://issues.apache.org/jira/browse/IGNITE-17838 Implement auto retries
try {
tx.rollback(); // Try rolling back on user exception.
} catch (Exception e) {
Expand All @@ -130,4 +132,22 @@ default <T> T runInTransaction(Function<Transaction, T> clo) throws TransactionE
throw t;
}
}

/**
* Executes a closure within a transaction asynchronously.
*
* <p>A returned future must be the last in the asynchronous chain. This means all transaction operations happen before the future
* is completed.
*
* <p>If the asynchronous chain resulted in no exception, the commitAsync will be automatically called.
*
* @param clo The closure.
* @param <T> Closure result type.
* @return The result.
*/
default <T> CompletableFuture<T> runInTransactionAsync(Function<Transaction, CompletableFuture<T>> clo) {
// Rollback is expected to be called by the failure handling code
sanpwc marked this conversation as resolved.
Show resolved Hide resolved
// TODO FIXME https://issues.apache.org/jira/browse/IGNITE-17838 Implement auto retries
return beginAsync().thenCompose(tx -> clo.apply(tx).thenCompose(val -> tx.commitAsync().thenApply(ignored -> val)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.table;

import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand All @@ -37,6 +38,7 @@
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Flow;
Expand Down Expand Up @@ -286,6 +288,40 @@ public void testTxClosureKeyValueView() throws TransactionException {
assertEquals(5, txManager(accounts).finished());
}

/**
* Tests positive transfer scenario.
*/
@Test
public void testTxClosureAsync() {
double balance1 = 200.;
double balance2 = 300.;
double delta = 50.;
Tuple ret = transferAsync(balance1, balance2, delta).join();

RecordView<Tuple> view = accounts.recordView();

assertEquals(balance1 - delta, view.get(null, makeKey(1)).doubleValue("balance"));
assertEquals(balance2 + delta, view.get(null, makeKey(2)).doubleValue("balance"));
assertEquals(balance1, ret.doubleValue("balance1"));
assertEquals(balance2, ret.doubleValue("balance2"));
}

/**
* Tests negative transfer scenario.
*/
@Disabled("https://issues.apache.org/jira/browse/IGNITE-17861")
public void testTxClosureAbortAsync() {
double balance1 = 10.;
double balance2 = 300.;
double delta = 50.;
assertThrows(CompletionException.class, () -> transferAsync(balance1, balance2, delta).join());

RecordView<Tuple> view = accounts.recordView();

assertEquals(balance1, view.get(null, makeKey(1)).doubleValue("balance"));
assertEquals(balance2, view.get(null, makeKey(2)).doubleValue("balance"));
}

@Test
public void testBatchPutConcurrently() {
Transaction tx = igniteTransactions.begin();
Expand Down Expand Up @@ -1646,7 +1682,7 @@ protected LockManager lockManager(Table t) {
protected abstract boolean assertPartitionsSame(Table table, int partId);

/**
* Validates a balances.
* Validates balances.
*
* @param rows Rows.
* @param expected Expected values.
Expand All @@ -1661,4 +1697,44 @@ private void validateBalance(Collection<Tuple> rows, double... expected) {
assertEquals(v, rows0.get(i).doubleValue("balance"));
}
}

/**
* Transfers money between accounts.
*
* @param balance1 First account initial balance.
* @param balance2 Second account initial balance.
* @param delta Delta.
* @return The future holding tuple with previous balances.
*/
private CompletableFuture<Tuple> transferAsync(double balance1, double balance2, double delta) {
RecordView<Tuple> view = accounts.recordView();

view.upsert(null, makeValue(1, balance1));
view.upsert(null, makeValue(2, balance2));

return igniteTransactions.runInTransactionAsync(tx -> {
// Attempt to withdraw from first account.
CompletableFuture<Double> fut1 = view.getAsync(tx, makeKey(1))
.thenCompose(val1 -> {
double prev = val1.doubleValue("balance");
double balance = prev - delta;

if (balance < 0) {
return tx.rollbackAsync().thenApply(ignored -> null);
}

return view.upsertAsync(tx, makeValue(1, balance)).thenApply(ignored -> prev);
});

// Optimistically deposit to second account.
CompletableFuture<Double> fut2 = view.getAsync(tx, makeKey(2))
.thenCompose(val2 -> {
double prev = val2.doubleValue("balance");
return view.upsertAsync(tx, makeValue(2, prev + delta)).thenApply(ignored -> prev);
});

return fut1.thenCompose(val1 -> fut2.thenCompose(val2 ->
completedFuture(Tuple.create().set("balance1", val1).set("balance2", val2))));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,6 @@ public TxState state(UUID txId) {
return states.get(txId);
}

/**
* Unlocks all locks for the timestamp.
*
* @param txId Transaction id.
*/
private void unlockAll(UUID txId) {
lockManager.locks(txId).forEachRemaining(lockManager::release);
}

/** {@inheritDoc} */
@Override
public boolean changeState(UUID txId, TxState before, TxState after) {
Expand All @@ -119,7 +110,7 @@ public boolean changeState(UUID txId, TxState before, TxState after) {
/** {@inheritDoc} */
@Override
public CompletableFuture<Lock> writeLock(UUID lockId, ByteBuffer keyData, UUID txId) {
// TODO IGNITE-15933 process tx messages in striped fasion to avoid races. But locks can be acquired from any thread !
// TODO IGNITE-15933 process tx messages in strips to avoid races. But locks can be acquired from any thread !
TxState state = state(txId);

if (state != null && state != TxState.PENDING) {
Expand Down