diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java index ca2dcdc2b98f7..186c87cb46dee 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java @@ -44,6 +44,8 @@ import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.IgniteCodeGeneratingFail; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.mvcc.DeadlockProbe; +import org.apache.ignite.internal.processors.cache.mvcc.ProbedTx; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; @@ -168,6 +170,9 @@ public static void main(String[] args) throws Exception { MessageCodeGenerator gen = new MessageCodeGenerator(srcDir); + gen.generateAndWrite(ProbedTx.class); + gen.generateAndWrite(DeadlockProbe.class); + // gen.generateAll(true); // gen.generateAndWrite(GridCacheMessage.class); diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 6b16de5cd7db3..5027ebcfb9fd0 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -260,6 +260,18 @@ public final class IgniteSystemProperties { */ public static final String IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT = "IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT"; + /** + * Specifies delay in milliseconds before starting deadlock detection procedure when tx encounters locked key. + *

+ * Following values could be used: + *

+ */ + public static final String IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY = "IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY"; + /** * System property to enable pending transaction tracker. * Affects impact of {@link IgniteSystemProperties#IGNITE_DISABLE_WAL_DURING_REBALANCING} property: diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java index 437ee4dca23a3..5677178568e3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java @@ -139,7 +139,10 @@ public enum GridTopic { TOPIC_GEN_ENC_KEY, /** */ - TOPIC_SERVICES; + TOPIC_SERVICES, + + /** */ + TOPIC_DEADLOCK_DETECTION; /** Enum values. */ private static final GridTopic[] VALS = values(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index be467f5f62e93..b3772705bf613 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -124,8 +124,10 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest; +import org.apache.ignite.internal.processors.cache.mvcc.DeadlockProbe; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotWithoutTxs; import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionImpl; +import org.apache.ignite.internal.processors.cache.mvcc.ProbedTx; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryCntr; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryId; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTx; @@ -1132,6 +1134,16 @@ public GridIoMessageFactory(MessageFactory[] ext) { break; + case 170: + msg = new DeadlockProbe(); + + break; + + case 171: + msg = new ProbedTx(); + + break; + // [-3..119] [124..129] [-23..-28] [-36..-55] - this // [120..123] - DR // [-4..-22, -30..-35] - SQL diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index d659aa5c7db91..be1b9e2495f11 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1151,7 +1151,7 @@ else if (res.resultType() == ResultType.LOCKED) { GridFutureAdapter resFut = new GridFutureAdapter<>(); - IgniteInternalFuture lockFut = cctx.kernalContext().coordinators().waitFor(cctx, lockVer); + IgniteInternalFuture lockFut = cctx.kernalContext().coordinators().waitForLock(cctx, mvccVer, lockVer); lockFut.listen(new MvccUpdateLockListener(tx, this, affNodeId, topVer, val, ttl0, mvccVer, op, needHistory, noCreate, resFut, needOldVal, filter, retVal, entryProc, invokeArgs)); @@ -1300,7 +1300,7 @@ else if (res.resultType() == ResultType.LOCKED) { GridFutureAdapter resFut = new GridFutureAdapter<>(); - IgniteInternalFuture lockFut = cctx.kernalContext().coordinators().waitFor(cctx, lockVer); + IgniteInternalFuture lockFut = cctx.kernalContext().coordinators().waitForLock(cctx, mvccVer, lockVer); lockFut.listen(new MvccRemoveLockListener(tx, this, affNodeId, topVer, mvccVer, needHistory, resFut, needOldVal, retVal, filter)); @@ -1392,7 +1392,7 @@ else if (res.resultType() == ResultType.LOCKED) { GridFutureAdapter resFut = new GridFutureAdapter<>(); - IgniteInternalFuture lockFut = cctx.kernalContext().coordinators().waitFor(cctx, lockVer); + IgniteInternalFuture lockFut = cctx.kernalContext().coordinators().waitForLock(cctx, mvccVer, lockVer); lockFut.listen(new MvccAcquireLockListener(tx, this, mvccVer, resFut)); @@ -5193,7 +5193,7 @@ else if (res.resultType() == ResultType.FILTERED) { else if (res.resultType() == ResultType.LOCKED) { entry.unlockEntry(); - IgniteInternalFuture lockFuture = cctx.kernalContext().coordinators().waitFor(cctx, res.resultVersion()); + IgniteInternalFuture lockFuture = cctx.kernalContext().coordinators().waitForLock(cctx, mvccVer, res.resultVersion()); lockFuture.listen(this); @@ -5323,7 +5323,7 @@ private static class MvccAcquireLockListener implements IgniteInClosure { /** Database manager. */ private IgniteCacheDatabaseSharedManager dbMgr; - /** Snp manager. */ + /** Snapshot manager. */ private IgniteCacheSnapshotManager snpMgr; /** Page store manager. {@code Null} if persistence is not enabled. */ @@ -127,12 +128,15 @@ public class GridCacheSharedContext { /** Ttl cleanup manager. */ private GridCacheSharedTtlCleanupManager ttlMgr; - /** */ + /** Partitons evict manager. */ private PartitionsEvictManager evictMgr; /** Mvcc caching manager. */ private MvccCachingManager mvccCachingMgr; + /** Deadlock detection manager. */ + private DeadlockDetectionManager deadlockDetectionMgr; + /** Cache contexts map. */ private ConcurrentHashMap> ctxMap; @@ -187,12 +191,17 @@ public class GridCacheSharedContext { * @param walMgr WAL manager. {@code Null} if persistence is not enabled. * @param walStateMgr WAL state manager. * @param depMgr Deployment manager. + * @param dbMgr Database manager. + * @param snpMgr Snapshot manager. * @param exchMgr Exchange manager. * @param affMgr Affinity manager. * @param ioMgr IO manager. * @param ttlMgr Ttl cleanup manager. + * @param evictMgr Partitons evict manager. * @param jtaMgr JTA manager. * @param storeSesLsnrs Store session listeners. + * @param mvccCachingMgr Mvcc caching manager. + * @param deadlockDetectionMgr Deadlock detection manager. */ public GridCacheSharedContext( GridKernalContext kernalCtx, @@ -212,7 +221,8 @@ public GridCacheSharedContext( PartitionsEvictManager evictMgr, CacheJtaManagerAdapter jtaMgr, Collection storeSesLsnrs, - MvccCachingManager mvccCachingMgr + MvccCachingManager mvccCachingMgr, + DeadlockDetectionManager deadlockDetectionMgr ) { this.kernalCtx = kernalCtx; @@ -233,7 +243,8 @@ public GridCacheSharedContext( ioMgr, ttlMgr, evictMgr, - mvccCachingMgr + mvccCachingMgr, + deadlockDetectionMgr ); this.storeSesLsnrs = storeSesLsnrs; @@ -400,8 +411,8 @@ void onReconnected(boolean active) throws IgniteCheckedException { ioMgr, ttlMgr, evictMgr, - mvccCachingMgr - ); + mvccCachingMgr, + deadlockDetectionMgr); this.mgrs = mgrs; @@ -428,20 +439,7 @@ private boolean restartOnDisconnect(GridCacheSharedManager mgr) { return mgr instanceof GridCacheDeploymentManager || mgr instanceof GridCachePartitionExchangeManager; } - /** - * @param mgrs Managers list. - * @param txMgr Transaction manager. - * @param jtaMgr JTA manager. - * @param verMgr Version manager. - * @param mvccMgr MVCC manager. - * @param pageStoreMgr Page store manager. {@code Null} if persistence is not enabled. - * @param walStateMgr WAL state manager. - * @param depMgr Deployment manager. - * @param exchMgr Exchange manager. - * @param affMgr Affinity manager. - * @param ioMgr IO manager. - * @param ttlMgr Ttl cleanup manager. - */ + /** */ @SuppressWarnings("unchecked") private void setManagers( List> mgrs, @@ -460,8 +458,8 @@ private void setManagers( GridCacheIoManager ioMgr, GridCacheSharedTtlCleanupManager ttlMgr, PartitionsEvictManager evictMgr, - MvccCachingManager mvccCachingMgr - ) { + MvccCachingManager mvccCachingMgr, + DeadlockDetectionManager deadlockDetectionMgr) { this.mvccMgr = add(mgrs, mvccMgr); this.verMgr = add(mgrs, verMgr); this.txMgr = add(mgrs, txMgr); @@ -478,6 +476,7 @@ private void setManagers( this.ttlMgr = add(mgrs, ttlMgr); this.evictMgr = add(mgrs, evictMgr); this.mvccCachingMgr = add(mgrs, mvccCachingMgr); + this.deadlockDetectionMgr = add(mgrs, deadlockDetectionMgr); } /** @@ -830,6 +829,13 @@ public MvccCachingManager mvccCaching() { return mvccCachingMgr; } + /** + * @return Deadlock detection manager. + */ + public DeadlockDetectionManager deadlockDetectionMgr() { + return deadlockDetectionMgr; + } + /** * @return Node ID. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java index 25242c62a21cd..3f53b48220383 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java @@ -508,6 +508,8 @@ private void continueLoop(boolean ignoreCntr) { updateFut.listen(new CI1>() { @Override public void apply(IgniteInternalFuture fut) { try { + tx.incrementLockCounter(); + processEntry(entry0, op, fut.get(), val0, backups0); continueLoop(true); @@ -523,6 +525,8 @@ private void continueLoop(boolean ignoreCntr) { } } + tx.incrementLockCounter(); + processEntry(entry, op, res, val0, backups); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 6ef9000b05e39..daaa5b492580a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; +import org.apache.ignite.internal.processors.cache.transactions.TxCounters; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridLeanMap; @@ -940,4 +941,20 @@ protected final IgniteInternalFuture chainOnePhasePre return GridToStringBuilder.toString(GridDhtTxLocalAdapter.class, this, "nearNodes", nearMap.keySet(), "dhtNodes", dhtMap.keySet(), "explicitLock", explicitLock, "super", super.toString()); } + + /** + * Increments lock counter. + */ + public void incrementLockCounter() { + txCounters(true).incrementLockCounter(); + } + + /** + * @return Current value of lock counter. + */ + public int lockCounter() { + TxCounters txCntrs = txCounters(false); + + return txCntrs != null ? txCntrs.lockCounter() : 0; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java index b78248336a0b8..afed6e037819a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache.distributed.near; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.ignite.IgniteCacheRestartingException; @@ -484,6 +486,11 @@ protected long remainingTime() throws IgniteTxTimeoutCheckedException { */ protected abstract void map(boolean topLocked); + /** + * @return Nodes from which current future waits responses. + */ + public abstract Set pendingResponseNodes(); + /** * Lock request timeout object. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java index d98065ddb21d5..87d2dc6e03b2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java @@ -22,11 +22,13 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.stream.Collectors; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; @@ -164,7 +166,7 @@ private void sendNextBatches(@Nullable UUID nodeId) { boolean first = (nodeId != null); // Need to unlock topology to avoid deadlock with binary descriptors registration. - if(!topLocked && cctx.topology().holdsLock()) + if (!topLocked && cctx.topology().holdsLock()) cctx.topology().readUnlock(); for (Batch batch : next) { @@ -618,6 +620,14 @@ else if (res0.success() && !res.result().success()) return true; } + /** {@inheritDoc} */ + @Override public Set pendingResponseNodes() { + return batches.entrySet().stream() + .filter(e -> e.getValue().ready()) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridNearTxEnlistFuture.class, this, super.toString()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java index b0a83dc05168d..deeb8b733d79c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java @@ -18,7 +18,10 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.util.Collection; +import java.util.Collections; +import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; @@ -135,7 +138,7 @@ protected GridNearTxQueryEnlistFuture( boolean clientFirst = false; // Need to unlock topology to avoid deadlock with binary descriptors registration. - if(!topLocked && cctx.topology().holdsLock()) + if (!topLocked && cctx.topology().holdsLock()) cctx.topology().readUnlock(); for (ClusterNode node : F.view(primary, F.remoteNodes(cctx.localNodeId()))) { @@ -329,6 +332,19 @@ public void onResult(UUID nodeId, GridNearTxQueryEnlistResponse res) { mini.onResult(res, null); } + /** {@inheritDoc} */ + @Override public Set pendingResponseNodes() { + if (initialized() && !isDone()) { + return futures().stream() + .map(MiniFuture.class::cast) + .filter(mini -> !mini.isDone()) + .map(mini -> mini.node.id()) + .collect(Collectors.toSet()); + } + + return Collections.emptySet(); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridNearTxQueryEnlistFuture.class, this, super.toString()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java index e4d74f2942473..e3bbed498a640 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java @@ -22,11 +22,13 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.stream.Collectors; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; @@ -151,7 +153,7 @@ private void sendNextBatches(@Nullable UUID nodeId) { boolean first = (nodeId != null); // Need to unlock topology to avoid deadlock with binary descriptors registration. - if(!topLocked && cctx.topology().holdsLock()) + if (!topLocked && cctx.topology().holdsLock()) cctx.topology().readUnlock(); for (Batch batch : next) { @@ -584,6 +586,14 @@ public boolean checkResponse(UUID nodeId, GridNearTxQueryResultsEnlistResponse r return true; } + /** {@inheritDoc} */ + @Override public Set pendingResponseNodes() { + return batches.entrySet().stream() + .filter(e -> e.getValue().ready()) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridNearTxQueryResultsEnlistFuture.class, this, super.toString()); @@ -674,5 +684,4 @@ public void ready(boolean ready) { this.ready = ready; } } - } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/DeadlockDetectionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/DeadlockDetectionManager.java new file mode 100644 index 0000000000000..9b1969b1031b6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/DeadlockDetectionManager.java @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxAbstractEnlistFuture; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; +import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; + +import static java.util.Collections.singleton; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY; +import static org.apache.ignite.internal.GridTopic.TOPIC_DEADLOCK_DETECTION; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; +import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.belongToSameTx; + +/** + * Component participating in deadlock detection in a cluster. Detection process is collaborative and it is performed + * by relaying special probe messages from waiting transaction to it's blocker. + *

+ * Ideas for used detection algorithm are borrowed from Chandy-Misra-Haas deadlock detection algorithm for resource + * model. + *

+ * Current implementation assumes that transactions obeys 2PL. + */ +public class DeadlockDetectionManager extends GridCacheSharedManagerAdapter { + /** */ + private final long detectionStartDelay = Long.getLong(IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY, 10_000); + + /** {@inheritDoc} */ + @Override protected void start0() throws IgniteCheckedException { + cctx.gridIO().addMessageListener(TOPIC_DEADLOCK_DETECTION, (nodeId, msg, plc) -> { + if (msg instanceof DeadlockProbe) { + if (log.isDebugEnabled()) + log.debug("Received a probe message [msg=" + msg + ']'); + + DeadlockProbe msg0 = (DeadlockProbe)msg; + + handleDeadlockProbe(msg0); + } + else + log.warning("Unexpected message received [node=" + nodeId + ", msg=" + msg + ']'); + }); + } + + /** + * Starts a dedlock detection after a delay. + * + * @param waiterVer Version of the waiting transaction. + * @param blockerVer Version of the waited for transaction. + * @return Cancellable computation. + */ + public DelayedDeadlockComputation initDelayedComputation(MvccVersion waiterVer, MvccVersion blockerVer) { + if (detectionStartDelay < 0) + return null; + + if (detectionStartDelay == 0) { + startComputation(waiterVer, blockerVer); + + return null; + } + + return new DelayedDeadlockComputation(waiterVer, blockerVer, detectionStartDelay); + } + + /** + * Starts a deadlock detection for a given pair of transaction versions (wait-for edge). + * + * @param waiterVer Version of the waiting transaction. + * @param blockerVer Version of the waited for transaction. + */ + private void startComputation(MvccVersion waiterVer, MvccVersion blockerVer) { + if (log.isDebugEnabled()) + log.debug("Starting deadlock detection [waiterVer=" + waiterVer + ", blockerVer=" + blockerVer + ']'); + + Optional waitingTx = findTx(waiterVer); + + Optional blockerTx = findTx(blockerVer); + + if (waitingTx.isPresent() && blockerTx.isPresent()) { + GridDhtTxLocalAdapter wTx = waitingTx.get(); + + GridDhtTxLocalAdapter bTx = blockerTx.get(); + + sendProbe( + bTx.eventNodeId(), + wTx.xidVersion(), + // real start time will be filled later when corresponding near node is visited + singleton(new ProbedTx(wTx.nodeId(), wTx.xidVersion(), wTx.nearXidVersion(), -1, wTx.lockCounter())), + new ProbedTx(bTx.nodeId(), bTx.xidVersion(), bTx.nearXidVersion(), -1, bTx.lockCounter()), + true); + } + } + + /** */ + private Optional findTx(MvccVersion mvccVer) { + return cctx.tm().activeTransactions().stream() + .filter(tx -> tx.local() && tx.mvccSnapshot() != null) + .filter(tx -> belongToSameTx(mvccVer, tx.mvccSnapshot())) + .map(GridDhtTxLocalAdapter.class::cast) + .findAny(); + } + + /** + * Handles received deadlock probe. Possible outcomes: + *

    + *
  1. Deadlock is found.
  2. + *
  3. Probe is relayed to other blocking transactions.
  4. + *
  5. Probe is discarded because receiving transaction is not blocked.
  6. + *
+ * + * @param probe Received probe message. + */ + private void handleDeadlockProbe(DeadlockProbe probe) { + if (probe.nearCheck()) + handleDeadlockProbeForNear(probe); + else + handleDeadlockProbeForDht(probe); + } + + /** */ + private void handleDeadlockProbeForNear(DeadlockProbe probe) { + // a probe is simply discarded if next wait-for edge is not found + ProbedTx blocker = probe.blocker(); + + GridNearTxLocal nearTx = cctx.tm().tx(blocker.nearXidVersion()); + + if (nearTx == null) + return; + + // probe each blocker + for (UUID pendingNodeId : getPendingResponseNodes(nearTx)) { + sendProbe( + pendingNodeId, + probe.initiatorVersion(), + probe.waitChain(), + // real start time is filled here + blocker.withStartTime(nearTx.startTime()), + false); + } + } + + /** */ + private void handleDeadlockProbeForDht(DeadlockProbe probe) { + // a probe is simply discarded if next wait-for edge is not found + cctx.tm().activeTransactions().stream() + .filter(IgniteInternalTx::local) + .filter(tx -> tx.nearXidVersion().equals(probe.blocker().nearXidVersion())) + .findAny() + .map(GridDhtTxLocalAdapter.class::cast) + .ifPresent(tx -> { + // search for locally checked tx (identified as blocker previously) in the wait chain + Optional repeatedTx = probe.waitChain().stream() + .filter(wTx -> wTx.xidVersion().equals(tx.xidVersion())) + .findAny(); + + if (repeatedTx.isPresent()) { + // a deadlock found + resolveDeadlock(probe, repeatedTx.get(), tx); + } + else + relayProbeIfLocalTxIsWaiting(probe, tx); + }); + } + + /** */ + private void resolveDeadlock(DeadlockProbe probe, ProbedTx repeatedTx, GridDhtTxLocalAdapter locTx) { + if (log.isDebugEnabled()) + log.debug("Deadlock detected [probe=" + probe + ']'); + + ProbedTx victim = chooseVictim( + // real start time is filled here for repeated tx + repeatedTx.withStartTime(probe.blocker().startTime()), + probe.waitChain()); + + if (victim.xidVersion().equals(locTx.xidVersion())) { + if (log.isDebugEnabled()) + log.debug("Chosen victim is on local node, tx will be aborted [victim=" + victim + ']'); + + // if a victim tx has made a progress since it was identified as waiting + // it means that detected deadlock was broken by other means (e.g. timeout of another tx) + if (victim.lockCounter() == locTx.lockCounter()) + abortTx(locTx); + } + else { + if (log.isDebugEnabled()) + log.debug("Chosen victim is on remote node, message will be sent [victim=" + victim + ']'); + + // destination node must determine itself as a victim + sendProbe(victim.nodeId(), probe.initiatorVersion(), singleton(victim), victim, false); + } + } + + /** */ + private void relayProbeIfLocalTxIsWaiting(DeadlockProbe probe, GridDhtTxLocalAdapter locTx) { + assert locTx.mvccSnapshot() != null; + + cctx.coordinators().checkWaiting(locTx.mvccSnapshot()) + .flatMap(this::findTx) + .ifPresent(nextBlocker -> { + ArrayList waitChain = new ArrayList<>(probe.waitChain().size() + 1); + waitChain.addAll(probe.waitChain()); + // real start time is filled here + waitChain.add(new ProbedTx(locTx.nodeId(), locTx.xidVersion(), locTx.nearXidVersion(), + probe.blocker().startTime(), locTx.lockCounter())); + + // real start time will be filled later when corresponding near node is visited + ProbedTx nextProbedTx = new ProbedTx(nextBlocker.nodeId(), nextBlocker.xidVersion(), + nextBlocker.nearXidVersion(), -1, nextBlocker.lockCounter()); + + sendProbe( + nextBlocker.eventNodeId(), + probe.initiatorVersion(), + waitChain, + nextProbedTx, + true); + }); + } + + /** + * Chooses victim basing on tx start time. Algorithm chooses victim in such way that every site detected a deadlock + * will choose the same victim. As a result only one tx participating in a deadlock will be aborted. + *

+ * Local tx is needed here because start time for it might not be filled yet for corresponding entry in wait chain. + * + * @param locTx Deadlocked tx on local node. + * @param waitChain Wait chain. + * @return Tx chosen as a victim. + */ + @SuppressWarnings("StatementWithEmptyBody") + private ProbedTx chooseVictim(ProbedTx locTx, Collection waitChain) { + Iterator it = waitChain.iterator(); + + // skip until local tx (inclusive), because txs before are not deadlocked + while (it.hasNext() && !it.next().xidVersion().equals(locTx.xidVersion())); + + ProbedTx victim = locTx; + long maxStartTime = locTx.startTime(); + + while (it.hasNext()) { + ProbedTx tx = it.next(); + + // seek for youngest tx in order to guarantee forward progress + if (tx.startTime() > maxStartTime) { + maxStartTime = tx.startTime(); + victim = tx; + } + // tie-breaking + else if (tx.startTime() == maxStartTime && tx.nearXidVersion().compareTo(victim.nearXidVersion()) > 0) + victim = tx; + } + + return victim; + } + + /** */ + private void abortTx(GridDhtTxLocalAdapter tx) { + cctx.coordinators().failWaiter(tx.mvccSnapshot(), new IgniteTxRollbackCheckedException( + "Deadlock detected. Transaction will be rolled back [tx=" + tx + ']')); + } + + /** */ + private Set getPendingResponseNodes(GridNearTxLocal tx) { + IgniteInternalFuture lockFut = tx.lockFuture(); + + if (lockFut instanceof GridNearTxAbstractEnlistFuture) + return ((GridNearTxAbstractEnlistFuture)lockFut).pendingResponseNodes(); + + return Collections.emptySet(); + } + + /** */ + private void sendProbe(UUID destNodeId, GridCacheVersion initiatorVer, Collection waitChain, + ProbedTx blocker, boolean near) { + + DeadlockProbe probe = new DeadlockProbe(initiatorVer, waitChain, blocker, near); + + if (log.isDebugEnabled()) + log.debug("Sending probe [probe=" + probe + ", destNode=" + destNodeId + ']'); + + try { + cctx.gridIO().sendToGridTopic(destNodeId, TOPIC_DEADLOCK_DETECTION, probe, SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException ignored) { + } + catch (IgniteCheckedException e) { + log.warning("Failed to send a deadlock probe [nodeId=" + destNodeId + ']', e); + } + } + + /** + * Delayed deadlock probe computation which can be cancelled. + */ + public class DelayedDeadlockComputation extends GridTimeoutObjectAdapter { + /** */ + private final MvccVersion waiterVer; + + /** */ + private final MvccVersion blockerVer; + + /** {@inheritDoc} */ + @Override public void onTimeout() { + startComputation(waiterVer, blockerVer); + } + + /** */ + private DelayedDeadlockComputation(MvccVersion waiterVer, MvccVersion blockerVer, long timeout) { + super(timeout); + this.waiterVer = waiterVer; + this.blockerVer = blockerVer; + + cctx.kernalContext().timeout().addTimeoutObject(this); + } + + /** */ + public void cancel() { + cctx.kernalContext().timeout().removeTimeoutObject(this); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/DeadlockProbe.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/DeadlockProbe.java new file mode 100644 index 0000000000000..47bad8cfb96cd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/DeadlockProbe.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.nio.ByteBuffer; +import java.util.Collection; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Probe message travelling between transactions (from waiting to blocking) during deadlock detection. + * @see DeadlockDetectionManager + */ +public class DeadlockProbe implements Message { + /** */ + private static final long serialVersionUID = 0; + + /** */ + private GridCacheVersion initiatorVer; + /** */ + @GridToStringInclude + @GridDirectCollection(ProbedTx.class) + private Collection waitChain; + /** */ + private ProbedTx blocker; + /** */ + private boolean nearCheck; + + /** */ + public DeadlockProbe() { + } + + /** */ + @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") + public DeadlockProbe(GridCacheVersion initiatorVer, Collection waitChain, + ProbedTx blocker, boolean nearCheck) { + this.initiatorVer = initiatorVer; + this.waitChain = waitChain; + this.blocker = blocker; + this.nearCheck = nearCheck; + } + + /** + * @return Identifier of a transaction started a deadlock detection process. Can be used for diagnostics. + */ + public GridCacheVersion initiatorVersion() { + return initiatorVer; + } + + /** + * @return Chain of transactions identified as waiting during deadlock detection. + */ + @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") + public Collection waitChain() { + return waitChain; + } + + /** + * @return Identifier of a transaction identified as blocking last transaction in the wait chain + * during deadlock deteciton. + */ + public ProbedTx blocker() { + return blocker; + } + + /** + * @return {@code True} if checks if near transaction is waiting. {@code False} if checks dht transaction. + */ + public boolean nearCheck() { + return nearCheck; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeMessage("blocker", blocker)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeMessage("initiatorVer", initiatorVer)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeBoolean("nearCheck", nearCheck)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeCollection("waitChain", waitChain, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + blocker = reader.readMessage("blocker"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + initiatorVer = reader.readMessage("initiatorVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + nearCheck = reader.readBoolean("nearCheck"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + waitChain = reader.readCollection("waitChain", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(DeadlockProbe.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 170; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DeadlockProbe.class, this); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java index 9f8f702627953..2910eedd5afb9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.mvcc; +import java.util.Optional; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.configuration.CacheConfiguration; @@ -93,11 +94,15 @@ public interface MvccProcessor extends GridProcessor { boolean hasLocalTransaction(long crd, long cntr); /** + * Stands in the lock wait queue for the current lock holder. + * * @param cctx Cache context. - * @param locked Version the entry is locked by. + * @param waiterVer Version of the waiting tx. + * @param blockerVer Version the entry is locked by. * @return Future, which is completed as soon as the lock is released. */ - IgniteInternalFuture waitFor(GridCacheContext cctx, MvccVersion locked); + IgniteInternalFuture waitForLock(GridCacheContext cctx, MvccVersion waiterVer, + MvccVersion blockerVer); /** * @param locked Version the entry is locked by. @@ -206,4 +211,21 @@ public interface MvccProcessor extends GridProcessor { * @throws IgniteCheckedException If failed to initialize. */ void ensureStarted() throws IgniteCheckedException; + + /** + * Checks whether one tx is waiting for another tx. + * It is assumed that locks on data nodes are requested one by one, so tx can wait only for one another tx here. + * + * @param mvccVer Version of transaction which is checked for being waiting. + * @return Version of tx which blocks checked tx. + */ + Optional checkWaiting(MvccVersion mvccVer); + + /** + * Unfreezes waiter for specific version failing it with passed exception. + * + * @param mvccVer Version of a waiter to fail. + * @param e Exception reflecting failure reason. + */ + void failWaiter(MvccVersion mvccVer, Exception e); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java index f3c563c98db59..b333d33f5c209 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.UUID; @@ -119,6 +120,7 @@ import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_READ_OP_CNTR; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_START_CNTR; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_START_OP_CNTR; +import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.belongToSameTx; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.compare; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.hasNewVersion; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.isVisible; @@ -234,7 +236,7 @@ public MvccProcessorImpl(GridKernalContext ctx) { @Override public void start() throws IgniteCheckedException { ctx.event().addDiscoveryEventListener(this::onDiscovery, EVT_NODE_FAILED, EVT_NODE_LEFT, EVT_NODE_JOINED); - ctx.io().addMessageListener(TOPIC_CACHE_COORDINATOR, new CoordinatorMessageListener()); + ctx.io().addMessageListener(TOPIC_CACHE_COORDINATOR, new MvccMessageListener()); ctx.discovery().setCustomEventListener(DynamicCacheChangeBatch.class, new CustomEventListener() { @@ -634,15 +636,23 @@ private void initialize(MvccCoordinator curCrd0) { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture waitFor(GridCacheContext cctx, MvccVersion locked) { - TxKey key = new TxKey(locked.coordinatorVersion(), locked.counter()); + @Override public IgniteInternalFuture waitForLock(GridCacheContext cctx, MvccVersion waiterVer, + MvccVersion blockerVer) { + TxKey key = new TxKey(blockerVer.coordinatorVersion(), blockerVer.counter()); - LockFuture fut = new LockFuture(cctx.ioPolicy()); + LockFuture fut = new LockFuture(cctx.ioPolicy(), waiterVer); Waiter waiter = waitMap.merge(key, fut, Waiter::concat); if (!waiter.hasLocalTransaction() && (waiter = waitMap.remove(key)) != null) waiter.run(ctx); + else { + DeadlockDetectionManager.DelayedDeadlockComputation delayedComputation + = ctx.cache().context().deadlockDetectionMgr().initDelayedComputation(waiterVer, blockerVer); + + if (delayedComputation != null) + fut.listen(fut0 -> delayedComputation.cancel()); + } return fut; } @@ -1059,13 +1069,11 @@ void startVacuumWorkers() { vacuumWorkers.add(new VacuumScheduler(ctx, log, this)); - for (int i = 0; i < ctx.config().getMvccVacuumThreadCount(); i++) { + for (int i = 0; i < ctx.config().getMvccVacuumThreadCount(); i++) vacuumWorkers.add(new VacuumWorker(ctx, log, cleanupQueue)); - } - for (GridWorker worker : vacuumWorkers) { + for (GridWorker worker : vacuumWorkers) new IgniteThread(worker).start(); - } return; } @@ -1107,9 +1115,8 @@ void stopVacuumWorkers() { if (!queue.isEmpty()) { IgniteCheckedException ex = vacuumCancelledException(); - for (VacuumTask task : queue) { + for (VacuumTask task : queue) task.onDone(ex); - } } } } @@ -1628,17 +1635,35 @@ void onNodeLeft(UUID nodeId) { } } + /** {@inheritDoc} */ + @Override public Optional checkWaiting(MvccVersion mvccVer) { + return waitMap.entrySet().stream() + .filter(e -> e.getValue().lockFuture(mvccVer) != null) + .map(Map.Entry::getKey) + .map(key -> new MvccVersionImpl(key.major(), key.minor(), 0)) + .findAny(); + } + + /** {@inheritDoc} */ + @Override public void failWaiter(MvccVersion mvccVer, Exception e) { + waitMap.values().stream() + .map(w -> w.lockFuture(mvccVer)) + .filter(Objects::nonNull) + .findAny() + .ifPresent(w -> w.onDone(e)); + } + /** * */ - private class CoordinatorMessageListener implements GridMessageListener { + private class MvccMessageListener implements GridMessageListener { /** {@inheritDoc} */ @Override public void onMessage(UUID nodeId, Object msg, byte plc) { MvccMessage msg0 = (MvccMessage)msg; if (msg0.waitForCoordinatorInit() && !initFut.isDone()) { initFut.listen(new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture future) { + @Override public void apply(IgniteInternalFuture fut) { assert curCrd.local(); processMessage(nodeId, msg); @@ -1680,7 +1705,7 @@ else if (msg instanceof MvccRecoveryFinishedMessage) /** {@inheritDoc} */ @Override public String toString() { - return "CoordinatorMessageListener[]"; + return "MvccMessageListener[]"; } } @@ -1782,18 +1807,28 @@ private interface Waiter { * @return {@code True} if it is a compound waiter. */ boolean compound(); + + /** + * @param checkedVer Version of transaction checking for wait. + * @return Lock future corresponding to checked transaction or {@code null} if it is not waiting. + */ + @Nullable GridFutureAdapter lockFuture(MvccVersion checkedVer); } /** */ private static class LockFuture extends GridFutureAdapter implements Waiter, Runnable { /** */ private final byte plc; + /** */ + private final MvccVersion waitingTxVer; /** * @param plc Pool policy. + * @param waitingTxVer Waiting tx version. */ - LockFuture(byte plc) { + LockFuture(byte plc, MvccVersion waitingTxVer) { this.plc = plc; + this.waitingTxVer = waitingTxVer; } /** {@inheritDoc} */ @@ -1804,7 +1839,8 @@ private static class LockFuture extends GridFutureAdapter implements Waite /** {@inheritDoc} */ @Override public void run(GridKernalContext ctx) { try { - ctx.pools().poolForPolicy(plc).execute(this); + if (!isDone()) + ctx.pools().poolForPolicy(plc).execute(this); } catch (IgniteCheckedException e) { U.error(ctx.log(LockFuture.class), e); @@ -1825,6 +1861,11 @@ private static class LockFuture extends GridFutureAdapter implements Waite @Override public boolean compound() { return false; } + + /** {@inheritDoc} */ + @Override public GridFutureAdapter lockFuture(MvccVersion checkedVer) { + return belongToSameTx(waitingTxVer, checkedVer) ? this : null; + } } /** */ @@ -1848,6 +1889,11 @@ private static class LocalTransactionMarker implements Waiter { @Override public boolean compound() { return false; } + + /** {@inheritDoc} */ + @Override public GridFutureAdapter lockFuture(MvccVersion checkedVer) { + return null; + } } /** */ @@ -1888,9 +1934,8 @@ else if (((CompoundWaiter)waiter).inner.getClass() == ArrayList.class) /** {@inheritDoc} */ @Override public void run(GridKernalContext ctx) { if (inner.getClass() == ArrayList.class) { - for (Waiter waiter : (List)inner) { + for (Waiter waiter : (List)inner) waiter.run(ctx); - } } else ((Waiter)inner).run(ctx); @@ -1910,6 +1955,22 @@ else if (((CompoundWaiter)waiter).inner.getClass() == ArrayList.class) @Override public boolean compound() { return true; } + + /** {@inheritDoc} */ + @Override public GridFutureAdapter lockFuture(MvccVersion checkedVer) { + if (inner.getClass() == ArrayList.class) { + for (Waiter waiter : (List)inner) { + GridFutureAdapter waitFut; + + if ((waitFut = waiter.lockFuture(checkedVer)) != null) + return waitFut; + } + + return null; + } + else + return ((Waiter)inner).lockFuture(checkedVer); + } } /** */ @@ -2250,7 +2311,7 @@ private void cleanup(GridDhtLocalPartition part, KeyCacheObject key, List Return type. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/ProbedTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/ProbedTx.java new file mode 100644 index 0000000000000..a13a37d7c56fa --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/ProbedTx.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.nio.ByteBuffer; +import java.util.UUID; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Contains attributes of tx visited during deadlock detection. + */ +public class ProbedTx implements Message { + /** */ + private static final long serialVersionUID = 0; + + /** */ + private UUID nodeId; + /** */ + private GridCacheVersion xidVer; + /** */ + private GridCacheVersion nearXidVer; + /** */ + private long startTime; + /** */ + private int lockCntr; + + /** */ + public ProbedTx() { + } + + /** + * @param nodeId Node on which probed transaction runs. + * @param xidVer Identifier of transaction. + * @param nearXidVer Identifier of near transaction. + * @param startTime Transaction start time. + * @param lockCntr Number of locks acquired by probed transaction at a time of probe handling. + */ + public ProbedTx(UUID nodeId, GridCacheVersion xidVer, GridCacheVersion nearXidVer, long startTime, + int lockCntr) { + this.nodeId = nodeId; + this.xidVer = xidVer; + this.nearXidVer = nearXidVer; + this.startTime = startTime; + this.lockCntr = lockCntr; + } + + /** + * @return Node on which probed transaction runs. + */ + public UUID nodeId() { + return nodeId; + } + + /** + * @return Identifier of transaction. + */ + public GridCacheVersion xidVersion() { + return xidVer; + } + + /** + * @return Identifier of near transaction. + */ + public GridCacheVersion nearXidVersion() { + return nearXidVer; + } + + /** + * @return Transaction start time. + */ + public long startTime() { + return startTime; + } + + /** + * @return Number of locks acquired by probed transaction at a time of probe handling. + */ + public int lockCounter() { + return lockCntr; + } + + /** + * Creates a copy of this instance with modified transaction start time. + * + * @param updStartTime New start time value. + * @return Instance with updated start time. + */ + public ProbedTx withStartTime(long updStartTime) { + return new ProbedTx( + nodeId, + xidVer, + nearXidVer, + updStartTime, + lockCntr + ); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeInt("lockCntr", lockCntr)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeMessage("nearXidVer", nearXidVer)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeUuid("nodeId", nodeId)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeLong("startTime", startTime)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeMessage("xidVer", xidVer)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + lockCntr = reader.readInt("lockCntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + nearXidVer = reader.readMessage("nearXidVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + nodeId = reader.readUuid("nodeId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + startTime = reader.readLong("startTime"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + xidVer = reader.readMessage("xidVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(ProbedTx.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 171; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 5; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ProbedTx.class, this); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java index 6e5759dd88f5e..39ece36a1872e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java @@ -368,8 +368,8 @@ private FileDescriptor readFileDescriptor(File file, FileIOFactory ioFactory) { return new GridCacheSharedContext<>( kernalCtx, null, null, null, null, null, null, dbMgr, null, - null, null, null, null, - null, null,null, null, null + null, null, null, null, null, + null,null, null, null, null ); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java index 550ec09ae00f2..b12ee561a491c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage; import org.jetbrains.annotations.Nullable; @@ -38,6 +39,9 @@ public class TxCounters { /** Final update counters for cache partitions in the end of transaction */ private Collection updCntrs; + /** Counter tracking number of entries locked by tx. */ + private final AtomicInteger lockCntr = new AtomicInteger(); + /** * Accumulates size change for cache partition. * @@ -127,4 +131,18 @@ private AtomicLong accumulator(Map> accMap, in return acc; } + + /** + * Increments lock counter. + */ + public void incrementLockCounter() { + lockCntr.incrementAndGet(); + } + + /** + * @return Current value of lock counter. + */ + public int lockCounter() { + return lockCntr.get(); + } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java index a0eff327bc960..62bc06a31018f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java @@ -242,7 +242,7 @@ private CacheConfiguration cacheConfiguration(CacheAtomicityMode atomicityMode) /** * @throws Exception If failed. */ - @Ignore("https://issues.apache.org/jira/browse/IGNITE-9322") // Fix diagnostic message or disable test. + @Ignore("https://issues.apache.org/jira/browse/IGNITE-10637") // Support diagnostics message or disable test. @Test public void testSeveralLongRunningMvccTxs() throws Exception { checkSeveralLongRunningTxs(TRANSACTIONAL_SNAPSHOT); @@ -364,7 +364,7 @@ private int countTxKeysInASingleBlock(String log) { /** * @throws Exception If failed. */ - @Ignore("https://issues.apache.org/jira/browse/IGNITE-9322") // Fix diagnostic message or disable test. + @Ignore("https://issues.apache.org/jira/browse/IGNITE-10637") // Support diagnostic messages or disable test. @Test public void testLongRunningMvccTx() throws Exception { checkLongRunningTx(TRANSACTIONAL_SNAPSHOT); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java index 0f6c03f3b1e51..cc1cf43e15411 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java @@ -151,6 +151,7 @@ private void checkInvariantSwitchSegmentSize(int serVer) throws Exception { null, null, null, + null, null) ).createSerializer(serVer); @@ -476,6 +477,7 @@ private T2 initiate( null, null, null, + null, null ); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java index f51056f73f867..d48cb0f8a094a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java @@ -80,6 +80,7 @@ public class BPlusTreePageMemoryImplTest extends BPlusTreeSelfTest { null, null, null, + null, null ); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java index 9a7d63b1e901d..333ff63a3fb79 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java @@ -80,6 +80,7 @@ public class BPlusTreeReuseListPageMemoryImplTest extends BPlusTreeReuseSelfTest null, null, null, + null, null ); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java index cbf9dea928c62..89b0cdfeecc8a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java @@ -95,6 +95,7 @@ public class IndexStoragePageMemoryImplTest extends IndexStorageSelfTest { null, null, null, + null, null ); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java index 1190899496f6b..e8f6ead1e48dc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java @@ -89,6 +89,7 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest { null, null, null, + null, null ); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java index 7591dd74b2111..fea7d2b2d32a3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java @@ -315,6 +315,7 @@ private PageMemoryImpl createPageMemory(PageMemoryImpl.ThrottlingPolicy throttli null, null, null, + null, null ); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTopologyChangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTopologyChangeTest.java index 03e72d5edc9ba..b030890824999 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTopologyChangeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTopologyChangeTest.java @@ -93,7 +93,7 @@ public class TxRollbackOnTopologyChangeTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { if (MvccFeatureChecker.forcedMvcc()) - fail("https://issues.apache.org/jira/browse/IGNITE-9322"); //Won't start nodes if the only test mutes. + fail("https://issues.apache.org/jira/browse/IGNITE-10807"); //Won't start nodes if the only test mutes. super.beforeTest(); diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java index ad04ded6fde50..c6d9e18438bd0 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java @@ -83,6 +83,7 @@ public GridCacheTestContext(GridTestKernalContext ctx) throws Exception { new PartitionsEvictManager(), new CacheNoopJtaManager(), null, + null, null ), defaultCacheConfiguration(), @@ -110,4 +111,4 @@ public GridCacheTestContext(GridTestKernalContext ctx) throws Exception { store().initialize(null, new IdentityHashMap()); } -} \ No newline at end of file +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesTest.java index 199cfad017b56..02f091d74dff4 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesTest.java @@ -24,7 +24,7 @@ /** */ public class CacheMvccPartitionedSqlTxQueriesTest extends CacheMvccSqlTxQueriesAbstractTest { /** {@inheritDoc} */ - protected CacheMode cacheMode() { + @Override protected CacheMode cacheMode() { return PARTITIONED; } } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java index 5841b093e8acb..7f0dcf05bcb80 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesAbstractTest.java @@ -67,6 +67,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL; import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL_SUM; @@ -81,6 +82,20 @@ */ @RunWith(JUnit4.class) public abstract class CacheMvccSqlTxQueriesAbstractTest extends CacheMvccAbstractTest { + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + System.setProperty(IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY, "-1"); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + System.clearProperty(IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY); + + super.afterTestsStopped(); + } + /** * @throws Exception If failed. */ diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java index 684631a2b3ea4..f0a586a061645 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlTxQueriesWithReducerAbstractTest.java @@ -44,6 +44,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause; import static org.apache.ignite.testframework.GridTestUtils.runMultiThreaded; @@ -55,6 +56,20 @@ */ @RunWith(JUnit4.class) public abstract class CacheMvccSqlTxQueriesWithReducerAbstractTest extends CacheMvccAbstractTest { + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + System.setProperty(IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY, "-1"); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + System.clearProperty(IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY); + + super.afterTestsStopped(); + } + /** */ private static final int TIMEOUT = 3000; diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccDeadlockDetectionTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccDeadlockDetectionTest.java new file mode 100644 index 0000000000000..2a2d3caa3b822 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccDeadlockDetectionTest.java @@ -0,0 +1,652 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; +import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; +import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.GridTestUtils.SF; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** */ +@RunWith(JUnit4.class) +public class MvccDeadlockDetectionTest extends GridCommonAbstractTest { + /** */ + @BeforeClass + public static void setUpClass() { + System.setProperty(IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY, "0"); + } + + /** */ + @AfterClass + public static void tearDownClass() { + System.clearProperty(IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY); + } + + /** */ + private IgniteEx client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + + return cfg; + } + + /** */ + private void setUpGrids(int n, boolean indexed) throws Exception { + Ignite ign = startGridsMultiThreaded(n); + CacheConfiguration ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setAtomicityMode(TRANSACTIONAL_SNAPSHOT); + if (indexed) + ccfg.setIndexedTypes(Integer.class, Integer.class); + + ign.getOrCreateCache(ccfg); + + G.setClientMode(true); + + client = startGrid(n); + } + + /** + * @throws Exception If failed. + */ + @After + public void cleanupTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + @Test + public void detectSimpleDeadlock() throws Exception { + setUpGrids(2, false); + + Integer key0 = primaryKey(grid(0).cache(DEFAULT_CACHE_NAME)); + Integer key1 = primaryKey(grid(1).cache(DEFAULT_CACHE_NAME)); + + IgniteCache cache = client.cache(DEFAULT_CACHE_NAME); + + assert client.configuration().isClientMode(); + + CyclicBarrier b = new CyclicBarrier(2); + + IgniteInternalFuture fut0 = GridTestUtils.runAsync(() -> { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key0, 0); + b.await(); + cache.put(key1, 1); + + tx.commit(); + } + + return null; + }); + + IgniteInternalFuture fut1 = GridTestUtils.runAsync(() -> { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key1, 1); + b.await(); + cache.put(key0, 0); + + tx.commit(); + } + + return null; + }); + + assertExactlyOneAbortedDueDeadlock(fut0, fut1); + } + + /** + * @throws Exception If failed. + */ + @Test + public void detectSimpleDeadlockFastUpdate() throws Exception { + setUpGrids(2, true); + + IgniteCache cache = client.cache(DEFAULT_CACHE_NAME); + + Integer key0 = primaryKey(grid(0).cache(DEFAULT_CACHE_NAME)); + Integer key1 = primaryKey(grid(1).cache(DEFAULT_CACHE_NAME)); + + cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, ?)").setArgs(key0, -1)); + cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, ?)").setArgs(key1, -1)); + + assert client.configuration().isClientMode(); + + CyclicBarrier b = new CyclicBarrier(2); + + IgniteInternalFuture fut0 = GridTestUtils.runAsync(() -> { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.query(new SqlFieldsQuery("update Integer set _val = 0 where _key = ?").setArgs(key0)); + b.await(); + cache.query(new SqlFieldsQuery("update Integer set _val = 0 where _key = ?").setArgs(key1)); + + tx.commit(); + } + + return null; + }); + + IgniteInternalFuture fut1 = GridTestUtils.runAsync(() -> { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.query(new SqlFieldsQuery("update Integer set _val = 1 where _key = ?").setArgs(key1)); + b.await(); + cache.query(new SqlFieldsQuery("update Integer set _val = 1 where _key = ?").setArgs(key0)); + + tx.commit(); + } + + return null; + }); + + assertExactlyOneAbortedDueDeadlock(fut0, fut1); + } + + /** + * @throws Exception If failed. + */ + @Test + public void detect3Deadlock() throws Exception { + setUpGrids(3, false); + + Integer key0 = primaryKey(grid(0).cache(DEFAULT_CACHE_NAME)); + Integer key1 = primaryKey(grid(1).cache(DEFAULT_CACHE_NAME)); + Integer key2 = primaryKey(grid(2).cache(DEFAULT_CACHE_NAME)); + + IgniteCache cache = client.cache(DEFAULT_CACHE_NAME); + + assert client.configuration().isClientMode(); + + CyclicBarrier b = new CyclicBarrier(3); + + IgniteInternalFuture fut0 = GridTestUtils.runAsync(() -> { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key0, 0); + b.await(); + cache.put(key1, 1); + + tx.rollback(); + } + + return null; + }); + + IgniteInternalFuture fut1 = GridTestUtils.runAsync(() -> { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key1, 0); + b.await(); + cache.put(key2, 1); + + tx.rollback(); + } + + return null; + }); + + IgniteInternalFuture fut2 = GridTestUtils.runAsync(() -> { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key2, 1); + b.await(); + cache.put(key0, 0); + + tx.rollback(); + } + + return null; + }); + + assertExactlyOneAbortedDueDeadlock(fut0, fut1, fut2); + } + + /** + * @throws Exception If failed. + */ + @Test + public void detectMultipleLockWaitDeadlock() throws Exception { + // T0 -> T1 + // \-> T2 -> T0 + setUpGrids(3, true); + + IgniteCache cache = client.cache(DEFAULT_CACHE_NAME); + + Integer key0 = primaryKey(grid(0).cache(DEFAULT_CACHE_NAME)); + Integer key1 = primaryKey(grid(1).cache(DEFAULT_CACHE_NAME)); + Integer key2 = primaryKey(grid(2).cache(DEFAULT_CACHE_NAME)); + + cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, ?)").setArgs(key0, -1)); + cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, ?)").setArgs(key1, -1)); + cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, ?)").setArgs(key2, -1)); + + CyclicBarrier b = new CyclicBarrier(3); + + IgniteInternalFuture fut2 = GridTestUtils.runAsync(() -> { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.query(new SqlFieldsQuery("update Integer set _val = 2 where _key = ?").setArgs(key2)); + b.await(); + cache.query(new SqlFieldsQuery("update Integer set _val = 2 where _key = ?").setArgs(key0)); + + // rollback to prevent waiting tx abort due write conflict + tx.rollback(); + } + + return null; + }); + + IgniteInternalFuture fut1 = GridTestUtils.runAsync(() -> { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.query(new SqlFieldsQuery("update Integer set _val = 1 where _key = ?").setArgs(key1)); + b.await(); + GridTestUtils.waitForCondition(fut2::isDone, 1000); + + // rollback to prevent waiting tx abort due write conflict + tx.rollback(); + } + + return null; + }); + + IgniteInternalFuture fut0 = GridTestUtils.runAsync(() -> { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.query(new SqlFieldsQuery("update Integer set _val = 0 where _key = ?").setArgs(key0)); + b.await(); + cache.query( + new SqlFieldsQuery("update Integer set _val = 0 where _key = ? or _key = ?").setArgs(key2, key1)); + + tx.commit(); + } + + return null; + }); + + fut1.get(10, TimeUnit.SECONDS); + + assertExactlyOneAbortedDueDeadlock(fut0, fut2); + } + + /** + * @throws Exception If failed. + */ + @Test + public void detectDeadlockLocalEntriesEnlistFuture() throws Exception { + setUpGrids(1, false); + + List keys = primaryKeys(grid(0).cache(DEFAULT_CACHE_NAME), 2); + + IgniteCache cache = client.cache(DEFAULT_CACHE_NAME); + + assert client.configuration().isClientMode(); + + CyclicBarrier b = new CyclicBarrier(2); + + IgniteInternalFuture fut0 = GridTestUtils.runAsync(() -> { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(keys.get(0), 11); + b.await(); + cache.put(keys.get(1), 11); + + tx.commit(); + } + + return null; + }); + + IgniteInternalFuture fut1 = GridTestUtils.runAsync(() -> { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(keys.get(1), 22); + b.await(); + cache.put(keys.get(0), 22); + + tx.commit(); + } + + return null; + }); + + assertExactlyOneAbortedDueDeadlock(fut0, fut1); + } + + /** + * @throws Exception If failed. + */ + @Test + public void detectDeadlockLocalPrimary() throws Exception { + // Checks that case when near tx does local on enlist on the same node and no dht tx is created + + setUpGrids(2, false); + + IgniteCache cache0 = grid(0).cache(DEFAULT_CACHE_NAME); + IgniteCache cache1 = grid(1).cache(DEFAULT_CACHE_NAME); + + int key0 = primaryKey(cache0); + int key1 = primaryKey(cache1); + + CyclicBarrier b = new CyclicBarrier(2); + + IgniteInternalFuture fut0 = GridTestUtils.runAsync(() -> { + try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache0.put(key1, 11); + b.await(); + cache0.put(key0, 11); + + tx.commit(); + } + + return null; + }); + + IgniteInternalFuture fut1 = GridTestUtils.runAsync(() -> { + try (Transaction tx = grid(1).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache1.put(key0, 22); + b.await(); + cache1.put(key1, 22); + + tx.commit(); + } + + return null; + }); + + assertExactlyOneAbortedDueDeadlock(fut0, fut1); + } + + /** + * @throws Exception If failed. + */ + @Test + public void detectDeadlockLocalQueryEnlistFuture() throws Exception { + setUpGrids(1, true); + + List keys = primaryKeys(grid(0).cache(DEFAULT_CACHE_NAME), 2); + + Collections.sort(keys); + + Integer key0 = keys.get(0), key1 = keys.get(1); + + IgniteCache cache = client.cache(DEFAULT_CACHE_NAME); + + assert client.configuration().isClientMode(); + + cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, ?)").setArgs(key0, -1)); + cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, ?)").setArgs(key1, -1)); + + CyclicBarrier b = new CyclicBarrier(2); + + IgniteInternalFuture fut0 = GridTestUtils.runAsync(() -> { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.query(new SqlFieldsQuery("update Integer set _val = 0 where _key <= ?").setArgs(key0)); + b.await(); + cache.query(new SqlFieldsQuery("update Integer set _val = 0 where _key >= ?").setArgs(key1)); + TimeUnit.SECONDS.sleep(2); + + tx.commit(); + } + + return null; + }); + + IgniteInternalFuture fut1 = GridTestUtils.runAsync(() -> { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.query(new SqlFieldsQuery("update Integer set _val = 1 where _key >= ?").setArgs(key1)); + b.await(); + cache.query(new SqlFieldsQuery("update Integer set _val = 1 where _key <= ?").setArgs(key0)); + + tx.commit(); + } + + return null; + }); + + assertExactlyOneAbortedDueDeadlock(fut0, fut1); + } + + /** + * @throws Exception If failed. + */ + @Test + public void nonDeadlockedTxDetectsDeadlock1() throws Exception { + setUpGrids(2, false); + + Integer key0 = primaryKey(grid(0).cache(DEFAULT_CACHE_NAME)); + Integer key1 = primaryKey(grid(1).cache(DEFAULT_CACHE_NAME)); + + IgniteCache cache = client.cache(DEFAULT_CACHE_NAME); + + assert client.configuration().isClientMode(); + + CyclicBarrier b = new CyclicBarrier(3); + + IgniteInternalFuture fut0 = GridTestUtils.runAsync(() -> { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + blockProbe(grid(1), tx); + + cache.put(key0, 0); + b.await(); + cache.put(key1, 1); + + tx.rollback(); + } + + return null; + }); + + IgniteInternalFuture fut1 = GridTestUtils.runAsync(() -> { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + blockProbe(grid(0), tx); + + cache.put(key1, 1); + b.await(); + cache.put(key0, 0); + + tx.rollback(); + } + + return null; + }); + + b.await(); + tryPutRepeatedly(cache, key0); + + assertExactlyOneAbortedDueDeadlock(fut0, fut1); + } + + /** + * @throws Exception If failed. + */ + @Test + public void nonDeadlockedTxDetectsDeadlock2() throws Exception { + setUpGrids(2, false); + + List keys0 = primaryKeys(grid(0).cache(DEFAULT_CACHE_NAME), 2); + Integer key00 = keys0.get(0); + Integer key01 = keys0.get(1); + Integer key1 = primaryKey(grid(1).cache(DEFAULT_CACHE_NAME)); + + IgniteCache cache = client.cache(DEFAULT_CACHE_NAME); + + assert client.configuration().isClientMode(); + + CyclicBarrier b = new CyclicBarrier(3); + + IgniteInternalFuture fut0 = GridTestUtils.runAsync(() -> { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + blockProbe(grid(1), tx); + + cache.put(key00, 0); + b.await(); + cache.put(key1, 1); + + tx.rollback(); + } + + return null; + }); + + IgniteInternalFuture fut1 = GridTestUtils.runAsync(() -> { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + blockProbe(grid(0), tx); + + cache.put(key1, 1); + cache.put(key01, 0); + b.await(); + cache.put(key00, 0); + + tx.rollback(); + } + + return null; + }); + + b.await(); + tryPutRepeatedly(cache, key01); + + assertExactlyOneAbortedDueDeadlock(fut0, fut1); + } + + /** + * @throws Exception If failed. + */ + @Test + public void randomizedPuts() throws Exception { + int gridCnt = SF.applyLB(10, 2); + int opsByWorker = SF.applyLB(1000, 10); + + setUpGrids(gridCnt, false); + + List keys = new ArrayList<>(); + for (int i = 0; i < gridCnt; i++) + keys.addAll(primaryKeys(grid(i).cache(DEFAULT_CACHE_NAME), 3)); + + AtomicInteger aborted = new AtomicInteger(); + + List> futs = new ArrayList<>(); + for (int i = 0; i < gridCnt * 2; i++) { + IgniteEx ign = grid(i % gridCnt); + IgniteCache cache = ign.cache(DEFAULT_CACHE_NAME); + + IgniteInternalFuture fut = GridTestUtils.runAsync(() -> { + for (int k = 0; k < opsByWorker; k++) { + try (Transaction tx = ign.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + ArrayList keys0 = new ArrayList<>(keys); + Collections.shuffle(keys0); + int nkeys = ThreadLocalRandom.current().nextInt(8) + 5; + for (int j = 0; j < nkeys; j++) + cache.put(keys0.get(j), j); + + tx.rollback(); + } + catch (Exception e) { + if (X.hasCause(e, IgniteTxRollbackCheckedException.class)) + aborted.incrementAndGet(); + } + } + }); + futs.add(fut); + } + + for (IgniteInternalFuture fut : futs) + fut.get(10, TimeUnit.MINUTES); + + log.info("Number of txs aborted: " + aborted); + } + + /** */ + private static void blockProbe(IgniteEx ign, Transaction tx) { + ((TestRecordingCommunicationSpi)ign.configuration().getCommunicationSpi()) + .blockMessages((node, msg) -> { + if (msg instanceof DeadlockProbe) { + DeadlockProbe msg0 = (DeadlockProbe)msg; + GridNearTxLocal tx0 = ((TransactionProxyImpl)tx).tx(); + return msg0.initiatorVersion().equals(tx0.xidVersion()); + } + + return false; + }); + } + + /** */ + private void assertExactlyOneAbortedDueDeadlock(IgniteInternalFuture... futs) throws IgniteCheckedException { + assert futs.length > 0; + + int aborted = 0; + + for (IgniteInternalFuture fut : futs) { + try { + fut.get(10, TimeUnit.SECONDS); + } + catch (IgniteCheckedException e) { + // TODO check expected exceptions once https://issues.apache.org/jira/browse/IGNITE-9470 is resolved + if (X.hasCause(e, IgniteTxRollbackCheckedException.class)) + aborted++; + else + throw e; + } + } + + if (aborted != 1) + fail("Exactly one tx is expected to be aborted, but was " + aborted); + } + + /** */ + private void tryPutRepeatedly(IgniteCache cache, Integer key0) { + for (int i = 0; i < 100; i++) { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 200, 1)) { + cache.put(key0, 33); + + break; + } + catch (Exception ignored) { + } + } + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java index 04189145dba35..d1ed2f3e6953e 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java @@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccStreamingInsertTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxNodeMappingTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxRecoveryTest; +import org.apache.ignite.internal.processors.cache.mvcc.MvccDeadlockDetectionTest; import org.apache.ignite.internal.processors.cache.mvcc.MvccRepeatableReadBulkOpsTest; import org.apache.ignite.internal.processors.cache.mvcc.MvccRepeatableReadOperationsTest; import org.apache.ignite.internal.processors.query.h2.GridIndexRebuildWithMvccEnabledSelfTest; @@ -93,6 +94,8 @@ CacheMvccTxNodeMappingTest.class, + MvccDeadlockDetectionTest.class, + // SQL vs CacheAPI consistency. MvccRepeatableReadOperationsTest.class, MvccRepeatableReadBulkOpsTest.class,