From 3209defc81ec0871fd229594c1b44a3c4eec57ae Mon Sep 17 00:00:00 2001 From: Alexandr Kuramshin Date: Sun, 29 Oct 2017 17:27:57 +0700 Subject: [PATCH 01/12] Added TcpDiscoverySplitTest, updated IgniteCacheTopologySplitAbstractTest Early previous node fail with more reliable connection check (keep-alive) algorithm Fast node failed message transmission, reduced split detection & exchange delay --- .../ignite/spi/discovery/tcp/ServerImpl.java | 160 ++++++++++--- .../tcp/internal/TcpDiscoveryNodesRing.java | 65 ++++++ .../IgniteCacheTopologySplitAbstractTest.java | 67 ++++-- .../discovery/tcp/TcpDiscoverySplitTest.java | 217 ++++++++++++++++++ .../IgniteSpiDiscoverySelfTestSuite.java | 2 + 5 files changed, 469 insertions(+), 42 deletions(-) create mode 100644 modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySplitTest.java diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 1c3ec2ecf0680..b085bb3e6aea9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -55,6 +55,7 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.SSLException; import javax.net.ssl.SSLServerSocket; @@ -193,6 +194,8 @@ class ServerImpl extends TcpDiscoveryImpl { @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private RingMessageWorker msgWorker; + private ConnChecker connChecker; + /** Client message workers. */ protected ConcurrentMap clientMsgWorkers = new ConcurrentHashMap8<>(); @@ -244,6 +247,9 @@ class ServerImpl extends TcpDiscoveryImpl { private final ConcurrentMap>> pingMap = new ConcurrentHashMap8<>(); + /** Time when the last message from previous node was received. */ + private final AtomicLong lastTimePrevNodeRcvd = new AtomicLong(); + /** * @param adapter Adapter. */ @@ -331,6 +337,9 @@ class ServerImpl extends TcpDiscoveryImpl { msgWorker = new RingMessageWorker(); msgWorker.start(); + connChecker = new ConnChecker(); + connChecker.start(); + if (tcpSrvr == null) tcpSrvr = new TcpServer(); @@ -454,6 +463,9 @@ else if (log.isInfoEnabled()) { U.interrupt(ipFinderCleaner); U.join(ipFinderCleaner, log); + U.interrupt(connChecker); + U.join(connChecker, log); + U.interrupt(msgWorker); U.join(msgWorker, log); @@ -2031,6 +2043,16 @@ private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) { } } + /** */ + protected void updateLastTimePrevNodeRcvd(TcpDiscoveryAbstractMessage msg) { + if (msg.senderNodeId() != null) { + TcpDiscoveryNode prevNode = ring.prevNode(); + + if (prevNode != null && prevNode.id().equals(msg.senderNodeId())) + lastTimePrevNodeRcvd.set(U.currentTimeMillis()); + } + } + /** * @param obj Object. * @param ver Security serialize version. @@ -2508,6 +2530,9 @@ private class RingMessageWorker extends MessageWorkerAdapter>> Simulating split"); + log.info("Simulating split"); - long topVer = grid(0).cluster().topologyVersion(); + splitTopVer = grid(0).cluster().topologyVersion(); // Trigger segmentation. segmented = true; @@ -84,7 +87,14 @@ protected void splitAndWait() throws InterruptedException, IgniteCheckedExceptio comm.blockMessages(new SegmentBlocker(ignite.cluster().localNode())); } + } + /** + * Wait for segmentation results. Should be called after {@link #split()} + * + * @throws InterruptedException If interrupted while waiting. + */ + protected void awaitSegmentation() throws InterruptedException { Collection seg0 = F.view(G.allGrids(), new IgnitePredicate() { @Override public boolean apply(Ignite ignite) { return segment(ignite.cluster().localNode()) == 0; @@ -97,26 +107,51 @@ protected void splitAndWait() throws InterruptedException, IgniteCheckedExceptio } }); - for (Ignite grid : seg0) - ((IgniteKernal)grid).context().discovery().topologyFuture(topVer + seg1.size()).get(); + try { + for (Ignite grid : seg0) + ((IgniteKernal)grid).context().discovery().topologyFuture(splitTopVer + seg1.size()).get(); - for (Ignite grid : seg1) - ((IgniteKernal)grid).context().discovery().topologyFuture(topVer + seg0.size()).get(); + for (Ignite grid : seg1) + ((IgniteKernal)grid).context().discovery().topologyFuture(splitTopVer + seg0.size()).get(); - // awaitPartitionMapExchange won't work because coordinator is wrong for second segment. - for (Ignite grid : G.allGrids()) - ((IgniteKernal)grid).context().cache().context().exchange().lastTopologyFuture().get(); + // awaitPartitionMapExchange won't work because coordinator is wrong for second segment. + for (Ignite grid : G.allGrids()) + ((IgniteKernal)grid).context().cache().context().exchange().lastTopologyFuture().get(); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to wait for exchange", e); + } if (log.isInfoEnabled()) - log.info(">>> Finished waiting for split"); + log.info("Finished waiting for split"); + } + + /** + * Trigger segmentation and wait for results. Should be called on stable topology. + * + * @throws InterruptedException If interrupted while waiting. + * @throws IgniteCheckedException On error. + */ + protected void splitAndWait() throws InterruptedException, IgniteCheckedException { + split(); + awaitSegmentation(); } /** * Restore initial state */ protected void unsplit() { + unsplit(true); + } + + /** + * Restore initial state + * + * @param sendMessages Send blocked messages after restoring from a split + */ + protected void unsplit(boolean sendMessages) { if (log.isInfoEnabled()) - log.info(">>> Restoring from split"); + log.info("Restoring from split [sendMessages=" + sendMessages + ']'); segmented = false; @@ -124,7 +159,7 @@ protected void unsplit() { TestRecordingCommunicationSpi comm = (TestRecordingCommunicationSpi) ignite.configuration().getCommunicationSpi(); - comm.stopBlock(); + comm.stopBlock(sendMessages); } } @@ -156,6 +191,7 @@ protected boolean segmented() { * Discovery SPI which can simulate network split. */ protected class SplitTcpDiscoverySpi extends TcpDiscoverySpi { + /** * @param sockAddr Remote socket address. * @return Segmented status. @@ -246,6 +282,7 @@ protected void checkSegmented(InetSocketAddress sockAddr, long timeout) throws S /** */ protected class SegmentBlocker implements IgniteBiPredicate { + /** */ private final ClusterNode locNode; diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySplitTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySplitTest.java new file mode 100644 index 0000000000000..1ac287d42b849 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySplitTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.spi.discovery.tcp; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheTopologySplitAbstractTest; +import org.apache.ignite.internal.util.typedef.internal.U; + +import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT; + +/** + * {@link TcpDiscoverySpi} test with splitting + */ +public class TcpDiscoverySplitTest extends IgniteCacheTopologySplitAbstractTest { + + /** */ + private static final int SEG_0_SIZE = 4; + + /** */ + private static final long DISCO_TIMEOUT = 1000L; + + /** */ + private static final long SPLIT_TIME = 3 * DISCO_TIMEOUT + DISCO_TIMEOUT / 2; + + /** */ + private static final String NODE_IDX_ATTR = "nodeIdx"; + + /** */ + private static int getDiscoPort(int gridIdx) { + return DFLT_PORT + gridIdx; + } + + /** */ + private static boolean isDiscoPort(int port) { + return port >= DFLT_PORT && port <= (DFLT_PORT + TcpDiscoverySpi.DFLT_PORT_RANGE); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 120_000L; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + int idx = getTestIgniteInstanceIndex(gridName); + + SplitTcpDiscoverySpi disco = (SplitTcpDiscoverySpi)cfg.getDiscoverySpi(); + + disco.setLocalPort(getDiscoPort(idx)); + + disco.setSocketTimeout(DISCO_TIMEOUT); + + cfg.setUserAttributes(Collections.singletonMap(NODE_IDX_ATTR, idx)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected boolean isBlocked(int locPort, int rmtPort) { + return isDiscoPort(locPort) && isDiscoPort(rmtPort) && segment(locPort) != segment(rmtPort); + } + + /** */ + private int segment(int discoPort) { + return (discoPort - DFLT_PORT) < SEG_0_SIZE ? 0 : 1; + } + + /** {@inheritDoc} */ + @Override protected int segment(ClusterNode node) { + return ((Integer)node.attribute(NODE_IDX_ATTR)) < SEG_0_SIZE ? 0 : 1; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** */ + @SuppressWarnings("unchecked") + protected void testSplitRestore(int[] startSeq, long splitTime) throws Exception { + if (log.isInfoEnabled()) + log.info("Start sequence [size=" + startSeq.length + ", indices=" + Arrays.toString(startSeq) + + ", splitTime=" + splitTime); + + IgniteEx[] grids = new IgniteEx[startSeq.length]; + + try { + for (int i = 0; i < startSeq.length; i++) { + int idx = startSeq[i]; + + grids[i] = startGrid(idx); + + awaitPartitionMapExchange(); + } + + long beforeSplitTime = U.currentTimeMillis(); + + split(); + + Thread.sleep(splitTime); + + unsplit(false); + + awaitSegmentation(); + + long exchangeEndTime = U.currentTimeMillis(); + + if (log.isInfoEnabled()) + log.info("Split with exchange finished in " + (exchangeEndTime - beforeSplitTime) + " ms"); + + Set[] segs = {new HashSet(), new HashSet()}; + + for (int i = 0; i < startSeq.length; i++) { + int idx = startSeq[i]; + + int segIdx = idx < SEG_0_SIZE ? 0 : 1; + + try { + IgniteEx g = grids[i]; + + if (!g.context().isStopping()) + segs[segIdx].add(idx); + } + catch (Exception e) { + U.warn(log, "Error checking grid is live [idx=" + idx + ']', e); + } + } + if (log.isInfoEnabled()) + for (int i = 0; i < segs.length; ++i) { + Set seg = segs[i]; + + log.info(seg.isEmpty() ? "No live grids [segment=" + i + ']' : + "Live grids [segment=" + i + ", size=" + seg.size() + ", indices=" + seg + ']'); + } + int[] liveExp = startSeq; + + for (int idx : liveExp) { + int segIdx = idx < SEG_0_SIZE ? 0 : 1; + + if (!segs[segIdx].contains(idx)) + fail("Grid is stopped, but expected to live [idx=" + idx + ']'); + } + } + finally { +// for (int i = 0; i < startSeq.length; ++i) { +// int idx = startSeq[i]; +// if (grids[i] != null) +// try { +// stopGrid(idx); +// } +// catch (Throwable e) { +// U.warn(log, "Stop grid error", e); +// } +// } + } + } + + /** */ + public void testFullSplit() throws Exception { + int[] startSeq = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}; + testSplitRestore(startSeq, startSeq.length * DISCO_TIMEOUT + DISCO_TIMEOUT / 2); + } + + /** */ + public void testConsecutiveCoordSeg0() throws Exception { + testSplitRestore(new int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, SPLIT_TIME); + } + + /** */ + public void testConsecutiveCoordSeg1() throws Exception { + testSplitRestore(new int[] {4, 5, 6, 7, 8, 9, 10, 11, 0, 1, 2, 3}, SPLIT_TIME); + } + + /** */ + public void testMixedCoordSeg0() throws Exception { + testSplitRestore(new int[] {0, 1, 4, 5, 6, 7, 2, 3, 8, 9, 10, 11}, SPLIT_TIME); + } + + /** */ + public void testMixedCoordSeg1() throws Exception { + testSplitRestore(new int[] {4, 5, 6, 7, 0, 1, 8, 9, 10, 11, 2, 3}, SPLIT_TIME); + } + + /** */ + public void testShuffledCoordSeg0() throws Exception { + testSplitRestore(new int[] {0, 4, 5, 1, 6, 7, 2, 8, 9, 3, 10, 11}, SPLIT_TIME); + } + + /** */ + public void testShuffledCoordSeg1() throws Exception { + testSplitRestore(new int[] {4, 5, 0, 6, 7, 1, 8, 9, 2, 10, 11, 3}, SPLIT_TIME); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java index ff4c9c1ada130..a3700255c65d6 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java @@ -39,6 +39,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpiFailureTimeoutSelfTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpiSelfTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpiStartStopSelfTest; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySplitTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslSecuredUnsecuredTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslSelfTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslTrustedSelfTest; @@ -78,6 +79,7 @@ public static TestSuite suite() throws Exception { suite.addTest(new TestSuite(TcpDiscoverySpiConfigSelfTest.class)); suite.addTest(new TestSuite(TcpDiscoveryMarshallerCheckSelfTest.class)); suite.addTest(new TestSuite(TcpDiscoverySnapshotHistoryTest.class)); + suite.addTest(new TestSuite(TcpDiscoverySplitTest.class)); suite.addTest(new TestSuite(GridTcpSpiForwardingSelfTest.class)); From c7b48f1d307575396eb5b36573c3fee569ea3bad Mon Sep 17 00:00:00 2001 From: Alexandr Kuramshin Date: Mon, 30 Oct 2017 00:21:45 +0700 Subject: [PATCH 02/12] Added TcpDiscoverySplitTest, updated IgniteCacheTopologySplitAbstractTest Early previous node fail with more reliable connection check (keep-alive) algorithm Fast node failed message transmission, reduced split detection & exchange delay --- .../ignite/spi/discovery/tcp/ServerImpl.java | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index b085bb3e6aea9..e397903ea441b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2707,7 +2707,15 @@ private void initConnectionCheckFrequency() { spi.stats.onMessageProcessingStarted(msg); - if (msg instanceof TcpDiscoveryJoinRequestMessage) + if (!(msg instanceof TcpDiscoveryNodeAddedMessage) && + !(msg instanceof TcpDiscoveryJoinRequestMessage) && + msg.senderNodeId() != null && ring.node(msg.senderNodeId()) == null) { + + U.warn(ServerImpl.this.log, "Ignore message from unknown node [senderNodeId=" + msg.senderNodeId() + + ", msg=" + msg + ']'); + } + + else if (msg instanceof TcpDiscoveryJoinRequestMessage) processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg); else if (msg instanceof TcpDiscoveryClientReconnectMessage) { @@ -5844,14 +5852,16 @@ else if (log.isDebugEnabled()) this.nodeId = nodeId; - for (TcpDiscoveryNode n : failedNodes.keySet()) { - if (n.id().equals(nodeId)) { - if (log.isInfoEnabled()) - log.info("Ignore handshake request from failed node [nodeId=" + nodeId + ']'); + synchronized (mux) { + for (TcpDiscoveryNode n : failedNodes.keySet()) { + if (n.id().equals(nodeId)) { + if (log.isInfoEnabled()) + log.info("Ignore handshake request from failed node [nodeId=" + nodeId + ']'); - U.closeQuiet(sock); + U.closeQuiet(sock); - return ; + return; + } } } @@ -6185,15 +6195,6 @@ else if (msg instanceof TcpDiscoveryRingLatencyCheckMessage) { TcpDiscoveryClientMetricsUpdateMessage metricsUpdateMsg = null; - if (!(msg instanceof TcpDiscoveryNodeAddedMessage) && - ring.node(msg.senderNodeId()) == null) { - - if (log.isInfoEnabled()) - log.info("Ignore message from unknown node [msg=" + msg + - ", senderNodeId=" + msg.senderNodeId() + ']'); - break; - } - if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage) metricsUpdateMsg = (TcpDiscoveryClientMetricsUpdateMessage)msg; else { From 02b69a85aab8e2107d91e7634b309ab439d25b0e Mon Sep 17 00:00:00 2001 From: Alexandr Kuramshin Date: Mon, 30 Oct 2017 03:58:41 +0700 Subject: [PATCH 03/12] Added TcpDiscoverySplitTest, updated IgniteCacheTopologySplitAbstractTest Early previous node fail with more reliable connection check (keep-alive) algorithm Fast node failed message transmission, reduced split detection & exchange delay Remember recently failed nodes and reject them on handshake --- .../apache/ignite/IgniteSystemProperties.java | 3 ++ .../ignite/spi/discovery/tcp/ServerImpl.java | 43 +++++++++++++------ 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 4294c713960f5..8d0712c2412f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -458,6 +458,9 @@ public final class IgniteSystemProperties { /** Maximum size for discovery messages history. */ public static final String IGNITE_DISCOVERY_HISTORY_SIZE = "IGNITE_DISCOVERY_HISTORY_SIZE"; + /** Maximum size for recently failed nodes history. */ + public static final String IGNITE_FAILED_NODES_HISTORY_SIZE = "IGNITE_FAILED_NODES_HISTORY_SIZE"; + /** Maximum number of discovery message history used to support client reconnect. */ public static final String IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE = "IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE"; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index e397903ea441b..6303bff3b7025 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -139,6 +139,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2; import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_FAILED_NODES_HISTORY_SIZE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SERVICES_COMPATIBILITY_MODE; import static org.apache.ignite.IgniteSystemProperties.getInteger; @@ -207,6 +208,9 @@ class ServerImpl extends TcpDiscoveryImpl { @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private StatisticsPrinter statsPrinter; + private final Set recentFailedNodeIds = new GridBoundedLinkedHashSet<>(getInteger( + IGNITE_FAILED_NODES_HISTORY_SIZE, 1000)); + /** Failed nodes (but still in topology). */ private final Map failedNodes = new HashMap<>(); @@ -530,6 +534,7 @@ else if (log.isInfoEnabled()) { // Clear stored data. leavingNodes.clear(); failedNodes.clear(); + recentFailedNodeIds.clear(); spiState = DISCONNECTED; } @@ -2007,13 +2012,18 @@ private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) { } synchronized (mux) { + if (recentFailedNodeIds.contains(sndId)) { + if (log.isDebugEnabled()) + log.debug("Ignore message failed nodes, sender node was recently failed [nodeId=" + + sndId + ']'); + return; + } for (TcpDiscoveryNode failedNode : failedNodes.keySet()) { if (failedNode.id().equals(sndId)) { if (log.isDebugEnabled()) { - log.debug("Ignore message failed nodes, sender node is in fail list [nodeId=" + sndId + - ", failedNodes=" + msgFailedNodes + ']'); + log.debug("Ignore message failed nodes, sender node is in fail list [nodeId=" + + sndId + ", failedNodes=" + msgFailedNodes + ']'); } - return; } } @@ -2028,6 +2038,8 @@ private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) { boolean added = false; synchronized (mux) { + recentFailedNodeIds.add(failedNode.id()); + if (!failedNodes.containsKey(failedNode)) { failedNodes.put(failedNode, msg.senderNodeId() != null ? msg.senderNodeId() : getLocalNodeId()); @@ -2707,15 +2719,7 @@ private void initConnectionCheckFrequency() { spi.stats.onMessageProcessingStarted(msg); - if (!(msg instanceof TcpDiscoveryNodeAddedMessage) && - !(msg instanceof TcpDiscoveryJoinRequestMessage) && - msg.senderNodeId() != null && ring.node(msg.senderNodeId()) == null) { - - U.warn(ServerImpl.this.log, "Ignore message from unknown node [senderNodeId=" + msg.senderNodeId() - + ", msg=" + msg + ']'); - } - - else if (msg instanceof TcpDiscoveryJoinRequestMessage) + if (msg instanceof TcpDiscoveryJoinRequestMessage) processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg); else if (msg instanceof TcpDiscoveryClientReconnectMessage) { @@ -3295,6 +3299,8 @@ else if (e instanceof SocketTimeoutException || synchronized (mux) { for (TcpDiscoveryNode failedNode : failedNodes) { + recentFailedNodeIds.add(failedNode.id()); + if (!ServerImpl.this.failedNodes.containsKey(failedNode)) ServerImpl.this.failedNodes.put(failedNode, locNodeId); } @@ -4742,7 +4748,8 @@ private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) { assert creatorId != null : msg; synchronized (mux) { - contains = failedNodes.containsKey(sndNode) || ring.node(creatorId) == null; + contains = recentFailedNodeIds.contains(sndId) || failedNodes.containsKey(sndNode) || + ring.node(creatorId) == null; } if (contains) { @@ -4775,6 +4782,8 @@ private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) { if (!skipUpdateFailedNodes) { synchronized (mux) { + recentFailedNodeIds.add(failedNodeId); + if (!failedNodes.containsKey(failedNode)) failedNodes.put(failedNode, msg.senderNodeId() != null ? msg.senderNodeId() : getLocalNodeId()); } @@ -5853,6 +5862,14 @@ else if (log.isDebugEnabled()) this.nodeId = nodeId; synchronized (mux) { + if (recentFailedNodeIds.contains(nodeId)) { + if (log.isInfoEnabled()) + log.info("Ignore handshake request from recently failed node [nodeId=" + nodeId + ']'); + + U.closeQuiet(sock); + + return; + } for (TcpDiscoveryNode n : failedNodes.keySet()) { if (n.id().equals(nodeId)) { if (log.isInfoEnabled()) From 9f6f89cfe6c21e0dd11f66c095c2e38efc7e4afd Mon Sep 17 00:00:00 2001 From: Alexandr Kuramshin Date: Tue, 31 Oct 2017 01:52:35 +0700 Subject: [PATCH 04/12] Added TcpDiscoverySplitTest, updated IgniteCacheTopologySplitAbstractTest Early previous node fail with more reliable connection check (keep-alive) algorithm Fast node failed message transmission, reduced split detection & exchange delay Remember recently failed nodes and say them that the next is failed upon handshake --- .../ignite/spi/discovery/tcp/ClientImpl.java | 8 +++ .../ignite/spi/discovery/tcp/ServerImpl.java | 61 +++++++++++++++---- 2 files changed, 57 insertions(+), 12 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 139c11049ecc5..734d530bab435 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -57,6 +57,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.IgniteNeedReconnectException; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -636,6 +637,13 @@ else if (addrs.isEmpty()) { TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0); + if (res.failedNodes() != null) { + if (log.isDebugEnabled()) + log.debug("Handshake response from failed node: " + res); + + throw new IgniteNeedReconnectException(locNode, null); + } + UUID rmtNodeId = res.creatorNodeId(); assert rmtNodeId != null; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 6303bff3b7025..935e9dff98337 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1201,10 +1201,12 @@ else if (U.currentTimeMillis() - noResStart > spi.joinTimeout) TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk( ackTimeout0)); + processMessageFailedNodes(res); + if (msg instanceof TcpDiscoveryJoinRequestMessage) { boolean ignore = false; - synchronized (failedNodes) { + synchronized (mux) { for (TcpDiscoveryNode failedNode : failedNodes.keySet()) { if (failedNode.id().equals(res.creatorNodeId())) { if (log.isDebugEnabled()) @@ -2001,12 +2003,16 @@ private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) { if (msgFailedNodes != null) { UUID sndId = msg.senderNodeId(); + if (sndId == null) + sndId = msg.creatorNodeId(); + if (sndId != null) { if (ring.node(sndId) == null) { if (log.isDebugEnabled()) { log.debug("Ignore message failed nodes, sender node is not alive [nodeId=" + sndId + ", failedNodes=" + msgFailedNodes + ']'); } + msg.failedNodes(null); return; } @@ -2016,6 +2022,9 @@ private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) { if (log.isDebugEnabled()) log.debug("Ignore message failed nodes, sender node was recently failed [nodeId=" + sndId + ']'); + + msg.failedNodes(null); + return; } for (TcpDiscoveryNode failedNode : failedNodes.keySet()) { @@ -2024,6 +2033,9 @@ private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) { log.debug("Ignore message failed nodes, sender node is in fail list [nodeId=" + sndId + ", failedNodes=" + msgFailedNodes + ']'); } + + msg.failedNodes(null); + return; } } @@ -2043,6 +2055,10 @@ private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) { if (!failedNodes.containsKey(failedNode)) { failedNodes.put(failedNode, msg.senderNodeId() != null ? msg.senderNodeId() : getLocalNodeId()); + if (msg instanceof TcpDiscoveryHandshakeResponse) + msgWorker.addMessage(new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), + failedNode.id(), failedNode.internalOrder())); + added = true; } } @@ -2969,6 +2985,15 @@ else if (log.isTraceEnabled()) TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + processMessageFailedNodes(res); + + if (res.failedNodes() != null) { + if (log.isDebugEnabled()) + log.debug("Handshake response from failed node: " + res); + + break addr; + } + if (locNodeId.equals(res.creatorNodeId())) { if (log.isDebugEnabled()) log.debug("Handshake response from local node: " + res); @@ -4928,6 +4953,14 @@ private void processStatusCheckMessage(final TcpDiscoveryStatusCheckMessage msg) sendMessageAcrossRing(msg); } else { + synchronized (mux) { + if (recentFailedNodeIds.contains(msg.creatorNodeId())) { + if (log.isDebugEnabled()) + log.debug("Status check message discarded (creator node was recently failed)."); + + return; + } + } // Sender is not in topology, it should reconnect. msg.status(STATUS_RECON); @@ -5861,30 +5894,34 @@ else if (log.isDebugEnabled()) this.nodeId = nodeId; + boolean failed = false; + synchronized (mux) { if (recentFailedNodeIds.contains(nodeId)) { if (log.isInfoEnabled()) - log.info("Ignore handshake request from recently failed node [nodeId=" + nodeId + ']'); - - U.closeQuiet(sock); + log.info("Handshake request from recently failed node [nodeId=" + nodeId + ']'); - return; + failed = true; } - for (TcpDiscoveryNode n : failedNodes.keySet()) { - if (n.id().equals(nodeId)) { - if (log.isInfoEnabled()) - log.info("Ignore handshake request from failed node [nodeId=" + nodeId + ']'); + if (!failed) + for (TcpDiscoveryNode n : failedNodes.keySet()) { + if (n.id().equals(nodeId)) { + if (log.isInfoEnabled()) + log.info("Handshake request from failed node [nodeId=" + nodeId + ']'); - U.closeQuiet(sock); + failed = true; - return; + break; + } } - } } TcpDiscoveryHandshakeResponse res = new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder()); + if (failed) + res.addFailedNode(locNodeId); + if (req.client()) res.clientAck(true); From 0f1c960a22d6e66d257008218fd40c37caca8e3c Mon Sep 17 00:00:00 2001 From: Alexandr Kuramshin Date: Tue, 31 Oct 2017 11:06:16 +0700 Subject: [PATCH 05/12] Fix connection check and node add finished race --- .../org/apache/ignite/spi/discovery/tcp/ServerImpl.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 935e9dff98337..6b20288f968ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -3011,6 +3011,13 @@ else if (log.isTraceEnabled()) long nextOrder = res.order(); + if (nextOrder == 0 && msg instanceof TcpDiscoveryConnectionCheckMessage) { + if (log.isDebugEnabled()) + log.debug("Skip connection check, next node is still initializing"); + + return; + } + if (!next.id().equals(nextId)) { // Node with different ID has bounded to the same port. if (log.isDebugEnabled()) From 934c10b691abec8b2426326162c2b2d9b64bfd5e Mon Sep 17 00:00:00 2001 From: Alexandr Kuramshin Date: Wed, 1 Nov 2017 12:12:48 +0700 Subject: [PATCH 06/12] Added TcpDiscoverySplitTest, updated IgniteCacheTopologySplitAbstractTest Early previous node fail with more reliable connection check (keep-alive) algorithm Fast node failed message transmission, reduced split detection & exchange delay Remember recently failed nodes and simulate the local node failure for the node being connected upon handshake Detect the local node freeze and do not fail previous node in that case of connection check timeout --- .../ignite/spi/discovery/tcp/ClientImpl.java | 7 -- .../ignite/spi/discovery/tcp/ServerImpl.java | 66 ++++++++++++++----- 2 files changed, 48 insertions(+), 25 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 734d530bab435..408e26fca9ae6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -637,13 +637,6 @@ else if (addrs.isEmpty()) { TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0); - if (res.failedNodes() != null) { - if (log.isDebugEnabled()) - log.debug("Handshake response from failed node: " + res); - - throw new IgniteNeedReconnectException(locNode, null); - } - UUID rmtNodeId = res.creatorNodeId(); assert rmtNodeId != null; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 6b20288f968ba..580de56788507 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1661,6 +1661,9 @@ private void clearNodeAddedMessage(TcpDiscoveryAbstractMessage msg) { U.interrupt(tmp); U.joinThreads(tmp, log); + U.interrupt(connChecker); + U.join(connChecker, log); + U.interrupt(msgWorker); U.join(msgWorker, log); @@ -2050,7 +2053,8 @@ private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) { boolean added = false; synchronized (mux) { - recentFailedNodeIds.add(failedNode.id()); + if (!failedNode.isClient()) + recentFailedNodeIds.add(failedNode.id()); if (!failedNodes.containsKey(failedNode)) { failedNodes.put(failedNode, msg.senderNodeId() != null ? msg.senderNodeId() : getLocalNodeId()); @@ -2597,6 +2601,9 @@ private class RingMessageWorker extends MessageWorkerAdapter Date: Thu, 2 Nov 2017 12:32:00 +0700 Subject: [PATCH 08/12] CacheLateAffinityAssignmentTest.checkCaches() fix --- .../cache/distributed/CacheLateAffinityAssignmentTest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java index ab07611792c7a..e493b5266f4d3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java @@ -104,6 +104,8 @@ import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.internal.processors.cache.ExchangeContext.IGNITE_EXCHANGE_COMPATIBILITY_VER_1; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; /** * @@ -2469,7 +2471,9 @@ private void checkCaches() { cache.put(key, val); - assertEquals(val, cache.get(key)); + try(Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + assertEquals(val, cache.get(key)); + } cache.remove(key); From e182d5e8eeec8a73655966847a8c44d8eb6fa181 Mon Sep 17 00:00:00 2001 From: Alexandr Kuramshin Date: Thu, 2 Nov 2017 14:09:44 +0700 Subject: [PATCH 09/12] CacheLateAffinityAssignmentTest.checkCaches() fix --- .../cache/distributed/CacheLateAffinityAssignmentTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java index e493b5266f4d3..ec4cad4993180 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java @@ -2477,7 +2477,9 @@ private void checkCaches() { cache.remove(key); - assertNull(cache.get(key)); + try(Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + assertNull(cache.get(key)); + } } } catch (Exception e) { From 15578ee8bdf0db6b877306e272c81bddd5069f46 Mon Sep 17 00:00:00 2001 From: Alexandr Kuramshin Date: Tue, 7 Nov 2017 03:16:15 +0700 Subject: [PATCH 10/12] Independent asynchronous connection checkers for the previous node and the next one. --- .../spi/IgniteSpiOperationTimeoutHelper.java | 34 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 388 +++++++++++++----- 2 files changed, 304 insertions(+), 118 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java index b2432cea99790..2163af82aa3dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java @@ -32,6 +32,12 @@ public class IgniteSpiOperationTimeoutHelper { /** */ private long lastOperStartTs; + /** */ + private long lastOperFinishTs; + + /** */ + private boolean overtime; + /** */ private long timeout; @@ -65,16 +71,20 @@ public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean srvOp) * this {@code IgniteSpiOperationTimeoutController}. */ public long nextTimeoutChunk(long dfltTimeout) throws IgniteSpiOperationTimeoutException { - if (!failureDetectionTimeoutEnabled) + if (!failureDetectionTimeoutEnabled) { + lastOperFinishTs = U.currentTimeMillis() + dfltTimeout; + return dfltTimeout; + } + + long curTs = U.currentTimeMillis(); if (lastOperStartTs == 0) { timeout = failureDetectionTimeout; - lastOperStartTs = U.currentTimeMillis(); + + lastOperStartTs = curTs; } else { - long curTs = U.currentTimeMillis(); - timeout = timeout - (curTs - lastOperStartTs); lastOperStartTs = curTs; @@ -84,10 +94,26 @@ public long nextTimeoutChunk(long dfltTimeout) throws IgniteSpiOperationTimeoutE "'failureDetectionTimeout' configuration property [failureDetectionTimeout=" + failureDetectionTimeout + ']'); } + lastOperFinishTs = curTs + timeout; return timeout; } + /** + * Returns difference between current time and expected last operation finish time. + * + * @param threshold threshold for a time difference + * @return Time difference if it was above threshold. + */ + public boolean checkOvertime(long threshold) { + long overtime = U.currentTimeMillis() - lastOperFinishTs; + + if (overtime > threshold) + this.overtime = true; + + return this.overtime; + } + /** * Checks whether the given {@link Exception} is generated because failure detection timeout has been reached. * diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 580de56788507..c3ae1fb524758 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -55,7 +55,6 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.SSLException; import javax.net.ssl.SSLServerSocket; @@ -195,7 +194,11 @@ class ServerImpl extends TcpDiscoveryImpl { @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private RingMessageWorker msgWorker; - private ConnChecker connChecker; + /** Continuously checks connection from a previous node */ + private PrevNodeConnChecker prevNodeConnChecker; + + /** Keeps connection to a next node be alive */ + private NextNodeConnChecker nextNodeConnChecker; /** Client message workers. */ protected ConcurrentMap clientMsgWorkers = new ConcurrentHashMap8<>(); @@ -251,8 +254,17 @@ class ServerImpl extends TcpDiscoveryImpl { private final ConcurrentMap>> pingMap = new ConcurrentHashMap8<>(); - /** Time when the last message from previous node was received. */ - private final AtomicLong lastTimePrevNodeRcvd = new AtomicLong(); + /** Connection check frequency. */ + private long connCheckFreq; + + /** Connection check threshold. */ + private long connCheckThreshold; + + /** Last time when a message from the previous node was received. */ + private long lastPrevNodeTime; + + /** Last time when a message to the next node was sent. */ + private volatile long lastNextNodeTime; /** * @param adapter Adapter. @@ -338,11 +350,16 @@ class ServerImpl extends TcpDiscoveryImpl { fromAddrs.clear(); noResAddrs.clear(); + initConnectionCheckFrequency(); + msgWorker = new RingMessageWorker(); msgWorker.start(); - connChecker = new ConnChecker(); - connChecker.start(); + prevNodeConnChecker = new PrevNodeConnChecker(); + prevNodeConnChecker.start(); + + nextNodeConnChecker = new NextNodeConnChecker(); + nextNodeConnChecker.start(); if (tcpSrvr == null) tcpSrvr = new TcpServer(); @@ -467,8 +484,11 @@ else if (log.isInfoEnabled()) { U.interrupt(ipFinderCleaner); U.join(ipFinderCleaner, log); - U.interrupt(connChecker); - U.join(connChecker, log); + U.interrupt(prevNodeConnChecker); + U.join(prevNodeConnChecker, log); + + U.interrupt(nextNodeConnChecker); + U.join(nextNodeConnChecker, log); U.interrupt(msgWorker); U.join(msgWorker, log); @@ -1661,8 +1681,11 @@ private void clearNodeAddedMessage(TcpDiscoveryAbstractMessage msg) { U.interrupt(tmp); U.joinThreads(tmp, log); - U.interrupt(connChecker); - U.join(connChecker, log); + U.interrupt(prevNodeConnChecker); + U.join(prevNodeConnChecker, log); + + U.interrupt(nextNodeConnChecker); + U.join(nextNodeConnChecker, log); U.interrupt(msgWorker); U.join(msgWorker, log); @@ -2053,9 +2076,6 @@ private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) { boolean added = false; synchronized (mux) { - if (!failedNode.isClient()) - recentFailedNodeIds.add(failedNode.id()); - if (!failedNodes.containsKey(failedNode)) { failedNodes.put(failedNode, msg.senderNodeId() != null ? msg.senderNodeId() : getLocalNodeId()); @@ -2075,13 +2095,40 @@ private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) { } } + /** + * Initializes connection check frequency. Used only when failure detection timeout is enabled. + */ + private void initConnectionCheckFrequency() { + if (spi.failureDetectionTimeoutEnabled()) + connCheckThreshold = spi.failureDetectionTimeout(); + else + connCheckThreshold = Math.min(spi.getSocketTimeout(), spi.metricsUpdateFreq); + + for (int i = 3; i > 0; i--) { + connCheckFreq = connCheckThreshold / i; + + if (connCheckFreq > 10) + break; + } + + assert connCheckFreq > 0; + + if (log.isDebugEnabled()) + log.debug("Connection check frequency is calculated: " + connCheckFreq); + } + /** */ - protected void updateLastTimePrevNodeRcvd(TcpDiscoveryAbstractMessage msg) { + private void updatePrevNodeTime(TcpDiscoveryAbstractMessage msg) { if (msg.senderNodeId() != null) { - TcpDiscoveryNode prevNode = ring.prevNode(); + synchronized (mux) { + TcpDiscoveryNode prevNode = ring.prevNode(failedNodes.keySet()); + + if (prevNode != null && prevNode.id().equals(msg.senderNodeId())) { + lastPrevNodeTime = U.currentTimeMillis(); - if (prevNode != null && prevNode.id().equals(msg.senderNodeId())) - lastTimePrevNodeRcvd.set(U.currentTimeMillis()); + mux.notifyAll(); + } + } } } @@ -2592,24 +2639,13 @@ private class RingMessageWorker extends MessageWorkerAdapter 0; i--) { - connCheckFreq = connCheckThreshold / i; - - if (connCheckFreq > 10) - break; - } - - assert connCheckFreq > 0; - - if (log.isDebugEnabled()) - log.debug("Connection check frequency is calculated: " + connCheckFreq); - } - /** * @param msg Message to process. */ @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { spi.startMessageProcess(msg); - checkConnection(); - sendMetricsUpdateMessage(); DebugLogger log = messageLogger(msg); @@ -2717,12 +2729,8 @@ private void initConnectionCheckFrequency() { boolean ensured = spi.ensured(msg); - if (!locNode.id().equals(msg.senderNodeId())) { - updateLastTimePrevNodeRcvd(msg); - - if (ensured) - lastRingMsgTime = U.currentTimeMillis(); - } + if (!locNode.id().equals(msg.senderNodeId()) && ensured) + lastRingMsgTime = U.currentTimeMillis(); if (locNode.internalOrder() == 0) { boolean proc = false; @@ -2742,6 +2750,8 @@ private void initConnectionCheckFrequency() { spi.stats.onMessageProcessingStarted(msg); + processMessageFailedNodes(msg); + if (msg instanceof TcpDiscoveryJoinRequestMessage) processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg); @@ -2881,8 +2891,6 @@ private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) { private void sendMessageAcrossRing0(TcpDiscoveryAbstractMessage msg) { assert msg != null; - assert ring.hasRemoteNodes(); - for (IgniteInClosure msgLsnr : spi.sndMsgLsnrs) msgLsnr.apply(msg); @@ -2989,9 +2997,13 @@ else if (log.isTraceEnabled()) spi.writeToSocket(sock, out, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + timeoutHelper.checkOvertime(connCheckFreq); + TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + timeoutHelper.checkOvertime(connCheckFreq); + processMessageFailedNodes(res); if (res.failedNodes() != null) { @@ -3088,6 +3100,14 @@ else if (log.isTraceEnabled()) onException("Failed to connect to next node [node=" + next + ", msg=" + msg + ", err=" + e.getMessage() + ']', e); + if (timeoutHelper.checkOvertime(connCheckFreq)) { + U.warn(log, "Local node was frozen. Will reconnect to the next node"); + + timeoutHelper = null; + + continue; + } + if (!openSock) break; // Don't retry if we can not establish connection. @@ -3155,6 +3175,8 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof try { spi.writeToSocket(sock, out, pendingMsg, timeoutHelper.nextTimeoutChunk( spi.getSocketTimeout())); + + timeoutHelper.checkOvertime(connCheckFreq); } finally { clearNodeAddedMessage(pendingMsg); @@ -3164,6 +3186,8 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + timeoutHelper.checkOvertime(connCheckFreq); + spi.stats.onMessageSent(pendingMsg, tstamp0 - tstamp); if (log.isDebugEnabled()) @@ -3213,10 +3237,14 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + timeoutHelper.checkOvertime(connCheckFreq); + long tstamp0 = U.currentTimeMillis(); int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + timeoutHelper.checkOvertime(connCheckFreq); + if (latencyCheck && log.isInfoEnabled()) log.info("Latency check message has been acked: " + msg.id()); @@ -3263,6 +3291,14 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']', e); + if (timeoutHelper.checkOvertime(connCheckFreq)) { + U.warn(log, "Local node was frozen. Will reconnect to the next node"); + + timeoutHelper = null; + + continue; + } + if (timeoutHelper.checkFailureTimeoutReached(e)) break; @@ -3322,6 +3358,8 @@ else if (e instanceof SocketTimeoutException || break; } + lastNextNodeTime = sent ? U.currentTimeMillis() : 0; + synchronized (mux) { failedNodes.removeAll(ServerImpl.this.failedNodes.keySet()); } @@ -3338,8 +3376,6 @@ else if (e instanceof SocketTimeoutException || synchronized (mux) { for (TcpDiscoveryNode failedNode : failedNodes) { - recentFailedNodeIds.add(failedNode.id()); - if (!ServerImpl.this.failedNodes.containsKey(failedNode)) ServerImpl.this.failedNodes.put(failedNode, locNodeId); } @@ -4092,7 +4128,8 @@ private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { UUID locNodeId = getLocalNodeId(); - if (isLocalNodeCoordinator()) { + boolean coord = isLocalNodeCoordinator(); + if (coord) { if (msg.verified()) { spi.stats.onRingMessageReceived(msg); @@ -4154,7 +4191,7 @@ else if (!locNodeId.equals(node.id()) && ring.node(node.id()) != null) { joiningNodes.add(node.id()); } - if (!isLocalNodeCoordinator() && spi.nodeAuth != null && spi.nodeAuth.isGlobalNodeAuthentication()) { + if (!coord && spi.nodeAuth != null && spi.nodeAuth.isGlobalNodeAuthentication()) { boolean authFailed = true; try { @@ -4240,6 +4277,15 @@ else if (!locNodeId.equals(node.id()) && ring.node(node.id()) != null) { if (topChanged) { assert !node.visible() : "Added visible node [node=" + node + ", locNode=" + locNode + ']'; + if (coord) { + lastPrevNodeTime = 0; + + if (log.isDebugEnabled()) + log.debug("Waiting for the first message from the newly added node"); + } + else if (node.equals(ring.nextNode())) + lastNextNodeTime = 0; + DiscoveryDataPacket dataPacket = msg.gridDiscoveryData(); assert dataPacket != null : msg; @@ -4826,9 +4872,6 @@ private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) { if (!skipUpdateFailedNodes) { synchronized (mux) { - if (!failedNode.isClient()) - recentFailedNodeIds.add(failedNodeId); - if (!failedNodes.containsKey(failedNode)) failedNodes.put(failedNode, msg.senderNodeId() != null ? msg.senderNodeId() : getLocalNodeId()); } @@ -4919,8 +4962,15 @@ private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) { spi.stats.onNodeFailed(); } - if (sendMessageToRemotes(msg)) + if (sendMessageToRemotes(msg)) { sendMessageAcrossRing(msg); + + synchronized (mux) { + recentFailedNodeIds.add(failedNodeId); + + mux.notifyAll(); + } + } else { if (log.isDebugEnabled()) log.debug("Unable to send message across the ring (topology has no remote nodes): " + msg); @@ -5598,71 +5648,179 @@ private void checkConnection() { failureThresholdReached = true; } + } + } + + /** Continuously checks connection of the local node */ + protected abstract class ConnChecker extends IgniteSpiThread { + + /** Next time when connection to the previous node should be checked. */ + protected long nextCheckTime; + + /** + **/ + protected ConnChecker(String name) { + super(spi.ignite().name(), name, log); + } - long lastTimeConn = lastTimeConnectionChecked; + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + try { + while (!isInterrupted()) { + if (exiting()) + return; - lastTimeConnectionChecked = U.currentTimeMillis(); + if (nextCheckTime == 0) + nextCheckTime = U.currentTimeMillis(); - if (lastTimeConn > 0) { - long elapsed = U.currentTimeMillis() - lastTimeConn; + nextCheckTime = checkConnection(); - if (elapsed > connCheckThreshold && spiStateCopy() == CONNECTED && ring.hasRemoteServerNodes()) { - lastTimePrevNodeRcvd.set(0); + long remain = nextCheckTime == 0 ? connCheckFreq : (nextCheckTime - U.currentTimeMillis()); - U.warn(log, "Local node operation was frozen and might be failed [elapsed=" + elapsed + - ", connCheckThreshold=" + connCheckThreshold); + if (remain > 0) + synchronized (mux) { + mux.wait(remain); + } + } + } + catch (InterruptedException ignore) { + } + finally { + if (exiting()) { + if (log.isDebugEnabled()) + log.debug(getName() + " thread exiting"); } + else + U.error(log, getName() + " thread exit abnormally"); } + } + + /** + * Whether the thread should exit or not + * + * @return thread exiting flag + **/ + private boolean exiting() { + return spiStateCopy() == DISCONNECTING || spi.isNodeStopping0(); + } + + /** + * Perform connection checking + * + * @return time when the next check should be performed + */ + protected abstract long checkConnection(); + } + + /** Continuously checks connection from a previous node */ + private class PrevNodeConnChecker extends ConnChecker { + + /** + **/ + public PrevNodeConnChecker() { + super("tcp-disco-prev-node-conn-checker"); + } + + /** {@inheritDoc} */ + @Override protected long checkConnection() { + synchronized (mux) { + if (lastPrevNodeTime > 0) { + TcpDiscoveryNode prevNode = ring.prevNode(failedNodes.keySet()); + + if (prevNode == null) { + lastPrevNodeTime = 0; + + return 0; + } + long currTimeMillis = U.currentTimeMillis(); + + boolean checkLost = currTimeMillis - nextCheckTime > connCheckFreq; + + if (checkLost) { + U.warn(log, "Local node was frozen and has lost connection check from the previous one " + + "(status check will be initiated)"); + + msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(spi.locNode, null)); + } + + long elapsed = currTimeMillis - lastPrevNodeTime; - long lastTimeRcvd = lastTimePrevNodeRcvd.get(); + if (elapsed > connCheckThreshold) { + if (checkLost) { + lastPrevNodeTime = currTimeMillis; - if (lastTimeRcvd > 0) { - long elapsed = U.currentTimeMillis() - lastTimeRcvd; + return currTimeMillis + connCheckThreshold; + } + U.warn(log, "No message from previous node in configured timeout [prevNode=" + + prevNode + " ,timeout=" + connCheckThreshold + ']'); - if (elapsed > connCheckThreshold) { + TcpDiscoveryNodeFailedMessage msg = new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), prevNode.id(), + prevNode.internalOrder()); - TcpDiscoveryNode prevNode = ring.prevNode(); + // Eliminating race between processing NodeFailed and receiving a message from the failed prev node. + if (!failedNodes.containsKey(prevNode)) { + failedNodes.put(prevNode, getLocalNodeId()); - if (prevNode != null) { - lastTimePrevNodeRcvd.set(0); + if (log.isDebugEnabled()) + log.debug("Added previous node to failed nodes list [node=" + prevNode + ", msg=" + msg + ']'); + } - U.warn(log, "No message from previous node in configured timeout [prevNode=" + prevNode + - " ,timeout=" + connCheckThreshold + ']'); + lastPrevNodeTime = 0; + + msgWorker.addMessage(msg); - msgWorker.addMessage(new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), prevNode.id(), - prevNode.internalOrder())); + return 0; } + return lastPrevNodeTime + connCheckThreshold; } } + + return 0; } } /** Keeps connection to a next node be alive */ - private class ConnChecker extends IgniteSpiThread { + private class NextNodeConnChecker extends ConnChecker { /** **/ - public ConnChecker() { - super(spi.ignite().name(), "tcp-disco-conn-checker", log); + public NextNodeConnChecker() { + super("tcp-disco-next-node-conn-checker"); } /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - while (!isInterrupted()) { - Thread.sleep(msgWorker.connCheckFreq); + @Override protected long checkConnection() { + long nextNodeTime = lastNextNodeTime; - if (spiStateCopy() == DISCONNECTING || spi.isNodeStopping0()) - return; + if (nextNodeTime > 0) { + long currentTimeMillis = U.currentTimeMillis(); - if (ring.hasRemoteServerNodes()) - msgWorker.sendMessageAcrossRing(new TcpDiscoveryConnectionCheckMessage(locNode)); - } - if (spiStateCopy() == DISCONNECTING || spi.isNodeStopping0()) { - if (log.isDebugEnabled()) - log.debug("ConnChecker thread exiting"); + if (currentTimeMillis - nextNodeTime > connCheckThreshold) { + U.warn(log, "Local node was frozen for a long time and will be failed"); + + notifyDiscovery(EVT_NODE_SEGMENTED, ring.topologyVersion(), locNode); + + lastNextNodeTime = 0; + + return 0; + } + long nextCheckTime = nextNodeTime + connCheckFreq; + + long remain = nextCheckTime - currentTimeMillis; + + if (remain >= 100) + return nextCheckTime; + + if (!ring.hasRemoteServerNodes()) { + lastNextNodeTime = 0; + + return 0; + } + + msgWorker.sendMessageAcrossRing(new TcpDiscoveryConnectionCheckMessage(locNode)); } - else - U.error(log, "ConnChecker thread exit abnormally, the local node will be failed by the next node"); + + return 0; } } @@ -5929,7 +6087,7 @@ else if (log.isDebugEnabled()) this.nodeId = nodeId; - boolean failed = false; + boolean rmtNodeFailed = false; if (srvSock) { synchronized (mux) { @@ -5937,15 +6095,19 @@ else if (log.isDebugEnabled()) if (log.isInfoEnabled()) log.info("Handshake request from recently failed node [nodeId=" + nodeId + ']'); - failed = true; + rmtNodeFailed = true; } - if (!failed) + if (!rmtNodeFailed) for (TcpDiscoveryNode n : failedNodes.keySet()) { if (n.id().equals(nodeId)) { if (log.isInfoEnabled()) log.info("Handshake request from failed node [nodeId=" + nodeId + ']'); - failed = true; + rmtNodeFailed = true; + + // Ensure the next node knows about the failed node + while(!recentFailedNodeIds.contains(nodeId)) + mux.wait(); break; } @@ -5956,7 +6118,7 @@ else if (log.isDebugEnabled()) TcpDiscoveryHandshakeResponse res = new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder()); - if (failed) + if (rmtNodeFailed) res.addFailedNode(locNodeId); if (req.client()) @@ -6098,9 +6260,10 @@ else if (e.hasCause(ObjectStreamException.class) || if (debugMode && recordable(msg)) debugLog(msg, "Message has been received: " + msg); - if (msg instanceof TcpDiscoveryConnectionCheckMessage) { - updateLastTimePrevNodeRcvd(msg); + if(srvSock && locNode.internalOrder() > 0) + updatePrevNodeTime(msg); + if (msg instanceof TcpDiscoveryConnectionCheckMessage) { spi.writeToSocket(msg, sock, RES_OK, sockTimeout); continue; @@ -6288,11 +6451,8 @@ else if (msg instanceof TcpDiscoveryRingLatencyCheckMessage) { if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage) metricsUpdateMsg = (TcpDiscoveryClientMetricsUpdateMessage)msg; - else { - processMessageFailedNodes(msg); - + else msgWorker.addMessage(msg); - } // Send receipt back. if (clientMsgWrk != null) { From 993ccbeaf141c89a67560010458ffe0864809254 Mon Sep 17 00:00:00 2001 From: Alexandr Kuramshin Date: Tue, 7 Nov 2017 05:10:13 +0700 Subject: [PATCH 11/12] Independent asynchronous connection checkers for the previous node and the next one. --- .../ignite/spi/discovery/tcp/ServerImpl.java | 59 ++++++++++--------- 1 file changed, 31 insertions(+), 28 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index c3ae1fb524758..d138ccb637d0f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2102,7 +2102,7 @@ private void initConnectionCheckFrequency() { if (spi.failureDetectionTimeoutEnabled()) connCheckThreshold = spi.failureDetectionTimeout(); else - connCheckThreshold = Math.min(spi.getSocketTimeout(), spi.metricsUpdateFreq); + connCheckThreshold = spi.getSocketTimeout(); for (int i = 3; i > 0; i--) { connCheckFreq = connCheckThreshold / i; @@ -2640,7 +2640,7 @@ private class RingMessageWorker extends MessageWorkerAdapter 0) + if (elapsed < metricsCheckFreq) return; msgWorker.addMessage(new TcpDiscoveryStatusCheckMessage(locNode, null)); - lastTimeStatusMsgSent = U.currentTimeMillis(); + lastMessageUpdateTime = U.currentTimeMillis(); } /** From a86c26b29c0c34e1c9205a18b783c223daf558ef Mon Sep 17 00:00:00 2001 From: Alexandr Kuramshin Date: Tue, 7 Nov 2017 06:37:11 +0700 Subject: [PATCH 12/12] TcpDiscoverySelfTest update --- .../ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index d6d484ca5956c..0f09b0562ee9a 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -2264,12 +2264,6 @@ else if (locNode.order() == errNodeOrder) { } log.info("Stop sleep on message send: " + msg); - - if (node.equals(errNext)) { - log.info("Fail write after sleep [node=" + node.id() + ", msg=" + msg + ']'); - - throw new SocketTimeoutException(); - } } } } @@ -2605,7 +2599,7 @@ public TestMessageWorkerFailureSpi1(int failureMode) { throw new RuntimeException("Failing ring message worker explicitly"); else { try { - Thread.sleep(5_000); + Thread.sleep(timeout); } catch (InterruptedException ignored) { // No-op.