Skip to content

Commit

Permalink
GG-28022 [IGNITE-12746] Regression in GridCacheColocatedDebugTest: pu…
Browse files Browse the repository at this point in the history
…tAll of sorted keys causes deadlock
  • Loading branch information
glukos committed Mar 18, 2020
1 parent c42fd02 commit 9671a56
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -677,10 +677,13 @@ public boolean recheck(GridCacheVersion ver) {
unlockEntry();
}

boolean lockedByThreadChainVer = owner != null && owner.hasCandidate(ver);

// If locked by the thread chain version no need to do recursive thread chain scans for the same chain.
// This call must be made outside of synchronization.
checkOwnerChanged(prev, owner, val, true);
checkOwnerChanged(prev, owner, val, lockedByThreadChainVer);

return owner == null || !owner.hasCandidate(ver); // Will return false if locked by thread chain version.
return !lockedByThreadChainVer;
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,13 @@ public boolean recheck(GridCacheVersion ver) {
unlockEntry();
}

checkOwnerChanged(prev, owner, val, true);
boolean lockedByThreadChainVer = owner != null && owner.hasCandidate(ver);

return owner == null || !owner.hasCandidate(ver); // Will return false if locked by thread chain version.
// If locked by the thread chain version no need to do recursive thread chain scans for the same chain.
// This call must be made outside of synchronization.
checkOwnerChanged(prev, owner, val, lockedByThreadChainVer);

return !lockedByThreadChainVer;
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
* @return All entries. Returned collection is copy of internal collection.
*/
public synchronized Collection<IgniteTxEntry> allEntriesCopy() {
return txMap == null ? Collections.<IgniteTxEntry>emptySet() : new HashSet<>(txMap.values());
return txMap == null ? Collections.<IgniteTxEntry>emptySet() : new ArrayList<>(txMap.values());
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import org.apache.ignite.Ignite;
Expand All @@ -33,21 +34,25 @@
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheTestStore;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.junit.Test;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_TO_STRING_MAX_LENGTH;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
Expand Down Expand Up @@ -982,6 +987,116 @@ public void testExplicitLocksDistributed() throws Exception {
}
}

/**
* Version of check thread chain case for optimistic transactions.
*
* @throws Exception If failed.
*/
@Test
@WithSystemProperty(key = IGNITE_TO_STRING_MAX_LENGTH, value = "100000")
public void testConcurrentCheckThreadChainOptimistic() throws Exception {
testConcurrentCheckThreadChain(OPTIMISTIC);
}

/**
* Version of check thread chain case for pessimistic transactions.
*
* @throws Exception If failed.
*/
@Test
@WithSystemProperty(key = IGNITE_TO_STRING_MAX_LENGTH, value = "100000")
public void testConcurrentCheckThreadChainPessimistic() throws Exception {
testConcurrentCheckThreadChain(PESSIMISTIC);
}

/**
* Covers scenario when thread chain locks acquisition for XID 1 should be continued during unsuccessful attempt
* to acquire lock on certain key for XID 2 (XID 1 with uncompleted chain becomes owner of this key instead).
*
* Scenario:
* 1) Start 1 server node with transactional cache
* 2) With 10 threads, perform 100000 transactions with massive puts
* 2.1) Every put affects the same common key, which is not first and not last in the keys list
* 3) Expected: 100000 transaction should end, load threads shouldn't hang for more than 5 seconds
*
* @throws Exception If failed.
*/
protected void testConcurrentCheckThreadChain(TransactionConcurrency txConcurrency) throws Exception {
storeEnabled = false;

startGrid(0);

try {
final AtomicLong iterCnt = new AtomicLong();

int commonKey = 1000;

int otherKeyPickVariance = 10;

int otherKeysCnt = 5;

int maxIterCnt = MAX_ITER_CNT * 10;

IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
@Override public void run() {
long threadId = Thread.currentThread().getId();

long itNum;

while ((itNum = iterCnt.getAndIncrement()) < maxIterCnt) {
Map<Integer, String> vals = U.newLinkedHashMap(otherKeysCnt * 2 + 1);

for (int i = 0; i < otherKeysCnt; i++) {
int key = ThreadLocalRandom.current().nextInt(
otherKeyPickVariance * i, otherKeyPickVariance * (i + 1));

vals.put(key, String.valueOf(key) + threadId);
}

vals.put(commonKey, String.valueOf(commonKey) + threadId);

for (int i = 0; i < otherKeysCnt; i++) {
int key = ThreadLocalRandom.current().nextInt(
commonKey + otherKeyPickVariance * (i + 1), otherKeyPickVariance * (i + 2) + commonKey);

vals.put(key, String.valueOf(key) + threadId);
}

try (Transaction tx = grid(0).transactions().txStart(txConcurrency, READ_COMMITTED)) {
jcache(0).putAll(vals);

tx.commit();
}

if (itNum > 0 && itNum % 5000 == 0)
info(">>> " + itNum + " iterations completed.");
}
}
}, THREAD_CNT);

while (true) {
long prevIterCnt = iterCnt.get();

try {
fut.get(5_000);

break;
}
catch (IgniteFutureTimeoutCheckedException ignored) {
if (iterCnt.get() == prevIterCnt) {
Collection<IgniteInternalTx> hangingTxes =
ignite(0).context().cache().context().tm().activeTransactions();

fail(hangingTxes.toString());
}
}
}
}
finally {
stopAllGrids();
}
}

/**
* Gets key for which given node is primary.
*
Expand Down

0 comments on commit 9671a56

Please sign in to comment.