Skip to content

Commit

Permalink
IGNITE-9322: MVCC: implement deadlock detector. This closes #5579.
Browse files Browse the repository at this point in the history
  • Loading branch information
pavlukhin authored and gvvinblade committed Jan 22, 2019
1 parent bc209d0 commit 457ffc1
Show file tree
Hide file tree
Showing 35 changed files with 1,750 additions and 61 deletions.
Expand Up @@ -44,6 +44,8 @@
import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.IgniteCodeGeneratingFail; import org.apache.ignite.internal.IgniteCodeGeneratingFail;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.mvcc.DeadlockProbe;
import org.apache.ignite.internal.processors.cache.mvcc.ProbedTx;
import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.typedef.internal.U;
Expand Down Expand Up @@ -168,6 +170,9 @@ public static void main(String[] args) throws Exception {


MessageCodeGenerator gen = new MessageCodeGenerator(srcDir); MessageCodeGenerator gen = new MessageCodeGenerator(srcDir);


gen.generateAndWrite(ProbedTx.class);
gen.generateAndWrite(DeadlockProbe.class);

// gen.generateAll(true); // gen.generateAll(true);


// gen.generateAndWrite(GridCacheMessage.class); // gen.generateAndWrite(GridCacheMessage.class);
Expand Down
Expand Up @@ -260,6 +260,18 @@ public final class IgniteSystemProperties {
*/ */
public static final String IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT = "IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT"; public static final String IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT = "IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT";


/**
* Specifies delay in milliseconds before starting deadlock detection procedure when tx encounters locked key.
* <p>
* Following values could be used:
* <ul>
* <li>&lt; 0 disable detection;</li>
* <li>0 start detection without a delay;</li>
* <li>&gt; 0 start detection after a specified number of milliseconds.</li>
* </ul>
*/
public static final String IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY = "IGNITE_TX_DEADLOCK_DETECTION_INITIAL_DELAY";

/** /**
* System property to enable pending transaction tracker. * System property to enable pending transaction tracker.
* Affects impact of {@link IgniteSystemProperties#IGNITE_DISABLE_WAL_DURING_REBALANCING} property: * Affects impact of {@link IgniteSystemProperties#IGNITE_DISABLE_WAL_DURING_REBALANCING} property:
Expand Down
Expand Up @@ -139,7 +139,10 @@ public enum GridTopic {
TOPIC_GEN_ENC_KEY, TOPIC_GEN_ENC_KEY,


/** */ /** */
TOPIC_SERVICES; TOPIC_SERVICES,

/** */
TOPIC_DEADLOCK_DETECTION;


/** Enum values. */ /** Enum values. */
private static final GridTopic[] VALS = values(); private static final GridTopic[] VALS = values();
Expand Down
Expand Up @@ -124,8 +124,10 @@
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest;
import org.apache.ignite.internal.processors.cache.mvcc.DeadlockProbe;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotWithoutTxs; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotWithoutTxs;
import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionImpl; import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionImpl;
import org.apache.ignite.internal.processors.cache.mvcc.ProbedTx;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryCntr; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryCntr;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryId; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryId;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTx; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTx;
Expand Down Expand Up @@ -1132,6 +1134,16 @@ public GridIoMessageFactory(MessageFactory[] ext) {


break; break;


case 170:
msg = new DeadlockProbe();

break;

case 171:
msg = new ProbedTx();

break;

// [-3..119] [124..129] [-23..-28] [-36..-55] - this // [-3..119] [124..129] [-23..-28] [-36..-55] - this
// [120..123] - DR // [120..123] - DR
// [-4..-22, -30..-35] - SQL // [-4..-22, -30..-35] - SQL
Expand Down
Expand Up @@ -1151,7 +1151,7 @@ else if (res.resultType() == ResultType.LOCKED) {


GridFutureAdapter<GridCacheUpdateTxResult> resFut = new GridFutureAdapter<>(); GridFutureAdapter<GridCacheUpdateTxResult> resFut = new GridFutureAdapter<>();


IgniteInternalFuture<?> lockFut = cctx.kernalContext().coordinators().waitFor(cctx, lockVer); IgniteInternalFuture<?> lockFut = cctx.kernalContext().coordinators().waitForLock(cctx, mvccVer, lockVer);


lockFut.listen(new MvccUpdateLockListener(tx, this, affNodeId, topVer, val, ttl0, mvccVer, lockFut.listen(new MvccUpdateLockListener(tx, this, affNodeId, topVer, val, ttl0, mvccVer,
op, needHistory, noCreate, resFut, needOldVal, filter, retVal, entryProc, invokeArgs)); op, needHistory, noCreate, resFut, needOldVal, filter, retVal, entryProc, invokeArgs));
Expand Down Expand Up @@ -1300,7 +1300,7 @@ else if (res.resultType() == ResultType.LOCKED) {


GridFutureAdapter<GridCacheUpdateTxResult> resFut = new GridFutureAdapter<>(); GridFutureAdapter<GridCacheUpdateTxResult> resFut = new GridFutureAdapter<>();


IgniteInternalFuture<?> lockFut = cctx.kernalContext().coordinators().waitFor(cctx, lockVer); IgniteInternalFuture<?> lockFut = cctx.kernalContext().coordinators().waitForLock(cctx, mvccVer, lockVer);


lockFut.listen(new MvccRemoveLockListener(tx, this, affNodeId, topVer, mvccVer, needHistory, lockFut.listen(new MvccRemoveLockListener(tx, this, affNodeId, topVer, mvccVer, needHistory,
resFut, needOldVal, retVal, filter)); resFut, needOldVal, retVal, filter));
Expand Down Expand Up @@ -1392,7 +1392,7 @@ else if (res.resultType() == ResultType.LOCKED) {


GridFutureAdapter<GridCacheUpdateTxResult> resFut = new GridFutureAdapter<>(); GridFutureAdapter<GridCacheUpdateTxResult> resFut = new GridFutureAdapter<>();


IgniteInternalFuture<?> lockFut = cctx.kernalContext().coordinators().waitFor(cctx, lockVer); IgniteInternalFuture<?> lockFut = cctx.kernalContext().coordinators().waitForLock(cctx, mvccVer, lockVer);


lockFut.listen(new MvccAcquireLockListener(tx, this, mvccVer, resFut)); lockFut.listen(new MvccAcquireLockListener(tx, this, mvccVer, resFut));


Expand Down Expand Up @@ -5193,7 +5193,7 @@ else if (res.resultType() == ResultType.FILTERED) {
else if (res.resultType() == ResultType.LOCKED) { else if (res.resultType() == ResultType.LOCKED) {
entry.unlockEntry(); entry.unlockEntry();


IgniteInternalFuture<?> lockFuture = cctx.kernalContext().coordinators().waitFor(cctx, res.resultVersion()); IgniteInternalFuture<?> lockFuture = cctx.kernalContext().coordinators().waitForLock(cctx, mvccVer, res.resultVersion());


lockFuture.listen(this); lockFuture.listen(this);


Expand Down Expand Up @@ -5323,7 +5323,7 @@ private static class MvccAcquireLockListener implements IgniteInClosure<IgniteIn
else if (res.resultType() == ResultType.LOCKED) { else if (res.resultType() == ResultType.LOCKED) {
entry.unlockEntry(); entry.unlockEntry();


cctx.kernalContext().coordinators().waitFor(cctx, res.resultVersion()).listen(this); cctx.kernalContext().coordinators().waitForLock(cctx, mvccVer, res.resultVersion()).listen(this);


return; return;
} }
Expand Down Expand Up @@ -5497,7 +5497,7 @@ private static class MvccUpdateLockListener implements IgniteInClosure<IgniteInt
else if (res.resultType() == ResultType.LOCKED) { else if (res.resultType() == ResultType.LOCKED) {
entry.unlockEntry(); entry.unlockEntry();


cctx.kernalContext().coordinators().waitFor(cctx, res.resultVersion()).listen(this); cctx.kernalContext().coordinators().waitForLock(cctx, mvccVer, res.resultVersion()).listen(this);


return; return;
} }
Expand Down
Expand Up @@ -98,6 +98,7 @@
import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter; import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;
import org.apache.ignite.internal.processors.cache.local.GridLocalCache; import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
import org.apache.ignite.internal.processors.cache.local.atomic.GridLocalAtomicCache; import org.apache.ignite.internal.processors.cache.local.atomic.GridLocalAtomicCache;
import org.apache.ignite.internal.processors.cache.mvcc.DeadlockDetectionManager;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager; import org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion; import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener; import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
Expand Down Expand Up @@ -3218,6 +3219,8 @@ private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx,


MvccCachingManager mvccCachingMgr = new MvccCachingManager(); MvccCachingManager mvccCachingMgr = new MvccCachingManager();


DeadlockDetectionManager deadlockDetectionMgr = new DeadlockDetectionManager();

return new GridCacheSharedContext( return new GridCacheSharedContext(
kernalCtx, kernalCtx,
tm, tm,
Expand All @@ -3236,7 +3239,8 @@ private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx,
evict, evict,
jta, jta,
storeSesLsnrs, storeSesLsnrs,
mvccCachingMgr mvccCachingMgr,
deadlockDetectionMgr
); );
} }


Expand Down
Expand Up @@ -46,6 +46,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionsEvictManager; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionsEvictManager;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter; import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;
import org.apache.ignite.internal.processors.cache.mvcc.DeadlockDetectionManager;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager; import org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager;
import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor; import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
Expand Down Expand Up @@ -115,7 +116,7 @@ public class GridCacheSharedContext<K, V> {
/** Database manager. */ /** Database manager. */
private IgniteCacheDatabaseSharedManager dbMgr; private IgniteCacheDatabaseSharedManager dbMgr;


/** Snp manager. */ /** Snapshot manager. */
private IgniteCacheSnapshotManager snpMgr; private IgniteCacheSnapshotManager snpMgr;


/** Page store manager. {@code Null} if persistence is not enabled. */ /** Page store manager. {@code Null} if persistence is not enabled. */
Expand All @@ -127,12 +128,15 @@ public class GridCacheSharedContext<K, V> {
/** Ttl cleanup manager. */ /** Ttl cleanup manager. */
private GridCacheSharedTtlCleanupManager ttlMgr; private GridCacheSharedTtlCleanupManager ttlMgr;


/** */ /** Partitons evict manager. */
private PartitionsEvictManager evictMgr; private PartitionsEvictManager evictMgr;


/** Mvcc caching manager. */ /** Mvcc caching manager. */
private MvccCachingManager mvccCachingMgr; private MvccCachingManager mvccCachingMgr;


/** Deadlock detection manager. */
private DeadlockDetectionManager deadlockDetectionMgr;

/** Cache contexts map. */ /** Cache contexts map. */
private ConcurrentHashMap<Integer, GridCacheContext<K, V>> ctxMap; private ConcurrentHashMap<Integer, GridCacheContext<K, V>> ctxMap;


Expand Down Expand Up @@ -187,12 +191,17 @@ public class GridCacheSharedContext<K, V> {
* @param walMgr WAL manager. {@code Null} if persistence is not enabled. * @param walMgr WAL manager. {@code Null} if persistence is not enabled.
* @param walStateMgr WAL state manager. * @param walStateMgr WAL state manager.
* @param depMgr Deployment manager. * @param depMgr Deployment manager.
* @param dbMgr Database manager.
* @param snpMgr Snapshot manager.
* @param exchMgr Exchange manager. * @param exchMgr Exchange manager.
* @param affMgr Affinity manager. * @param affMgr Affinity manager.
* @param ioMgr IO manager. * @param ioMgr IO manager.
* @param ttlMgr Ttl cleanup manager. * @param ttlMgr Ttl cleanup manager.
* @param evictMgr Partitons evict manager.
* @param jtaMgr JTA manager. * @param jtaMgr JTA manager.
* @param storeSesLsnrs Store session listeners. * @param storeSesLsnrs Store session listeners.
* @param mvccCachingMgr Mvcc caching manager.
* @param deadlockDetectionMgr Deadlock detection manager.
*/ */
public GridCacheSharedContext( public GridCacheSharedContext(
GridKernalContext kernalCtx, GridKernalContext kernalCtx,
Expand All @@ -212,7 +221,8 @@ public GridCacheSharedContext(
PartitionsEvictManager evictMgr, PartitionsEvictManager evictMgr,
CacheJtaManagerAdapter jtaMgr, CacheJtaManagerAdapter jtaMgr,
Collection<CacheStoreSessionListener> storeSesLsnrs, Collection<CacheStoreSessionListener> storeSesLsnrs,
MvccCachingManager mvccCachingMgr MvccCachingManager mvccCachingMgr,
DeadlockDetectionManager deadlockDetectionMgr
) { ) {
this.kernalCtx = kernalCtx; this.kernalCtx = kernalCtx;


Expand All @@ -233,7 +243,8 @@ public GridCacheSharedContext(
ioMgr, ioMgr,
ttlMgr, ttlMgr,
evictMgr, evictMgr,
mvccCachingMgr mvccCachingMgr,
deadlockDetectionMgr
); );


this.storeSesLsnrs = storeSesLsnrs; this.storeSesLsnrs = storeSesLsnrs;
Expand Down Expand Up @@ -400,8 +411,8 @@ void onReconnected(boolean active) throws IgniteCheckedException {
ioMgr, ioMgr,
ttlMgr, ttlMgr,
evictMgr, evictMgr,
mvccCachingMgr mvccCachingMgr,
); deadlockDetectionMgr);


this.mgrs = mgrs; this.mgrs = mgrs;


Expand All @@ -428,20 +439,7 @@ private boolean restartOnDisconnect(GridCacheSharedManager<?, ?> mgr) {
return mgr instanceof GridCacheDeploymentManager || mgr instanceof GridCachePartitionExchangeManager; return mgr instanceof GridCacheDeploymentManager || mgr instanceof GridCachePartitionExchangeManager;
} }


/** /** */
* @param mgrs Managers list.
* @param txMgr Transaction manager.
* @param jtaMgr JTA manager.
* @param verMgr Version manager.
* @param mvccMgr MVCC manager.
* @param pageStoreMgr Page store manager. {@code Null} if persistence is not enabled.
* @param walStateMgr WAL state manager.
* @param depMgr Deployment manager.
* @param exchMgr Exchange manager.
* @param affMgr Affinity manager.
* @param ioMgr IO manager.
* @param ttlMgr Ttl cleanup manager.
*/
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void setManagers( private void setManagers(
List<GridCacheSharedManager<K, V>> mgrs, List<GridCacheSharedManager<K, V>> mgrs,
Expand All @@ -460,8 +458,8 @@ private void setManagers(
GridCacheIoManager ioMgr, GridCacheIoManager ioMgr,
GridCacheSharedTtlCleanupManager ttlMgr, GridCacheSharedTtlCleanupManager ttlMgr,
PartitionsEvictManager evictMgr, PartitionsEvictManager evictMgr,
MvccCachingManager mvccCachingMgr MvccCachingManager mvccCachingMgr,
) { DeadlockDetectionManager deadlockDetectionMgr) {
this.mvccMgr = add(mgrs, mvccMgr); this.mvccMgr = add(mgrs, mvccMgr);
this.verMgr = add(mgrs, verMgr); this.verMgr = add(mgrs, verMgr);
this.txMgr = add(mgrs, txMgr); this.txMgr = add(mgrs, txMgr);
Expand All @@ -478,6 +476,7 @@ private void setManagers(
this.ttlMgr = add(mgrs, ttlMgr); this.ttlMgr = add(mgrs, ttlMgr);
this.evictMgr = add(mgrs, evictMgr); this.evictMgr = add(mgrs, evictMgr);
this.mvccCachingMgr = add(mgrs, mvccCachingMgr); this.mvccCachingMgr = add(mgrs, mvccCachingMgr);
this.deadlockDetectionMgr = add(mgrs, deadlockDetectionMgr);
} }


/** /**
Expand Down Expand Up @@ -830,6 +829,13 @@ public MvccCachingManager mvccCaching() {
return mvccCachingMgr; return mvccCachingMgr;
} }


/**
* @return Deadlock detection manager.
*/
public DeadlockDetectionManager deadlockDetectionMgr() {
return deadlockDetectionMgr;
}

/** /**
* @return Node ID. * @return Node ID.
*/ */
Expand Down
Expand Up @@ -508,6 +508,8 @@ private void continueLoop(boolean ignoreCntr) {
updateFut.listen(new CI1<IgniteInternalFuture<GridCacheUpdateTxResult>>() { updateFut.listen(new CI1<IgniteInternalFuture<GridCacheUpdateTxResult>>() {
@Override public void apply(IgniteInternalFuture<GridCacheUpdateTxResult> fut) { @Override public void apply(IgniteInternalFuture<GridCacheUpdateTxResult> fut) {
try { try {
tx.incrementLockCounter();

processEntry(entry0, op, fut.get(), val0, backups0); processEntry(entry0, op, fut.get(), val0, backups0);


continueLoop(true); continueLoop(true);
Expand All @@ -523,6 +525,8 @@ private void continueLoop(boolean ignoreCntr) {
} }
} }


tx.incrementLockCounter();

processEntry(entry, op, res, val0, backups); processEntry(entry, op, res, val0, backups);
} }


Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.GridLeanMap;
Expand Down Expand Up @@ -940,4 +941,20 @@ protected final IgniteInternalFuture<GridNearTxPrepareResponse> chainOnePhasePre
return GridToStringBuilder.toString(GridDhtTxLocalAdapter.class, this, "nearNodes", nearMap.keySet(), return GridToStringBuilder.toString(GridDhtTxLocalAdapter.class, this, "nearNodes", nearMap.keySet(),
"dhtNodes", dhtMap.keySet(), "explicitLock", explicitLock, "super", super.toString()); "dhtNodes", dhtMap.keySet(), "explicitLock", explicitLock, "super", super.toString());
} }

/**
* Increments lock counter.
*/
public void incrementLockCounter() {
txCounters(true).incrementLockCounter();
}

/**
* @return Current value of lock counter.
*/
public int lockCounter() {
TxCounters txCntrs = txCounters(false);

return txCntrs != null ? txCntrs.lockCounter() : 0;
}
} }
Expand Up @@ -17,6 +17,8 @@


package org.apache.ignite.internal.processors.cache.distributed.near; package org.apache.ignite.internal.processors.cache.distributed.near;


import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.ignite.IgniteCacheRestartingException; import org.apache.ignite.IgniteCacheRestartingException;
Expand Down Expand Up @@ -484,6 +486,11 @@ protected long remainingTime() throws IgniteTxTimeoutCheckedException {
*/ */
protected abstract void map(boolean topLocked); protected abstract void map(boolean topLocked);


/**
* @return Nodes from which current future waits responses.
*/
public abstract Set<UUID> pendingResponseNodes();

/** /**
* Lock request timeout object. * Lock request timeout object.
*/ */
Expand Down

0 comments on commit 457ffc1

Please sign in to comment.