From 37af9454221a057d8af4a57bda1c77ae9e4f771e Mon Sep 17 00:00:00 2001 From: apopov Date: Wed, 29 Nov 2017 15:51:24 +0300 Subject: [PATCH 1/2] IGNITE-7047 NPE at org.jsr166.ConcurrentLinkedHashMap.replace --- .../GridDhtTxOnePhaseCommitAckRequest.java | 7 + .../cache/transactions/IgniteTxManager.java | 12 +- ...GridCacheMissingCommitVersionSelfTest.java | 137 ++++++++++++++++-- 3 files changed, 139 insertions(+), 17 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java index 67eacd3f8c5f8..20b0dcd5e31f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java @@ -28,6 +28,8 @@ import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import static org.apache.ignite.internal.managers.communication.GridIoMessage.STRIPE_DISABLED_PART; + /** * One Phase Commit Near transaction ack request. */ @@ -132,6 +134,11 @@ public Collection versions() { return reader.afterMessageRead(GridDhtTxOnePhaseCommitAckRequest.class); } + /** {@inheritDoc} */ + @Override public int partition() { + return STRIPE_DISABLED_PART; + } + /** {@inheritDoc} */ @Override public short directType() { return -27; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 77634bdccb23f..3a6a4082eebf3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -81,6 +81,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteReducer; @@ -1042,10 +1043,15 @@ public void removeTxReturn(GridCacheVersion xidVer) { if (Boolean.FALSE.equals(prev)) // Tx can be rolled back. return; - assert prev instanceof GridCacheReturnCompletableWrapper: - prev + " instead of GridCacheReturnCompletableWrapper"; + if (!(prev instanceof GridCacheReturnCompletableWrapper)) + LT.warn(log(), prev + " instead of GridCacheReturnCompletableWrapper for " + xidVer.toString() + + ", size=" + completedVersHashMap.sizex()); - boolean res = completedVersHashMap.replace(xidVer, prev, true); + boolean res; + if (prev == null) + res = (null == completedVersHashMap.putIfAbsent(xidVer, true)); + else + res = completedVersHashMap.replace(xidVer, prev, true); assert res; } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java index a6f202218d28c..955149dde5350 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java @@ -17,15 +17,22 @@ package org.apache.ignite.internal.processors.cache; +import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCache; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePutRetryAbstractSelfTest; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteFutureTimeoutException; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -34,6 +41,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.testframework.GridTestUtils.runAsync; /** * @@ -45,23 +53,16 @@ public class GridCacheMissingCommitVersionSelfTest extends GridCommonAbstractTes /** */ private String maxCompletedTxCnt; - /** - */ - public GridCacheMissingCommitVersionSelfTest() { - super(true); - } + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration() throws Exception { - maxCompletedTxCnt = System.getProperty(IGNITE_MAX_COMPLETED_TX_COUNT); - - System.setProperty(IGNITE_MAX_COMPLETED_TX_COUNT, String.valueOf(5)); - - IgniteConfiguration cfg = super.getConfiguration(); + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + discoSpi.setIpFinder(IP_FINDER); cfg.setDiscoverySpi(discoSpi); @@ -70,24 +71,38 @@ public GridCacheMissingCommitVersionSelfTest() { ccfg.setCacheMode(PARTITIONED); ccfg.setAtomicityMode(TRANSACTIONAL); ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setBackups(1); cfg.setCacheConfiguration(ccfg); return cfg; } + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + maxCompletedTxCnt = System.getProperty(IGNITE_MAX_COMPLETED_TX_COUNT); + + System.setProperty(IGNITE_MAX_COMPLETED_TX_COUNT, String.valueOf(5)); + } + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { + super.afterTest(); + System.setProperty(IGNITE_MAX_COMPLETED_TX_COUNT, maxCompletedTxCnt != null ? maxCompletedTxCnt : ""); - super.afterTest(); + stopAllGrids(); } /** * @throws Exception If failed. */ public void testMissingCommitVersion() throws Exception { - final IgniteCache cache = jcache(); + startGrid(0); + + final IgniteCache cache = jcache(0); final int KEYS_PER_THREAD = 10_000; @@ -133,4 +148,98 @@ public void testMissingCommitVersion() throws Exception { } } } + + /** + * OnePhaseCommitAckRequest may come with missing version. It could be removed by completedVersHashMap itself if + * reaches the maximum size under heavy load. This tests verifies IGNITE-7047 fix + * @throws Exception If failed. + */ + public void testMissingVersionForOnePhaseCommitAckRequest() throws Exception { + final int GRID_SIZE = 2; + final AtomicBoolean finished = new AtomicBoolean(); + final AtomicInteger uncaughtCnt = new AtomicInteger(); + + // catch possible asserts from removeTxReturn + Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + public void uncaughtException(Thread t, Throwable e) { + log().error("uncaughtException " + e.getMessage()); + uncaughtCnt.getAndIncrement(); + } + }); + + startGridsMultiThreaded(GRID_SIZE); + + IgniteInternalFuture restarterFut = runRestarts(finished); + + IgniteInternalFuture updaterFut = runUpdates(finished); + + try { + U.sleep(10_000); + } + finally { + finished.set(true); + + restarterFut.get(); + updaterFut.get(); + } + + assertEquals(0, uncaughtCnt.get()); + + checkOnePhaseCommitReturnValuesCleaned(GRID_SIZE); + } + + + /** + * Creates and run a thread that restarts Node-0 every 300 ms. + * @param finished Boolean atomic that stops thread infinite loop by True value. + * @return future. + */ + private IgniteInternalFuture runRestarts(final AtomicBoolean finished) { + return runAsync(new Callable() { + @Override public Object call() throws Exception { + while (!finished.get()) { + stopGrid(0); + + U.sleep(300); + + startGrid(0); + + awaitPartitionMapExchange(); + } + + return null; + } + }); + } + + /** + * Creates and run a thread that executes Cache getAndPut operations (at Node-1) in infinite loop. + * @param finished Boolean atomic that stops thread infinite loop by True value. + * @return future. + */ + private IgniteInternalFuture runUpdates(final AtomicBoolean finished) { + final int keysCnt = 10_000; + + return runAsync(new Callable() { + @Override public Object call() throws Exception { + int iter = 0; + + while (!finished.get()) { + try { + IgniteCache cache = ignite(1).cache(DEFAULT_CACHE_NAME); + + Integer val = ++iter; + + for (int i = 0; i < keysCnt; i++) + cache.getAndPut(i, val); + } + catch (Exception ignored) { + // No-op. + } + } + + return null; + } + }); + } } From 734c93537b4868e40e9df80b0281cfb5ac3c5240 Mon Sep 17 00:00:00 2001 From: apopov Date: Tue, 5 Dec 2017 15:58:30 +0300 Subject: [PATCH 2/2] IGNITE-7047: review fixes --- .../cache/transactions/IgniteTxHandler.java | 4 +- .../cache/transactions/IgniteTxManager.java | 29 +++++---- ...GridCacheMissingCommitVersionSelfTest.java | 65 ++++++++++--------- 3 files changed, 54 insertions(+), 44 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 38c877b030016..f102de2bb6a51 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -1288,9 +1288,9 @@ protected void finish( if (tx == null) { if (req.commit()) // Must be some long time duplicate, but we add it anyway. - ctx.tm().addCommittedTx(tx, req.version(), null); + ctx.tm().addCommittedTx(null, req.version(), null); else - ctx.tm().addRolledbackTx(tx, req.version()); + ctx.tm().addRolledbackTx(null, req.version()); if (log.isDebugEnabled()) log.debug("Received finish request for non-existing transaction (added to completed set) " + diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 3a6a4082eebf3..af3d38656fd7a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -933,6 +933,7 @@ public void addCommittedTx(IgniteInternalTx tx) { * @param tx Committed transaction. */ public void addCommittedTxReturn(IgniteInternalTx tx, GridCacheReturnCompletableWrapper ret) { + // nearXidVersion is passed to xidVersion for onePhaseCommit transaction addCommittedTxReturn(tx.nearXidVersion(), null, ret); } @@ -951,7 +952,7 @@ public boolean addRolledbackTx(IgniteInternalTx tx) { * @return If transaction was not already present in completed set. */ public boolean addCommittedTx( - IgniteInternalTx tx, + @Nullable IgniteInternalTx tx, GridCacheVersion xidVer, @Nullable GridCacheVersion nearXidVer ) { @@ -997,7 +998,7 @@ private void addCommittedTxReturn( * @return If transaction was not already present in completed set. */ public boolean addRolledbackTx( - IgniteInternalTx tx, + @Nullable IgniteInternalTx tx, GridCacheVersion xidVer ) { Object committed0 = completedVersHashMap.putIfAbsent(xidVer, false); @@ -1035,6 +1036,11 @@ public GridCacheReturnCompletableWrapper getCommittedTxReturn(GridCacheVersion x } /** + * removeTxReturn. completedVersHashMap could have the following values for xidVer: + * false - for rollbacked tx, + * true - for regular finished tx, + * GridCacheReturn - for onePhaseCommit case, + * null - for lost tx. * @param xidVer xidVer Completed transaction version. */ public void removeTxReturn(GridCacheVersion xidVer) { @@ -1043,17 +1049,16 @@ public void removeTxReturn(GridCacheVersion xidVer) { if (Boolean.FALSE.equals(prev)) // Tx can be rolled back. return; - if (!(prev instanceof GridCacheReturnCompletableWrapper)) - LT.warn(log(), prev + " instead of GridCacheReturnCompletableWrapper for " + xidVer.toString() + - ", size=" + completedVersHashMap.sizex()); - - boolean res; - if (prev == null) - res = (null == completedVersHashMap.putIfAbsent(xidVer, true)); - else - res = completedVersHashMap.replace(xidVer, prev, true); + if (prev == null) { + if (log.isDebugEnabled()) + log.debug("removeTxReturn: could not find " + xidVer); + return; + } - assert res; + if (prev instanceof GridCacheReturnCompletableWrapper) { + boolean res = completedVersHashMap.replace(xidVer, prev, true); + assert res; + } } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java index 955149dde5350..a8834cbe9df12 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java @@ -17,17 +17,15 @@ package org.apache.ignite.internal.processors.cache; -import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePutRetryAbstractSelfTest; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteFutureTimeoutException; @@ -78,6 +76,11 @@ public class GridCacheMissingCommitVersionSelfTest extends GridCommonAbstractTes return cfg; } + /** {@inheritDoc} */ + @Override public String getTestIgniteInstanceName(int idx) { + return "NODE-" + Integer.toString(idx); + } + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); @@ -152,55 +155,52 @@ public void testMissingCommitVersion() throws Exception { /** * OnePhaseCommitAckRequest may come with missing version. It could be removed by completedVersHashMap itself if * reaches the maximum size under heavy load. This tests verifies IGNITE-7047 fix + * Log can be enabled by adding org.apache.ignite.internal.processors.cache.transactions category to log4j-test.xml * @throws Exception If failed. */ public void testMissingVersionForOnePhaseCommitAckRequest() throws Exception { final int GRID_SIZE = 2; - final AtomicBoolean finished = new AtomicBoolean(); - final AtomicInteger uncaughtCnt = new AtomicInteger(); + + final AtomicReference unexpectedE = new AtomicReference(); // catch possible asserts from removeTxReturn Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { log().error("uncaughtException " + e.getMessage()); - uncaughtCnt.getAndIncrement(); + unexpectedE.set(e); } }); startGridsMultiThreaded(GRID_SIZE); - IgniteInternalFuture restarterFut = runRestarts(finished); + // run NODE-0 restarts 3 times in 100 msec + IgniteInternalFuture restartFut = runNodeRestarts(); - IgniteInternalFuture updaterFut = runUpdates(finished); + // run simultaneous transactions at NODE-1 + IgniteInternalFuture txFut = runTransactions(restartFut); - try { - U.sleep(10_000); - } - finally { - finished.set(true); + restartFut.get(); + txFut.get(); - restarterFut.get(); - updaterFut.get(); - } - - assertEquals(0, uncaughtCnt.get()); + assertEquals(null, unexpectedE.get()); checkOnePhaseCommitReturnValuesCleaned(GRID_SIZE); } /** - * Creates and run a thread that restarts Node-0 every 300 ms. - * @param finished Boolean atomic that stops thread infinite loop by True value. + * Creates and runs a thread that restarts Node-0 every 100 ms. * @return future. */ - private IgniteInternalFuture runRestarts(final AtomicBoolean finished) { + private IgniteInternalFuture runNodeRestarts() { return runAsync(new Callable() { @Override public Object call() throws Exception { - while (!finished.get()) { + U.sleep(200); + + for (int i = 0; i < 3; i++) { stopGrid(0); - U.sleep(300); + U.sleep(100); startGrid(0); @@ -213,25 +213,30 @@ private IgniteInternalFuture runRestarts(final AtomicBoolean finished) { } /** - * Creates and run a thread that executes Cache getAndPut operations (at Node-1) in infinite loop. - * @param finished Boolean atomic that stops thread infinite loop by True value. + * Creates and runs a thread that executes Cache getAndPut operations (at Node-1) in infinite loop. + * @param fut Future that is used as a signal to stop thread infinite loop. * @return future. */ - private IgniteInternalFuture runUpdates(final AtomicBoolean finished) { + private IgniteInternalFuture runTransactions(final IgniteInternalFuture fut) { final int keysCnt = 10_000; return runAsync(new Callable() { @Override public Object call() throws Exception { int iter = 0; - while (!finished.get()) { + while (!fut.isDone()) { try { IgniteCache cache = ignite(1).cache(DEFAULT_CACHE_NAME); Integer val = ++iter; - for (int i = 0; i < keysCnt; i++) - cache.getAndPut(i, val); + final Affinity affinity = ignite(1).affinity(cache.getName()); + + // run transactions for primary partitions at NODE-1 to have more unexpected + // OnePhaseCommitAckRequest at NODE-0. + for (int i = 0; i < keysCnt && !fut.isDone(); i++) + if (affinity.mapKeyToNode(i) == ignite(1).cluster().localNode()) + cache.getAndPut(i, val); } catch (Exception ignored) { // No-op.