Skip to content

Commit

Permalink
ignite-7049 Fixed error in tx timeout processing for optimistic/seria…
Browse files Browse the repository at this point in the history
…lizable tx
  • Loading branch information
ascherbakoff authored and sboikov committed Dec 2, 2017
1 parent 9398813 commit cd0d2eb
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ private void onError(@Nullable GridDistributedTxMapping m, Throwable e) {
}
}

if (e instanceof IgniteTxOptimisticCheckedException || e instanceof IgniteTxTimeoutCheckedException) {
if (e instanceof IgniteTxOptimisticCheckedException) {
if (m != null)
tx.removeMapping(m.primary().id());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,31 @@
package org.apache.ignite.internal.processors.cache.transactions;

import java.util.Collection;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
Expand All @@ -44,8 +51,11 @@
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionDeadlockException;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionOptimisticException;
import org.apache.ignite.transactions.TransactionTimeoutException;
import org.jsr166.LongAdder8;

import static java.lang.Thread.sleep;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
Expand All @@ -55,6 +65,9 @@
* Tests an ability to eagerly rollback timed out transactions.
*/
public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
/** */
private static final long DURATION = 60 * 1000L;

/** */
private static final long TX_MIN_TIMEOUT = 1;

Expand All @@ -73,6 +86,8 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {

((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);

cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());

boolean client = "client".equals(igniteInstanceName);

cfg.setClientMode(client);
Expand Down Expand Up @@ -372,6 +387,175 @@ public void testSimple() throws Exception {
}
}

/**
* Test timeouts with random values and different tx configurations.
*/
public void testRandomMixedTxConfigurations() throws Exception {
final Ignite client = startClient();

final AtomicBoolean stop = new AtomicBoolean();

final long seed = System.currentTimeMillis();

final Random r = new Random(seed);

log.info("Using seed: " + seed);

final int threadsCnt = Runtime.getRuntime().availableProcessors() * 2;

for (int k = 0; k < threadsCnt; k++)
grid(0).cache(CACHE_NAME).put(k, (long)0);

final TransactionConcurrency[] TC_VALS = TransactionConcurrency.values();
final TransactionIsolation[] TI_VALS = TransactionIsolation.values();

final LongAdder8 cntr0 = new LongAdder8();
final LongAdder8 cntr1 = new LongAdder8();
final LongAdder8 cntr2 = new LongAdder8();
final LongAdder8 cntr3 = new LongAdder8();

final IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
@Override public void run() {
while (!stop.get()) {
int nodeId = r.nextInt(GRID_CNT + 1);

Ignite node = nodeId == GRID_CNT || nearCacheEnabled() ? client : grid(nodeId);

TransactionConcurrency conc = TC_VALS[r.nextInt(TC_VALS.length)];
TransactionIsolation isolation = TI_VALS[r.nextInt(TI_VALS.length)];

int k = r.nextInt(threadsCnt);

long timeout = r.nextInt(200) + 50;

// Roughly 50% of transactions should time out.
try (Transaction tx = node.transactions().txStart(conc, isolation, timeout, 1)) {
cntr0.add(1);

final Long v = (Long)node.cache(CACHE_NAME).get(k);

final int delay = r.nextInt(400);

if (delay > 0)
sleep(delay);

node.cache(CACHE_NAME).put(k, v + 1);

tx.commit();

cntr1.add(1);
}
catch (TransactionOptimisticException | InterruptedException e) {
// Expected.
cntr3.add(1);
}
catch (TransactionTimeoutException e) {
cntr2.add(1);
}
catch (CacheException e) {
assertEquals(TransactionTimeoutException.class, X.getCause(e).getClass());

cntr2.add(1);
}
}
}
}, threadsCnt, "tx-async-thread");

sleep(DURATION);

stop.set(true);

fut.get(10_000);

log.info("Tx test stats: started=" + cntr0.sum() +
", completed=" + cntr1.sum() +
", failed=" + cntr3.sum() +
", timedOut=" + cntr2.sum());

assertEquals("Expected finished count same as started count", cntr0.sum(), cntr1.sum() + cntr2.sum() + cntr3.sum());
}

/**
* Tests timeout on DHT primary node for all tx configurations.
*
* @throws Exception If failed.
*/
public void testTimeoutOnPrimaryDHTNode() throws Exception {
final ClusterNode n0 = grid(0).affinity(CACHE_NAME).mapKeyToNode(0);

final Ignite prim = G.ignite(n0.id());

for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
for (TransactionIsolation isolation : TransactionIsolation.values())
testTimeoutOnPrimaryDhtNode0(prim, concurrency, isolation);
}
}

/**
*
* @param prim Primary node.
* @param conc Concurrency.
* @param isolation Isolation.
* @throws Exception If failed.
*/
private void testTimeoutOnPrimaryDhtNode0(final Ignite prim, final TransactionConcurrency conc,
final TransactionIsolation isolation)
throws Exception {

log.info("concurrency=" + conc + ", isolation=" + isolation);

// Force timeout on primary DHT node by blocking DHT prepare response.
toggleBlocking(GridDhtTxPrepareResponse.class, prim, true);

final int val = 0;

try {
multithreaded(new Runnable() {
@Override public void run() {
try (Transaction txOpt = prim.transactions().txStart(conc, isolation, 300, 1)) {

prim.cache(CACHE_NAME).put(val, val);

txOpt.commit();
}
}
}, 1, "tx-async-thread");

fail();
}
catch (TransactionTimeoutException e) {
// Expected.
}

toggleBlocking(GridDhtTxPrepareResponse.class, prim, false);

AffinityTopologyVersion topVer = new AffinityTopologyVersion(GRID_CNT + 1, 0);

for (Ignite ignite : G.allGrids())
((IgniteEx)ignite).context().cache().context().partitionReleaseFuture(topVer).get(10_000);
}

/**
* @param cls Message class.
* @param nodeToBlock Node to block.
* @param block Block.
*/
private void toggleBlocking(Class<? extends Message> cls, Ignite nodeToBlock, boolean block) {
for (Ignite ignite : G.allGrids()) {
if (ignite == nodeToBlock)
continue;

final TestRecordingCommunicationSpi spi =
(TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();

if (block)
spi.blockMessages(cls, nodeToBlock.name());
else
spi.stopBlock(true);
}
}

/**
* @param concurrency Concurrency.
* @param isolation Isolation.
Expand Down Expand Up @@ -652,4 +836,4 @@ private void waitingTxUnblockedOnThreadDeath0(final Ignite near,

fut2.get();
}
}
}

0 comments on commit cd0d2eb

Please sign in to comment.