From 8a07a47327c7a929a5fa964068c76113f25fae2e Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Fri, 7 Oct 2022 14:24:36 +0300 Subject: [PATCH 1/9] IGNITE-17839 Introduce async txn closures. --- .../apache/ignite/tx/IgniteTransactions.java | 23 +++++- .../ignite/internal/table/TxAbstractTest.java | 75 ++++++++++++++++++- .../internal/tx/impl/TxManagerImpl.java | 11 +-- 3 files changed, 95 insertions(+), 14 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java index 770892b5a75..bf829bafbd8 100644 --- a/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java +++ b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java @@ -84,6 +84,7 @@ public interface IgniteTransactions { /** * Executes a closure within a transaction. * + *

Please make sure all asynchronous operations are enlisted into the transaction before returning from the callback. *

If the closure is executed normally (no exceptions) the transaction is automatically committed. * * @param clo The closure. @@ -100,11 +101,9 @@ default void runInTransaction(Consumer clo) throws TransactionExcep /** * Executes a closure within a transaction and returns a result. * + *

Please make sure all asynchronous operations are enlisted into the transaction before returning from the callback. *

If the closure is executed normally (no exceptions) the transaction is automatically committed. * - *

This method will automatically enlist all tables into the transaction, but the execution of - * the transaction shouldn't leave starting thread or an exception will be thrown. - * * @param clo The closure. * @param Closure result type. * @return The result. @@ -121,6 +120,7 @@ default T runInTransaction(Function clo) throws TransactionE return ret; } catch (Throwable t) { + // TODO FIXME https://issues.apache.org/jira/browse/IGNITE-17838 Implement auto retries try { tx.rollback(); // Try rolling back on user exception. } catch (Exception e) { @@ -130,4 +130,21 @@ default T runInTransaction(Function clo) throws TransactionE throw t; } } + + /** + * Executes a closure within a transaction asynchronously. + * + *

A returned future must be the last in the asynchronous chain. This means all transaction operations happen before the future + * is completed. + *

If the asynchronous chain resulted in no exception, the commitAsync will be automatically called. + * + * @param clo The closure. + * @param Closure result type. + * @return The result. + */ + default CompletableFuture runInTransactionAsync(Function> clo) { + // Rollback is expected to be called by the failure handling code + // TODO FIXME https://issues.apache.org/jira/browse/IGNITE-17838 Implement auto retries + return beginAsync().thenCompose(tx -> clo.apply(tx).thenCompose(val -> tx.commitAsync().thenApply(ignored -> val))); + } } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java index 1771b25d13f..886d529ff86 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.table; import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.concurrent.CompletableFuture.completedFuture; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -37,6 +38,7 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Flow; @@ -286,6 +288,40 @@ public void testTxClosureKeyValueView() throws TransactionException { assertEquals(5, txManager(accounts).finished()); } + /** + * Tests positive transfer scenario. + */ + @Test + public void testTxClosureAsync() { + double balance1 = 200.; + double balance2 = 300.; + double delta = 50.; + Tuple ret = transferAsync(balance1, balance2, delta).join(); + + RecordView view = accounts.recordView(); + + assertEquals(balance1 - delta, view.get(null, makeKey(1)).doubleValue("balance")); + assertEquals(balance2 + delta, view.get(null, makeKey(2)).doubleValue("balance")); + assertEquals(balance1, ret.doubleValue("balance1")); + assertEquals(balance2, ret.doubleValue("balance2")); + } + + /** + * Tests negative transfer scenario. + */ + @Test + public void testTxClosureAbortAsync() { + double balance1 = 10.; + double balance2 = 300.; + double delta = 50.; + assertThrows(CompletionException.class, () -> transferAsync(balance1, balance2, delta).join()); + + RecordView view = accounts.recordView(); + + assertEquals(balance1, view.get(null, makeKey(1)).doubleValue("balance")); + assertEquals(balance2, view.get(null, makeKey(2)).doubleValue("balance")); + } + @Test public void testBatchPutConcurrently() { Transaction tx = igniteTransactions.begin(); @@ -1646,7 +1682,7 @@ protected LockManager lockManager(Table t) { protected abstract boolean assertPartitionsSame(Table table, int partId); /** - * Validates a balances. + * Validates balances. * * @param rows Rows. * @param expected Expected values. @@ -1661,4 +1697,41 @@ private void validateBalance(Collection rows, double... expected) { assertEquals(v, rows0.get(i).doubleValue("balance")); } } + + /** + * @param balance1 First account initial balance. + * @param balance2 Second account initial balance. + * @param delta Delta. + * @return The future holding tuple with previous balances. + */ + private CompletableFuture transferAsync(double balance1, double balance2, double delta) { + RecordView view = accounts.recordView(); + + view.upsert(null, makeValue(1, balance1)); + view.upsert(null, makeValue(2, balance2)); + + return igniteTransactions.runInTransactionAsync(tx -> { + // Attempt to withdraw from first account. + CompletableFuture fut1 = view.getAsync(tx, makeKey(1)) + .thenCompose(val1 -> { + double prev = val1.doubleValue("balance"); + double balance = prev - delta; + + if (balance < 0) + return tx.rollbackAsync().thenApply(ignored -> null); + + return view.upsertAsync(tx, makeValue(1, balance)).thenApply(ignored -> prev); + }); + + // Optimistically deposit to second account. + CompletableFuture fut2 = view.getAsync(tx, makeKey(2)) + .thenCompose(val2 -> { + double prev = val2.doubleValue("balance"); + return view.upsertAsync(tx, makeValue(2, prev + delta)).thenApply(ignored -> prev); + }); + + return fut1.thenCompose(val1 -> fut2.thenCompose(val2 -> + completedFuture(Tuple.create().set("balance1", val1).set("balance2", val2)))); + }); + } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index d6e8473e4cb..95a5f2186d4 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -95,15 +95,6 @@ public TxState state(UUID txId) { return states.get(txId); } - /** - * Unlocks all locks for the timestamp. - * - * @param txId Transaction id. - */ - private void unlockAll(UUID txId) { - lockManager.locks(txId).forEachRemaining(lockManager::release); - } - /** {@inheritDoc} */ @Override public boolean changeState(UUID txId, TxState before, TxState after) { @@ -119,7 +110,7 @@ public boolean changeState(UUID txId, TxState before, TxState after) { /** {@inheritDoc} */ @Override public CompletableFuture writeLock(UUID lockId, ByteBuffer keyData, UUID txId) { - // TODO IGNITE-15933 process tx messages in striped fasion to avoid races. But locks can be acquired from any thread ! + // TODO IGNITE-15933 process tx messages in strips to avoid races. But locks can be acquired from any thread ! TxState state = state(txId); if (state != null && state != TxState.PENDING) { From 8afc5fdcef17cae2e9a02c933aeb8a692a89471b Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Mon, 10 Oct 2022 11:39:31 +0300 Subject: [PATCH 2/9] IGNITE-17839 Disable hanging test. --- .../java/org/apache/ignite/internal/table/TxAbstractTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java index 886d529ff86..03564ed7e1a 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java @@ -309,7 +309,7 @@ public void testTxClosureAsync() { /** * Tests negative transfer scenario. */ - @Test + @Disabled("https://issues.apache.org/jira/browse/IGNITE-17861") public void testTxClosureAbortAsync() { double balance1 = 10.; double balance2 = 300.; From d29015e7c63a56968c380ba2af3bf8f1be4ba485 Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Mon, 10 Oct 2022 13:36:58 +0300 Subject: [PATCH 3/9] IGNITE-17839 Fixing style. --- .../main/java/org/apache/ignite/tx/IgniteTransactions.java | 3 +++ .../org/apache/ignite/internal/table/TxAbstractTest.java | 5 ++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java index bf829bafbd8..db632645b24 100644 --- a/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java +++ b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java @@ -85,6 +85,7 @@ public interface IgniteTransactions { * Executes a closure within a transaction. * *

Please make sure all asynchronous operations are enlisted into the transaction before returning from the callback. + * *

If the closure is executed normally (no exceptions) the transaction is automatically committed. * * @param clo The closure. @@ -102,6 +103,7 @@ default void runInTransaction(Consumer clo) throws TransactionExcep * Executes a closure within a transaction and returns a result. * *

Please make sure all asynchronous operations are enlisted into the transaction before returning from the callback. + * *

If the closure is executed normally (no exceptions) the transaction is automatically committed. * * @param clo The closure. @@ -136,6 +138,7 @@ default T runInTransaction(Function clo) throws TransactionE * *

A returned future must be the last in the asynchronous chain. This means all transaction operations happen before the future * is completed. + * *

If the asynchronous chain resulted in no exception, the commitAsync will be automatically called. * * @param clo The closure. diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java index 03564ed7e1a..234d5e71b77 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java @@ -1699,6 +1699,8 @@ private void validateBalance(Collection rows, double... expected) { } /** + * Transfers money between accounts. + * * @param balance1 First account initial balance. * @param balance2 Second account initial balance. * @param delta Delta. @@ -1717,8 +1719,9 @@ private CompletableFuture transferAsync(double balance1, double balance2, double prev = val1.doubleValue("balance"); double balance = prev - delta; - if (balance < 0) + if (balance < 0) { return tx.rollbackAsync().thenApply(ignored -> null); + } return view.upsertAsync(tx, makeValue(1, balance)).thenApply(ignored -> prev); }); From 414dc449a3e1e6cf2438be4d51c3a2ce4c21398f Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Tue, 11 Oct 2022 18:14:25 +0300 Subject: [PATCH 4/9] IGNITE-17839 Improving docs. --- .../apache/ignite/tx/IgniteTransactions.java | 43 +++++++++++++++- .../ignite/internal/table/TxAbstractTest.java | 50 +++++++++++++++++++ 2 files changed, 91 insertions(+), 2 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java index db632645b24..cf1d034e487 100644 --- a/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java +++ b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java @@ -102,7 +102,30 @@ default void runInTransaction(Consumer clo) throws TransactionExcep /** * Executes a closure within a transaction and returns a result. * - *

Please make sure all asynchronous operations are enlisted into the transaction before returning from the callback. + *

This method expects that all transaction operations are completed before the closure returns. The safest way to achieve that is + * to use synchronous table API. + * + *

Take care then using the asynchronous operations inside the closure. For example, the following snippet is incorrect, + * because the last operation goes out of the scope of the closure unfinished: + *

+     * {@code
+     * igniteTransactions.runInTransaction(tx -> {
+     *     var key = Tuple.create().set("accountId", 1);
+     *     Tuple acc = view.get(tx, key);
+     *        view.upsertAsync(tx, Tuple.create().set("accountId", 1).set("balance", acc.longValue("balance") + 100));
+     *     });
+     * }
+     * 
+ * + *

The correct variant will be + *

+     * {@code
+     * igniteTransactions.runInTransaction(tx -> {
+     *     view.getAsync(tx, Tuple.create().set("accountId", 1)).thenCompose(acc ->
+     *         view.upsertAsync(tx, Tuple.create().set("accountId", 1).set("balance", acc.longValue("balance") + 100))).join();
+     *     });
+     * }
+     * 
* *

If the closure is executed normally (no exceptions) the transaction is automatically committed. * @@ -139,6 +162,13 @@ default T runInTransaction(Function clo) throws TransactionE *

A returned future must be the last in the asynchronous chain. This means all transaction operations happen before the future * is completed. * + *

Consider the example: + *

+     * {@code
+     *     igniteTransactions.runInTransactionAsync(tx -> view.getAsync(tx, Tuple.create().set("accountId", 1)).thenCompose(
+     *         acc -> view.upsertAsync(tx, Tuple.create().set("accountId", 1).set("balance", acc.longValue("balance") + 100))));
+     * }
+     * 
*

If the asynchronous chain resulted in no exception, the commitAsync will be automatically called. * * @param clo The closure. @@ -148,6 +178,15 @@ default T runInTransaction(Function clo) throws TransactionE default CompletableFuture runInTransactionAsync(Function> clo) { // Rollback is expected to be called by the failure handling code // TODO FIXME https://issues.apache.org/jira/browse/IGNITE-17838 Implement auto retries - return beginAsync().thenCompose(tx -> clo.apply(tx).thenCompose(val -> tx.commitAsync().thenApply(ignored -> val))); + return beginAsync().thenCompose(tx -> { + try { + return clo.apply(tx).thenCompose(val -> tx.commitAsync().thenApply(ignored -> val)); + } catch (Exception e) { + return tx.rollbackAsync().exceptionally(e0 -> { + e.addSuppressed(e0); + return null; + }).thenCompose(ignored -> CompletableFuture.failedFuture(e)); + } + }); } } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java index 234d5e71b77..5a0ae0f2e52 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java @@ -322,6 +322,56 @@ public void testTxClosureAbortAsync() { assertEquals(balance2, view.get(null, makeKey(2)).doubleValue("balance")); } + /** + * Tests uncaught exception in the closure. + */ + @Test + public void testTxClosureUncaughtExceptionAsync() { + double balance = 10.; + double delta = 50.; + + RecordView view = accounts.recordView(); + view.upsert(null, makeValue(1, balance)); + + CompletableFuture fut0 = igniteTransactions.runInTransactionAsync(tx -> { + CompletableFuture fut = view.getAsync(tx, makeKey(1)) + .thenCompose(val2 -> { + double prev = val2.doubleValue("balance"); + return view.upsertAsync(tx, makeValue(1, delta + 20)).thenApply(ignored -> prev); + }); + + fut.join(); + + if (true) + throw new IllegalArgumentException(); + + return fut; + }); + + var err = assertThrows(CompletionException.class, fut0::join); + assertEquals(IllegalArgumentException.class, err.getCause().getClass()); + assertEquals(balance, view.get(null, makeKey(1)).doubleValue("balance")); + } + + /** + * Tests uncaught exception in the chain. + */ + @Test + public void testTxClosureUncaughtExceptionInChainAsync() { + RecordView view = accounts.recordView(); + + CompletableFuture fut0 = igniteTransactions.runInTransactionAsync(tx -> { + return view.getAsync(tx, makeKey(2)) + .thenCompose(val2 -> { + double prev = val2.doubleValue("balance"); // val2 is null - NPE is thrown here + return view.upsertAsync(tx, makeValue(1, 100)).thenApply(ignored -> prev); + }); + }); + + var err = assertThrows(CompletionException.class, fut0::join); + assertEquals(NullPointerException.class, err.getCause().getClass()); + } + @Test public void testBatchPutConcurrently() { Transaction tx = igniteTransactions.begin(); From 8589d8d860f6a0e130de6d4a78a31abc38863865 Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Wed, 12 Oct 2022 09:00:19 +0300 Subject: [PATCH 5/9] IGNITE-17839 Style fix. --- .../src/main/java/org/apache/ignite/tx/IgniteTransactions.java | 1 + .../java/org/apache/ignite/internal/table/TxAbstractTest.java | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java index cf1d034e487..15161a9612f 100644 --- a/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java +++ b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java @@ -169,6 +169,7 @@ default T runInTransaction(Function clo) throws TransactionE * acc -> view.upsertAsync(tx, Tuple.create().set("accountId", 1).set("balance", acc.longValue("balance") + 100)))); * } * + * *

If the asynchronous chain resulted in no exception, the commitAsync will be automatically called. * * @param clo The closure. diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java index 5a0ae0f2e52..49861338b75 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java @@ -342,8 +342,9 @@ public void testTxClosureUncaughtExceptionAsync() { fut.join(); - if (true) + if (true) { throw new IllegalArgumentException(); + } return fut; }); From 04bdab71030fad2ab7d0a8345994950fabe16ce6 Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Wed, 12 Oct 2022 09:50:56 +0300 Subject: [PATCH 6/9] IGNITE-17839 Minor. --- .../apache/ignite/tx/IgniteTransactions.java | 29 +++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java index 15161a9612f..9eae77ab325 100644 --- a/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java +++ b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java @@ -82,9 +82,32 @@ public interface IgniteTransactions { CompletableFuture beginAsync(); /** - * Executes a closure within a transaction. + * Executes a closure within a transaction and returns a result. + * + *

This method expects that all transaction operations are completed before the closure returns. The safest way to achieve that is + * to use synchronous table API. + * + *

Take care then using the asynchronous operations inside the closure. For example, the following snippet is incorrect, + * because the last operation goes out of the scope of the closure unfinished: + *

+     * {@code
+     * igniteTransactions.runInTransaction(tx -> {
+     *     var key = Tuple.create().set("accountId", 1);
+     *     Tuple acc = view.get(tx, key);
+     *        view.upsertAsync(tx, Tuple.create().set("accountId", 1).set("balance", acc.longValue("balance") + 100));
+     *     });
+     * }
+     * 
* - *

Please make sure all asynchronous operations are enlisted into the transaction before returning from the callback. + *

The correct variant will be: + *

+     * {@code
+     * igniteTransactions.runInTransaction(tx -> {
+     *     view.getAsync(tx, Tuple.create().set("accountId", 1)).thenCompose(acc ->
+     *         view.upsertAsync(tx, Tuple.create().set("accountId", 1).set("balance", acc.longValue("balance") + 100))).join();
+     *     });
+     * }
+     * 
* *

If the closure is executed normally (no exceptions) the transaction is automatically committed. * @@ -117,7 +140,7 @@ default void runInTransaction(Consumer clo) throws TransactionExcep * } * * - *

The correct variant will be + *

The correct variant will be: *

      * {@code
      * igniteTransactions.runInTransaction(tx -> {

From c7a150a2b455e759b19b5ceac9684ac955d4a4f6 Mon Sep 17 00:00:00 2001
From: Alexey Scherbakov 
Date: Wed, 12 Oct 2022 10:40:04 +0300
Subject: [PATCH 7/9] IGNITE-17839 Minor 2.

---
 .../src/main/java/org/apache/ignite/tx/IgniteTransactions.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java
index 9eae77ab325..cd33377f939 100644
--- a/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java
+++ b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java
@@ -82,7 +82,7 @@ public interface IgniteTransactions {
     CompletableFuture beginAsync();
 
     /**
-     * Executes a closure within a transaction and returns a result.
+     * Executes a closure within a transaction.
      *
      * 

This method expects that all transaction operations are completed before the closure returns. The safest way to achieve that is * to use synchronous table API. From c7de5602ea0d1e7a929dbdb9b149a5f9ec2c1819 Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Wed, 12 Oct 2022 17:21:17 +0300 Subject: [PATCH 8/9] IGNITE-17839 Formatting. --- .../org/apache/ignite/tx/IgniteTransactions.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java index cd33377f939..b8d231611f4 100644 --- a/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java +++ b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java @@ -94,8 +94,8 @@ public interface IgniteTransactions { * igniteTransactions.runInTransaction(tx -> { * var key = Tuple.create().set("accountId", 1); * Tuple acc = view.get(tx, key); - * view.upsertAsync(tx, Tuple.create().set("accountId", 1).set("balance", acc.longValue("balance") + 100)); - * }); + * view.upsertAsync(tx, Tuple.create().set("accountId", 1).set("balance", acc.longValue("balance") + 100)); + * }); * } *

* @@ -105,7 +105,7 @@ public interface IgniteTransactions { * igniteTransactions.runInTransaction(tx -> { * view.getAsync(tx, Tuple.create().set("accountId", 1)).thenCompose(acc -> * view.upsertAsync(tx, Tuple.create().set("accountId", 1).set("balance", acc.longValue("balance") + 100))).join(); - * }); + * }); * } * * @@ -135,8 +135,8 @@ default void runInTransaction(Consumer clo) throws TransactionExcep * igniteTransactions.runInTransaction(tx -> { * var key = Tuple.create().set("accountId", 1); * Tuple acc = view.get(tx, key); - * view.upsertAsync(tx, Tuple.create().set("accountId", 1).set("balance", acc.longValue("balance") + 100)); - * }); + * view.upsertAsync(tx, Tuple.create().set("accountId", 1).set("balance", acc.longValue("balance") + 100)); + * }); * } * * @@ -146,7 +146,7 @@ default void runInTransaction(Consumer clo) throws TransactionExcep * igniteTransactions.runInTransaction(tx -> { * view.getAsync(tx, Tuple.create().set("accountId", 1)).thenCompose(acc -> * view.upsertAsync(tx, Tuple.create().set("accountId", 1).set("balance", acc.longValue("balance") + 100))).join(); - * }); + * }); * } * * From 81eacee07b48135d26fcce2a8d46452234ebeda2 Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Wed, 12 Oct 2022 18:03:50 +0300 Subject: [PATCH 9/9] IGNITE-17839 Handle uncaught exception inside chain. --- .../org/apache/ignite/tx/IgniteTransactions.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java index b8d231611f4..de22c0273d0 100644 --- a/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java +++ b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java @@ -17,6 +17,9 @@ package org.apache.ignite.tx; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.function.Function.identity; + import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Function; @@ -200,11 +203,19 @@ default T runInTransaction(Function clo) throws TransactionE * @return The result. */ default CompletableFuture runInTransactionAsync(Function> clo) { - // Rollback is expected to be called by the failure handling code // TODO FIXME https://issues.apache.org/jira/browse/IGNITE-17838 Implement auto retries return beginAsync().thenCompose(tx -> { try { - return clo.apply(tx).thenCompose(val -> tx.commitAsync().thenApply(ignored -> val)); + return clo.apply(tx).handle((res, e) -> { + if (e != null) { + return tx.rollbackAsync().exceptionally(e0 -> { + e.addSuppressed(e0); + return null; + }).thenCompose(ignored -> CompletableFuture.failedFuture(e)); + } + + return completedFuture(res); + }).thenCompose(identity()).thenCompose(val -> tx.commitAsync().thenApply(ignored -> val)); } catch (Exception e) { return tx.rollbackAsync().exceptionally(e0 -> { e.addSuppressed(e0);