From f4c36dcd9dbb509209d14bddc1ea11238a0451d0 Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Mon, 6 May 2024 14:29:59 +0300 Subject: [PATCH 1/7] IGNITE-22130 Fix retries. --- .../ignite/internal/util/ExceptionUtils.java | 24 ++ .../internal/replicator/ReplicaService.java | 30 ++- .../runner/app/ItIgniteNodeRestartTest.java | 3 +- .../ignite/internal/app/IgniteImpl.java | 3 +- .../storage/InternalTableImpl.java | 242 +++++++++--------- .../ignite/distributed/ItTxTestCluster.java | 7 +- 6 files changed, 184 insertions(+), 125 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java index dbad400afa4..63a365689c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java @@ -567,6 +567,30 @@ public static int extractCodeFrom(Throwable t) { return INTERNAL_ERR; } + /** + * Determine if a particular error matches any of passed error codes. + * + * @param t Unwrapped throwable. + * @param code The code. + * @param codes Other codes. + * @return {@code True} if exception allows retry. + */ + public static boolean matchAny(Throwable t, int code, int... codes) { + int errCode = extractCodeFrom(t); + + if (code == errCode) { + return true; + } + + for (int c0 : codes) { + if (c0 == errCode) { + return true; + } + } + + return false; + } + // TODO: https://issues.apache.org/jira/browse/IGNITE-19870 // This method should be removed or re-worked and usages should be changed to IgniteExceptionMapperUtil.mapToPublicException. /** diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java index 245b3b4bdba..059c627cf92 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java @@ -17,17 +17,22 @@ package org.apache.ignite.internal.replicator; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; +import static org.apache.ignite.internal.util.ExceptionUtils.matchAny; import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause; import static org.apache.ignite.internal.util.ExceptionUtils.withCause; import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR; +import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR; import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_TIMEOUT_ERR; +import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeoutException; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.lang.NodeStoppingException; @@ -45,10 +50,14 @@ import org.apache.ignite.internal.replicator.message.ReplicaResponse; import org.apache.ignite.internal.replicator.message.TimestampAware; import org.apache.ignite.network.ClusterNode; +import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; /** The service is intended to execute requests on replicas. */ public class ReplicaService { + /** Retry timeout. */ + private static final int RETRY_TIMEOUT_MILLIS = 10; + /** Message service. */ private final MessagingService messagingService; @@ -59,6 +68,8 @@ public class ReplicaService { private final ReplicationConfiguration replicationConfiguration; + private final ScheduledExecutorService retryExecutor; + /** Requests to retry. */ private final Map> pendingInvokes = new ConcurrentHashMap<>(); @@ -82,7 +93,8 @@ public ReplicaService( messagingService, clock, ForkJoinPool.commonPool(), - replicationConfiguration + replicationConfiguration, + null ); } @@ -93,24 +105,27 @@ public ReplicaService( * @param clock A hybrid logical clock. * @param partitionOperationsExecutor Partition operation executor. * @param replicationConfiguration Replication configuration. + * @param retryExecutor Retry executor. */ public ReplicaService( MessagingService messagingService, HybridClock clock, Executor partitionOperationsExecutor, - ReplicationConfiguration replicationConfiguration + ReplicationConfiguration replicationConfiguration, + @Nullable ScheduledExecutorService retryExecutor ) { this.messagingService = messagingService; this.clock = clock; this.partitionOperationsExecutor = partitionOperationsExecutor; this.replicationConfiguration = replicationConfiguration; + this.retryExecutor = retryExecutor; } /** * Sends request to the replica node. * * @param targetNodeConsistentId A consistent id of the replica node.. - * @param req Replica request. + * @param req Replica request. * @return Response future with either evaluation result or completed exceptionally. * @see NodeStoppingException If either supplier or demander node is stopping. * @see ReplicaUnavailableException If replica with given replication group id doesn't exist or not started yet. @@ -210,7 +225,14 @@ private CompletableFuture sendToReplica(String targetNodeConsistentId, Re return null; }); } else { - res.completeExceptionally(errResp.throwable()); + if (retryExecutor != null && matchAny(unwrapCause(errResp.throwable()), ACQUIRE_LOCK_ERR, REPLICA_MISS_ERR)) { + retryExecutor.schedule( + // Need to resubmit again to pool which is valid for synchronous IO execution. + () -> partitionOperationsExecutor.execute(() -> res.completeExceptionally(errResp.throwable())), + RETRY_TIMEOUT_MILLIS, MILLISECONDS); + } else { + res.completeExceptionally(errResp.throwable()); + } } } else { res.complete((R) ((ReplicaResponse) response).result()); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index b88f8d3b99c..b3653120496 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -391,7 +391,8 @@ private PartialNode startPartialNode( messagingServiceReturningToStorageOperationsPool, hybridClock, threadPoolsManager.partitionOperationsExecutor(), - replicationConfiguration + replicationConfiguration, + threadPoolsManager.commonScheduler() ); var lockManager = new HeapLockManager(); diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index c90b014454a..67c260bc01b 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -610,7 +610,8 @@ public class IgniteImpl implements Ignite { messagingServiceReturningToStorageOperationsPool, clock, threadPoolsManager.partitionOperationsExecutor(), - replicationConfig + replicationConfig, + threadPoolsManager.commonScheduler() ); LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier = partitionIdleSafeTimePropagationPeriodMsSupplier(replicationConfig); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index 4987f375f18..4b0cbeca2f6 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.function.Function.identity; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_DELETE_ALL; import static org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_GET; @@ -29,19 +30,20 @@ import static org.apache.ignite.internal.util.CompletableFutures.completedOrFailedFuture; import static org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import static org.apache.ignite.internal.util.ExceptionUtils.matchAny; +import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow; import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause; import static org.apache.ignite.internal.util.ExceptionUtils.withCause; import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis; import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; +import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR; import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR; import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR; -import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_REPLICA_UNAVAILABLE_ERR; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; -import java.net.ConnectException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.BitSet; @@ -52,12 +54,10 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; @@ -75,8 +75,6 @@ import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; -import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException; -import org.apache.ignite.internal.replicator.exception.ReplicationException; import org.apache.ignite.internal.replicator.message.ReplicaRequest; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowEx; @@ -101,7 +99,6 @@ import org.apache.ignite.internal.table.distributed.replicator.action.RequestType; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.InternalTransaction; -import org.apache.ignite.internal.tx.LockException; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.impl.TransactionInflights; import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage; @@ -110,7 +107,6 @@ import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.internal.utils.PrimaryReplica; -import org.apache.ignite.lang.IgniteException; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterNodeResolver; import org.apache.ignite.tx.TransactionException; @@ -365,16 +361,16 @@ private CompletableFuture enlistInTx( if (implicit) { long ts = (txStartTs == null) ? actualTx.startTimestamp().getPhysical() : txStartTs; - if (exceptionAllowsTxRetry(e) && coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) { + if (exceptionAllowsImplicitTxRetry(e) && coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) { return enlistInTx(row, null, fac, noWriteChecker, ts); } } - throw wrapReplicationException(e); + sneakyThrow(e); } return completedFuture(r); - }).thenCompose(x -> x); + }).thenCompose(identity()); } /** @@ -484,16 +480,16 @@ private CompletableFuture enlistInTx( if (implicit) { long ts = (txStartTs == null) ? actualTx.startTimestamp().getPhysical() : txStartTs; - if (exceptionAllowsTxRetry(e) && coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) { + if (exceptionAllowsImplicitTxRetry(e) && coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) { return enlistInTx(keyRows, null, fac, reducer, noOpChecker, ts); } } - throw wrapReplicationException(e); + sneakyThrow(e); } return completedFuture(r); - }).thenCompose(x -> x); + }).thenCompose(identity()); } private InternalTransaction startImplicitRwTxIfNeeded(@Nullable InternalTransaction tx) { @@ -626,50 +622,71 @@ private CompletableFuture trackingInvoke( || request instanceof MultipleRowPkReplicaRequest && ((MultipleRowPkReplicaRequest) request).requestType() != RW_GET_ALL || request instanceof SwapRowReplicaRequest; - if (write && !full) { - // Track only write requests from explicit transactions. - if (!transactionInflights.addInflight(tx.id(), false)) { - return failedFuture( - new TransactionException(TX_ALREADY_FINISHED_ERR, format( - "Transaction is already finished [tableName={}, partId={}, txState={}].", - tableName, - partId, - tx.state() - ))); - } + if (full) { // Full transaction retries are handled in postEnlist. + return replicaSvc.invoke(primaryReplicaAndConsistencyToken.get1(), request); + } else { + if (write) { // Track only write requests from explicit transactions. + if (!transactionInflights.addInflight(tx.id(), false)) { + return failedFuture( + new TransactionException(TX_ALREADY_FINISHED_ERR, format( + "Transaction is already finished [tableName={}, partId={}, txState={}].", + tableName, + partId, + tx.state() + ))); + } - return replicaSvc.invoke(primaryReplicaAndConsistencyToken.get1(), request).thenApply(res -> { - assert noWriteChecker != null; + return replicaSvc.invoke(primaryReplicaAndConsistencyToken.get1(), request).thenApply(res -> { + assert noWriteChecker != null; - // Remove inflight if no replication was scheduled, otherwise inflight will be removed by delayed response. - if (noWriteChecker.test(res, request)) { - transactionInflights.removeInflight(tx.id()); - } + // Remove inflight if no replication was scheduled, otherwise inflight will be removed by delayed response. + if (noWriteChecker.test(res, request)) { + transactionInflights.removeInflight(tx.id()); + } + + return res; + }).handle((r, e) -> { + if (e != null) { + if (retryOnLockConflict > 0 && matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR)) { + transactionInflights.removeInflight(tx.id()); // Will be retried. + + return trackingInvoke( + tx, + partId, + ignored -> request, + false, + primaryReplicaAndConsistencyToken, + noWriteChecker, + retryOnLockConflict - 1 + ); + } - return res; - }).handle((r, e) -> { - if (e != null) { - if (retryOnLockConflict > 0 && e.getCause() instanceof LockException) { - transactionInflights.removeInflight(tx.id()); // Will be retried. - - return trackingInvoke( - tx, - partId, - ignored -> request, - full, - primaryReplicaAndConsistencyToken, - noWriteChecker, - retryOnLockConflict - 1 - ); + sneakyThrow(e); } - ExceptionUtils.sneakyThrow(e); - } + return completedFuture(r); + }).thenCompose(identity()); + } else { // Explicit reads should be retried too. + return replicaSvc.invoke(primaryReplicaAndConsistencyToken.get1(), request).handle((r, e) -> { + if (e != null) { + if (retryOnLockConflict > 0 && matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR)) { + return trackingInvoke( + tx, + partId, + ignored -> request, + false, + primaryReplicaAndConsistencyToken, + noWriteChecker, + retryOnLockConflict - 1 + ); + } - return completedFuture(r); - }).thenCompose(x -> x); - } else { - return replicaSvc.invoke(primaryReplicaAndConsistencyToken.get1(), request); + sneakyThrow(e); + } + + return completedFuture(r); + }).thenCompose(identity()); + } } } @@ -690,30 +707,25 @@ private CompletableFuture postEnlist(CompletableFuture fut, boolean au if (full) { // Full txn is already finished remotely. Just update local state. txManager.finishFull(observableTimestampTracker, tx0.id(), e == null); - return e != null ? failedFuture(wrapReplicationException(e)) : completedFuture(r); + return e != null ? failedFuture(e) : completedFuture(r); } if (e != null) { - RuntimeException e0 = wrapReplicationException(e); - return tx0.rollbackAsync().handle((ignored, err) -> { if (err != null) { - e0.addSuppressed(err); + e.addSuppressed(err); } - throw e0; + sneakyThrow(e); + return null; }); // Preserve failed state. } else { if (autoCommit) { - return tx0.commitAsync() - .exceptionally(ex -> { - throw wrapReplicationException(ex); - }) - .thenApply(ignored -> r); + return tx0.commitAsync().thenApply(ignored -> r); } else { return completedFuture(r); } } - }).thenCompose(x -> x); + }).thenCompose(identity()); } /** @@ -829,29 +841,24 @@ private CompletableFuture evaluateReadOnlyPrimaryNode( private CompletableFuture postEvaluate(CompletableFuture fut, InternalTransaction tx) { return fut.handle((BiFunction>) (r, e) -> { if (e != null) { - RuntimeException e0 = wrapReplicationException(e); - return tx.finish(false, clock.now()) .handle((ignored, err) -> { - if (err != null) { - e0.addSuppressed(err); + e.addSuppressed(err); } - throw e0; + + sneakyThrow(e); + return null; }); // Preserve failed state. } - return tx.finish(true, clock.now()) - .exceptionally(ex -> { - throw wrapReplicationException(ex); - }) - .thenApply(ignored -> r); - }).thenCompose(x -> x); + return tx.finish(true, clock.now()).thenApply(ignored -> r); + }).thenCompose(identity()); } /** {@inheritDoc} */ @Override - public CompletableFuture get(BinaryRowEx keyRow, InternalTransaction tx) { + public CompletableFuture get(BinaryRowEx keyRow, @Nullable InternalTransaction tx) { if (tx == null) { return evaluateReadOnlyPrimaryNode( keyRow, @@ -882,7 +889,7 @@ public CompletableFuture get(BinaryRowEx keyRow, InternalTransaction .enlistmentConsistencyToken(enlistmentConsistencyToken) .requestType(RW_GET) .timestampLong(clock.nowLong()) - .full(tx == null) + .full(false) .coordinatorId(txo.coordinatorId()) .build(), (res, req) -> false @@ -1022,6 +1029,10 @@ private static boolean allSchemaVersionsSame(Collection row boolean first = true; for (BinaryRow row : rows) { + if (row == null) { + continue; + } + if (first) { schemaVersion = row.schemaVersion(); first = false; @@ -1066,7 +1077,7 @@ private TablePartitionIdMessage serializeTablePartitionId(TablePartitionId id) { /** {@inheritDoc} */ @Override - public CompletableFuture upsert(BinaryRowEx row, InternalTransaction tx) { + public CompletableFuture upsert(BinaryRowEx row, @Nullable InternalTransaction tx) { return enlistInTx( row, tx, @@ -1088,7 +1099,7 @@ public CompletableFuture upsert(BinaryRowEx row, InternalTransaction tx) { /** {@inheritDoc} */ @Override - public CompletableFuture upsertAll(Collection rows, InternalTransaction tx) { + public CompletableFuture upsertAll(Collection rows, @Nullable InternalTransaction tx) { return enlistInTx( rows, tx, @@ -1101,9 +1112,29 @@ public CompletableFuture upsertAll(Collection rows, InternalT /** {@inheritDoc} */ @Override public CompletableFuture updateAll(Collection rows, @Nullable BitSet deleted, int partition) { + return updateAllWithRetry(rows, deleted, partition, null); + } + + /** + * Update all with retry. + * + * @param rows Rows. + * @param deleted Deleted. + * @param partition The partition. + * @param txStartTs Start timestamp. + * @return The future. + */ + private CompletableFuture updateAllWithRetry( + Collection rows, + @Nullable BitSet deleted, + int partition, + @Nullable Long txStartTs + ) { InternalTransaction tx = txManager.begin(observableTimestampTracker); TablePartitionId partGroupId = new TablePartitionId(tableId, partition); + assert rows.stream().allMatch(row -> partitionId(row) == partition) : "Invalid batch for partition " + partition; + CompletableFuture fut = enlistAndInvoke( tx, partition, @@ -1112,7 +1143,20 @@ public CompletableFuture updateAll(Collection rows, @Nullable null ); - return postEnlist(fut, false, tx, true); // Will be committed in one RTT. + // Will be finished in one RTT. + return postEnlist(fut, false, tx, true).handle((r, e) -> { + if (e != null) { + long ts = (txStartTs == null) ? tx.startTimestamp().getPhysical() : txStartTs; + + if (exceptionAllowsImplicitTxRetry(e) && coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) { + return updateAllWithRetry(rows, deleted, partition, ts); + } + + sneakyThrow(e); + } + + return completedFuture(r); + }).thenCompose(identity()); } /** {@inheritDoc} */ @@ -1712,7 +1756,7 @@ private CompletableFuture completeScan( } return fut; - }).thenCompose(Function.identity()); + }).thenCompose(identity()); } /** @@ -2151,34 +2195,6 @@ protected CompletableFuture evaluateReadOnlyRecipientNode(int partI }); } - /** - * Casts any exception type to a client exception, wherein {@link ReplicationException} and {@link LockException} are wrapped to - * {@link TransactionException}, but another exceptions are wrapped to a common exception. The method does not wrap an exception if the - * exception already inherits type of {@link RuntimeException}. - * - * @param e An instance exception to cast to client side one. - * @return {@link IgniteException} An instance of client side exception. - */ - private RuntimeException wrapReplicationException(Throwable e) { - if (e instanceof CompletionException) { - e = e.getCause(); - } - - RuntimeException e0; - - if (e instanceof ReplicationException || e instanceof ConnectException || e instanceof TimeoutException) { - e0 = withCause(TransactionException::new, TX_REPLICA_UNAVAILABLE_ERR, e); - } else if (e instanceof LockException) { - e0 = withCause(TransactionException::new, ACQUIRE_LOCK_ERR, e); - } else if (!(e instanceof RuntimeException)) { - e0 = withCause(IgniteException::new, INTERNAL_ERR, e); - } else { - e0 = (RuntimeException) e; - } - - return e0; - } - @Override public @Nullable PendingComparableValuesTracker getPartitionSafeTimeTracker(int partitionId) { return safeTimeTrackerByPartitionId.get(partitionId); @@ -2265,13 +2281,7 @@ private ReplicaRequest upsertAllInternal( * @param e Exception to check. * @return True if retrying is possible, false otherwise. */ - private static boolean exceptionAllowsTxRetry(Throwable e) { - Throwable ex = unwrapCause(e); - - while (ex instanceof TransactionException && ex.getCause() != null) { - ex = ex.getCause(); - } - - return ex instanceof LockException || ex instanceof PrimaryReplicaMissException; + private static boolean exceptionAllowsImplicitTxRetry(Throwable e) { + return matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR, REPLICA_MISS_ERR); } } diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index a7b3fe44abe..530dcbca3f4 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -247,7 +247,6 @@ public class ItTxTestCluster { protected String localNodeName; private final ClusterNodeResolver nodeResolver = new ClusterNodeResolver() { - @Override public @Nullable ClusterNode getById(String id) { for (ClusterService service : cluster) { @@ -425,7 +424,8 @@ public void prepareCluster() throws Exception { clusterService.messagingService(), clock, partitionOperationsExecutor, - replicationConfiguration + replicationConfiguration, + executor )); replicaServices.put(node.name(), replicaSvc); @@ -981,7 +981,8 @@ private void startClient() throws InterruptedException { client.messagingService(), clientClock, partitionOperationsExecutor, - replicationConfiguration + replicationConfiguration, + executor )); LOG.info("The client has been started"); From 00c429b1d846de5978112f13669d15a9cc960e98 Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Mon, 6 May 2024 16:31:37 +0300 Subject: [PATCH 2/7] IGNITE-22130 Fix tests. --- .../org/apache/ignite/internal/table/ItTableScanTest.java | 6 +++++- .../internal/rebalance/ItRebalanceDistributedTest.java | 3 ++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java index d1b31ab5961..9fd16e4dc55 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java @@ -135,7 +135,11 @@ public void afterTest() { private void checkResourcesAreReleased(IgniteImpl ignite) { checkCursorsAreClosed(ignite); - assertTrue(ignite.txManager().lockManager().isEmpty()); + try { + assertTrue(waitForCondition(() -> ignite.txManager().lockManager().isEmpty(), 1000)); + } catch (InterruptedException e) { + fail("Unexpected interruption"); + } } /** diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index 0c742a98016..e08d866cd05 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -1098,7 +1098,8 @@ private class Node { clusterService.messagingService(), hybridClock, threadPoolsManager.partitionOperationsExecutor(), - replicationConfiguration + replicationConfiguration, + threadPoolsManager.commonScheduler() ); var resourcesRegistry = new RemotelyTriggeredResourceRegistry(); From d0d0bb95fa99561995e37f81f198183161d1856d Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Mon, 6 May 2024 17:09:39 +0300 Subject: [PATCH 3/7] IGNITE-22130 Remove wrapReplicationException. --- .../internal/replicator/ReplicaService.java | 7 +-- .../TransactionExceptionMapperProvider.java | 46 +++++++++++++++++++ 2 files changed, 50 insertions(+), 3 deletions(-) create mode 100644 modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java index 059c627cf92..20fbf12ab13 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java @@ -27,6 +27,7 @@ import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_TIMEOUT_ERR; import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR; +import java.net.ConnectException; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -246,7 +247,7 @@ private CompletableFuture sendToReplica(String targetNodeConsistentId, Re /** * Sends a request to the given replica {@code node} and returns a future that will be completed with a result of request processing. * - * @param node Replica node. + * @param node Replica node. * @param request Request. * @return Response future with either evaluation result or completed exceptionally. * @see NodeStoppingException If either supplier or demander node is stopping. @@ -274,8 +275,8 @@ public CompletableFuture invoke(String replicaConsistentId, ReplicaReques /** * Sends a request to the given replica {@code node} and returns a future that will be completed with a result of request processing. * - * @param node Replica node. - * @param request Request. + * @param node Replica node. + * @param request Request. * @param storageId Storage id. * @return Response future with either evaluation result or completed exceptionally. * @see NodeStoppingException If either supplier or demander node is stopping. diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java new file mode 100644 index 00000000000..7bd924599ce --- /dev/null +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx; + +import static org.apache.ignite.internal.lang.IgniteExceptionMapper.unchecked; + +import com.google.auto.service.AutoService; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.apache.ignite.internal.lang.IgniteExceptionMapper; +import org.apache.ignite.internal.lang.IgniteExceptionMappersProvider; +import org.apache.ignite.internal.replicator.exception.ReplicationException; +import org.apache.ignite.tx.TransactionException; + +/** + * SQL module exception mapper. + */ +@AutoService(IgniteExceptionMappersProvider.class) +public class TransactionExceptionMapperProvider implements IgniteExceptionMappersProvider { + @Override + public Collection> mappers() { + List> mappers = new ArrayList<>(); + + mappers.add(unchecked(LockException.class, err -> new TransactionException(err.traceId(), err.code(), err.getMessage(), err))); + mappers.add(unchecked(ReplicationException.class, + err -> new TransactionException(err.traceId(), err.code(), err.getMessage(), err))); + + return mappers; + } +} From 6175e954ff21a570af30155bcdcd1c653f51c53f Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Mon, 6 May 2024 17:11:29 +0300 Subject: [PATCH 4/7] IGNITE-22130 Remove wrapReplicationException. --- .../org/apache/ignite/internal/replicator/ReplicaService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java index 20fbf12ab13..29faa1fb10b 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java @@ -27,7 +27,6 @@ import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_TIMEOUT_ERR; import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR; -import java.net.ConnectException; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; From e00fb94597ef2082796653b89e22b169e1c8c99e Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Mon, 6 May 2024 17:11:55 +0300 Subject: [PATCH 5/7] IGNITE-22130 Remove wrapReplicationException. --- .../ignite/internal/tx/TransactionExceptionMapperProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java index 7bd924599ce..7c0dc5c40c5 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java @@ -29,7 +29,7 @@ import org.apache.ignite.tx.TransactionException; /** - * SQL module exception mapper. + * Transaction module exception mapper. */ @AutoService(IgniteExceptionMappersProvider.class) public class TransactionExceptionMapperProvider implements IgniteExceptionMappersProvider { From 8e1147b0803a18f6b7de0090bb0d5b10a67804a6 Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Tue, 7 May 2024 15:55:21 +0300 Subject: [PATCH 6/7] IGNITE-22130 Fix style. --- .../internal/table/distributed/storage/InternalTableImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index 4b0cbeca2f6..77e91f0f9cc 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -103,7 +103,6 @@ import org.apache.ignite.internal.tx.impl.TransactionInflights; import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage; import org.apache.ignite.internal.util.CollectionUtils; -import org.apache.ignite.internal.util.ExceptionUtils; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.internal.utils.PrimaryReplica; From b9754dc3305cb0afdf1ce6cc3941497fd48bd88b Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Wed, 8 May 2024 17:26:48 +0300 Subject: [PATCH 7/7] IGNITE-22130 Add nullable. --- .../org/apache/ignite/internal/replicator/ReplicaService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java index 29faa1fb10b..a174eae3b1d 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java @@ -68,7 +68,7 @@ public class ReplicaService { private final ReplicationConfiguration replicationConfiguration; - private final ScheduledExecutorService retryExecutor; + private @Nullable final ScheduledExecutorService retryExecutor; /** Requests to retry. */ private final Map> pendingInvokes = new ConcurrentHashMap<>();