From bfb00b6e61f9709718c30971997aeb0ac79e86b4 Mon Sep 17 00:00:00 2001 From: Alexandr Kuramshin Date: Fri, 18 Nov 2016 23:12:28 +0300 Subject: [PATCH 1/9] IgniteTcpCommunicationBigClusterTest added --- .../IgniteTcpCommunicationBigClusterTest.java | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100755 modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java new file mode 100755 index 0000000000000..9d99f9f14a3b2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java @@ -0,0 +1,100 @@ +package org.apache.ignite.spi.communication.tcp; + +import org.apache.ignite.Ignite; +import org.apache.ignite.Ignition; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import java.util.Arrays; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Testing {@link TcpCommunicationSpi} under big cluster conditions (long DiscoverySpi delivery) + * + * @author Alexandr Kuramshin + */ +public class IgniteTcpCommunicationBigClusterTest extends GridCommonAbstractTest { + + public static final int IGNITE_NODES_NUMBER = 5; + + public static final long NODE_ADDED_MESSAGE_DELAY = 1_000L; + + public static final long BROADCAST_PERIOD = 100L; + + /** */ + private static IgniteConfiguration config(String gridName) { + IgniteConfiguration cfg = new IgniteConfiguration(); + cfg.setGridName(gridName); + cfg.setPeerClassLoadingEnabled(false); + + TcpDiscoverySpi discovery = new SlowTcpDiscoverySpi(); + TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder(); + ipFinder.setAddresses(Arrays.asList("127.0.0.1:47500..47510")); + discovery.setIpFinder(ipFinder); + cfg.setDiscoverySpi(discovery); + + /*CacheConfiguration cacheCache = new CacheConfiguration(); + cacheCache.setName("cache"); + cacheCache.setCacheMode(CacheMode.PARTITIONED); + cacheCache.setBackups(0); + cacheCache.setAtomicityMode(CacheAtomicityMode.ATOMIC);*/ + + /** ONHEAP_TIERED + cacheCache.setMemoryMode(CacheMemoryMode.ONHEAP_TIERED); + cacheCache.setOffHeapMaxMemory(0); */ + + /** OFFHEAP_TIERED + cacheCache.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED); + cacheCache.setOffHeapMaxMemory(512L << 20); */ + + // cfg.setCacheConfiguration(cacheCache); + return cfg; + } + + public void testBigCluster() throws Exception { + final ExecutorService executorService = Executors.newCachedThreadPool(); + for (int i = 0; i < IGNITE_NODES_NUMBER; ++i) { + final int nodeIndex = i; + executorService.execute(() -> { + startNode("testBigClusterNode-" + nodeIndex); + }); + } + } + + private void startNode(String name) { + try (final Ignite ignite = Ignition.start(config(name))) { + try { + for (; ; ) { + Thread.sleep(BROADCAST_PERIOD); + ignite.compute().broadcast(() -> { + // no-op + }); + } + } + catch (Throwable ex) { + System.err.printf("Node thread exit on error: node = %s%d", name); + ex.printStackTrace(); + } + } + } + + private static class SlowTcpDiscoverySpi extends TcpDiscoverySpi { + @Override protected boolean ensured(TcpDiscoveryAbstractMessage msg) { + if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { + try { + Thread.sleep(NODE_ADDED_MESSAGE_DELAY); + } + catch (InterruptedException ex) { + System.err.println("Long delivery of TcpDiscoveryNodeAddFinishedMessage interrupted"); + ex.printStackTrace(); + } + } + return super.ensured(msg); + } + } +} From 02dd92e605b9b53f5a16c7ec5f8e7b5698b15ba4 Mon Sep 17 00:00:00 2001 From: Alexandr Kuramshin Date: Sat, 19 Nov 2016 00:55:37 +0300 Subject: [PATCH 2/9] IgniteTcpCommunicationBigClusterTest update --- .../IgniteTcpCommunicationBigClusterTest.java | 127 ++++++++++++++---- 1 file changed, 102 insertions(+), 25 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java index 9d99f9f14a3b2..55046db7fc69d 100755 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java @@ -1,17 +1,25 @@ package org.apache.ignite.spi.communication.tcp; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.Ignition; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import java.text.MessageFormat; import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; /** * Testing {@link TcpCommunicationSpi} under big cluster conditions (long DiscoverySpi delivery) @@ -20,11 +28,24 @@ */ public class IgniteTcpCommunicationBigClusterTest extends GridCommonAbstractTest { - public static final int IGNITE_NODES_NUMBER = 5; + /** */ + private static final int IGNITE_NODES_NUMBER = 10; + + /** */ + private static final long RUNNING_TIMESPAN = 10_000L; + + /** */ + private static final long MESSAGE_DELAY = 5_000L; + + /** */ + private static final long BROADCAST_PERIOD = 1000L; + + /** */ + private static final Logger LOGGER = Logger.getLogger(IgniteTcpCommunicationBigClusterTest.class.getName()); - public static final long NODE_ADDED_MESSAGE_DELAY = 1_000L; + private static final Level LOG_LEVEL = Level.SEVERE; - public static final long BROADCAST_PERIOD = 100L; + private CountDownLatch startLatch; /** */ private static IgniteConfiguration config(String gridName) { @@ -56,44 +77,100 @@ private static IgniteConfiguration config(String gridName) { return cfg; } - public void testBigCluster() throws Exception { - final ExecutorService executorService = Executors.newCachedThreadPool(); + /** */ + private static void println(String str) { + LOGGER.log(LOG_LEVEL, str); + } + + /** */ + private static void println(String str, Throwable ex) { + LOGGER.log(LOG_LEVEL, str, ex); + } + + /** */ + private static void printf(String format, Object... args) { + LOGGER.log(LOG_LEVEL, MessageFormat.format(format, args)); + } + + /** */ + private static void printf(String format, Throwable ex, Object... args) { + LOGGER.log(LOG_LEVEL, MessageFormat.format(format, args), ex); + } + + /** */ + public synchronized void testBigCluster() throws Exception { + startLatch = new CountDownLatch(IGNITE_NODES_NUMBER); + final ExecutorService execSvc = Executors.newCachedThreadPool(); for (int i = 0; i < IGNITE_NODES_NUMBER; ++i) { - final int nodeIndex = i; - executorService.execute(() -> { - startNode("testBigClusterNode-" + nodeIndex); + final String name = "testBigClusterNode-" + i; + execSvc.submit(() -> { + startNode(name); }); } + startLatch.await(); + println("All nodes running"); + Thread.sleep(RUNNING_TIMESPAN); + println("Stopping all nodes"); + execSvc.shutdownNow(); + execSvc.awaitTermination(1, TimeUnit.MINUTES); + println("Stopped all nodes"); } + /** */ private void startNode(String name) { + printf("Starting node = {0}", name); try (final Ignite ignite = Ignition.start(config(name))) { - try { - for (; ; ) { - Thread.sleep(BROADCAST_PERIOD); - ignite.compute().broadcast(() -> { - // no-op - }); - } - } - catch (Throwable ex) { - System.err.printf("Node thread exit on error: node = %s%d", name); - ex.printStackTrace(); + printf("Started node = {0}", name); + startLatch.countDown(); + nodeWork(ignite); + printf("Stopping node = {0}", name); + } + printf("Stopped node = {0}", name); + } + + /** */ + private void nodeWork(final Ignite ignite) { + try { + int count = 0; + for (; ; ) { + Thread.sleep(BROADCAST_PERIOD); + Collection results = ignite.compute().broadcast(() -> { + return "ignite"; + }); + for (String result : results) + if (!"ignite".equals(result)) + throw new IllegalArgumentException("Wrong answer from node: " + result); + if (count != results.size()) + printf("Computed results: node = {0}, count = {1}", ignite.name(), count = results.size()); } } + catch (InterruptedException | IgniteInterruptedException ex) { + printf("Node thread interrupted: node = {0}", ignite.name()); + } + catch (Throwable ex) { + printf("Node thread exit on error: node = {0}", ex, ignite.name()); + } } + /** */ private static class SlowTcpDiscoverySpi extends TcpDiscoverySpi { + + /** */ @Override protected boolean ensured(TcpDiscoveryAbstractMessage msg) { - if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { + if (msg instanceof TcpDiscoveryNodeAddedMessage + || msg instanceof TcpDiscoveryNodeAddFinishedMessage) try { - Thread.sleep(NODE_ADDED_MESSAGE_DELAY); + Thread.sleep(MESSAGE_DELAY); } - catch (InterruptedException ex) { - System.err.println("Long delivery of TcpDiscoveryNodeAddFinishedMessage interrupted"); - ex.printStackTrace(); + catch (InterruptedException | IgniteInterruptedException ex) { + println("Long delivery of TcpDiscoveryNodeAddFinishedMessage interrupted"); + throw ex instanceof IgniteInterruptedException ? (IgniteInterruptedException)ex + : new IgniteInterruptedException((InterruptedException)ex); + } + catch (Throwable ex) { + println("Long delivery of TcpDiscoveryNodeAddFinishedMessage error", ex); + throw ex instanceof RuntimeException ? (RuntimeException)ex : new RuntimeException(ex); } - } return super.ensured(msg); } } From 6acf193a3d356d1bad4c02a53ac76833ed1008d0 Mon Sep 17 00:00:00 2001 From: Alexandr Kuramshin Date: Sat, 19 Nov 2016 12:55:45 +0300 Subject: [PATCH 3/9] Have got TcpCommunicationSpi error --- .../IgniteTcpCommunicationBigClusterTest.java | 37 +++++++------------ 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java index 55046db7fc69d..86a84f334b215 100755 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java @@ -8,7 +8,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import java.text.MessageFormat; @@ -32,13 +31,15 @@ public class IgniteTcpCommunicationBigClusterTest extends GridCommonAbstractTest private static final int IGNITE_NODES_NUMBER = 10; /** */ - private static final long RUNNING_TIMESPAN = 10_000L; + private static final long RUNNING_TIMESPAN = 1_000L; /** */ - private static final long MESSAGE_DELAY = 5_000L; + private static final long ADDED_MESSAGE_DELAY = 1_000L; /** */ - private static final long BROADCAST_PERIOD = 1000L; + private static final long BROADCAST_PERIOD = 100L; + + private static final String CONTROL_ANSWER = "ignite"; /** */ private static final Logger LOGGER = Logger.getLogger(IgniteTcpCommunicationBigClusterTest.class.getName()); @@ -59,21 +60,12 @@ private static IgniteConfiguration config(String gridName) { discovery.setIpFinder(ipFinder); cfg.setDiscoverySpi(discovery); - /*CacheConfiguration cacheCache = new CacheConfiguration(); - cacheCache.setName("cache"); - cacheCache.setCacheMode(CacheMode.PARTITIONED); - cacheCache.setBackups(0); - cacheCache.setAtomicityMode(CacheAtomicityMode.ATOMIC);*/ - - /** ONHEAP_TIERED - cacheCache.setMemoryMode(CacheMemoryMode.ONHEAP_TIERED); - cacheCache.setOffHeapMaxMemory(0); */ - - /** OFFHEAP_TIERED - cacheCache.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED); - cacheCache.setOffHeapMaxMemory(512L << 20); */ + TcpCommunicationSpi communication = new TcpCommunicationSpi(); + communication.setConnectTimeout(500L); + communication.setMaxConnectTimeout(500L); + communication.setReconnectCount(1); + cfg.setCommunicationSpi(communication); - // cfg.setCacheConfiguration(cacheCache); return cfg; } @@ -135,10 +127,10 @@ private void nodeWork(final Ignite ignite) { for (; ; ) { Thread.sleep(BROADCAST_PERIOD); Collection results = ignite.compute().broadcast(() -> { - return "ignite"; + return CONTROL_ANSWER; }); for (String result : results) - if (!"ignite".equals(result)) + if (!CONTROL_ANSWER.equals(result)) throw new IllegalArgumentException("Wrong answer from node: " + result); if (count != results.size()) printf("Computed results: node = {0}, count = {1}", ignite.name(), count = results.size()); @@ -157,10 +149,9 @@ private static class SlowTcpDiscoverySpi extends TcpDiscoverySpi { /** */ @Override protected boolean ensured(TcpDiscoveryAbstractMessage msg) { - if (msg instanceof TcpDiscoveryNodeAddedMessage - || msg instanceof TcpDiscoveryNodeAddFinishedMessage) + if (ADDED_MESSAGE_DELAY > 0 && msg instanceof TcpDiscoveryNodeAddFinishedMessage) try { - Thread.sleep(MESSAGE_DELAY); + Thread.sleep(ADDED_MESSAGE_DELAY); } catch (InterruptedException | IgniteInterruptedException ex) { println("Long delivery of TcpDiscoveryNodeAddFinishedMessage interrupted"); From 4fd39653d24f62f19f70b4dffba8497185cc46fb Mon Sep 17 00:00:00 2001 From: Alexandr Kuramshin Date: Sat, 19 Nov 2016 19:39:10 +0300 Subject: [PATCH 4/9] Some discovery have been done --- .../spi/communication/tcp/TcpCommunicationSpi.java | 2 ++ .../tcp/IgniteTcpCommunicationBigClusterTest.java | 9 ++++++--- 2 files changed, 8 insertions(+), 3 deletions(-) mode change 100644 => 100755 modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java old mode 100644 new mode 100755 index 1fe437cc710c0..afd4db21008db --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -501,6 +501,7 @@ private void onFirstMessage(GridNioSession ses, Message msg) { GridTcpNioCommunicationClient client = connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient); + // here the RecoveryLastReceivedMessage is in the session queue fut.onDone(client); } finally { @@ -2746,6 +2747,7 @@ else if (log.isDebugEnabled()) i += read; } + // will got here rcvCnt = 0 if read timeout have been occurred and the channel was closed rcvCnt = buf.getLong(1); } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java index 86a84f334b215..a0e545eb61a49 100755 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java @@ -34,7 +34,10 @@ public class IgniteTcpCommunicationBigClusterTest extends GridCommonAbstractTest private static final long RUNNING_TIMESPAN = 1_000L; /** */ - private static final long ADDED_MESSAGE_DELAY = 1_000L; + private static final long COMMUNICATION_TIMEOUT = 1_000L; + + /** Should be about of the COMMUNICATION_TIMEOUT value to get the error */ + private static final long ADDED_MESSAGE_DELAY = COMMUNICATION_TIMEOUT; /** */ private static final long BROADCAST_PERIOD = 100L; @@ -61,8 +64,8 @@ private static IgniteConfiguration config(String gridName) { cfg.setDiscoverySpi(discovery); TcpCommunicationSpi communication = new TcpCommunicationSpi(); - communication.setConnectTimeout(500L); - communication.setMaxConnectTimeout(500L); + communication.setConnectTimeout(COMMUNICATION_TIMEOUT); + communication.setMaxConnectTimeout(COMMUNICATION_TIMEOUT); communication.setReconnectCount(1); cfg.setCommunicationSpi(communication); From c2c181922c7c24ea457577e32d2af897c8bec87f Mon Sep 17 00:00:00 2001 From: Alexandr Kuramshin Date: Sat, 19 Nov 2016 23:11:28 +0300 Subject: [PATCH 5/9] Prove that problem is not in the onFirstMessage hang --- .../spi/communication/tcp/TcpCommunicationSpi.java | 12 ++++++++++++ .../tcp/IgniteTcpCommunicationBigClusterTest.java | 14 ++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index afd4db21008db..ae276dd4b5730 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -46,6 +46,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLException; import org.apache.ignite.Ignite; @@ -238,6 +239,12 @@ @IgniteSpiConsistencyChecked(optional = false) public class TcpCommunicationSpi extends IgniteSpiAdapter implements CommunicationSpi, TcpCommunicationSpiMBean { + + /** */ + public static final AtomicInteger FIRST_MESSAGE_STARTED_COUNT = new AtomicInteger(); + /** */ + public static final AtomicInteger FIRST_MESSAGE_ACTIVE_COUNT = new AtomicInteger(); + /** IPC error message. */ public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " + "(switching to TCP, may be slower)."; @@ -547,7 +554,12 @@ private void onFirstMessage(GridNioSession ses, Message msg) { } try { + log.error("onFirstMessage enter, node = " + getLocalNodeId()); + FIRST_MESSAGE_STARTED_COUNT.incrementAndGet(); + FIRST_MESSAGE_ACTIVE_COUNT.incrementAndGet(); onFirstMessage(ses, msg); + FIRST_MESSAGE_ACTIVE_COUNT.decrementAndGet(); + log.error("onFirstMessage exit, node = " + getLocalNodeId()); } finally { connectGate.leave(); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java index a0e545eb61a49..a97c85898778f 100755 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java @@ -42,13 +42,16 @@ public class IgniteTcpCommunicationBigClusterTest extends GridCommonAbstractTest /** */ private static final long BROADCAST_PERIOD = 100L; + /** */ private static final String CONTROL_ANSWER = "ignite"; /** */ private static final Logger LOGGER = Logger.getLogger(IgniteTcpCommunicationBigClusterTest.class.getName()); + /** */ private static final Level LOG_LEVEL = Level.SEVERE; + /** */ private CountDownLatch startLatch; /** */ @@ -75,21 +78,32 @@ private static IgniteConfiguration config(String gridName) { /** */ private static void println(String str) { LOGGER.log(LOG_LEVEL, str); + logFirstMessageCount(); } /** */ private static void println(String str, Throwable ex) { LOGGER.log(LOG_LEVEL, str, ex); + logFirstMessageCount(); } /** */ private static void printf(String format, Object... args) { LOGGER.log(LOG_LEVEL, MessageFormat.format(format, args)); + logFirstMessageCount(); } /** */ private static void printf(String format, Throwable ex, Object... args) { LOGGER.log(LOG_LEVEL, MessageFormat.format(format, args), ex); + logFirstMessageCount(); + } + + /** */ + private static void logFirstMessageCount() { + LOGGER.log(LOG_LEVEL, MessageFormat.format("onFirstMessage: started = {0}, active = {1}", + TcpCommunicationSpi.FIRST_MESSAGE_STARTED_COUNT.get(), + TcpCommunicationSpi.FIRST_MESSAGE_ACTIVE_COUNT.get())); } /** */ From f8076edba097f6077229b2090ee3ff1a3369878c Mon Sep 17 00:00:00 2001 From: Alexandr Kuramshin Date: Sat, 19 Nov 2016 23:26:37 +0300 Subject: [PATCH 6/9] Revert: Prove that problem is not in the onFirstMessage hang --- .../spi/communication/tcp/TcpCommunicationSpi.java | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index ae276dd4b5730..afd4db21008db 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -46,7 +46,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLException; import org.apache.ignite.Ignite; @@ -239,12 +238,6 @@ @IgniteSpiConsistencyChecked(optional = false) public class TcpCommunicationSpi extends IgniteSpiAdapter implements CommunicationSpi, TcpCommunicationSpiMBean { - - /** */ - public static final AtomicInteger FIRST_MESSAGE_STARTED_COUNT = new AtomicInteger(); - /** */ - public static final AtomicInteger FIRST_MESSAGE_ACTIVE_COUNT = new AtomicInteger(); - /** IPC error message. */ public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " + "(switching to TCP, may be slower)."; @@ -554,12 +547,7 @@ private void onFirstMessage(GridNioSession ses, Message msg) { } try { - log.error("onFirstMessage enter, node = " + getLocalNodeId()); - FIRST_MESSAGE_STARTED_COUNT.incrementAndGet(); - FIRST_MESSAGE_ACTIVE_COUNT.incrementAndGet(); onFirstMessage(ses, msg); - FIRST_MESSAGE_ACTIVE_COUNT.decrementAndGet(); - log.error("onFirstMessage exit, node = " + getLocalNodeId()); } finally { connectGate.leave(); From 6e1f2dfc2acb3dbb8f24aa51ed67b2ee447b4585 Mon Sep 17 00:00:00 2001 From: Alexandr Kuramshin Date: Mon, 21 Nov 2016 11:55:09 +0300 Subject: [PATCH 7/9] Revert: pushing unnecessary changes to the master --- .../ignite/spi/communication/tcp/TcpCommunicationSpi.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index afd4db21008db..1fe437cc710c0 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -501,7 +501,6 @@ private void onFirstMessage(GridNioSession ses, Message msg) { GridTcpNioCommunicationClient client = connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient); - // here the RecoveryLastReceivedMessage is in the session queue fut.onDone(client); } finally { @@ -2747,7 +2746,6 @@ else if (log.isDebugEnabled()) i += read; } - // will got here rcvCnt = 0 if read timeout have been occurred and the channel was closed rcvCnt = buf.getLong(1); } From ed794ca815f6bb1471af15779279d287576b39cc Mon Sep 17 00:00:00 2001 From: Alexandr Kuramshin Date: Mon, 21 Nov 2016 12:08:00 +0300 Subject: [PATCH 8/9] Revert: pushing unnecessary changes to the master --- .../IgniteTcpCommunicationBigClusterTest.java | 185 ------------------ 1 file changed, 185 deletions(-) delete mode 100755 modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java deleted file mode 100755 index a97c85898778f..0000000000000 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationBigClusterTest.java +++ /dev/null @@ -1,185 +0,0 @@ -package org.apache.ignite.spi.communication.tcp; - -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteInterruptedException; -import org.apache.ignite.Ignition; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -import java.text.MessageFormat; -import java.util.Arrays; -import java.util.Collection; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Testing {@link TcpCommunicationSpi} under big cluster conditions (long DiscoverySpi delivery) - * - * @author Alexandr Kuramshin - */ -public class IgniteTcpCommunicationBigClusterTest extends GridCommonAbstractTest { - - /** */ - private static final int IGNITE_NODES_NUMBER = 10; - - /** */ - private static final long RUNNING_TIMESPAN = 1_000L; - - /** */ - private static final long COMMUNICATION_TIMEOUT = 1_000L; - - /** Should be about of the COMMUNICATION_TIMEOUT value to get the error */ - private static final long ADDED_MESSAGE_DELAY = COMMUNICATION_TIMEOUT; - - /** */ - private static final long BROADCAST_PERIOD = 100L; - - /** */ - private static final String CONTROL_ANSWER = "ignite"; - - /** */ - private static final Logger LOGGER = Logger.getLogger(IgniteTcpCommunicationBigClusterTest.class.getName()); - - /** */ - private static final Level LOG_LEVEL = Level.SEVERE; - - /** */ - private CountDownLatch startLatch; - - /** */ - private static IgniteConfiguration config(String gridName) { - IgniteConfiguration cfg = new IgniteConfiguration(); - cfg.setGridName(gridName); - cfg.setPeerClassLoadingEnabled(false); - - TcpDiscoverySpi discovery = new SlowTcpDiscoverySpi(); - TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder(); - ipFinder.setAddresses(Arrays.asList("127.0.0.1:47500..47510")); - discovery.setIpFinder(ipFinder); - cfg.setDiscoverySpi(discovery); - - TcpCommunicationSpi communication = new TcpCommunicationSpi(); - communication.setConnectTimeout(COMMUNICATION_TIMEOUT); - communication.setMaxConnectTimeout(COMMUNICATION_TIMEOUT); - communication.setReconnectCount(1); - cfg.setCommunicationSpi(communication); - - return cfg; - } - - /** */ - private static void println(String str) { - LOGGER.log(LOG_LEVEL, str); - logFirstMessageCount(); - } - - /** */ - private static void println(String str, Throwable ex) { - LOGGER.log(LOG_LEVEL, str, ex); - logFirstMessageCount(); - } - - /** */ - private static void printf(String format, Object... args) { - LOGGER.log(LOG_LEVEL, MessageFormat.format(format, args)); - logFirstMessageCount(); - } - - /** */ - private static void printf(String format, Throwable ex, Object... args) { - LOGGER.log(LOG_LEVEL, MessageFormat.format(format, args), ex); - logFirstMessageCount(); - } - - /** */ - private static void logFirstMessageCount() { - LOGGER.log(LOG_LEVEL, MessageFormat.format("onFirstMessage: started = {0}, active = {1}", - TcpCommunicationSpi.FIRST_MESSAGE_STARTED_COUNT.get(), - TcpCommunicationSpi.FIRST_MESSAGE_ACTIVE_COUNT.get())); - } - - /** */ - public synchronized void testBigCluster() throws Exception { - startLatch = new CountDownLatch(IGNITE_NODES_NUMBER); - final ExecutorService execSvc = Executors.newCachedThreadPool(); - for (int i = 0; i < IGNITE_NODES_NUMBER; ++i) { - final String name = "testBigClusterNode-" + i; - execSvc.submit(() -> { - startNode(name); - }); - } - startLatch.await(); - println("All nodes running"); - Thread.sleep(RUNNING_TIMESPAN); - println("Stopping all nodes"); - execSvc.shutdownNow(); - execSvc.awaitTermination(1, TimeUnit.MINUTES); - println("Stopped all nodes"); - } - - /** */ - private void startNode(String name) { - printf("Starting node = {0}", name); - try (final Ignite ignite = Ignition.start(config(name))) { - printf("Started node = {0}", name); - startLatch.countDown(); - nodeWork(ignite); - printf("Stopping node = {0}", name); - } - printf("Stopped node = {0}", name); - } - - /** */ - private void nodeWork(final Ignite ignite) { - try { - int count = 0; - for (; ; ) { - Thread.sleep(BROADCAST_PERIOD); - Collection results = ignite.compute().broadcast(() -> { - return CONTROL_ANSWER; - }); - for (String result : results) - if (!CONTROL_ANSWER.equals(result)) - throw new IllegalArgumentException("Wrong answer from node: " + result); - if (count != results.size()) - printf("Computed results: node = {0}, count = {1}", ignite.name(), count = results.size()); - } - } - catch (InterruptedException | IgniteInterruptedException ex) { - printf("Node thread interrupted: node = {0}", ignite.name()); - } - catch (Throwable ex) { - printf("Node thread exit on error: node = {0}", ex, ignite.name()); - } - } - - /** */ - private static class SlowTcpDiscoverySpi extends TcpDiscoverySpi { - - /** */ - @Override protected boolean ensured(TcpDiscoveryAbstractMessage msg) { - if (ADDED_MESSAGE_DELAY > 0 && msg instanceof TcpDiscoveryNodeAddFinishedMessage) - try { - Thread.sleep(ADDED_MESSAGE_DELAY); - } - catch (InterruptedException | IgniteInterruptedException ex) { - println("Long delivery of TcpDiscoveryNodeAddFinishedMessage interrupted"); - throw ex instanceof IgniteInterruptedException ? (IgniteInterruptedException)ex - : new IgniteInterruptedException((InterruptedException)ex); - } - catch (Throwable ex) { - println("Long delivery of TcpDiscoveryNodeAddFinishedMessage error", ex); - throw ex instanceof RuntimeException ? (RuntimeException)ex : new RuntimeException(ex); - } - return super.ensured(msg); - } - } -} From e919ae57a6226d0e3d12b2c3c2c9bdfe6d2e1da4 Mon Sep 17 00:00:00 2001 From: dkarachentsev Date: Thu, 8 Dec 2016 10:31:18 +0300 Subject: [PATCH 9/9] IGNITE-4395 - Test that leads to grid hang. --- .../IgniteComputeManyAsyncJobsTest.java | 112 ++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeManyAsyncJobsTest.java diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeManyAsyncJobsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeManyAsyncJobsTest.java new file mode 100644 index 0000000000000..5070d292dfda1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeManyAsyncJobsTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.util.LinkedList; +import java.util.Queue; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Submits many async tasks that do cache operations. + */ +public class IgniteComputeManyAsyncJobsTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(final String gridName) throws Exception { + final IgniteConfiguration cfg = super.getConfiguration(gridName); + + // Commented code resolves hang. +// if (gridName.endsWith("1")) +// ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setMessageQueueLimit(0); + + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected boolean isMultiJvm() { + // Not necessary, but hangs faster. + return true; + } + + /** + * @throws Exception If failed. + */ + public void testManyJobs() throws Exception { + final IgniteEx ignite0 = startGrid(0); + startGrid(1); + + final IgniteCompute compute = ignite0.compute(ignite0.cluster().forRemotes()).withAsync(); + + Queue futs = new LinkedList<>(); + + for (int i = 0; i < 20_000; i++) { + compute.run(new TestJob(i)); + + futs.add(compute.future()); + } + + IgniteFuture fut; + + do { + fut = futs.poll(); + + if (fut != null) + fut.get(); + } while (fut != null); + } + + /** + * + */ + private static class TestJob implements IgniteRunnable { + /** */ + private static final long serialVersionUID = 0L; + /** */ + @IgniteInstanceResource + private transient Ignite ignite; + + /** */ + private final int i; + + /** + * @param i Integer. + */ + private TestJob(final int i) { + this.i = i; + } + + /** {@inheritDoc} */ + @Override public void run() { + final IgniteCache cache = ignite.getOrCreateCache("cache"); + + System.out.println(">> Executed " + i); + + cache.get(i); + cache.put(i, String.valueOf(i)); + } + } +}