diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index ba5fe3b7a3f4e..cc269f46a192f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -637,12 +637,10 @@ else if (!crd && !fetchFuts.containsKey(grp.groupId())) { /** * @param msg Change request. * @param topVer Current topology version. - * @param crd Coordinator flag. * @return Closed caches IDs. */ private Set processCacheCloseRequests( ClientCacheChangeDummyDiscoveryMessage msg, - boolean crd, AffinityTopologyVersion topVer ) { Set cachesToClose = msg.cachesToClose(); @@ -704,7 +702,7 @@ void processClientCachesRequests(ClientCacheChangeDummyDiscoveryMessage msg) { // Check and close caches via dummy message. if (msg.cachesToClose() != null) - closedCaches = processCacheCloseRequests(msg, crd, topVer); + closedCaches = processCacheCloseRequests(msg, topVer); // Shedule change message. if (startedCaches != null || closedCaches != null) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java index 7f71c823a2172..88d44cc23090f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java @@ -502,7 +502,7 @@ public void cacheConfigurationEnrichment(CacheConfigurationEnrichment cacheCfgEn ", clientStartOnly=" + clientStartOnly + ", stop=" + stop + ", destroy=" + destroy + - ", disabledAfterStart" + disabledAfterStart + + ", disabledAfterStart=" + disabledAfterStart + ']'; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java index b31c6f0f5c0d3..cbe7df41b81ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java @@ -93,14 +93,14 @@ public boolean clientOnlyExchange() { * @return New caches start requests. */ public Collection cacheStartRequests() { - return cachesToStart != null ? cachesToStart.values() : Collections.emptyList(); + return cachesToStart != null ? cachesToStart.values() : Collections.emptyList(); } /** * @return Stop cache requests. */ public Collection cacheStopRequests() { - return cachesToStop != null ? cachesToStop.values() : Collections.emptyList(); + return cachesToStop != null ? cachesToStop.values() : Collections.emptyList(); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java index 1b9e61066b65a..4c361d93bfa7a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java @@ -1599,7 +1599,7 @@ private GridCacheGateway checkProxyIsValid(@Nullable GridCacheGateway proxy = context().kernalContext().cache().publicJCache(context().name()); + IgniteCacheProxy proxy = context().kernalContext().cache().publicJCache(context().name()); if (proxy != null) { proxyImpl.opportunisticRestart(proxy.internalProxy()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 5f39b50ac2b3b..135bac5515c45 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1577,13 +1577,17 @@ else if (interceptorVal != val0) recordNodeId(affNodeId, topVer); - if (metrics && cctx.statisticsEnabled()) { + if (metrics && cctx.statisticsEnabled() && tx != null) { cctx.cache().metrics0().onWrite(); - T2 entryProcRes = tx.entry(txKey()).entryProcessorCalculatedValue(); + IgniteTxEntry txEntry = tx.entry(txKey()); - if (entryProcRes != null && UPDATE.equals(entryProcRes.get1())) - cctx.cache().metrics0().onInvokeUpdate(old != null); + if (txEntry != null) { + T2 entryProcRes = txEntry.entryProcessorCalculatedValue(); + + if (entryProcRes != null && UPDATE.equals(entryProcRes.get1())) + cctx.cache().metrics0().onInvokeUpdate(old != null); + } } if (evt && newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 5a80d8e07703d..1f27819b9602a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1048,7 +1048,7 @@ public GridDhtPartitionTopology clientTopology(int grpId, DiscoCache discoCache) CacheGroupDescriptor grpDesc = cctx.affinity().cacheGroups().get(grpId); - assert grpDesc != null : grpId; + assert grpDesc != null : "grpId=" + grpId; CacheConfiguration ccfg = grpDesc.config(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 67ee23fc18db8..7933b03dc4373 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -287,7 +287,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Transaction interface implementation. */ private IgniteTransactionsImpl transactions; - /** Pending cache starts. */ + /** Pending cache operations. */ private ConcurrentMap pendingFuts = new ConcurrentHashMap<>(); /** Template configuration add futures. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index fb3a892baecc9..53320e6ebe733 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -3202,7 +3202,8 @@ private void collectInfo() { for (Map.Entry info : store.entrySet()) { GridCacheAdapter cacheCtx = info.getKey().context().cache(); - metricPerCacheStore.computeIfAbsent(cacheCtx, k -> new ArrayList<>()).add(info); + if (cacheCtx != null) + metricPerCacheStore.computeIfAbsent(cacheCtx, k -> new ArrayList<>()).add(info); } store.clear(); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 31fe6a4c20452..a163bb04fb9b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2742,14 +2742,11 @@ void reset( PendingMessage pm = new PendingMessage(msg); this.msgs.add(pm); - - if (pm.customMsg && pm.id.equals(customDiscardId)) - this.customDiscardId = customDiscardId; - - if (!pm.customMsg && pm.id.equals(discardId)) - this.discardId = discardId; } } + + this.discardId = discardId; + this.customDiscardId = customDiscardId; } /** @@ -6218,32 +6215,8 @@ private void processClientPingRequest(final TcpDiscoveryClientPingRequest msg) { */ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean waitForNotification) { if (isLocalNodeCoordinator()) { - boolean delayMsg; - - assert ring.minimumNodeVersion() != null : ring; - - boolean joiningEmpty; - - synchronized (mux) { - joiningEmpty = joiningNodes.isEmpty(); - } - - delayMsg = msg.topologyVersion() == 0L && !joiningEmpty; - - if (delayMsg) { - if (log.isDebugEnabled()) { - synchronized (mux) { - log.debug("Delay custom message processing, there are joining nodes [msg=" + msg + - ", joiningNodes=" + joiningNodes + ']'); - } - } - - synchronized (mux) { - pendingCustomMsgs.add(msg); - } - + if (posponeUndeliveredMessages(msg)) return; - } if (!msg.verified()) { msg.verify(getLocalNodeId()); @@ -6327,6 +6300,36 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean wa } } + /** + * If new node is in the progress of being added we must store and resend undelivered messages. + * + * @param msg Processed message. + * @return {@code true} If message was appended to pending queue. + */ + private boolean posponeUndeliveredMessages(final TcpDiscoveryCustomEventMessage msg) { + boolean joiningEmpty; + + synchronized (mux) { + joiningEmpty = joiningNodes.isEmpty(); + + if (log.isDebugEnabled()) + log.debug("Delay custom message processing, there are joining nodes [msg=" + msg + + ", joiningNodes=" + joiningNodes + ']'); + } + + boolean delayMsg = msg.topologyVersion() == 0L && !joiningEmpty; + + if (delayMsg) { + synchronized (mux) { + pendingCustomMsgs.add(msg); + } + + return true; + } + + return false; + } + /** * Checks failed nodes list and sends {@link TcpDiscoveryNodeFailedMessage} if failed node is still in the * ring and node detected failure left ring. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java index 65859cac27d11..07251acd4fc6a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java @@ -74,7 +74,6 @@ import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionOptimisticException; -import org.junit.Ignore; import org.junit.Test; import static java.util.concurrent.TimeUnit.SECONDS; @@ -186,7 +185,7 @@ private void txStreamerLoad(boolean allowOverwrite) throws Exception { private void txStreamerLoad(Ignite ignite, Integer key, String cacheName, - boolean allowOverwrite) throws Exception { + boolean allowOverwrite) { IgniteCache cache = ignite.cache(cacheName); log.info("Test key: " + key); @@ -2824,7 +2823,6 @@ public void testReadWriteTxConflict() throws Exception { /** * @throws Exception If failed. */ - @Ignore("https://issues.apache.org/jira/browse/IGNITE-9226") @Test public void testReadWriteTransactionsNoDeadlock() throws Exception { checkReadWriteTransactionsNoDeadlock(false); @@ -2833,7 +2831,6 @@ public void testReadWriteTransactionsNoDeadlock() throws Exception { /** * @throws Exception If failed. */ - @Ignore("https://issues.apache.org/jira/browse/IGNITE-9226") @Test public void testReadWriteTransactionsNoDeadlockMultinode() throws Exception { checkReadWriteTransactionsNoDeadlock(true); @@ -2844,8 +2841,6 @@ public void testReadWriteTransactionsNoDeadlockMultinode() throws Exception { * @throws Exception If failed. */ private void checkReadWriteTransactionsNoDeadlock(final boolean multiNode) throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-9226"); - final Ignite ignite0 = ignite(0); for (final CacheConfiguration ccfg : cacheConfigurations()) { @@ -4140,7 +4135,7 @@ private void accountTx(final boolean getAll, if (nonSer) { nonSerFut = runMultiThreadedAsync(new Callable() { - @Override public Void call() throws Exception { + @Override public Void call() { int nodeIdx = idx.getAndIncrement() % clients.size(); Ignite node = clients.get(nodeIdx); @@ -4198,7 +4193,7 @@ private void accountTx(final boolean getAll, } final IgniteInternalFuture fut = runMultiThreadedAsync(new Callable() { - @Override public Void call() throws Exception { + @Override public Void call() { int nodeIdx = idx.getAndIncrement() % clients.size(); Ignite node = clients.get(nodeIdx); @@ -4210,8 +4205,8 @@ private void accountTx(final boolean getAll, final IgniteTransactions txs = node.transactions(); final IgniteCache cache = - nearCache ? node.createNearCache(cacheName, new NearCacheConfiguration()) : - node.cache(cacheName); + nearCache ? node.createNearCache(cacheName, new NearCacheConfiguration<>()) : + node.cache(cacheName); assertNotNull(cache); diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index e8a2aa7a9ffa0..2999ec8e48c33 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -1353,10 +1353,8 @@ protected List backupKeys(IgniteCache cache, int cnt, int startFr * @param cnt Keys count. * @param startFrom Start value for keys search. * @return Collection of keys for which given cache is neither primary nor backup. - * @throws IgniteCheckedException If failed. */ - protected List nearKeys(IgniteCache cache, int cnt, int startFrom) - throws IgniteCheckedException { + protected List nearKeys(IgniteCache cache, int cnt, int startFrom) { return findKeys(cache, cnt, startFrom, 2); } @@ -1543,10 +1541,8 @@ protected Integer backupKey(IgniteCache cache) /** * @param cache Cache. * @return Key for which given cache is neither primary nor backup. - * @throws IgniteCheckedException If failed. */ - protected Integer nearKey(IgniteCache cache) - throws IgniteCheckedException { + protected Integer nearKey(IgniteCache cache) { return nearKeys(cache, 1, 1).get(0); }