From 73c02562a485ad013d7a80cc358a56f47e7b1432 Mon Sep 17 00:00:00 2001 From: Maxim Muzafarov Date: Wed, 7 Mar 2018 20:02:02 +0300 Subject: [PATCH 1/9] IGNITE-7791: add test case for delayed partition exchange after reconnect --- ...ClientReconnectCacheDelayExchangeTest.java | 300 ++++++++++++++++++ .../IgniteClientReconnectCacheTest.java | 8 +- .../IgniteClientReconnectTestSuite.java | 2 + 3 files changed, 308 insertions(+), 2 deletions(-) create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheDelayExchangeTest.java diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheDelayExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheDelayExchangeTest.java new file mode 100644 index 0000000000000..31f8a872ca467 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheDelayExchangeTest.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.testframework.GridTestUtils; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; + +/** + * + */ +public class IgniteClientReconnectCacheDelayExchangeTest extends IgniteClientReconnectAbstractTest { + /** */ + private static final int SRV_CNT = 3; + + /** */ + private static final String STATIC_CACHE = "static-cache"; + + /** */ + private UUID nodeId; + + /** + * Map of destination node ID to runnable with logic for real message sending. + * To apply real message sending use run method + */ + private final ConcurrentHashMap rs = new ConcurrentHashMap<>(); + /** + * Flag to redirect {@link GridDhtPartitionsFullMessage}s from real communication channel to {@link #rs} map. + * Applied only to messages not related to particular exchange + */ + private volatile boolean record = false; + + /** */ + private AtomicBoolean replay = new AtomicBoolean(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + TestCommunicationDelayedSpi commSpi = new TestCommunicationDelayedSpi(); + + commSpi.setSharedMemoryPort(-1); + + cfg.setCommunicationSpi(commSpi); + + cfg.setPeerClassLoadingEnabled(false); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setNetworkTimeout(5000); + + if (nodeId != null) { + cfg.setNodeId(nodeId); + + nodeId = null; + } + + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setName(STATIC_CACHE); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 0; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(SRV_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectCacheDestroyedAndCreatedDelayed() throws Exception { + clientMode = true; + + final Ignite client = startGrid(SRV_CNT); + + assertTrue(client.cluster().localNode().isClient()); + + final Ignite srv = clientRouter(client); + + final IgniteCache clientCache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + assertEquals(ATOMIC, + clientCache.getConfiguration(CacheConfiguration.class).getAtomicityMode()); + + awaitPartitionMapExchange(); + + record = true; + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srv.destroyCache(DEFAULT_CACHE_NAME); + + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setAtomicityMode(TRANSACTIONAL); + + srv.getOrCreateCache(ccfg); + } + }); + + GridTestUtils.assertThrows(log, new Callable() { + @Override public Object call() throws Exception { + return clientCache.get(1); + } + }, IllegalStateException.class, null); + + // Emulate latest GridDhtPartitionsFullMessages. + //grid(0).context().cache().context().exchange().scheduleResendPartitions(); + + checkCacheDiscoveryData(srv, client, DEFAULT_CACHE_NAME, true, false, false); + + IgniteCache clientCache0 = client.cache(DEFAULT_CACHE_NAME); + + replayMessages(); + + checkCacheDiscoveryData(srv, client, DEFAULT_CACHE_NAME, true, true, false); + + assertEquals(TRANSACTIONAL, + clientCache0.getConfiguration(CacheConfiguration.class).getAtomicityMode()); + + clientCache0.put(1, 1); + + assertEquals(1, clientCache0.get(1)); + } + + /** + * @param srv Server node. + * @param client Client node. + * @param cacheName Cache name. + * @param cacheExists Cache exists flag. + * @param clientCache {@code True} if client node has client cache. + * @param clientNear {@code True} if client node has near-enabled client cache. + * @throws Exception If failed. + */ + private void checkCacheDiscoveryData(Ignite srv, + Ignite client, + final String cacheName, + boolean cacheExists, + final boolean clientCache, + boolean clientNear) throws Exception { + final GridDiscoveryManager srvDisco = ((IgniteKernal)srv).context().discovery(); + GridDiscoveryManager clientDisco = ((IgniteKernal)client).context().discovery(); + + ClusterNode srvNode = ((IgniteKernal)srv).localNode(); + final ClusterNode clientNode = ((IgniteKernal)client).localNode(); + + assertFalse(srvDisco.cacheAffinityNode(clientNode, cacheName)); + assertFalse(clientDisco.cacheAffinityNode(clientNode, cacheName)); + + assertEquals(cacheExists, srvDisco.cacheAffinityNode(srvNode, cacheName)); + + if (clientNear) { + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return srvDisco.cacheNearNode(clientNode, cacheName); + } + }, 5000)); + + assertTrue(srvDisco.cacheNearNode(clientNode, cacheName)); + } + else { + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return F.eq(clientCache, srvDisco.cacheClientNode(clientNode, cacheName)); + } + }, 5000)); + + assertEquals(clientCache, srvDisco.cacheClientNode(clientNode, cacheName)); + } + + assertEquals(cacheExists, clientDisco.cacheAffinityNode(srvNode, cacheName)); + + if (clientNear) + assertTrue(clientDisco.cacheNearNode(clientNode, cacheName)); + else + assertEquals(clientCache, clientDisco.cacheClientNode(clientNode, cacheName)); + + if (cacheExists) { + if (clientCache || clientNear) { + assertTrue(client.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); + assertTrue(srv.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); + } + else { + assertFalse(client.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); + assertFalse(srv.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); + } + } + else { + assertTrue(client.cluster().forClientNodes(cacheName).nodes().isEmpty()); + assertTrue(srv.cluster().forClientNodes(cacheName).nodes().isEmpty()); + } + } + + /** + * Replays all saved messages from map, actual sent is performed. + * + * @throws IgniteInterruptedCheckedException If interrupted. + */ + private void replayMessages() throws IgniteInterruptedCheckedException { + record = false; + + log.info("Start replaying messages"); + + for (Runnable r : rs.values()) + r.run(); // Causes real messages sending. + + assertTrue(replay.compareAndSet(false, true)); + + U.sleep(10000); // Enough time to process delayed GridDhtPartitionsFullMessages. + } + + /** + * + */ + private class TestCommunicationDelayedSpi extends TcpCommunicationSpi { + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure ackClosure) + throws IgniteSpiException { + final Object msg0 = ((GridIoMessage)msg).message(); + + log.info("Communication message [thread=" + Thread.currentThread().getName() + + ", msg=" + msg0 + + ", node.id()=" + node.id() + + ']'); + + if (msg0 instanceof GridDhtPartitionsSingleMessage && record && + ((GridDhtPartitionsAbstractMessage)msg0).exchangeId() == null) { + + Runnable prevValue = rs.putIfAbsent(node.id(), new Runnable() { + @Override public void run() { + log.info("Message replayed: " + msg); + + TestCommunicationDelayedSpi.super.sendMessage(node, msg, ackClosure); + } + }); + + assert prevValue == null : "Duplicate message registered to [" + node.id() + "]"; + } + else + try { + super.sendMessage(node, msg, ackClosure); + } + catch (Exception e) { + U.log(null, e); + } + } + + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index 518e674d6c052..d262f7693f707 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -1460,8 +1460,12 @@ private static class TestCommunicationSpi extends TcpCommunicationSpi { /** {@inheritDoc} */ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure ackClosure) throws IgniteSpiException { - if (msg instanceof GridIoMessage) { - Object msg0 = ((GridIoMessage)msg).message(); + + Object msg0 = ((GridIoMessage)msg).message(); + + log.info("Record message [thread=" + Thread.currentThread().getName() + ", msg=" + msg0 + ']'); + + if (msg0 instanceof GridIoMessage) { synchronized (this) { Set blockNodes = blockCls.get(msg0.getClass()); diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java index d0e907cde01f8..3154afa9d5b19 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java @@ -22,6 +22,7 @@ import org.apache.ignite.internal.IgniteClientReconnectApiExceptionTest; import org.apache.ignite.internal.IgniteClientReconnectAtomicsTest; import org.apache.ignite.internal.IgniteClientReconnectBinaryContexTest; +import org.apache.ignite.internal.IgniteClientReconnectCacheDelayExchangeTest; import org.apache.ignite.internal.IgniteClientReconnectCacheTest; import org.apache.ignite.internal.IgniteClientReconnectCollectionsTest; import org.apache.ignite.internal.IgniteClientReconnectComputeTest; @@ -49,6 +50,7 @@ public static TestSuite suite() throws Exception { suite.addTestSuite(IgniteClientReconnectApiExceptionTest.class); suite.addTestSuite(IgniteClientReconnectDiscoveryStateTest.class); suite.addTestSuite(IgniteClientReconnectCacheTest.class); + suite.addTestSuite(IgniteClientReconnectCacheDelayExchangeTest.class); suite.addTestSuite(IgniteClientReconnectBinaryContexTest.class); suite.addTestSuite(IgniteClientReconnectContinuousProcessorTest.class); suite.addTestSuite(IgniteClientReconnectComputeTest.class); From 824ea5f717147e7e615bf961fb235e5b1207a00f Mon Sep 17 00:00:00 2001 From: Maxim Muzafarov Date: Wed, 7 Mar 2018 20:14:40 +0300 Subject: [PATCH 2/9] IGNITE-7791: awaitPartitionMapExchange before client reconnects --- .../apache/ignite/internal/IgniteClientReconnectCacheTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index d262f7693f707..306f28bb609df 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -872,6 +872,8 @@ public void testReconnectCacheDestroyedAndCreated() throws Exception { assertEquals(ATOMIC, clientCache.getConfiguration(CacheConfiguration.class).getAtomicityMode()); + awaitPartitionMapExchange(); + reconnectClientNode(client, srv, new Runnable() { @Override public void run() { srv.destroyCache(DEFAULT_CACHE_NAME); From 640db004ca522e020512c460713ca66e305669bd Mon Sep 17 00:00:00 2001 From: Maxim Muzafarov Date: Thu, 8 Mar 2018 16:32:41 +0300 Subject: [PATCH 3/9] IGNITE-7791: remove awaitPartitionMapExchange before client reconnects --- .../apache/ignite/internal/IgniteClientReconnectCacheTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index 306f28bb609df..e2eb4b6446cb6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -872,7 +872,7 @@ public void testReconnectCacheDestroyedAndCreated() throws Exception { assertEquals(ATOMIC, clientCache.getConfiguration(CacheConfiguration.class).getAtomicityMode()); - awaitPartitionMapExchange(); + //awaitPartitionMapExchange(); reconnectClientNode(client, srv, new Runnable() { @Override public void run() { From 8677131101c94e0d55d25dd65aae4bba4da9af24 Mon Sep 17 00:00:00 2001 From: Maxim Muzafarov Date: Thu, 8 Mar 2018 18:41:14 +0300 Subject: [PATCH 4/9] IGNITE-7791: fix 100 prc issue reproducer --- ...ClientReconnectCacheDelayExchangeTest.java | 144 ++++-------------- 1 file changed, 32 insertions(+), 112 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheDelayExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheDelayExchangeTest.java index 31f8a872ca467..2733b3414dab2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheDelayExchangeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheDelayExchangeTest.java @@ -20,7 +20,6 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; @@ -30,7 +29,6 @@ import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; @@ -42,7 +40,6 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; -import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; /** @@ -54,46 +51,23 @@ public class IgniteClientReconnectCacheDelayExchangeTest extends IgniteClientRec /** */ private static final String STATIC_CACHE = "static-cache"; - - /** */ - private UUID nodeId; - /** * Map of destination node ID to runnable with logic for real message sending. - * To apply real message sending use run method */ - private final ConcurrentHashMap rs = new ConcurrentHashMap<>(); - /** - * Flag to redirect {@link GridDhtPartitionsFullMessage}s from real communication channel to {@link #rs} map. - * Applied only to messages not related to particular exchange - */ - private volatile boolean record = false; - - /** */ - private AtomicBoolean replay = new AtomicBoolean(); + private final ConcurrentHashMap recordedMessages = new ConcurrentHashMap<>(); /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); TestCommunicationDelayedSpi commSpi = new TestCommunicationDelayedSpi(); - commSpi.setSharedMemoryPort(-1); cfg.setCommunicationSpi(commSpi); - cfg.setPeerClassLoadingEnabled(false); - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setNetworkTimeout(5000); - if (nodeId != null) { - cfg.setNodeId(nodeId); - - nodeId = null; - } - CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); - ccfg.setName(STATIC_CACHE); cfg.setCacheConfiguration(ccfg); @@ -123,19 +97,11 @@ public void testReconnectCacheDestroyedAndCreatedDelayed() throws Exception { clientMode = true; final Ignite client = startGrid(SRV_CNT); - - assertTrue(client.cluster().localNode().isClient()); - final Ignite srv = clientRouter(client); - final IgniteCache clientCache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + clientMode = false; - assertEquals(ATOMIC, - clientCache.getConfiguration(CacheConfiguration.class).getAtomicityMode()); - - awaitPartitionMapExchange(); - - record = true; + final IgniteCache clientCache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); reconnectClientNode(client, srv, new Runnable() { @Override public void run() { @@ -155,19 +121,15 @@ record = true; } }, IllegalStateException.class, null); - // Emulate latest GridDhtPartitionsFullMessages. - //grid(0).context().cache().context().exchange().scheduleResendPartitions(); - - checkCacheDiscoveryData(srv, client, DEFAULT_CACHE_NAME, true, false, false); + checkCacheDiscoveryData(srv, client, DEFAULT_CACHE_NAME, false); IgniteCache clientCache0 = client.cache(DEFAULT_CACHE_NAME); - replayMessages(); - - checkCacheDiscoveryData(srv, client, DEFAULT_CACHE_NAME, true, true, false); + // Resend delayed GridDhtPartitionsSingleMessage + for (Runnable r : recordedMessages.values()) + r.run(); // Real messages sending. - assertEquals(TRANSACTIONAL, - clientCache0.getConfiguration(CacheConfiguration.class).getAtomicityMode()); + checkCacheDiscoveryData(srv, client, DEFAULT_CACHE_NAME, true); clientCache0.put(1, 1); @@ -178,17 +140,13 @@ record = true; * @param srv Server node. * @param client Client node. * @param cacheName Cache name. - * @param cacheExists Cache exists flag. * @param clientCache {@code True} if client node has client cache. - * @param clientNear {@code True} if client node has near-enabled client cache. * @throws Exception If failed. */ private void checkCacheDiscoveryData(Ignite srv, Ignite client, final String cacheName, - boolean cacheExists, - final boolean clientCache, - boolean clientNear) throws Exception { + final boolean clientCache) throws Exception { final GridDiscoveryManager srvDisco = ((IgniteKernal)srv).context().discovery(); GridDiscoveryManager clientDisco = ((IgniteKernal)client).context().discovery(); @@ -198,66 +156,29 @@ private void checkCacheDiscoveryData(Ignite srv, assertFalse(srvDisco.cacheAffinityNode(clientNode, cacheName)); assertFalse(clientDisco.cacheAffinityNode(clientNode, cacheName)); - assertEquals(cacheExists, srvDisco.cacheAffinityNode(srvNode, cacheName)); + assertEquals(true, srvDisco.cacheAffinityNode(srvNode, cacheName)); - if (clientNear) { - assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return srvDisco.cacheNearNode(clientNode, cacheName); - } - }, 5000)); + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return F.eq(clientCache, srvDisco.cacheClientNode(clientNode, cacheName)); + } + }, 5000)); - assertTrue(srvDisco.cacheNearNode(clientNode, cacheName)); - } - else { - assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return F.eq(clientCache, srvDisco.cacheClientNode(clientNode, cacheName)); - } - }, 5000)); + assertEquals(clientCache, srvDisco.cacheClientNode(clientNode, cacheName)); - assertEquals(clientCache, srvDisco.cacheClientNode(clientNode, cacheName)); - } + assertEquals(true, clientDisco.cacheAffinityNode(srvNode, cacheName)); - assertEquals(cacheExists, clientDisco.cacheAffinityNode(srvNode, cacheName)); + assertEquals(clientCache, clientDisco.cacheClientNode(clientNode, cacheName)); - if (clientNear) - assertTrue(clientDisco.cacheNearNode(clientNode, cacheName)); - else - assertEquals(clientCache, clientDisco.cacheClientNode(clientNode, cacheName)); - - if (cacheExists) { - if (clientCache || clientNear) { - assertTrue(client.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); - assertTrue(srv.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); - } - else { - assertFalse(client.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); - assertFalse(srv.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); - } + if (clientCache) { + assertTrue(client.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); + assertTrue(srv.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); } else { - assertTrue(client.cluster().forClientNodes(cacheName).nodes().isEmpty()); - assertTrue(srv.cluster().forClientNodes(cacheName).nodes().isEmpty()); + assertFalse(client.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); + assertFalse(srv.cluster().forClientNodes(cacheName).nodes().contains(clientNode)); } - } - /** - * Replays all saved messages from map, actual sent is performed. - * - * @throws IgniteInterruptedCheckedException If interrupted. - */ - private void replayMessages() throws IgniteInterruptedCheckedException { - record = false; - - log.info("Start replaying messages"); - - for (Runnable r : rs.values()) - r.run(); // Causes real messages sending. - - assertTrue(replay.compareAndSet(false, true)); - - U.sleep(10000); // Enough time to process delayed GridDhtPartitionsFullMessages. } /** @@ -269,23 +190,22 @@ private class TestCommunicationDelayedSpi extends TcpCommunicationSpi { throws IgniteSpiException { final Object msg0 = ((GridIoMessage)msg).message(); - log.info("Communication message [thread=" + Thread.currentThread().getName() + - ", msg=" + msg0 + - ", node.id()=" + node.id() + - ']'); - - if (msg0 instanceof GridDhtPartitionsSingleMessage && record && + if (msg0 instanceof GridDhtPartitionsSingleMessage && ((GridDhtPartitionsAbstractMessage)msg0).exchangeId() == null) { - Runnable prevValue = rs.putIfAbsent(node.id(), new Runnable() { + log.info("GridDhtPartitionsSingleMessage message [thread=" + Thread.currentThread().getName() + + ", msg=" + msg0 + + ", node.id=" + node.id() + + ']'); + + recordedMessages.putIfAbsent(node.id(), new Runnable() { @Override public void run() { - log.info("Message replayed: " + msg); + log.info("GridDhtPartitionsSingleMessage replayed: " + msg); TestCommunicationDelayedSpi.super.sendMessage(node, msg, ackClosure); } }); - assert prevValue == null : "Duplicate message registered to [" + node.id() + "]"; } else try { @@ -294,7 +214,7 @@ private class TestCommunicationDelayedSpi extends TcpCommunicationSpi { catch (Exception e) { U.log(null, e); } - } + } } } From e239167d87253919441f5faacf6ee2bbc248b394 Mon Sep 17 00:00:00 2001 From: Maxim Muzafarov Date: Mon, 19 Mar 2018 21:20:26 +0300 Subject: [PATCH 5/9] IGNITE-7791: remove client node check --- .../internal/processors/cache/CacheAffinitySharedManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 7397a45627cf4..1019174f81e37 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -168,7 +168,7 @@ void onDiscoveryEvent(int type, !DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(customMsg)) return; - if ((!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT)) || + if ((type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT) || DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(customMsg)) { synchronized (mux) { assert lastAffVer == null || topVer.compareTo(lastAffVer) > 0 : From 26645e5a241a339b56159b463234cc18f827e4f2 Mon Sep 17 00:00:00 2001 From: Maxim Muzafarov Date: Tue, 27 Mar 2018 21:30:04 +0300 Subject: [PATCH 6/9] IGNITE-7791: more logging --- .../discovery/GridDiscoveryManager.java | 25 +++++++++ .../cache/CacheAffinitySharedManager.java | 55 ++++++++++++++++--- .../GridCachePartitionExchangeManager.java | 38 ++++++++----- .../processors/cache/GridCacheProcessor.java | 6 +- .../dht/GridDhtPartitionTopologyImpl.java | 10 ++-- ...ClientReconnectCacheDelayExchangeTest.java | 5 +- 6 files changed, 109 insertions(+), 30 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index e33666750abf3..43c91f5397b1f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -679,6 +679,31 @@ else if (customMsg instanceof ChangeGlobalStateFinishMessage) { ctx.cache().onDiscoveryEvent(type, customMsg, node, nextTopVer, ctx.state().clusterState()); + String typeStr; + switch (type) { + case EVT_NODE_FAILED: + typeStr = "EVT_NODE_FAILED"; + break; + case EVT_NODE_JOINED: + typeStr = "EVT_NODE_JOINED"; + break; + case EVT_NODE_LEFT: + typeStr = "EVT_NODE_LEFT"; + break; + case EVT_CLIENT_NODE_RECONNECTED: + typeStr = "EVT_CLIENT_NODE_RECONNECTED"; + break; + case EVT_DISCOVERY_CUSTOM_EVT: + typeStr = "EVT_DISCOVERY_CUSTOM_EVT"; + break; + case EVT_NODE_METRICS_UPDATED: + typeStr = "EVT_NODE_METRICS_UPDATED"; + break; + default: + typeStr = String.valueOf(type); + } + log.info("onDiscoveryEvent type=" + typeStr + ", customMsg=" + customMsg + ", nextTopVer=" + nextTopVer + ", clusterState=" + ctx.state().clusterState()); + if (type == EVT_DISCOVERY_CUSTOM_EVT) { for (Class cls = customMsg.getClass(); cls != null; cls = cls.getSuperclass()) { List> list = customEvtLsnrs.get(cls); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 1019174f81e37..2237ece533bfd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -77,9 +77,12 @@ import static org.apache.ignite.cache.CacheMode.LOCAL; import static org.apache.ignite.cache.CacheRebalanceMode.NONE; +import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED; +import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; /** * @@ -165,16 +168,51 @@ void onDiscoveryEvent(int type, lastAffVer = null; if ((state.transition() || !state.active()) && - !DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(customMsg)) + !DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(customMsg)) { + log.info("onDiscoveryEvent requiresCentralizedAffinityAssignment customMsg=" + customMsg); return; + } + + String typeStr; + switch (type) { + case EVT_NODE_FAILED: + typeStr = "EVT_NODE_FAILED"; + break; + case EVT_NODE_JOINED: + typeStr = "EVT_NODE_JOINED"; + break; + case EVT_NODE_LEFT: + typeStr = "EVT_NODE_LEFT"; + break; + case EVT_CLIENT_NODE_RECONNECTED: + typeStr = "EVT_CLIENT_NODE_RECONNECTED"; + break; + case EVT_DISCOVERY_CUSTOM_EVT: + typeStr = "EVT_DISCOVERY_CUSTOM_EVT"; + break; + default: + typeStr = String.valueOf(type); + } + + if (type != EVT_NODE_METRICS_UPDATED) + log.info("onDiscoveryEvent lastAffVer=" + lastAffVer + + ", node=" + node + + ", topVer=" + topVer + + ", customMsg=" + customMsg + + ", clientNode=" + CU.clientNode(node) + + ", type=" + typeStr + ); - if ((type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT) || + if ((!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT )) || DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(customMsg)) { + log.info("onDiscoveryEvent before mux"); synchronized (mux) { assert lastAffVer == null || topVer.compareTo(lastAffVer) > 0 : "lastAffVer=" + lastAffVer + ", topVer=" + topVer + ", customMsg=" + customMsg; lastAffVer = topVer; + log.info("onDiscoveryEvent updated lastAffVer=" + lastAffVer); + } } } @@ -212,6 +250,7 @@ boolean onCustomEvent(CacheAffinityChangeMessage msg) { // Skip message if affinity was already recalculated. boolean exchangeNeeded = lastAffVer == null || lastAffVer.equals(msg.topologyVersion()); + log.info("onCustomEvent lastAffVer=" + lastAffVer + ", topologyVersion=" + msg.topologyVersion()); msg.exchangeNeeded(exchangeNeeded); if (exchangeNeeded) { @@ -307,8 +346,12 @@ void checkRebalanceState(GridDhtPartitionTopology top, Integer checkGrpId) { } try { - if (msg != null) + if (msg != null) { + log.info("checkRebalanceState affinityChangeMessage=" + msg + + ", top.lastTopologyChangeVersion=" + top.lastTopologyChangeVersion() + + ", top.readyTopologyVersion=" + top.readyTopologyVersion() ); cctx.discovery().sendCustomEvent(msg); + } } catch (IgniteCheckedException e) { U.error(log, "Failed to send affinity change message.", e); @@ -967,10 +1010,8 @@ public void onChangeAffinityMessage(final GridDhtPartitionsExchangeFuture exchFu final AffinityTopologyVersion topVer = exchFut.initialVersion(); - if (log.isDebugEnabled()) { - log.debug("Process affinity change message [exchVer=" + topVer + - ", msgVer=" + msg.topologyVersion() + ']'); - } + log.info("Process affinity change message [exchVer=" + topVer + + ", msgVer=" + msg.topologyVersion() + ']'); final Map>> affChange = msg.assignmentChange(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 4e93822a63168..e5b5228dcbaac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -17,9 +17,12 @@ package org.apache.ignite.internal.processors.cache; +import java.io.PrintWriter; +import java.io.StringWriter; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -41,6 +44,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.affinity.AffinityFunction; @@ -470,6 +474,7 @@ else if (customMsg instanceof DynamicCacheChangeBatch) { } else if (customMsg instanceof CacheAffinityChangeMessage) { CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customMsg; + AffinityTopologyVersion lastInitTopology = lastInitializedFut.topologyVersion(); if (msg.exchangeId() == null) { if (msg.exchangeNeeded()) { @@ -959,6 +964,10 @@ public IgniteInternalFuture deferStopCachesOnClientReconnect(Collection e.getClassName() + "." + e.getMethodName() + ":" + e.getLineNumber() + "\n") + .collect(Collectors.joining())); + if (timeout == null || timeout.started()) { ResendTimeoutObject update = new ResendTimeoutObject(); @@ -983,14 +992,12 @@ public void refreshPartitions() { ClusterNode oldest = cctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE); if (oldest == null) { - if (log.isDebugEnabled()) - log.debug("Skip partitions refresh, there are no server nodes [loc=" + cctx.localNodeId() + ']'); + log.info("Skip partitions refresh, there are no server nodes [loc=" + cctx.localNodeId() + ']'); return; } - if (log.isDebugEnabled()) - log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() + ']'); + log.info("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() + ']'); // If this is the oldest node. if (oldest.id().equals(cctx.localNodeId())) { @@ -999,8 +1006,11 @@ public void refreshPartitions() { if (!grp.isLocal()) { GridDhtPartitionTopology top = grp.topology(); - if (top != null) + if (top != null) { + log.info("checkRebalanceState [top=" + top + ", grp.groupId()=" + grp.groupId() + ']'); + cctx.affinity().checkRebalanceState(top, grp.groupId()); + } } } @@ -1036,8 +1046,7 @@ private void sendAllPartitions(Collection nodes, m.topologyVersion(msgTopVer); - if (log.isDebugEnabled()) - log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']'); + log.info("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']'); for (ClusterNode node : nodes) { try { @@ -1197,8 +1206,7 @@ private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExc false, null); - if (log.isDebugEnabled()) - log.debug("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']'); + log.info("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']'); try { cctx.io().sendNoRetry(node, m, SYSTEM_POOL); @@ -1502,8 +1510,7 @@ private void processSinglePartitionUpdate(final ClusterNode node, final GridDhtP try { if (msg.exchangeId() == null) { - if (log.isDebugEnabled()) - log.debug("Received local partition update [nodeId=" + node.id() + ", parts=" + + log.info("Received local partition update [nodeId=" + node.id() + ", parts=" + msg + ']'); boolean updated = false; @@ -1526,7 +1533,7 @@ else if (!grp.isLocal()) if (top != null) { updated |= top.update(null, entry.getValue(), false); - + log.info("processSinglePartitionUpdate top=" + top + ", grpId=" + grpId); cctx.affinity().checkRebalanceState(top, grpId); } } @@ -2571,8 +2578,13 @@ private class ResendTimeoutObject implements GridTimeoutObject { return; try { - if (started.compareAndSet(false, true)) + if (started.compareAndSet(false, true)) { + log.info("Refreshing partitions onTimeout: "); + log.info(Arrays.stream(Thread.currentThread().getStackTrace()). + map(e -> e.getClassName() + "." + e.getMethodName() + ":" + e.getLineNumber() + "\n") + .collect(Collectors.joining())); refreshPartitions(); + } } finally { busyLock.readLock().unlock(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 4648f2597edfa..d20d93abe2609 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -3279,9 +3279,11 @@ public boolean onCustomEvent(DiscoveryCustomMessage msg, AffinityTopologyVersion return false; } - if (msg instanceof CacheAffinityChangeMessage) - return sharedCtx.affinity().onCustomEvent(((CacheAffinityChangeMessage)msg)); + if (msg instanceof CacheAffinityChangeMessage) { + log.info("onCustomEvent msg=" + msg + ", topVer=" + topVer + ", node=" + node); + return sharedCtx.affinity().onCustomEvent(((CacheAffinityChangeMessage)msg)); + } if (msg instanceof SnapshotDiscoveryMessage && ((SnapshotDiscoveryMessage)msg).needExchange()) return true; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 528f0a6e51c7a..193b9920dbef1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -1532,10 +1532,9 @@ else if (locPart.state() == RENTING) { consistencyCheck(); - if (log.isDebugEnabled()) { - log.debug("Partition map after full update [grp=" + grp.cacheOrGroupName() + - ", map=" + fullMapString() + ']'); - } + log.info("Partition map after full update [grp=" + grp.cacheOrGroupName() + + ", map=" + fullMapString() + ']'); + if (changed) ctx.exchange().scheduleResendPartitions(); @@ -1784,8 +1783,7 @@ else if (isStaleUpdate(cur, parts)) { consistencyCheck(); - if (log.isDebugEnabled()) - log.debug("Partition map after single update [grp=" + grp.cacheOrGroupName() + ", map=" + fullMapString() + ']'); + log.info("Partition map after single update [grp=" + grp.cacheOrGroupName() + ", map=" + fullMapString() + ']'); if (changed && exchId == null) ctx.exchange().scheduleResendPartitions(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheDelayExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheDelayExchangeTest.java index 2733b3414dab2..a85095e3dc372 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheDelayExchangeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheDelayExchangeTest.java @@ -193,14 +193,15 @@ private class TestCommunicationDelayedSpi extends TcpCommunicationSpi { if (msg0 instanceof GridDhtPartitionsSingleMessage && ((GridDhtPartitionsAbstractMessage)msg0).exchangeId() == null) { - log.info("GridDhtPartitionsSingleMessage message [thread=" + Thread.currentThread().getName() + + log.info("DelayedSpi GridDhtPartitionsSingleMessage message [thread=" + + Thread.currentThread().getName() + ", msg=" + msg0 + ", node.id=" + node.id() + ']'); recordedMessages.putIfAbsent(node.id(), new Runnable() { @Override public void run() { - log.info("GridDhtPartitionsSingleMessage replayed: " + msg); + log.info("DelayedSpi GridDhtPartitionsSingleMessage replayed: " + msg0); TestCommunicationDelayedSpi.super.sendMessage(node, msg, ackClosure); } From 2fba4cd60b698db5f27ca9c889fa4ace4aba97fb Mon Sep 17 00:00:00 2001 From: Maxim Muzafarov Date: Wed, 4 Apr 2018 12:16:26 +0300 Subject: [PATCH 7/9] IGNITE-6842: update log details --- .../processors/cache/CacheAffinitySharedManager.java | 8 +++++++- .../cache/GridCachePartitionExchangeManager.java | 8 ++++++-- .../distributed/dht/GridDhtPartitionTopologyImpl.java | 8 ++++++++ 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 2237ece533bfd..0b6d05eb7d969 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -310,6 +310,8 @@ void checkRebalanceState(GridDhtPartitionTopology top, Integer checkGrpId) { Map partWait = waitInfo.waitGrps.get(checkGrpId); + log.info("checkRebalanceState waitInfo=" + waitInfo); + boolean rebalanced = true; if (partWait != null) { @@ -1974,6 +1976,8 @@ private CacheGroupHolder groupHolder(AffinityTopologyVersion topVer, final Cache } }); + log.info("initAffinityOnNodeJoin waitRebalanceInfo=" + waitRebalanceInfo); + return waitRebalanceInfo; } } @@ -2617,7 +2621,9 @@ void add(Integer grpId, Integer part, UUID waitNode, List assignmen /** {@inheritDoc} */ @Override public String toString() { return "WaitRebalanceInfo [topVer=" + topVer + - ", grps=" + (waitGrps != null ? waitGrps.keySet() : null) + ']'; + ", grps=" + (waitGrps != null ? waitGrps.keySet() : null) + + ", assignments=" + assignments.size() + + ", deploymentIds = " + deploymentIds.size() + "]"; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index e5b5228dcbaac..bcb30b9255dfc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1531,15 +1531,19 @@ private void processSinglePartitionUpdate(final ClusterNode node, final GridDhtP else if (!grp.isLocal()) top = grp.topology(); + log.info("processSinglePartitionUpdate top=" + top + ", grpId=" + grpId + ", clienTops=" + clientTops); + if (top != null) { updated |= top.update(null, entry.getValue(), false); - log.info("processSinglePartitionUpdate top=" + top + ", grpId=" + grpId); cctx.affinity().checkRebalanceState(top, grpId); } } - if (updated) + if (updated) { + log.info("processSinglePartitionUpdate scheduleResendPartitions"); + scheduleResendPartitions(); + } } else exchangeFuture(msg.exchangeId(), null, null, null, null).onReceiveSingleMessage(node, msg); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 193b9920dbef1..c220d8abd5396 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -2656,6 +2656,14 @@ private void consistencyCheck() { // no-op } + @Override + public String toString() { + return "GridDhtPartitionTopologyImpl{" + + "diffFromAffinityVer=" + diffFromAffinityVer + + ", lastTopChangeVer=" + lastTopChangeVer + + '}'; + } + /** * Iterator over current local partitions. */ From 2e31145ce76392f3ba8374d02fca04d1f983cac0 Mon Sep 17 00:00:00 2001 From: Maxim Muzafarov Date: Thu, 5 Apr 2018 12:52:51 +0300 Subject: [PATCH 8/9] IGNITE-7791: more logging --- .../cache/CacheAffinitySharedManager.java | 20 +++++++++++++++++++ .../processors/cache/ClusterCachesInfo.java | 15 ++++++++++++++ .../GridCachePartitionExchangeManager.java | 6 ++++++ 3 files changed, 41 insertions(+) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 0b6d05eb7d969..7f5348fdf02c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -29,6 +29,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteSystemProperties; @@ -66,6 +67,7 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.IgniteInClosureX; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; @@ -75,6 +77,7 @@ import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; +import static java.util.Arrays.stream; import static org.apache.ignite.cache.CacheMode.LOCAL; import static org.apache.ignite.cache.CacheRebalanceMode.NONE; import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED; @@ -228,6 +231,12 @@ public void initCachesOnLocalJoin( caches.clear(); caches.init(cacheGroupDescriptors, cacheDescriptors); + + StackTraceElement[] arr = Thread.currentThread().getStackTrace(); + String msg = stream(arr).map(e -> e.getClassName() + "." + e.getMethodName() + ":" + e.getLineNumber() + "\n") + .collect(Collectors.joining(",", "[", "]")); + log.info("initCachesOnLocalJoin complete msg=" + msg + ", cacheGroupDescriptors=" + cacheGroupDescriptors + + ", cacheDescriptors=" + cacheDescriptors); } /** @@ -2689,6 +2698,12 @@ CacheGroupDescriptor group(int grpId) { * @param descs Cache descriptor. */ void initStartedCaches(Collection descs) { + + StackTraceElement[] arr = Thread.currentThread().getStackTrace(); + String msg = stream(arr).map(e -> e.getClassName() + "." + e.getMethodName() + ":" + e.getLineNumber() + "\n") + .collect(Collectors.joining(",", "[", "]")); + log.info("initStartedCaches descs=" + descs + ", msg=" + msg); + for (DynamicCacheDescriptor desc : descs) { CacheGroupDescriptor grpDesc = desc.groupDescriptor(); @@ -2721,6 +2736,11 @@ void updateCachesInfo(ExchangeActions exchActions) { for (ExchangeActions.CacheActionData req : exchActions.cacheStartRequests()) registerCache(req.descriptor()); + + StackTraceElement[] arr = Thread.currentThread().getStackTrace(); + String msg = stream(arr).map(e -> e.getClassName() + "." + e.getMethodName() + ":" + e.getLineNumber() + "\n") + .collect(Collectors.joining(",", "[", "]")); + log.info("updateCachesInfo exchActions=" + exchActions + ", msg=" + msg); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 2b2fb559c182e..06cfddad39360 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -32,6 +32,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheExistsException; @@ -62,6 +64,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static java.util.Arrays.stream; import static org.apache.ignite.cache.CacheMode.LOCAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; @@ -813,6 +816,8 @@ private Serializable joinDiscoveryData() { locJoinCachesCtx = null; + log.info("localJoinCachesContext locJoinCachesCtx=" + locJoinCachesCtx + ", result=" + result); + return result; } @@ -1107,6 +1112,12 @@ public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { private void initStartCachesForLocalJoin(boolean firstNode, boolean reconnect) { assert locJoinCachesCtx == null : locJoinCachesCtx; + StackTraceElement[] arr = Thread.currentThread().getStackTrace(); + String msg = stream(arr).map(e -> e.getClassName() + "." + e.getMethodName() + ":" + e.getLineNumber() + "\n") + .collect(Collectors.joining(",", "[", "]")); + log.info("initStartCachesForLocalJoin firstNode=" + firstNode + + ", reconnect=" + reconnect + ", msg=" + msg); + if (ctx.state().clusterState().transition()) { joinOnTransition = true; @@ -1165,6 +1176,9 @@ private void initStartCachesForLocalJoin(boolean firstNode, boolean reconnect) { } } + log.info("initStartCachesForLocalJoin locJoinStartCaches=" + locJoinStartCaches + + ", registeredCacheGrps=" + registeredCacheGrps + ", registeredCaches=" + registeredCaches); + locJoinCachesCtx = new LocalJoinCachesContext( locJoinStartCaches, new HashMap<>(registeredCacheGrps), @@ -1751,6 +1765,7 @@ public ClusterCachesReconnectResult onReconnected(boolean active, boolean transi } if (locJoinCachesCtx != null) { + log.info("onReconnected survivedCacheGrps=" + survivedCacheGrps + ", survivedCaches=" + survivedCaches); locJoinCachesCtx.removeSurvivedCacheGroups(survivedCacheGrps); locJoinCachesCtx.removeSurvivedCaches(survivedCaches); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index bcb30b9255dfc..803b341d6b567 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -118,6 +118,7 @@ import org.jetbrains.annotations.Nullable; import java.util.concurrent.ConcurrentHashMap; +import static java.util.Arrays.stream; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.IgniteSystemProperties.IGNITE_IO_DUMP_ON_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT; @@ -441,6 +442,11 @@ private void onDiscoveryEvent(DiscoveryEvent evt, DiscoCache cache) { if (locJoinCtx != null) { exchActs = new ExchangeActions(); + StackTraceElement[] arr = Thread.currentThread().getStackTrace(); + String msg = stream(arr).map(e -> e.getClassName() + "." + e.getMethodName() + ":" + e.getLineNumber() + "\n") + .collect(Collectors.joining(",", "[", "]")); + log.info("initCachesOnLocalJoin set locJoinCtx=" + locJoinCtx + ", msg=" + msg); + exchActs.localJoinContext(locJoinCtx); } } From 5af630a4b2474e0459c3ee0b985ceb1cd6fa1389 Mon Sep 17 00:00:00 2001 From: Maxim Muzafarov Date: Sat, 7 Apr 2018 01:34:42 +0300 Subject: [PATCH 9/9] IGNITE-7791: fix local caches context --- .../internal/processors/cache/CacheAffinitySharedManager.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 7f5348fdf02c7..822c82c12776b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -230,7 +230,8 @@ public void initCachesOnLocalJoin( // Clean-up in case of client reconnect. caches.clear(); - caches.init(cacheGroupDescriptors, cacheDescriptors); +// caches.init(cacheGroupDescriptors, cacheDescriptors); + caches.init(cctx.cache().cacheGroupDescriptors(), cctx.cache().cacheDescriptors()); StackTraceElement[] arr = Thread.currentThread().getStackTrace(); String msg = stream(arr).map(e -> e.getClassName() + "." + e.getMethodName() + ":" + e.getLineNumber() + "\n")