From 8427a4df8a52f0cc4e3eeac80776f221af8b9f1a Mon Sep 17 00:00:00 2001 From: Evgeny Stanilovskiy Date: Fri, 21 Dec 2018 15:11:13 +0300 Subject: [PATCH 1/6] IGNITE-9493 Communication error resolver shouldn`t be called on client node failed. --- .../zk/internal/ZookeeperDiscoveryImpl.java | 3 + .../internal/ZookeeperDiscoverySpiTest.java | 79 ++++++++++++++++++- 2 files changed, 81 insertions(+), 1 deletion(-) diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index fa218fff4f2aa..66e6db16dd187 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -300,6 +300,9 @@ void clearCommunicationErrorProcessFuture(ZkCommunicationErrorProcessFuture fut) * @param err Connect error. */ public void resolveCommunicationError(ClusterNode node0, Exception err) { + if (node0.isClient()) + return; + ZookeeperClusterNode node = node(node0.id()); if (node == null) diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java index b47d51ec978f1..5ebea5c2b9716 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java @@ -204,6 +204,9 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { /** */ private boolean failCommSpi; + /** */ + private boolean blockCommSpi; + /** */ private long sesTimeout; @@ -400,6 +403,13 @@ public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest { if (failCommSpi) cfg.setCommunicationSpi(new PeerToPeerCommunicationFailureSpi()); + if (blockCommSpi) { + cfg.setCommunicationSpi(new TcpBlockCommunicationSpi(igniteInstanceName.contains("block")) + .setUsePairedConnections(true)); + + cfg.setNetworkTimeout(500); + } + if (commFailureRslvr != null) cfg.setCommunicationFailureResolver(commFailureRslvr.apply()); @@ -3584,6 +3594,28 @@ public void testCommunicationFailureResolve_ConcurrentMultinode() throws Excepti }, 30, "test-resolve-failure"); } + /** + * @throws Exception If failed. + */ + @Test + public void testClientReconnects() throws Exception { + blockCommSpi = true; + + Ignite srv1 = startGrid("server1-block"); + + clientModeThreadLocal(true); + + IgniteEx cli = startGrid("client"); + + IgniteCache cache = cli.getOrCreateCache(DEFAULT_CACHE_NAME); + + cache.put(1, 1); + + assertEquals(cache.get(1), 1); + + assertEquals(1, srv1.cluster().forClients().nodes().size()); + } + /** * @throws Exception If failed. */ @@ -5612,7 +5644,7 @@ static class TestFastStopProcessCustomMessageAck implements DiscoveryCustomMessa } /** {@inheritDoc} */ - @Nullable @Override public DiscoveryCustomMessage ackMessage() { + @Override public @Nullable DiscoveryCustomMessage ackMessage() { return null; } @@ -5655,4 +5687,49 @@ static class TestFastStopProcessCustomMessageAck implements DiscoveryCustomMessa return S.toString(TestFastStopProcessCustomMessageAck.class, this); } } + + /** + * Block communications. + */ + private class TcpBlockCommunicationSpi extends TcpCommunicationSpi { + /** + * Whether this instance should actually block. + */ + private final boolean isBlocking; + + /** Blocked once. */ + private boolean alreadyBlocked; + + /** + * @param isBlocking Whether this instance should actually block. + */ + public TcpBlockCommunicationSpi(boolean isBlocking) { + this.isBlocking = isBlocking; + } + + /** {@inheritDoc} */ + @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) + throws IgniteCheckedException { + if (node.isClient() && blockHandshakeOnce(node.id())) { + ZookeeperDiscoverySpi spi = spi(ignite()); + + spi.resolveCommunicationFailure(spi.getRemoteNodes().iterator().next(), new Exception("test")); + + return null; + } + + return super.createTcpClient(node, connIdx); + } + + /** Check if this connection is blocked. */ + private boolean blockHandshakeOnce(UUID nodeId) { + if (isBlocking && !alreadyBlocked) { + alreadyBlocked = true; + + return true; + } + + return false; + } + } } From 89594f711567d70ffdb94b3f223235d5015a3579 Mon Sep 17 00:00:00 2001 From: Evgeny Stanilovskiy Date: Mon, 24 Dec 2018 12:32:52 +0300 Subject: [PATCH 2/6] wop --- .../spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java | 6 ++++-- .../discovery/zk/internal/ZookeeperDiscoverySpiTest.java | 8 ++++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 66e6db16dd187..1c895a6ef4ecf 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -300,7 +300,9 @@ void clearCommunicationErrorProcessFuture(ZkCommunicationErrorProcessFuture fut) * @param err Connect error. */ public void resolveCommunicationError(ClusterNode node0, Exception err) { - if (node0.isClient()) + ZkCommunicationErrorProcessFuture fut = commErrProcFut.get(); + + if (node0.isClient() && fut == null) return; ZookeeperClusterNode node = node(node0.id()); @@ -313,7 +315,7 @@ public void resolveCommunicationError(ClusterNode node0, Exception err) { for (;;) { checkState(); - ZkCommunicationErrorProcessFuture fut = commErrProcFut.get(); + fut = commErrProcFut.get(); if (fut == null || fut.isDone()) { ZkCommunicationErrorProcessFuture newFut = ZkCommunicationErrorProcessFuture.createOnCommunicationError( diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java index 5ebea5c2b9716..faf0db68e4477 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java @@ -3595,6 +3595,10 @@ public void testCommunicationFailureResolve_ConcurrentMultinode() throws Excepti } /** + * Test reproduces failure in case of client resolution failure + * {@link org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi#createTcpClient} from server side, further + * client reconnect and proper grid work. + * * @throws Exception If failed. */ @Test @@ -3605,7 +3609,7 @@ public void testClientReconnects() throws Exception { clientModeThreadLocal(true); - IgniteEx cli = startGrid("client"); + IgniteEx cli = startGrid("client-block"); IgniteCache cache = cli.getOrCreateCache(DEFAULT_CACHE_NAME); @@ -5691,7 +5695,7 @@ static class TestFastStopProcessCustomMessageAck implements DiscoveryCustomMessa /** * Block communications. */ - private class TcpBlockCommunicationSpi extends TcpCommunicationSpi { + private static class TcpBlockCommunicationSpi extends TcpCommunicationSpi { /** * Whether this instance should actually block. */ From 3fd8f6a4e2bc47a4ced94614ac8e4f7853e1c030 Mon Sep 17 00:00:00 2001 From: Evgeny Stanilovskiy Date: Mon, 24 Dec 2018 14:39:55 +0300 Subject: [PATCH 3/6] wip --- .../spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 1c895a6ef4ecf..d5c281c834f57 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -300,9 +300,7 @@ void clearCommunicationErrorProcessFuture(ZkCommunicationErrorProcessFuture fut) * @param err Connect error. */ public void resolveCommunicationError(ClusterNode node0, Exception err) { - ZkCommunicationErrorProcessFuture fut = commErrProcFut.get(); - - if (node0.isClient() && fut == null) + if (node0.isClient()) return; ZookeeperClusterNode node = node(node0.id()); @@ -315,7 +313,7 @@ public void resolveCommunicationError(ClusterNode node0, Exception err) { for (;;) { checkState(); - fut = commErrProcFut.get(); + ZkCommunicationErrorProcessFuture fut = commErrProcFut.get(); if (fut == null || fut.isDone()) { ZkCommunicationErrorProcessFuture newFut = ZkCommunicationErrorProcessFuture.createOnCommunicationError( From 405137355af0d795cfb3241fcca961ee1ce759d5 Mon Sep 17 00:00:00 2001 From: Evgeny Stanilovskiy Date: Mon, 24 Dec 2018 16:35:05 +0300 Subject: [PATCH 4/6] add metrics check --- .../discovery/zk/ZookeeperDiscoverySpi.java | 9 +++++++-- .../zk/ZookeeperDiscoverySpiMBean.java | 8 ++++++++ .../zk/internal/ZookeeperDiscoveryImpl.java | 5 ++++- .../ZookeeperDiscoveryStatistics.java | 20 +++++++++++++++---- .../internal/ZookeeperDiscoverySpiTest.java | 13 ++++++++++++ 5 files changed, 48 insertions(+), 7 deletions(-) diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index 222b73bb10e58..287bd4502af11 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -595,12 +595,17 @@ public ZookeeperDiscoverySpiMBeanImpl(IgniteSpiAdapter spiAdapter) { } /** {@inheritDoc} */ - @Nullable @Override public UUID getCoordinator() { + @Override public long getCommErrorProcNum() { + return stats.commErrorCount(); + } + + /** {@inheritDoc} */ + @Override public @Nullable UUID getCoordinator() { return impl.getCoordinator(); } /** {@inheritDoc} */ - @Nullable @Override public String getCoordinatorNodeFormatted() { + @Override public @Nullable String getCoordinatorNodeFormatted() { return String.valueOf(impl.node(impl.getCoordinator())); } diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiMBean.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiMBean.java index 1eed0b41d098d..05a3dc26778a3 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiMBean.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiMBean.java @@ -57,6 +57,14 @@ public interface ZookeeperDiscoverySpiMBean extends IgniteSpiManagementMBean, Di @MXBeanDescription("Zk Session Id.") public String getZkSessionId(); + /** + * Gets number of communication resolver called. + * + * @return Number of communication resolved oparations. + */ + @MXBeanDescription("Communication error resolver call count.") + public long getCommErrorProcNum(); + /** * Gets root path in ZooKeeper cluster Zk client uses to put data to. * diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index d5c281c834f57..31475943d1ed8 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -64,6 +64,7 @@ import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.internal.util.typedef.T2; @@ -310,10 +311,12 @@ public void resolveCommunicationError(ClusterNode node0, Exception err) { IgniteInternalFuture nodeStatusFut; + stats.onCommunicationError(); + for (;;) { checkState(); - ZkCommunicationErrorProcessFuture fut = commErrProcFut.get(); + ZkCommunicationErrorProcessFuture fut = commErrProcFut.get(); if (fut == null || fut.isDone()) { ZkCommunicationErrorProcessFuture newFut = ZkCommunicationErrorProcessFuture.createOnCommunicationError( diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryStatistics.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryStatistics.java index 678cf11cf1505..af5c238e69662 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryStatistics.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryStatistics.java @@ -23,21 +23,28 @@ */ public class ZookeeperDiscoveryStatistics { /** */ - private int joinedNodesCnt; + private long joinedNodesCnt; /** */ - private int failedNodesCnt; + private long failedNodesCnt; + + /** Communication error count. */ + private long commErrCnt; /** */ - public int joinedNodesCnt() { + public long joinedNodesCnt() { return joinedNodesCnt; } /** */ - public int failedNodesCnt() { + public long failedNodesCnt() { return failedNodesCnt; } + public long commErrorCount() { + return commErrCnt; + } + /** */ public void onNodeJoined() { joinedNodesCnt++; @@ -48,6 +55,11 @@ public void onNodeFailed() { failedNodesCnt++; } + /** */ + public void onCommunicationError() { + commErrCnt++; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(ZookeeperDiscoveryStatistics.class, this); diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java index faf0db68e4477..72e835ecea1d9 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java @@ -3618,6 +3618,19 @@ public void testClientReconnects() throws Exception { assertEquals(cache.get(1), 1); assertEquals(1, srv1.cluster().forClients().nodes().size()); + + MBeanServer srv = ManagementFactory.getPlatformMBeanServer(); + + IgniteEx ignite = grid("server1-block"); + + ObjectName spiName = U.makeMBeanName(ignite.context().igniteInstanceName(), "SPIs", + ZookeeperDiscoverySpi.class.getSimpleName()); + + ZookeeperDiscoverySpiMBean bean = JMX.newMBeanProxy(srv, spiName, ZookeeperDiscoverySpiMBean.class); + + assertNotNull(bean); + + assertEquals(0, bean.getCommErrorProcNum()); } /** From 15b53635390a67442e313027f44459150c0c01a7 Mon Sep 17 00:00:00 2001 From: Evgeny Stanilovskiy Date: Mon, 24 Dec 2018 16:43:47 +0300 Subject: [PATCH 5/6] wip --- .../spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 31475943d1ed8..cea426678f7b2 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -311,8 +311,6 @@ public void resolveCommunicationError(ClusterNode node0, Exception err) { IgniteInternalFuture nodeStatusFut; - stats.onCommunicationError(); - for (;;) { checkState(); @@ -323,6 +321,8 @@ public void resolveCommunicationError(ClusterNode node0, Exception err) { this, node.sessionTimeout() + 1000); + stats.onCommunicationError(); + if (commErrProcFut.compareAndSet(fut, newFut)) { fut = newFut; From 4c23d5caf32b34fcce6770854f1d3ccd17a12379 Mon Sep 17 00:00:00 2001 From: Evgeny Stanilovskiy Date: Mon, 24 Dec 2018 17:02:08 +0300 Subject: [PATCH 6/6] fix --- .../ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index cea426678f7b2..d57c8d65f0bb7 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -64,7 +64,6 @@ import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; -import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.internal.util.typedef.T2;