From 7acf6b173df1cfc763dea72360fd9d7b13c20dfc Mon Sep 17 00:00:00 2001 From: voipp Date: Fri, 30 Jun 2017 18:06:53 +0300 Subject: [PATCH 1/6] IGNITE-5712 Context switching between threads for optimistic transactions --- .../ignite/tests/utils/TestTransaction.java | 10 + .../cache/GridCacheSharedContext.java | 24 + ...OptimisticSerializableTxPrepareFuture.java | 5 +- .../distributed/near/GridNearTxLocal.java | 45 ++ .../GridCacheDistributedQueryManager.java | 2 +- .../store/GridCacheStoreManagerAdapter.java | 10 + .../cache/transactions/IgniteTxAdapter.java | 15 +- .../cache/transactions/IgniteTxManager.java | 23 + .../cache/transactions/IgniteTxStateImpl.java | 2 - .../transactions/TransactionProxyImpl.java | 49 +- .../ignite/transactions/Transaction.java | 14 + .../ignite/transactions/TransactionState.java | 5 +- ...ractTransactionsInMultipleThreadsTest.java | 131 ++++ ...ansactionsInMultipleThreadsClientTest.java | 538 ++++++++++++++ ...sactionsInMultipleThreadsFailoverTest.java | 202 ++++++ ...sticTransactionsInMultipleThreadsTest.java | 678 ++++++++++++++++++ ...sticTransactionsInMultipleThreadsTest.java | 125 ++++ .../cache/GridAbstractCacheStoreSelfTest.java | 10 + .../testsuites/IgniteCacheTestSuite5.java | 9 + .../processors/cache/jta/CacheJtaManager.java | 49 +- .../cache/jta/CacheJtaResource.java | 29 +- .../GridJtaTransactionManagerSelfTest.java | 176 +++++ .../ignite/testsuites/IgniteJtaTestSuite.java | 3 + 23 files changed, 2099 insertions(+), 55 deletions(-) create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/AbstractTransactionsInMultipleThreadsTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsClientTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsFailoverTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/PessimisticTransactionsInMultipleThreadsTest.java create mode 100644 modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java index 4a03d25a44b63..e587bd7862282 100644 --- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java @@ -140,4 +140,14 @@ public class TestTransaction implements Transaction { @Override public IgniteFuture rollbackAsync() throws IgniteException { return null; } + + /** {@inheritDoc} */ + @Override public void suspend() throws IgniteException{ + // No-op. + } + + /** {@inheritDoc} */ + @Override public void resume() throws IgniteException { + // No-op. + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index efd90a88108c4..8bf94310a47d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -943,6 +943,30 @@ public IgniteInternalFuture rollbackTxAsync(GridNearTxLocal tx) throws IgniteChe return tx.rollbackNearTxLocalAsync(); } + /** + * Suspends transaction. It could be resume later. Supported only for optimistic transactions. + * + * @param tx Transaction to suspend. + * @throws IgniteCheckedException If suspension failed. + */ + public void suspendTx(GridNearTxLocal tx) throws IgniteCheckedException { + tx.txState().awaitLastFut(this); + + tx.suspend(); + } + + /** + * Resume transaction if it was previously suspended. + * + * @param tx Transaction to resume. + * @throws IgniteCheckedException If resume failed. + */ + public void resumeTx(GridNearTxLocal tx) throws IgniteCheckedException { + tx.txState().awaitLastFut(this); + + tx.resume(); + } + /** * @return Store session listeners. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 72ddc67e0a291..8cc059f2c81d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -188,10 +188,11 @@ private void onError(@Nullable GridDistributedTxMapping m, Throwable e) { tx.removeMapping(m.primary().id()); } - ERR_UPD.compareAndSet(this, null, e); - if (keyLockFut != null) keyLockFut.onDone(e); + + if (ERR_UPD.compareAndSet(this, null, e)) + onComplete(); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 81e5ca86ab643..9d2d789965660 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -105,12 +105,14 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER; import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER; +import static org.apache.ignite.transactions.TransactionState.ACTIVE; import static org.apache.ignite.transactions.TransactionState.COMMITTED; import static org.apache.ignite.transactions.TransactionState.COMMITTING; import static org.apache.ignite.transactions.TransactionState.PREPARED; import static org.apache.ignite.transactions.TransactionState.PREPARING; import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK; import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK; +import static org.apache.ignite.transactions.TransactionState.SUSPENDED; import static org.apache.ignite.transactions.TransactionState.UNKNOWN; /** @@ -2850,6 +2852,28 @@ public void addKeyMapping(IgniteTxKey key, ClusterNode node) { return txState.singleWrite(); } + /** + * Suspends transaction. It could be resumed later. Supported only for optimistic transactions. + * + * @throws IgniteCheckedException If the transaction is in an incorrect state, or timed out. + */ + public void suspend() throws IgniteCheckedException { + if (pessimistic()) + throw new UnsupportedOperationException("Suspension is not supported for pessimistic transactions."); + + checkValid(); + + synchronized (this) { + if (ACTIVE != state()) + throw new IgniteCheckedException("Trying to suspend transaction with incorrect state " + + "[expected=" + ACTIVE + ", actual=" + state() + ']'); + + cctx.tm().detachThread(this); + + state(SUSPENDED); + } + } + /** * @param maps Mappings. */ @@ -2991,6 +3015,27 @@ private void readyNearLock(IgniteTxEntry txEntry, } } + /** + * Resumes transaction (possibly in another thread) if it was previously suspended. + * + * @throws IgniteCheckedException If the transaction is in an incorrect state, or timed out. + */ + public void resume() throws IgniteCheckedException { + checkValid(); + + synchronized (this) { + if (SUSPENDED != state()) + throw new IgniteCheckedException("Trying to resume transaction with incorrect state " + + "[expected=" + SUSPENDED + ", actual=" + state() + ']'); + + cctx.tm().attachCurrentThread(this); + + threadId = Thread.currentThread().getId(); + + state(ACTIVE); + } + } + /** {@inheritDoc} */ @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"}) @Override public boolean localFinish(boolean commit) throws IgniteCheckedException { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index 7f859a204e138..a67527c296a53 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -211,7 +211,7 @@ protected void removeQueryFuture(long reqId) { cctx.cacheId(), req.id(), new IgniteCheckedException("Received request for incorrect cache [expected=" + cctx.name() + - ", actual=" + req.cacheName()), + ", actual=" + req.cacheName() + ']'), cctx.deploymentEnabled()); sendQueryResponse(sndId, res, 0); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index c02e2c73b2710..bb16ad1db2722 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -1357,6 +1357,11 @@ private static class TxProxy implements Transaction { return tx.state(); } + /** {@inheritDoc} */ + @Override public void suspend() throws IgniteException { + throw new UnsupportedOperationException(); + } + /** {@inheritDoc} */ @Override public long timeout() { return tx.timeout(); @@ -1402,6 +1407,11 @@ private static class TxProxy implements Transaction { throw new UnsupportedOperationException(); } + /** {@inheritDoc} */ + @Override public void resume() throws IgniteException { + throw new UnsupportedOperationException(); + } + /** {@inheritDoc} */ @Override public IgniteAsyncSupport withAsync() { throw new UnsupportedOperationException(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 880d9b91299aa..829cec41fb3be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -97,6 +97,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING; import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK; import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK; +import static org.apache.ignite.transactions.TransactionState.SUSPENDED; /** * Managed transaction adapter. @@ -981,10 +982,10 @@ protected boolean state(TransactionState state, boolean timedOut) { switch (state) { case ACTIVE: { - valid = false; + valid = prev == SUSPENDED; break; - } // Active is initial state and cannot be transitioned to. + } case PREPARING: { valid = prev == ACTIVE; @@ -1029,7 +1030,7 @@ protected boolean state(TransactionState state, boolean timedOut) { } case MARKED_ROLLBACK: { - valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED; + valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED || prev == SUSPENDED; break; } @@ -1037,7 +1038,13 @@ protected boolean state(TransactionState state, boolean timedOut) { case ROLLING_BACK: { valid = prev == ACTIVE || prev == MARKED_ROLLBACK || prev == PREPARING || - prev == PREPARED || (prev == COMMITTING && local() && !dht()); + prev == PREPARED || (prev == COMMITTING && local() && !dht()) || prev == SUSPENDED; + + break; + } + + case SUSPENDED: { + valid = prev == ACTIVE; break; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 3a3b7668ec3dc..5cabc5b4d6490 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -2236,6 +2236,29 @@ public Collection> deadlockDetectionFutures() { return (Collection>)values; } + /** + * Attaches current thread to transaction. + * + * @param tx Transaction to be attached. + */ + public void attachCurrentThread(IgniteInternalTx tx) { + assert tx != null; + assert !threadMap.containsKey(tx.threadId()); + + assert threadMap.putIfAbsent(Thread.currentThread().getId(), tx) == null; + } + + /** + * Detaches thread from the transaction. + * + * @param tx Transaction to be detached. + */ + public void detachThread(IgniteInternalTx tx) { + assert tx != null; + + assert threadMap.remove(tx.threadId(), tx); + } + /** * Timeout object for node failure handler. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java index ad4ca61011381..22ca4571c2cd2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java @@ -29,8 +29,6 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.processors.cache.CacheStoppedException; -import org.apache.ignite.internal.managers.discovery.DiscoCache; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java index 8750cab00df4d..0944a31f3b820 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java @@ -96,8 +96,13 @@ public GridNearTxLocal tx() { /** * Enters a call. + * + * @param checkThreadId Whether threadId should be checked. */ - private void enter() { + private void enter(boolean checkThreadId) { + assert Thread.currentThread().getId() == threadId() || !checkThreadId : + "Only thread owning transaction is permitted to operate it."; + if (cctx.deploymentEnabled()) cctx.deploy().onEnter(); @@ -203,6 +208,20 @@ private void leave() { return tx.state(); } + /** {@inheritDoc} */ + @Override public void suspend() throws IgniteException { + enter(true); + + try { + cctx.suspendTx(tx); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } finally { + leave(); + } + } + /** {@inheritDoc} */ @Override public long timeout(long timeout) { return tx.timeout(timeout); @@ -226,7 +245,7 @@ private void leave() { /** {@inheritDoc} */ @Override public boolean setRollbackOnly() { - enter(); + enter(true); try { return tx.setRollbackOnly(); @@ -238,7 +257,7 @@ private void leave() { /** {@inheritDoc} */ @Override public boolean isRollbackOnly() { - enter(); + enter(true); try { if (async) @@ -253,7 +272,7 @@ private void leave() { /** {@inheritDoc} */ @Override public void commit() { - enter(); + enter(true); try { IgniteInternalFuture commitFut = cctx.commitTxAsync(tx); @@ -273,7 +292,7 @@ private void leave() { /** {@inheritDoc} */ @Override public IgniteFuture commitAsync() throws IgniteException { - enter(); + enter(true); try { return (IgniteFuture)createFuture(cctx.commitTxAsync(tx)); @@ -285,7 +304,7 @@ private void leave() { /** {@inheritDoc} */ @Override public void close() { - enter(); + enter(true); try { cctx.endTx(tx); @@ -300,7 +319,7 @@ private void leave() { /** {@inheritDoc} */ @Override public void rollback() { - enter(); + enter(true); try { IgniteInternalFuture rollbackFut = cctx.rollbackTxAsync(tx); @@ -320,7 +339,7 @@ private void leave() { /** {@inheritDoc} */ @Override public IgniteFuture rollbackAsync() throws IgniteException { - enter(); + enter(true); try { return (IgniteFuture)(new IgniteFutureImpl(cctx.rollbackTxAsync(tx))); @@ -333,6 +352,20 @@ private void leave() { } } + /** {@inheritDoc} */ + @Override public void resume() throws IgniteException { + enter(false); + + try { + cctx.resumeTx(tx); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } finally { + leave(); + } + } + /** * @param res Result to convert to finished future. */ diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java index 57a2b00eefa24..7f5c6c02b01f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java +++ b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java @@ -272,4 +272,18 @@ public interface Transaction extends AutoCloseable, IgniteAsyncSupport { * @throws IgniteException If rollback failed. */ public IgniteFuture rollbackAsync() throws IgniteException; + + /** + * Resume transaction if it was previously suspended. + * + * @throws IgniteException If resume failed. + */ + public void resume() throws IgniteException; + + /** + * Suspends transaction. It could be resumed later. Supported only for optimistic transactions. + * + * @throws IgniteException If suspension failed. + */ + public void suspend() throws IgniteException; } \ No newline at end of file diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionState.java b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionState.java index 19802421ba31a..b235a6980cc8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionState.java +++ b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionState.java @@ -48,7 +48,10 @@ public enum TransactionState { ROLLED_BACK, /** Transaction rollback failed or is otherwise unknown state. */ - UNKNOWN; + UNKNOWN, + + /** Transaction has been suspended by user. */ + SUSPENDED; /** Enumerated values. */ private static final TransactionState[] VALS = values(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/AbstractTransactionsInMultipleThreadsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/AbstractTransactionsInMultipleThreadsTest.java new file mode 100644 index 0000000000000..3a210073b091b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/AbstractTransactionsInMultipleThreadsTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.TransactionIsolation; + +/** + * + */ +public abstract class AbstractTransactionsInMultipleThreadsTest extends GridCommonAbstractTest { + /** Transaction isolation level. */ + protected TransactionIsolation transactionIsolation; + + /** Id of node, started transaction. */ + protected int txInitiatorNodeId = 0; + + /** + * Creates new cache configuration. + * + * @return CacheConfiguration New cache configuration. + */ + protected CacheConfiguration getCacheConfiguration() { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(CacheMode.PARTITIONED); + + return cacheCfg; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setClientMode(false); + cfg.setCacheConfiguration(getCacheConfiguration()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + checkAllTransactionsHasEnded(); + } + + /** + * Checks whether all transactions has ended. + */ + private void checkAllTransactionsHasEnded() { + for (Ignite ignite : G.allGrids()) { + GridCacheSharedContext cctx = ((IgniteKernal)ignite).context().cache().context(); + + IgniteTxManager txMgr = cctx.tm(); + + assertTrue(txMgr.activeTransactions().isEmpty()); + } + } + + /** + * Starts test scenario for all transaction isolation levels. + * + * @param testScenario Test scenario. + * @throws Exception If scenario failed. + */ + protected void runWithAllIsolations(IgniteCallable testScenario) throws Exception { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + this.transactionIsolation = isolation; + + try { + testScenario.call(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + } + + /** + * Waits all transactions to finish. + * + * @throws IgniteInterruptedCheckedException If interrupted. + */ + protected void waitAllTransactionsHasFinished() throws IgniteInterruptedCheckedException { + boolean txFinished = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + GridCacheAdapter cache = ((IgniteKernal)ignite(0)).internalCache(DEFAULT_CACHE_NAME); + + IgniteTxManager txMgr = cache.isNear() ? + ((GridNearCacheAdapter)cache).dht().context().tm() : + cache.context().tm(); + + return txMgr.activeTransactions().isEmpty(); + } + }, 10000); + + assertTrue(txFinished); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsClientTest.java new file mode 100644 index 0000000000000..30a5f702897da --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsClientTest.java @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.apache.ignite.transactions.TransactionState; +import org.jsr166.LongAdder8; + +/** + * + */ +public class OptimisticTransactionsInMultipleThreadsClientTest extends OptimisticTransactionsInMultipleThreadsTest { + /** Number of concurrently running threads, which tries to perform transaction operations. */ + private int concurrentThreadsNum = 25; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrid(getTestIgniteInstanceName(1), getConfiguration().setClientMode(true)); + awaitPartitionMapExchange(); + + txInitiatorNodeId = 1; + } + + /** + * Test start 1 transaction, resuming it in another thread. And then start another transaction, trying to write + * the same key and commit it. + * + * @throws Exception If failed. + */ + public void testResumeTxWhileStartingAnotherTx() throws Exception { + for (final TransactionIsolation firstTxIsolation : TransactionIsolation.values()) + runWithAllIsolations(new IgniteCallable() { + @Override public Void call() throws Exception { + resumeTxWhileStartingAnotherTx(firstTxIsolation); + + return null; + } + }); + } + + /** + * @param firstTxIsolation Isolation level for first tx. + * @throws IgniteCheckedException If failed. + */ + private void resumeTxWhileStartingAnotherTx(TransactionIsolation firstTxIsolation) throws Exception { + final IgniteCache clientCache = jcache(txInitiatorNodeId); + final IgniteCache remoteCache = jcache(0); + + final CyclicBarrier barrier = new CyclicBarrier(2); + + final String remotePrimaryKey = String.valueOf(primaryKey(remoteCache)); + + final IgniteTransactions txs = ignite(txInitiatorNodeId).transactions(); + + final Transaction clientTx = txs.txStart(TransactionConcurrency.OPTIMISTIC, + firstTxIsolation); + + clientCache.put(remotePrimaryKey, 1); + + clientTx.suspend(); + + final IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Boolean call() throws Exception { + assertNull(txs.tx()); + assertEquals(TransactionState.SUSPENDED, clientTx.state()); + + clientTx.resume(); + + clientCache.put(remotePrimaryKey, 2); + + barrier.await(); + + barrier.await(); + + clientTx.commit(); + + return true; + } + }); + + barrier.await(); + + final Transaction clientTx2 = ignite(txInitiatorNodeId).transactions().txStart(TransactionConcurrency.OPTIMISTIC, + transactionIsolation); + + clientCache.put(remotePrimaryKey, 3); + + clientTx2.commit(); + + barrier.await(); + + fut.get(5000); + + assertEquals(2, jcache(0).get(remotePrimaryKey)); + + jcache(0).removeAll(); + } + + /** + * Test start 1 transaction, suspend it. And then start another transaction, trying to write + * the same key and commit it. + * + * @throws Exception If failed. + */ + public void testSuspendTxAndStartNewTx() throws Exception { + for (final TransactionIsolation firstTxIsolation : TransactionIsolation.values()) + runWithAllIsolations(new IgniteCallable() { + @Override public Void call() throws Exception { + suspendTxAndStartNewTx(firstTxIsolation); + + return null; + } + }); + } + + /** + * @param isolation1 Isolation level for first tx. + * @throws IgniteCheckedException If failed. + */ + private void suspendTxAndStartNewTx(TransactionIsolation isolation1) throws IgniteCheckedException { + final IgniteCache clientCache = jcache(txInitiatorNodeId); + final IgniteCache remoteCache = jcache(0); + + String remotePrimaryKey = String.valueOf(primaryKey(remoteCache)); + + Ignite clientIgnite = ignite(txInitiatorNodeId); + + final Transaction clientTx = clientIgnite.transactions().txStart(TransactionConcurrency.OPTIMISTIC, isolation1); + + clientCache.put(remotePrimaryKey, 1); + + clientTx.suspend(); + + final Transaction clientTx2 = clientIgnite.transactions().txStart(TransactionConcurrency.OPTIMISTIC, + transactionIsolation); + + clientCache.put(remotePrimaryKey, 2); + + clientTx2.commit(); + + assertEquals(2, jcache(0).get(remotePrimaryKey)); + + clientTx.close(); + + remoteCache.removeAll(); + } + + /** + * Test for concurrent transaction suspend. + * + * @throws Exception If failed. + */ + public void testTxConcurrentSuspend() throws Exception { + runWithAllIsolations(new IgniteCallable() { + @Override public Void call() throws Exception { + txConcurrentSuspend(); + + return null; + } + }); + } + + /** + * @throws Exception If failed. + */ + private void txConcurrentSuspend() throws Exception { + final IgniteCache clientCache = jcache(txInitiatorNodeId); + final IgniteCache remoteCache = jcache(0); + + final CyclicBarrier barrier = new CyclicBarrier(concurrentThreadsNum + 1); + final LongAdder8 failedTxNumber = new LongAdder8(); + final AtomicInteger threadCnt = new AtomicInteger(); + final AtomicInteger successfulResume = new AtomicInteger(); + + String remotePrimaryKey = String.valueOf(primaryKey(remoteCache)); + + final Transaction clientTx = ignite(txInitiatorNodeId).transactions().txStart(TransactionConcurrency.OPTIMISTIC, + transactionIsolation); + + clientCache.put(remotePrimaryKey, 1); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override public Object call() throws Exception { + waitAndPerformOperation(threadCnt, barrier, clientTx, successfulResume, failedTxNumber); + + return null; + } + }, concurrentThreadsNum, "th-suspend"); + + barrier.await(); + + clientTx.suspend(); + + fut.get(); + + // if transaction was not closed after resume, then close it now. + if (successfulResume.get() == 0) + clientTx.close(); + + assertTrue(successfulResume.get() < 2); + assertEquals(concurrentThreadsNum, failedTxNumber.intValue() + successfulResume.intValue()); + assertNull(remoteCache.get(remotePrimaryKey)); + } + + /** + * Test for concurrent transaction resume. + * + * @throws Exception If failed. + */ + public void testTxConcurrentResume() throws Exception { + runWithAllIsolations(new IgniteCallable() { + @Override public Void call() throws Exception { + txConcurrentResume(); + + return null; + } + }); + } + + /** + * @throws Exception If failed. + */ + private void txConcurrentResume() throws Exception { + final IgniteCache clientCache = jcache(txInitiatorNodeId); + final IgniteCache remoteCache = jcache(0); + + final CyclicBarrier barrier = new CyclicBarrier(concurrentThreadsNum); + final LongAdder8 failNumber = new LongAdder8(); + final AtomicInteger threadCnt = new AtomicInteger(); + final AtomicInteger successfulResume = new AtomicInteger(); + + String remotePrimaryKey = String.valueOf(primaryKey(remoteCache)); + + final Transaction clientTx = ignite(txInitiatorNodeId).transactions().txStart(TransactionConcurrency.OPTIMISTIC, + transactionIsolation); + + clientCache.put(remotePrimaryKey, 1); + + clientTx.suspend(); + + multithreaded(new Callable() { + @Override public Object call() throws Exception { + waitAndPerformOperation(threadCnt, barrier, clientTx, successfulResume, failNumber); + + return null; + } + }, concurrentThreadsNum); + + assertEquals(1, successfulResume.get()); + assertEquals(concurrentThreadsNum - 1, failNumber.intValue()); + assertNull(remoteCache.get(remotePrimaryKey)); + } + + /** + * Test for concurrent transaction commit. + * + * @throws Exception If failed. + */ + public void testTxConcurrentCommit() throws Exception { + runWithAllIsolations(new IgniteCallable() { + @Override public Void call() throws Exception { + txConcurrentCommit(); + + return null; + } + }); + } + + /** + * @throws Exception If failed. + */ + private void txConcurrentCommit() throws Exception { + final IgniteCache clientCache = jcache(txInitiatorNodeId); + final IgniteCache remoteCache = jcache(0); + + final CyclicBarrier barrier = new CyclicBarrier(concurrentThreadsNum + 1); + final LongAdder8 failNumber = new LongAdder8(); + final AtomicInteger threadCnt = new AtomicInteger(); + final AtomicInteger successfulResume = new AtomicInteger(); + + String remotePrimaryKey = String.valueOf(primaryKey(remoteCache)); + + final Transaction clientTx = ignite(txInitiatorNodeId).transactions().txStart(TransactionConcurrency.OPTIMISTIC, + transactionIsolation); + + clientCache.put(remotePrimaryKey, 1); + + clientTx.suspend(); + + multithreaded(new Callable() { + @Override public Object call() throws Exception { + clientTx.resume(); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override public Object call() throws Exception { + waitAndPerformOperation(threadCnt, barrier, clientTx, successfulResume, failNumber); + + return null; + } + }, concurrentThreadsNum, "th-commit"); + + barrier.await(); + + clientTx.commit(); + + fut.get(); + + return null; + } + }, 1); + + assertEquals(0, successfulResume.get()); + assertEquals(concurrentThreadsNum, failNumber.intValue()); + assertEquals(1, jcache(0).get(remotePrimaryKey)); + } + + /** + * Test for concurrent transaction rollback. + * + * @throws Exception If failed. + */ + public void testTxConcurrentRollback() throws Exception { + runWithAllIsolations(new IgniteCallable() { + @Override public Void call() throws Exception { + txConcurrentRollback(); + + return null; + } + }); + } + + /** + * @throws Exception If failed. + */ + private void txConcurrentRollback() throws Exception { + final IgniteCache clientCache = jcache(txInitiatorNodeId); + final IgniteCache remoteCache = jcache(0); + + final CyclicBarrier barrier = new CyclicBarrier(concurrentThreadsNum + 1); + final LongAdder8 failNumber = new LongAdder8(); + final AtomicInteger threadCnt = new AtomicInteger(); + final AtomicInteger successfulResume = new AtomicInteger(); + + String remotePrimaryKey = String.valueOf(primaryKey(remoteCache)); + + final Transaction clientTx = ignite(txInitiatorNodeId).transactions().txStart(TransactionConcurrency.OPTIMISTIC, + transactionIsolation); + + clientCache.put(remotePrimaryKey, 1); + + clientTx.suspend(); + + multithreaded(new Callable() { + @Override public Object call() throws Exception { + clientTx.resume(); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override public Object call() throws Exception { + waitAndPerformOperation(threadCnt, barrier, clientTx, successfulResume, failNumber); + + return null; + } + }, concurrentThreadsNum, "th-rollback"); + + barrier.await(); + + clientTx.rollback(); + + fut.get(); + + return null; + } + }, 1); + + assertEquals(0, successfulResume.get()); + assertEquals(concurrentThreadsNum, failNumber.intValue()); + assertNull(jcache(0).get(remotePrimaryKey)); + } + + /** + * Test for concurrent transaction close. + * + * @throws Exception If failed. + */ + public void testTxConcurrentClose() throws Exception { + runWithAllIsolations(new IgniteCallable() { + @Override public Void call() throws Exception { + txConcurrentClose(); + + return null; + } + }); + } + + /** + * @throws Exception If failed. + */ + private void txConcurrentClose() throws Exception { + final IgniteCache clientCache = jcache(txInitiatorNodeId); + final IgniteCache remoteCache = jcache(0); + + final CyclicBarrier barrier = new CyclicBarrier(concurrentThreadsNum + 1); + final LongAdder8 failNumber = new LongAdder8(); + final AtomicInteger threadCnt = new AtomicInteger(); + final AtomicInteger successfulResume = new AtomicInteger(); + + String remotePrimaryKey = String.valueOf(primaryKey(remoteCache)); + + final Transaction clientTx = ignite(txInitiatorNodeId).transactions().txStart(TransactionConcurrency.OPTIMISTIC, + transactionIsolation); + + clientCache.put(remotePrimaryKey, 1); + + clientTx.suspend(); + + multithreaded(new Callable() { + @Override public Object call() throws Exception { + clientTx.resume(); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override public Object call() throws Exception { + waitAndPerformOperation(threadCnt, barrier, clientTx, successfulResume, failNumber); + + return null; + } + }, concurrentThreadsNum, "th-close"); + + barrier.await(); + + clientTx.close(); + + fut.get(); + + return null; + } + }, 1); + + assertEquals(0, successfulResume.get()); + assertEquals(concurrentThreadsNum, failNumber.intValue()); + assertNull(jcache(0).get(remotePrimaryKey)); + } + + /** + * Thread begin waiting on barrier and then performs some operation. + * + * @param threadCnt Common counter for threads. + * @param barrier Barrier, all threads are waiting on. + * @param clientTx Transaction instance that we test. + * @param successfulResume Counter for successful resume operations. + * @param failedTxNumber Counter for failed operations. + * @throws Exception If failed. + */ + private void waitAndPerformOperation(AtomicInteger threadCnt, CyclicBarrier barrier, Transaction clientTx, + AtomicInteger successfulResume, LongAdder8 failedTxNumber) throws Exception { + try { + int threadNum = threadCnt.incrementAndGet(); + + switch (threadNum % 5) { + case 0: + barrier.await(); + + clientTx.suspend(); + + break; + + case 1: + barrier.await(); + + clientTx.resume(); + + successfulResume.incrementAndGet(); + + clientTx.close(); + + return; + + case 2: + barrier.await(); + + clientTx.commit(); + + break; + + case 3: + barrier.await(); + + clientTx.rollback(); + + break; + + case 4: + barrier.await(); + + clientTx.close(); + + break; + + default: + assert false; + } + + fail("Concurrent operation must failed, because it doesn't own transaction."); + } + catch (Throwable e) { + failedTxNumber.increment(); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsFailoverTest.java new file mode 100644 index 0000000000000..48bbad031d203 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsFailoverTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.concurrent.Callable; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionState; + +/** + * + */ +public class OptimisticTransactionsInMultipleThreadsFailoverTest extends AbstractTransactionsInMultipleThreadsTest { + /** + * Starts transaction, breaks node and then resuming it in another thread. + * + * @param key Key to put. + * @param breakNodeIdx Node id to brake. + * @param initiatingNodeIdx Node, starting transaction on. + * @throws IgniteCheckedException If failed. + */ + private void performTransactionFailover(String key, + int breakNodeIdx, int initiatingNodeIdx) throws IgniteCheckedException { + final IgniteTransactions txs = grid(initiatingNodeIdx).transactions(); + IgniteCache cache = jcache(initiatingNodeIdx); + + final Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, transactionIsolation); + + cache.put(key, 1); + + tx.suspend(); + + G.stop(ignite(breakNodeIdx).name(), true); + + assertNull(txs.tx()); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Boolean call() throws Exception { + assertNull(txs.tx()); + assertEquals(TransactionState.SUSPENDED, tx.state()); + + tx.resume(); + + assertEquals(TransactionState.ACTIVE, tx.state()); + + tx.commit(); + + return true; + } + }); + + fut.get(); + } + + /** + * Starts tx locally with remote residing keys and then remote node fails. + */ + public void testTxRemoteNodeFailover() throws Exception { + startGrid(getTestIgniteInstanceName(0)); + + runWithAllIsolations(new IgniteCallable() { + @Override public Void call() throws Exception { + txRemoteNodeFailover(); + + return null; + } + }); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + private void txRemoteNodeFailover() throws Exception { + startGrid(1); + awaitPartitionMapExchange(); + + IgniteCache remoteCache = jcache(1); + + String remotePrimaryKey = String.valueOf(primaryKey(remoteCache)); + + assert remotePrimaryKey != null; + + performTransactionFailover(remotePrimaryKey, 1, 0); + + waitAllTransactionsHasFinished(); + + IgniteCache clientCache = jcache(0); + + assertEquals(1, (long)clientCache.get(remotePrimaryKey)); + + clientCache.removeAll(); + } + + /** + * Starts tx locally with locally residing keys and then local node fails. + */ + public void testTxLocalNodeFailover() throws Exception { + startGrid(getTestIgniteInstanceName(0)); + + runWithAllIsolations(new IgniteCallable() { + @Override public Void call() throws Exception { + txLocalNodeFailover(); + + return null; + } + }); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + private void txLocalNodeFailover() throws Exception { + startGrid(1); + awaitPartitionMapExchange(); + + IgniteCache localCache = jcache(1); + + String localPrimaryKey = String.valueOf(primaryKey(localCache)); + + assert localPrimaryKey != null; + + try { + performTransactionFailover(localPrimaryKey, 1, 1); + } + catch (IgniteCheckedException ignore) { + // ignoring node breakage exception. + } + + IgniteCache remoteCache = jcache(0); + + assertFalse(remoteCache.containsKey(localPrimaryKey)); + } + + /** + * Starts tx locally on client, and break remote primary node. + */ + public void testTxOnClientBreakRemote() throws Exception { + startGrid(2); + + startGrid(getTestIgniteInstanceName(0), getConfiguration().setClientMode(true)); + + runWithAllIsolations(new IgniteCallable() { + @Override public Void call() throws Exception { + txOnClientBreakRemote(); + + return null; + } + }); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + private void txOnClientBreakRemote() throws Exception { + startGrid(1); + awaitPartitionMapExchange(); + + IgniteCache remoteCache = jcache(1); + + String remotePrimaryKey = String.valueOf(primaryKey(remoteCache)); + + assert remotePrimaryKey != null; + + performTransactionFailover(remotePrimaryKey, 1, 0); + + waitAllTransactionsHasFinished(); + + IgniteCache clientCache = jcache(0); + + assertEquals(1, (long)clientCache.get(remotePrimaryKey)); + + clientCache.removeAll(); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsTest.java new file mode 100644 index 0000000000000..334b9d686aad1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsTest.java @@ -0,0 +1,678 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.apache.ignite.transactions.TransactionState; +import org.apache.ignite.transactions.TransactionTimeoutException; + +/** + * + */ +public class OptimisticTransactionsInMultipleThreadsTest extends AbstractTransactionsInMultipleThreadsTest { + /** Name for test cache */ + private static final String TEST_CACHE_NAME = "testCache"; + + /** Name for second test cache */ + private static final String TEST_CACHE_NAME2 = "testCache2"; + + /** Transaction timeout. */ + private static final long TIMEOUT = 100; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrid(0); + awaitPartitionMapExchange(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(true); + } + + /** + * Test for transaction starting in one thread, continuing in another. + * + * @throws Exception If failed. + */ + public void testSimpleTransactionInAnotherThread() throws Exception { + runWithAllIsolations(new IgniteCallable() { + @Override public Void call() throws Exception { + simpleTransactionInAnotherThread(); + + return null; + } + }); + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void simpleTransactionInAnotherThread() throws IgniteCheckedException { + final IgniteCache cache = jcache(txInitiatorNodeId); + final IgniteTransactions txs = ignite(txInitiatorNodeId).transactions(); + + assertNull(cache.get("key1")); + + final Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, transactionIsolation); + + cache.put("key1", 1); + cache.put("key2", 2); + + tx.suspend(); + + assertNull(cache.get("key1")); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Boolean call() throws Exception { + assertNull(txs.tx()); + assertEquals(TransactionState.SUSPENDED, tx.state()); + + tx.resume(); + + assertEquals(TransactionState.ACTIVE, tx.state()); + + cache.put("key3", 3); + cache.remove("key2"); + + tx.commit(); + + return true; + } + }); + + fut.get(5000); + + assertEquals(TransactionState.COMMITTED, tx.state()); + assertEquals((long)1, (long)cache.get("key1")); + assertEquals((long)3, (long)cache.get("key3")); + assertFalse(cache.containsKey("key2")); + + cache.removeAll(); + } + + /** + * Test for transaction starting in one thread, continuing in another, and resuming in initiating thread. + * + * @throws Exception If failed. + */ + public void testSimpleTransactionInAnotherThreadContinued() throws Exception { + runWithAllIsolations(new IgniteCallable() { + @Override public Void call() throws Exception { + simpleTransactionInAnotherThreadContinued(); + + return null; + } + }); + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void simpleTransactionInAnotherThreadContinued() throws IgniteCheckedException { + final IgniteCache cache = jcache(txInitiatorNodeId); + final IgniteTransactions txs = ignite(txInitiatorNodeId).transactions(); + + assertNull(cache.get("key1")); + + final Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, transactionIsolation); + + cache.put("key1", 1); + cache.put("key2", 2); + cache.put("key1'", 1); + + tx.suspend(); + + assertNull(cache.get("key1")); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Boolean call() throws Exception { + assertNull(txs.tx()); + assertEquals(TransactionState.SUSPENDED, tx.state()); + + tx.resume(); + + assertEquals(TransactionState.ACTIVE, tx.state()); + + cache.put("key3", 3); + cache.put("key2'", 2); + cache.remove("key2", 2); + + tx.suspend(); + + return true; + } + }); + + fut.get(5000); + + assertNull(txs.tx()); + assertEquals(TransactionState.SUSPENDED, tx.state()); + + tx.resume(); + + assertEquals(TransactionState.ACTIVE, tx.state()); + + cache.remove("key1'", 1); + cache.remove("key2'", 2); + cache.put("key3'", 3); + + tx.commit(); + + assertEquals(TransactionState.COMMITTED, tx.state()); + assertEquals((long)1, (long)cache.get("key1")); + assertEquals((long)3, (long)cache.get("key3")); + assertEquals((long)3, (long)cache.get("key3'")); + assertFalse(cache.containsKey("key2")); + assertFalse(cache.containsKey("key1'")); + assertFalse(cache.containsKey("key2'")); + + cache.removeAll(); + } + + /** + * Test for transaction starting in one thread, continuing in another. Cache operations performed for a couple of + * caches. + * + * @throws Exception If failed. + */ + public void testCrossCacheTransactionInAnotherThread() throws Exception { + runWithAllIsolations(new IgniteCallable() { + @Override public Void call() throws Exception { + crossCacheTransactionInAnotherThread(); + + return null; + } + }); + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void crossCacheTransactionInAnotherThread() throws IgniteCheckedException { + Ignite ignite = ignite(txInitiatorNodeId); + final IgniteTransactions txs = ignite.transactions(); + final IgniteCache cache1 = ignite.getOrCreateCache(getCacheConfiguration().setName(TEST_CACHE_NAME)); + final IgniteCache cache2 = ignite.getOrCreateCache(getCacheConfiguration().setName(TEST_CACHE_NAME2)); + + final Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, transactionIsolation); + + cache1.put("key1", 1); + cache2.put("key2", 2); + + tx.suspend(); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Boolean call() throws Exception { + assertNull(txs.tx()); + assertEquals(TransactionState.SUSPENDED, tx.state()); + + tx.resume(); + + assertEquals(TransactionState.ACTIVE, tx.state()); + + cache1.put("key3", 3); + cache2.remove("key2"); + + tx.commit(); + + return true; + } + }); + + fut.get(5000); + + assertEquals(TransactionState.COMMITTED, tx.state()); + assertEquals((long)1, (long)cache1.get("key1")); + assertEquals((long)3, (long)cache1.get("key3")); + assertFalse(cache2.containsKey("key2")); + + cache2.removeAll(); + cache1.removeAll(); + } + + /** + * Test for transaction starting in one thread, continuing in another, and resuming in initiating thread. + * Cache operations performed for a couple of caches. + * + * @throws Exception If failed. + */ + public void testCrossCacheTransactionInAnotherThreadContinued() throws Exception { + runWithAllIsolations(new IgniteCallable() { + @Override public Void call() throws Exception { + crossCacheTransactionInAnotherThreadContinued(); + + return null; + } + }); + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void crossCacheTransactionInAnotherThreadContinued() throws IgniteCheckedException { + Ignite ignite = ignite(txInitiatorNodeId); + final IgniteTransactions txs = ignite.transactions(); + final IgniteCache cache1 = ignite.getOrCreateCache(getCacheConfiguration().setName(TEST_CACHE_NAME)); + final IgniteCache cache2 = ignite.getOrCreateCache(getCacheConfiguration().setName(TEST_CACHE_NAME2)); + + final Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, transactionIsolation); + + cache1.put("key1", 1); + cache2.put("key2", 2); + cache1.put("key1'", 1); + + tx.suspend(); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Boolean call() throws Exception { + assertNull(txs.tx()); + assertEquals(TransactionState.SUSPENDED, tx.state()); + + tx.resume(); + + assertEquals(TransactionState.ACTIVE, tx.state()); + + cache1.put("key3", 3); + cache2.put("key2'", 2); + cache2.remove("key2"); + + tx.suspend(); + + return true; + } + }); + + fut.get(5000); + + assertNull(txs.tx()); + assertEquals(TransactionState.SUSPENDED, tx.state()); + + tx.resume(); + + assertEquals(TransactionState.ACTIVE, tx.state()); + + cache1.remove("key1'", 1); + cache2.remove("key2'", 2); + cache1.put("key3'", 3); + + tx.commit(); + + assertEquals(TransactionState.COMMITTED, tx.state()); + assertEquals((long)1, (long)cache1.get("key1")); + assertEquals((long)3, (long)cache1.get("key3")); + assertEquals((long)3, (long)cache1.get("key3'")); + assertFalse(cache2.containsKey("key2")); + assertFalse(cache2.containsKey("key2'")); + assertFalse(cache1.containsKey("key1'")); + + cache1.removeAll(); + cache2.removeAll(); + } + + /** + * Test for transaction rollback. + * + * @throws Exception If failed. + */ + public void testTransactionRollback() throws Exception { + runWithAllIsolations(new IgniteCallable() { + @Override public Void call() throws Exception { + transactionRollback(); + + return null; + } + }); + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void transactionRollback() throws IgniteCheckedException { + final IgniteCache cache = jcache(txInitiatorNodeId); + final IgniteTransactions txs = ignite(txInitiatorNodeId).transactions(); + + final Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, transactionIsolation); + + cache.put("key1", 1); + cache.put("key2", 2); + + tx.suspend(); + + final IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Boolean call() throws Exception { + assertNull(txs.tx()); + assertEquals(TransactionState.SUSPENDED, tx.state()); + + tx.resume(); + + assertEquals(TransactionState.ACTIVE, tx.state()); + + cache.put("key3", 3); + + assertTrue(cache.remove("key2")); + + tx.rollback(); + + return true; + } + }); + + fut.get(5000); + + assertEquals(TransactionState.ROLLED_BACK, tx.state()); + assertFalse(cache.containsKey("key1")); + assertFalse(cache.containsKey("key2")); + assertFalse(cache.containsKey("key3")); + + cache.removeAll(); + } + + /** + * Test for starting and suspending transactions, and then resuming and committing in another thread. + * + * @throws IgniteCheckedException If failed. + */ + public void testMultipleTransactionsSuspendResume() throws IgniteCheckedException { + + for (TransactionIsolation isolation : TransactionIsolation.values()) { + transactionIsolation = isolation; + + multipleTransactionsSuspendResume(); + } + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void multipleTransactionsSuspendResume() throws IgniteCheckedException { + final List txs = new ArrayList<>(); + IgniteCache clientCache = jcache(txInitiatorNodeId); + Ignite clientNode = ignite(txInitiatorNodeId); + Transaction clientTx; + + for (int i = 0; i < 10; i++) { + clientTx = clientNode.transactions().txStart(TransactionConcurrency.OPTIMISTIC, transactionIsolation); + + clientCache.put("1", i); + + clientTx.suspend(); + + txs.add(clientTx); + } + + final IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Boolean call() throws Exception { + assertNull(ignite(txInitiatorNodeId).transactions().tx()); + + for (int i = 0; i < 10; i++) { + Transaction clientTx = txs.get(i); + + assertEquals(TransactionState.SUSPENDED, clientTx.state()); + + clientTx.resume(); + + assertEquals(TransactionState.ACTIVE, clientTx.state()); + + clientTx.commit(); + } + + return true; + } + }); + + fut.get(5000); + + assertEquals(9, jcache(0).get("1")); + + clientCache.removeAll(); + } + + /** + * Test for closing suspended transaction. + */ + public void testCloseSuspendedTransaction() { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + transactionIsolation = isolation; + + closeSuspendedTransaction(); + } + } + + /** + * + */ + private void closeSuspendedTransaction() { + final IgniteCache cache = jcache(txInitiatorNodeId); + final IgniteTransactions txs = ignite(txInitiatorNodeId).transactions(); + + Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, transactionIsolation); + + cache.put("key1", 1); + + tx.suspend(); + + tx.close(); + + assertEquals(TransactionState.ROLLED_BACK, tx.state()); + assertNull(cache.get("key1")); + } + + /** + * Test for rolling back suspended transaction. + */ + public void testRollbackSuspendedTransaction() { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + transactionIsolation = isolation; + + rollbackSuspendedTransaction(); + } + } + + /** + * + */ + private void rollbackSuspendedTransaction() { + final IgniteCache cache = jcache(txInitiatorNodeId); + final IgniteTransactions txs = ignite(txInitiatorNodeId).transactions(); + + Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, transactionIsolation); + + cache.put("key1", 1); + + tx.suspend(); + + tx.rollback(); + + assertEquals(TransactionState.ROLLED_BACK, tx.state()); + assertNull(cache.get("key1")); + } + + /** + * Test checking commit on suspended transaction leads to exception. + */ + public void testCommitSuspendedTxIsProhibited() { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + transactionIsolation = TransactionIsolation.SERIALIZABLE; + + commitSuspendedTxIsProhibited(); + } + } + + /** + * + */ + private void commitSuspendedTxIsProhibited() { + final IgniteCache cache = jcache(txInitiatorNodeId); + final IgniteTransactions txs = ignite(txInitiatorNodeId).transactions(); + + try (Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, transactionIsolation)) { + cache.put("key1", 1); + + tx.suspend(); + + tx.commit(); + + fail("Committing suspended transaction is prohibited."); + } + catch (IgniteException ignore) { + // ignoring commit exception on suspended transaction. + } + } + + /** + * Test checking commit on suspended transaction leads to exception. + */ + public void testRollbackSuspendedTxIsProhibitedFromOtherThread() throws Exception { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + transactionIsolation = isolation; + + rollbackSuspendedTxIsProhibitedFromOtherThread(); + } + } + + /** + * + */ + private void rollbackSuspendedTxIsProhibitedFromOtherThread() { + final IgniteCache cache = jcache(txInitiatorNodeId); + final IgniteTransactions txs = ignite(txInitiatorNodeId).transactions(); + + try (Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, transactionIsolation)) { + cache.put("key1", 1); + + tx.suspend(); + + multithreaded(new Callable() { + @Override public Object call() throws Exception { + tx.rollback(); + + return null; + } + }, 1); + + fail("Rolling back suspended transaction is prohibited from the other thread."); + } + catch (Throwable ignore) { + // ignoring rollback exception on suspended transaction from the other thread. + } + } + + /** + * Test checking timeout on resumed transaction. + * + * @throws Exception If failed. + */ + public void testTransactionTimeoutOnResumedTransaction() throws Exception { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + transactionIsolation = isolation; + + transactionTimeoutOnResumedTransaction(); + } + } + + /** + * @throws Exception If failed. + */ + private void transactionTimeoutOnResumedTransaction() throws Exception { + final IgniteTransactions txs = ignite(txInitiatorNodeId).transactions(); + boolean tryResume = false; + + try (Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, transactionIsolation, TIMEOUT, 0)) { + tx.suspend(); + + long sleep = TIMEOUT * 2; + + Thread.sleep(sleep); + + tryResume = true; + + tx.resume(); + + fail("Transaction must have timed out."); + } + catch (Exception e) { + if (!(X.hasCause(e, TransactionTimeoutException.class))) + throw e; + } + + assert tryResume; + } + + /** + * Test checking timeout on suspended transaction. + * + * @throws Exception If failed. + */ + public void testTransactionTimeoutOnSuspendedTransaction() throws Exception { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + transactionIsolation = isolation; + + transactionTimeoutOnSuspendedTransaction(); + } + } + + /** + * @throws Exception If failed. + */ + private void transactionTimeoutOnSuspendedTransaction() throws Exception { + final IgniteTransactions txs = ignite(txInitiatorNodeId).transactions(); + final IgniteCache cache = jcache(txInitiatorNodeId); + boolean trySuspend = false; + + try (Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, transactionIsolation, TIMEOUT, 0)) { + cache.put("key1", 1); + + long sleep = TIMEOUT * 2; + + Thread.sleep(sleep); + + trySuspend = true; + + tx.suspend(); + + fail("Transaction must have timed out."); + } + catch (Exception e) { + if (!(X.hasCause(e, TransactionTimeoutException.class))) + throw e; + } + + assertNull(cache.get("key1")); + + assert trySuspend; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/PessimisticTransactionsInMultipleThreadsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/PessimisticTransactionsInMultipleThreadsTest.java new file mode 100644 index 0000000000000..aa2bd11a91512 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/PessimisticTransactionsInMultipleThreadsTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.concurrent.Callable; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; + +/** + * + */ +public class PessimisticTransactionsInMultipleThreadsTest extends AbstractTransactionsInMultipleThreadsTest { + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(2); + awaitPartitionMapExchange(); + } + + /** + * Test for suspension on pessimistic transaction. + * + * @throws Exception If failed. + */ + public void testSuspendPessimisticTransaction() throws Exception { + runWithAllIsolations(new IgniteCallable() { + @Override public Void call() throws Exception { + suspendPessimisticTransaction(); + + return null; + } + }); + } + + /** + * @throws Exception If failed. + */ + private void suspendPessimisticTransaction() throws Exception { + final IgniteCache cache = jcache(txInitiatorNodeId); + final IgniteTransactions txs = ignite(txInitiatorNodeId).transactions(); + + try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, transactionIsolation)) { + cache.put("key1", 1); + + tx.suspend(); + + fail("Suspend must fail, because it isn't supported for pessimistic transactions."); + } + catch (Throwable e) { + if (!X.hasCause(e, UnsupportedOperationException.class)) + throw e; + } + + assertNull(cache.get("key1")); + } + + /** + * Test for resuming on pessimistic transaction. + * + * @throws Exception If failed. + */ + public void testResumePessimisticTransaction() throws Exception { + runWithAllIsolations(new IgniteCallable() { + @Override public Void call() throws Exception { + resumePessimisticTransaction(); + + return null; + } + }); + } + + /** + * @throws Exception If failed. + */ + private void resumePessimisticTransaction() throws Exception { + final IgniteCache cache = jcache(txInitiatorNodeId); + final IgniteTransactions txs = ignite(txInitiatorNodeId).transactions(); + + try (Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, transactionIsolation)) { + cache.put("key1", 1); + + tx.suspend(); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Boolean call() throws Exception { + tx.resume(); + + return null; + } + }); + + fut.get(); + + fail("Resume must fail, because it isn't supported for pessimistic transactions."); + } + catch (Throwable e) { + if (!X.hasCause(e, UnsupportedOperationException.class)) + throw e; + } + + assertNull(cache.get("key1")); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java index c5673b321fd23..f764212fada90 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java @@ -578,6 +578,16 @@ public static class DummyTx extends GridMetadataAwareAdapter implements Transact // No-op. } + /** {@inheritDoc} */ + @Override public void suspend() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void resume() { + // No-op. + } + /** {@inheritDoc} */ @Override public IgniteFuture rollbackAsync() throws IgniteException { return null; diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java index 1395b954df600..8804335e6b5da 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java @@ -41,6 +41,10 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheGroupsPartitionLossPolicySelfTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePartitionLossPolicySelfTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheTxIteratorSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.OptimisticTransactionsInMultipleThreadsClientTest; +import org.apache.ignite.internal.processors.cache.distributed.OptimisticTransactionsInMultipleThreadsFailoverTest; +import org.apache.ignite.internal.processors.cache.distributed.OptimisticTransactionsInMultipleThreadsTest; +import org.apache.ignite.internal.processors.cache.distributed.PessimisticTransactionsInMultipleThreadsTest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.IgniteCacheAtomicProtocolTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.CacheManualRebalancingTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheSyncRebalanceModeSelfTest; @@ -95,6 +99,11 @@ public static TestSuite suite() throws Exception { suite.addTestSuite(GridCachePartitionEvictionDuringReadThroughSelfTest.class); + suite.addTestSuite(OptimisticTransactionsInMultipleThreadsTest.class); + suite.addTestSuite(OptimisticTransactionsInMultipleThreadsClientTest.class); + suite.addTestSuite(OptimisticTransactionsInMultipleThreadsFailoverTest.class); + suite.addTestSuite(PessimisticTransactionsInMultipleThreadsTest.class); + return suite; } } diff --git a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java index 5047491b9410f..468f96fbbd8c7 100644 --- a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java +++ b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java @@ -28,7 +28,6 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.lifecycle.LifecycleAware; import org.jetbrains.annotations.Nullable; @@ -36,9 +35,6 @@ * Implementation of {@link CacheJtaManagerAdapter}. */ public class CacheJtaManager extends CacheJtaManagerAdapter { - /** */ - private final ThreadLocal rsrc = new ThreadLocal<>(); - /** */ private TransactionManager jtaTm; @@ -145,47 +141,40 @@ private CacheTmLookup createTmLookup(String tmLookupClsName) throws IgniteChecke } if (jtaTm != null) { - CacheJtaResource rsrc = this.rsrc.get(); - - if (rsrc == null || rsrc.isFinished()) { - try { - Transaction jtaTx = jtaTm.getTransaction(); + try { + Transaction jtaTx = jtaTm.getTransaction(); - if (jtaTx != null) { - GridNearTxLocal tx = cctx.tm().userTx(); + if (jtaTx != null) { + GridNearTxLocal tx = cctx.tm().userTx(); - if (tx == null) { - TransactionConfiguration tCfg = cctx.kernalContext().config() - .getTransactionConfiguration(); + if (tx == null) { + TransactionConfiguration txCfg = cctx.kernalContext().config().getTransactionConfiguration(); - tx = cctx.tm().newTx( + tx = cctx.tm().newTx( /*implicit*/false, /*implicit single*/false, - null, - tCfg.getDefaultTxConcurrency(), - tCfg.getDefaultTxIsolation(), - tCfg.getDefaultTxTimeout(), + null, + txCfg.getDefaultTxConcurrency(), + txCfg.getDefaultTxIsolation(), + txCfg.getDefaultTxTimeout(), /*store enabled*/true, /*tx size*/0 - ); - } + ); - rsrc = new CacheJtaResource(tx, cctx.kernalContext()); + CacheJtaResource rsrc = new CacheJtaResource(tx, cctx.kernalContext()); if (useJtaSync) jtaTx.registerSynchronization(rsrc); else if (!jtaTx.enlistResource(rsrc)) throw new IgniteCheckedException("Failed to enlist XA resource to JTA user transaction."); - - this.rsrc.set(rsrc); } } - catch (SystemException e) { - throw new IgniteCheckedException("Failed to obtain JTA transaction.", e); - } - catch (RollbackException e) { - throw new IgniteCheckedException("Failed to enlist XAResource to JTA transaction.", e); - } + } + catch (SystemException e) { + throw new IgniteCheckedException("Failed to obtain JTA transaction.", e); + } + catch (RollbackException e) { + throw new IgniteCheckedException("Failed to enlist XAResource to JTA transaction.", e); } } } diff --git a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java index 649f7c41be7fd..0d855ce207a91 100644 --- a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java +++ b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java @@ -71,12 +71,20 @@ final class CacheJtaResource implements XAResource, Synchronization { } /** {@inheritDoc} */ - @Override public void start(Xid xid, int flags) { + @Override public void start(Xid xid, int flags) throws XAException { if (log.isDebugEnabled()) log.debug("XA resource start(...) [xid=" + xid + ", flags=<" + flags(flags) + ">]"); // Simply save global transaction id. this.xid = xid; + + if ((flags & TMRESUME) == TMRESUME) + try { + cacheTx.resume(); + } + catch (IgniteCheckedException e) { + throwException("Failed to resume cache transaction: " + e.getMessage(), e); + } } /** @@ -121,20 +129,27 @@ private void throwException(String msg, Throwable cause) throws XAException { cacheTx.prepare(); } catch (IgniteCheckedException e) { - throwException("Failed to prepare cache transaction.", e); + throwException("Failed to prepare cache transaction: " + e.getMessage(), e); } return XA_OK; } /** {@inheritDoc} */ - @Override public void end(Xid xid, int flags) { + @Override public void end(Xid xid, int flags) throws XAException { assert this.xid.equals(xid); if (log.isDebugEnabled()) log.debug("XA resource end(...) [xid=" + xid + ", flags=<" + flags(flags) + ">]"); - if ((flags & TMFAIL) > 0) + if ((flags & TMSUSPEND) == TMSUSPEND) + try { + cacheTx.suspend(); + } + catch (Throwable e) { + throwException("Failed to suspend cache transaction: " + e.getMessage(), e); + } + else if ((flags & TMFAIL) > 0) cacheTx.setRollbackOnly(); } @@ -249,7 +264,7 @@ private StringBuilder addFlag(StringBuilder sb, int flags, int mask, String flag cacheTx.prepare(); } catch (IgniteCheckedException e) { - throw new CacheException("Failed to prepare cache transaction.", e); + throw new CacheException("Failed to prepare cache transaction: " + e.getMessage(), e); } } @@ -264,7 +279,7 @@ private StringBuilder addFlag(StringBuilder sb, int flags, int mask, String flag cacheTx.commit(); } catch (IgniteCheckedException e) { - throw new CacheException("Failed to commit cache transaction.", e); + throw new CacheException("Failed to commit cache transaction: " + e.getMessage(), e); } break; @@ -277,7 +292,7 @@ private StringBuilder addFlag(StringBuilder sb, int flags, int mask, String flag cacheTx.rollback(); } catch (IgniteCheckedException e) { - throw new CacheException("Failed to rollback cache transaction.", e); + throw new CacheException("Failed to rollback cache transaction: " + e.getMessage(), e); } break; diff --git a/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java b/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java new file mode 100644 index 0000000000000..695a2424b2a98 --- /dev/null +++ b/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import javax.cache.configuration.Factory; +import javax.transaction.Status; +import javax.transaction.Transaction; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.TransactionConfiguration; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.objectweb.jotm.Current; +import org.objectweb.jotm.Jotm; +import org.objectweb.transaction.jta.TransactionManager; + +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.transactions.TransactionState.ACTIVE; + +/** + * JTA Tx Manager test. + */ +public class GridJtaTransactionManagerSelfTest extends GridCacheAbstractSelfTest { + /** */ + private static final int GRID_CNT = 1; + + /** Java Open Transaction Manager facade. */ + private static Jotm jotm; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.getTransactionConfiguration().setTxManagerFactory(new Factory() { + private static final long serialVersionUID = 0L; + + @Override public TransactionManager create() { + return jotm.getTransactionManager(); + } + }); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + jotm = new Jotm(true, false); + + Current.setAppServer(false); + + super.beforeTestsStarted(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + jotm.stop(); + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** + * Test for switching tx context by JTA Manager. + * + * @throws Exception If failed. + */ + public void testJtaTransactionContextSwitch() throws Exception { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + + TransactionConfiguration cfg = grid(0).context().config().getTransactionConfiguration(); + + cfg.setDefaultTxConcurrency(TransactionConcurrency.OPTIMISTIC); + cfg.setDefaultTxIsolation(isolation); + + jtaTransactionContextSwitch((TransactionIsolation.REPEATABLE_READ.equals(isolation))); + } + } + + /** + * @param checkRepeatableRead True if repeatable read scenario should be tested. + * @throws Exception If failed. + */ + private void jtaTransactionContextSwitch(boolean checkRepeatableRead) throws Exception { + TransactionManager jtaTm = jotm.getTransactionManager(); + + IgniteCache cache = jcache(); + + Integer[] vals = {1, 2, 3}; + String[] keys = {"key1", "key2", "key3"}; + + Transaction[] jtaTxs = new Transaction[3]; + + try { + for (int i = 0; i < 3; i++) { + assertNull(ignite(0).transactions().tx()); + + jtaTm.begin(); + + jtaTxs[i] = jtaTm.getTransaction(); + + assertNull(ignite(0).transactions().tx()); + assertNull(cache.getAndPut(keys[i], vals[i])); + + assertNotNull(ignite(0).transactions().tx()); + assertEquals(ACTIVE, ignite(0).transactions().tx().state()); + assertEquals(vals[i], cache.get(keys[i])); + + if (checkRepeatableRead) + for (int j = 0; j < 3; j++) + assertEquals(j == i ? vals[j] : null, cache.get(keys[j])); + + jtaTm.suspend(); + + assertNull(ignite(0).transactions().tx()); + assertNull(cache.get(keys[i])); + } + + for (int i = 0; i < 3; i++) { + jtaTm.resume(jtaTxs[i]); + + assertNotNull(ignite(0).transactions().tx()); + assertEquals(ACTIVE, ignite(0).transactions().tx().state()); + + if (checkRepeatableRead) + for (int j = 0; j < 3; j++) + assertEquals(j == i ? vals[j] : null, cache.get(keys[j])); + else + for (int j = 0; j < 3; j++) + assertEquals(j <= i ? vals[j] : null, cache.get(keys[j])); + + jtaTm.commit(); + + for (int j = 0; j < 3; j++) + assertEquals(j <= i ? vals[j] : null, cache.get(keys[j])); + + assertNull(ignite(0).transactions().tx()); + } + + } + finally { + for (int i = 0; i < 3; i++) + if (jtaTxs[i] != null && jtaTxs[i].getStatus() == Status.STATUS_ACTIVE) + jtaTxs[i].rollback(); + } + + for (int i = 0; i < 3; i++) + assertEquals(vals[i], cache.get(keys[i])); + + cache.removeAll(); + } +} diff --git a/modules/jta/src/test/java/org/apache/ignite/testsuites/IgniteJtaTestSuite.java b/modules/jta/src/test/java/org/apache/ignite/testsuites/IgniteJtaTestSuite.java index 4ae5df06d54ec..acd9cc10eca7b 100644 --- a/modules/jta/src/test/java/org/apache/ignite/testsuites/IgniteJtaTestSuite.java +++ b/modules/jta/src/test/java/org/apache/ignite/testsuites/IgniteJtaTestSuite.java @@ -21,6 +21,7 @@ import org.apache.ignite.internal.processors.cache.CacheJndiTmFactorySelfTest; import org.apache.ignite.internal.processors.cache.GridCacheJtaConfigurationValidationSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheJtaFactoryConfigValidationSelfTest; +import org.apache.ignite.internal.processors.cache.GridJtaTransactionManagerSelfTest; import org.apache.ignite.internal.processors.cache.GridPartitionedCacheJtaFactorySelfTest; import org.apache.ignite.internal.processors.cache.GridPartitionedCacheJtaFactoryUseSyncSelfTest; import org.apache.ignite.internal.processors.cache.GridPartitionedCacheJtaLookupClassNameSelfTest; @@ -54,6 +55,8 @@ public static TestSuite suite() throws Exception { suite.addTestSuite(GridCacheJtaConfigurationValidationSelfTest.class); suite.addTestSuite(GridCacheJtaFactoryConfigValidationSelfTest.class); + suite.addTestSuite(GridJtaTransactionManagerSelfTest.class); + // Factory suite.addTestSuite(CacheJndiTmFactorySelfTest.class); From 010ae5da09dddc9d4a0232d1a5957857f7500fce Mon Sep 17 00:00:00 2001 From: voipp Date: Fri, 14 Jul 2017 18:46:51 +0300 Subject: [PATCH 2/6] IGNITE-5712 rollback on suspended tx is prohibited --- .../transactions/TransactionProxyImpl.java | 35 +++++++++++-------- ...sticTransactionsInMultipleThreadsTest.java | 28 ++++++++------- 2 files changed, 37 insertions(+), 26 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java index 0944a31f3b820..9f59e739a709e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java @@ -96,12 +96,10 @@ public GridNearTxLocal tx() { /** * Enters a call. - * - * @param checkThreadId Whether threadId should be checked. */ - private void enter(boolean checkThreadId) { - assert Thread.currentThread().getId() == threadId() || !checkThreadId : - "Only thread owning transaction is permitted to operate it."; + private void enter() { + assert Thread.currentThread().getId() == threadId() || TransactionState.SUSPENDED == state() : + "Only thread owning active transaction is permitted to operate it."; if (cctx.deploymentEnabled()) cctx.deploy().onEnter(); @@ -210,7 +208,7 @@ private void leave() { /** {@inheritDoc} */ @Override public void suspend() throws IgniteException { - enter(true); + enter(); try { cctx.suspendTx(tx); @@ -245,7 +243,10 @@ private void leave() { /** {@inheritDoc} */ @Override public boolean setRollbackOnly() { - enter(true); + enter(); + + assert TransactionState.SUSPENDED != state() : + "Marking rollback operation on suspended transaction is prohibited."; try { return tx.setRollbackOnly(); @@ -257,7 +258,7 @@ private void leave() { /** {@inheritDoc} */ @Override public boolean isRollbackOnly() { - enter(true); + enter(); try { if (async) @@ -272,7 +273,7 @@ private void leave() { /** {@inheritDoc} */ @Override public void commit() { - enter(true); + enter(); try { IgniteInternalFuture commitFut = cctx.commitTxAsync(tx); @@ -292,7 +293,7 @@ private void leave() { /** {@inheritDoc} */ @Override public IgniteFuture commitAsync() throws IgniteException { - enter(true); + enter(); try { return (IgniteFuture)createFuture(cctx.commitTxAsync(tx)); @@ -304,7 +305,7 @@ private void leave() { /** {@inheritDoc} */ @Override public void close() { - enter(true); + enter(); try { cctx.endTx(tx); @@ -319,7 +320,10 @@ private void leave() { /** {@inheritDoc} */ @Override public void rollback() { - enter(true); + enter(); + + assert TransactionState.SUSPENDED != state() : + "Rollback operation on suspended transaction is prohibited."; try { IgniteInternalFuture rollbackFut = cctx.rollbackTxAsync(tx); @@ -339,7 +343,10 @@ private void leave() { /** {@inheritDoc} */ @Override public IgniteFuture rollbackAsync() throws IgniteException { - enter(true); + enter(); + + assert TransactionState.SUSPENDED != state() : + "Rollback operation on suspended transaction is prohibited."; try { return (IgniteFuture)(new IgniteFutureImpl(cctx.rollbackTxAsync(tx))); @@ -354,7 +361,7 @@ private void leave() { /** {@inheritDoc} */ @Override public void resume() throws IgniteException { - enter(false); + enter(); try { cctx.resumeTx(tx); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsTest.java index 334b9d686aad1..89b0457771ca4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsTest.java @@ -494,37 +494,41 @@ private void closeSuspendedTransaction() { /** * Test for rolling back suspended transaction. */ - public void testRollbackSuspendedTransaction() { + public void testRollbackSuspendedTransactionIsProhibited() { for (TransactionIsolation isolation : TransactionIsolation.values()) { transactionIsolation = isolation; - rollbackSuspendedTransaction(); + rollbackSuspendedTransactionIsProhibited(); } } /** * */ - private void rollbackSuspendedTransaction() { + private void rollbackSuspendedTransactionIsProhibited() { final IgniteCache cache = jcache(txInitiatorNodeId); final IgniteTransactions txs = ignite(txInitiatorNodeId).transactions(); - Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, transactionIsolation); + try (Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, transactionIsolation)) { + cache.put("key1", 1); - cache.put("key1", 1); + tx.suspend(); - tx.suspend(); + tx.rollback(); - tx.rollback(); + fail("Rolling back suspended transaction is prohibited."); + } + catch (Throwable ignore) { + // ignoring rollback exception on suspended transaction. + } - assertEquals(TransactionState.ROLLED_BACK, tx.state()); assertNull(cache.get("key1")); } /** * Test checking commit on suspended transaction leads to exception. */ - public void testCommitSuspendedTxIsProhibited() { + public void testCommitSuspendedTransactionIsProhibited() { for (TransactionIsolation isolation : TransactionIsolation.values()) { transactionIsolation = TransactionIsolation.SERIALIZABLE; @@ -556,18 +560,18 @@ private void commitSuspendedTxIsProhibited() { /** * Test checking commit on suspended transaction leads to exception. */ - public void testRollbackSuspendedTxIsProhibitedFromOtherThread() throws Exception { + public void testRollbackSuspendedTransactionIsProhibitedFromOtherThread() throws Exception { for (TransactionIsolation isolation : TransactionIsolation.values()) { transactionIsolation = isolation; - rollbackSuspendedTxIsProhibitedFromOtherThread(); + rollbackSuspendedTransactionIsProhibitedFromOtherThread(); } } /** * */ - private void rollbackSuspendedTxIsProhibitedFromOtherThread() { + private void rollbackSuspendedTransactionIsProhibitedFromOtherThread() { final IgniteCache cache = jcache(txInitiatorNodeId); final IgniteTransactions txs = ignite(txInitiatorNodeId).transactions(); From 41bcf7d2b2bbd88562225c31fdbb59a85e253598 Mon Sep 17 00:00:00 2001 From: voipp Date: Fri, 14 Jul 2017 18:57:51 +0300 Subject: [PATCH 3/6] IGNITE-5712 clean redundant code --- .../cache/query/GridCacheDistributedQueryManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index a67527c296a53..7f859a204e138 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -211,7 +211,7 @@ protected void removeQueryFuture(long reqId) { cctx.cacheId(), req.id(), new IgniteCheckedException("Received request for incorrect cache [expected=" + cctx.name() + - ", actual=" + req.cacheName() + ']'), + ", actual=" + req.cacheName()), cctx.deploymentEnabled()); sendQueryResponse(sndId, res, 0); From aec42f48d36081ace7af572edf8626409d5cd2e1 Mon Sep 17 00:00:00 2001 From: voipp Date: Fri, 14 Jul 2017 19:32:00 +0300 Subject: [PATCH 4/6] IGNITE-5712 small fix test --- .../OptimisticTransactionsInMultipleThreadsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsTest.java index 89b0457771ca4..fd6fe8a063ae5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsTest.java @@ -530,7 +530,7 @@ private void rollbackSuspendedTransactionIsProhibited() { */ public void testCommitSuspendedTransactionIsProhibited() { for (TransactionIsolation isolation : TransactionIsolation.values()) { - transactionIsolation = TransactionIsolation.SERIALIZABLE; + transactionIsolation = isolation; commitSuspendedTxIsProhibited(); } From 6ebd6f2cad7e4ea8cb61a08951495fe289f51439 Mon Sep 17 00:00:00 2001 From: voipp Date: Fri, 14 Jul 2017 19:40:16 +0300 Subject: [PATCH 5/6] IGNITE-5712 cleanup redundant code 2 --- .../processors/cache/transactions/IgniteTxStateImpl.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java index 22ca4571c2cd2..ad4ca61011381 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java @@ -29,6 +29,8 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.processors.cache.CacheStoppedException; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; From dd2912c90fa9127bde57365e63be295dc4d58b37 Mon Sep 17 00:00:00 2001 From: voipp Date: Fri, 14 Jul 2017 19:56:51 +0300 Subject: [PATCH 6/6] IGNITE-5712 small test fixes --- .../distributed/near/GridNearTxLocal.java | 2 +- .../cache/transactions/IgniteTxManager.java | 6 +- .../transactions/TransactionProxyImpl.java | 33 ++-- ...ansactionsInMultipleThreadsClientTest.java | 8 +- ...sactionsInMultipleThreadsFailoverTest.java | 5 + ...sticTransactionsInMultipleThreadsTest.java | 154 +++++++++--------- ...sticTransactionsInMultipleThreadsTest.java | 1 + 7 files changed, 112 insertions(+), 97 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 9d2d789965660..7b9f172e490b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -2868,7 +2868,7 @@ public void suspend() throws IgniteCheckedException { throw new IgniteCheckedException("Trying to suspend transaction with incorrect state " + "[expected=" + ACTIVE + ", actual=" + state() + ']'); - cctx.tm().detachThread(this); + cctx.tm().detachCurrentThread(this); state(SUSPENDED); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 5cabc5b4d6490..12560fdac4803 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -2249,14 +2249,14 @@ public void attachCurrentThread(IgniteInternalTx tx) { } /** - * Detaches thread from the transaction. + * Detaches current thread from the transaction. * * @param tx Transaction to be detached. */ - public void detachThread(IgniteInternalTx tx) { + public void detachCurrentThread(IgniteInternalTx tx) { assert tx != null; - assert threadMap.remove(tx.threadId(), tx); + assert threadMap.remove(Thread.currentThread().getId(), tx); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java index 9f59e739a709e..97b46a507bde3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java @@ -96,11 +96,15 @@ public GridNearTxLocal tx() { /** * Enters a call. + * @param permittedOnSuspendedTx {@code True} If operation is permitted on suspended transaction. */ - private void enter() { + private void enter(boolean permittedOnSuspendedTx) { assert Thread.currentThread().getId() == threadId() || TransactionState.SUSPENDED == state() : "Only thread owning active transaction is permitted to operate it."; + assert TransactionState.SUSPENDED != state() || permittedOnSuspendedTx : + "Operation is prohibited on suspended transaction."; + if (cctx.deploymentEnabled()) cctx.deploy().onEnter(); @@ -208,7 +212,7 @@ private void leave() { /** {@inheritDoc} */ @Override public void suspend() throws IgniteException { - enter(); + enter(false); try { cctx.suspendTx(tx); @@ -243,10 +247,7 @@ private void leave() { /** {@inheritDoc} */ @Override public boolean setRollbackOnly() { - enter(); - - assert TransactionState.SUSPENDED != state() : - "Marking rollback operation on suspended transaction is prohibited."; + enter(false); try { return tx.setRollbackOnly(); @@ -258,7 +259,7 @@ private void leave() { /** {@inheritDoc} */ @Override public boolean isRollbackOnly() { - enter(); + enter(false); try { if (async) @@ -273,7 +274,7 @@ private void leave() { /** {@inheritDoc} */ @Override public void commit() { - enter(); + enter(false); try { IgniteInternalFuture commitFut = cctx.commitTxAsync(tx); @@ -293,7 +294,7 @@ private void leave() { /** {@inheritDoc} */ @Override public IgniteFuture commitAsync() throws IgniteException { - enter(); + enter(false); try { return (IgniteFuture)createFuture(cctx.commitTxAsync(tx)); @@ -305,7 +306,7 @@ private void leave() { /** {@inheritDoc} */ @Override public void close() { - enter(); + enter(false); try { cctx.endTx(tx); @@ -320,10 +321,7 @@ private void leave() { /** {@inheritDoc} */ @Override public void rollback() { - enter(); - - assert TransactionState.SUSPENDED != state() : - "Rollback operation on suspended transaction is prohibited."; + enter(false); try { IgniteInternalFuture rollbackFut = cctx.rollbackTxAsync(tx); @@ -343,10 +341,7 @@ private void leave() { /** {@inheritDoc} */ @Override public IgniteFuture rollbackAsync() throws IgniteException { - enter(); - - assert TransactionState.SUSPENDED != state() : - "Rollback operation on suspended transaction is prohibited."; + enter(false); try { return (IgniteFuture)(new IgniteFutureImpl(cctx.rollbackTxAsync(tx))); @@ -361,7 +356,7 @@ private void leave() { /** {@inheritDoc} */ @Override public void resume() throws IgniteException { - enter(); + enter(true); try { cctx.resumeTx(tx); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsClientTest.java index 30a5f702897da..fc31c87015c2a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsClientTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsClientTest.java @@ -46,6 +46,7 @@ public class OptimisticTransactionsInMultipleThreadsClientTest extends Optimisti super.beforeTestsStarted(); startGrid(getTestIgniteInstanceName(1), getConfiguration().setClientMode(true)); + awaitPartitionMapExchange(); txInitiatorNodeId = 1; @@ -170,6 +171,8 @@ private void suspendTxAndStartNewTx(TransactionIsolation isolation1) throws Igni assertEquals(2, jcache(0).get(remotePrimaryKey)); + clientTx.resume(); + clientTx.close(); remoteCache.removeAll(); @@ -224,8 +227,11 @@ private void txConcurrentSuspend() throws Exception { fut.get(); // if transaction was not closed after resume, then close it now. - if (successfulResume.get() == 0) + if (successfulResume.get() == 0) { + clientTx.resume(); + clientTx.close(); + } assertTrue(successfulResume.get() < 2); assertEquals(concurrentThreadsNum, failedTxNumber.intValue() + successfulResume.intValue()); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsFailoverTest.java index 48bbad031d203..acc9ba380f795 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsFailoverTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsFailoverTest.java @@ -96,6 +96,7 @@ public void testTxRemoteNodeFailover() throws Exception { */ private void txRemoteNodeFailover() throws Exception { startGrid(1); + awaitPartitionMapExchange(); IgniteCache remoteCache = jcache(1); @@ -137,6 +138,7 @@ public void testTxLocalNodeFailover() throws Exception { */ private void txLocalNodeFailover() throws Exception { startGrid(1); + awaitPartitionMapExchange(); IgniteCache localCache = jcache(1); @@ -165,6 +167,8 @@ public void testTxOnClientBreakRemote() throws Exception { startGrid(getTestIgniteInstanceName(0), getConfiguration().setClientMode(true)); + awaitPartitionMapExchange(); + runWithAllIsolations(new IgniteCallable() { @Override public Void call() throws Exception { txOnClientBreakRemote(); @@ -181,6 +185,7 @@ public void testTxOnClientBreakRemote() throws Exception { */ private void txOnClientBreakRemote() throws Exception { startGrid(1); + awaitPartitionMapExchange(); IgniteCache remoteCache = jcache(1); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsTest.java index fd6fe8a063ae5..6368f0b503dcf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OptimisticTransactionsInMultipleThreadsTest.java @@ -53,6 +53,7 @@ public class OptimisticTransactionsInMultipleThreadsTest extends AbstractTransac super.beforeTestsStarted(); startGrid(0); + awaitPartitionMapExchange(); } @@ -407,7 +408,6 @@ private void transactionRollback() throws IgniteCheckedException { * @throws IgniteCheckedException If failed. */ public void testMultipleTransactionsSuspendResume() throws IgniteCheckedException { - for (TransactionIsolation isolation : TransactionIsolation.values()) { transactionIsolation = isolation; @@ -462,136 +462,142 @@ private void multipleTransactionsSuspendResume() throws IgniteCheckedException { } /** - * Test for closing suspended transaction. + * Test checking all operations(exception resume) on suspended transaction are prohibited. */ - public void testCloseSuspendedTransaction() { - for (TransactionIsolation isolation : TransactionIsolation.values()) { - transactionIsolation = isolation; + public void testOperationsAreProhibitedOnSuspendedTx() { + for (int opIdx = 0; opIdx < 7; opIdx++) + for (TransactionIsolation isolation : TransactionIsolation.values()) { + transactionIsolation = isolation; - closeSuspendedTransaction(); - } + operationsAreProhibitedOnSuspendedTx(opIdx); + } } /** + * Test checking operation(exception resume) on suspended transaction is prohibited. * + * @param opIdx Operation index. */ - private void closeSuspendedTransaction() { + private void operationsAreProhibitedOnSuspendedTx(final int opIdx) { final IgniteCache cache = jcache(txInitiatorNodeId); final IgniteTransactions txs = ignite(txInitiatorNodeId).transactions(); Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, transactionIsolation); - cache.put("key1", 1); + try { + cache.put("key1", 1); - tx.suspend(); + tx.suspend(); + + performOperation(tx, opIdx); + + fail("Operation on suspended transaction is prohibited."); + } + catch (Throwable ignore) { + // ignoring exception on suspended transaction. + } + + tx.resume(); tx.close(); - assertEquals(TransactionState.ROLLED_BACK, tx.state()); assertNull(cache.get("key1")); } /** - * Test for rolling back suspended transaction. + * Test checking all operations(exception resume) on suspended transaction from the other thread are prohibited. */ - public void testRollbackSuspendedTransactionIsProhibited() { - for (TransactionIsolation isolation : TransactionIsolation.values()) { - transactionIsolation = isolation; + public void testOperationsAreProhibitedOnSuspendedTxFromTheOtherThread() throws Exception { + for (int opIdx = 0; opIdx < 7; opIdx++) + for (TransactionIsolation isolation : TransactionIsolation.values()) { + transactionIsolation = isolation; - rollbackSuspendedTransactionIsProhibited(); - } + operationsAreProhibitedOnSuspendedTxFromTheOtherThread(opIdx); + } } /** + * Test checking operation(exception resume) on suspended transaction from the other thread is prohibited. * + * @param opIdx Operation index. */ - private void rollbackSuspendedTransactionIsProhibited() { + private void operationsAreProhibitedOnSuspendedTxFromTheOtherThread(final int opIdx) { final IgniteCache cache = jcache(txInitiatorNodeId); final IgniteTransactions txs = ignite(txInitiatorNodeId).transactions(); - try (Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, transactionIsolation)) { + final Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, transactionIsolation); + + try { cache.put("key1", 1); tx.suspend(); - tx.rollback(); + multithreaded(new Callable() { + @Override public Object call() throws Exception { + performOperation(tx, opIdx); + + return null; + } + }, 1); - fail("Rolling back suspended transaction is prohibited."); + fail("Operation on suspended transaction is prohibited from the other thread."); } catch (Throwable ignore) { - // ignoring rollback exception on suspended transaction. + // ignoring exception on suspended transaction. } - assertNull(cache.get("key1")); - } + tx.resume(); - /** - * Test checking commit on suspended transaction leads to exception. - */ - public void testCommitSuspendedTransactionIsProhibited() { - for (TransactionIsolation isolation : TransactionIsolation.values()) { - transactionIsolation = isolation; + tx.close(); - commitSuspendedTxIsProhibited(); - } + assertNull(cache.get("key1")); } /** + * Performs operation based on its index. Resume operation is not supported. * + * @param tx Transaction, operation is performed on. + * @param opIdx Operation index. */ - private void commitSuspendedTxIsProhibited() { - final IgniteCache cache = jcache(txInitiatorNodeId); - final IgniteTransactions txs = ignite(txInitiatorNodeId).transactions(); + private void performOperation(Transaction tx, final int opIdx) { + switch (opIdx) { + case 0: + tx.suspend(); - try (Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, transactionIsolation)) { - cache.put("key1", 1); + break; - tx.suspend(); + case 1: + tx.close(); - tx.commit(); + break; - fail("Committing suspended transaction is prohibited."); - } - catch (IgniteException ignore) { - // ignoring commit exception on suspended transaction. - } - } + case 2: + tx.commit(); - /** - * Test checking commit on suspended transaction leads to exception. - */ - public void testRollbackSuspendedTransactionIsProhibitedFromOtherThread() throws Exception { - for (TransactionIsolation isolation : TransactionIsolation.values()) { - transactionIsolation = isolation; + break; - rollbackSuspendedTransactionIsProhibitedFromOtherThread(); - } - } + case 3: + tx.commitAsync(); - /** - * - */ - private void rollbackSuspendedTransactionIsProhibitedFromOtherThread() { - final IgniteCache cache = jcache(txInitiatorNodeId); - final IgniteTransactions txs = ignite(txInitiatorNodeId).transactions(); + break; - try (Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, transactionIsolation)) { - cache.put("key1", 1); + case 4: - tx.suspend(); + tx.rollback(); - multithreaded(new Callable() { - @Override public Object call() throws Exception { - tx.rollback(); + break; - return null; - } - }, 1); + case 5: + tx.rollbackAsync(); - fail("Rolling back suspended transaction is prohibited from the other thread."); - } - catch (Throwable ignore) { - // ignoring rollback exception on suspended transaction from the other thread. + break; + case 6: + tx.setRollbackOnly(); + + break; + + default: + assert false; } } @@ -613,6 +619,7 @@ public void testTransactionTimeoutOnResumedTransaction() throws Exception { */ private void transactionTimeoutOnResumedTransaction() throws Exception { final IgniteTransactions txs = ignite(txInitiatorNodeId).transactions(); + boolean tryResume = false; try (Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, transactionIsolation, TIMEOUT, 0)) { @@ -655,6 +662,7 @@ public void testTransactionTimeoutOnSuspendedTransaction() throws Exception { private void transactionTimeoutOnSuspendedTransaction() throws Exception { final IgniteTransactions txs = ignite(txInitiatorNodeId).transactions(); final IgniteCache cache = jcache(txInitiatorNodeId); + boolean trySuspend = false; try (Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, transactionIsolation, TIMEOUT, 0)) { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/PessimisticTransactionsInMultipleThreadsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/PessimisticTransactionsInMultipleThreadsTest.java index aa2bd11a91512..1b081fb9dbedd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/PessimisticTransactionsInMultipleThreadsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/PessimisticTransactionsInMultipleThreadsTest.java @@ -36,6 +36,7 @@ public class PessimisticTransactionsInMultipleThreadsTest extends AbstractTransa super.beforeTestsStarted(); startGrids(2); + awaitPartitionMapExchange(); }