From a7e1d6358185c69c551b5927ff8d223c11ec5527 Mon Sep 17 00:00:00 2001 From: dkarachentsev Date: Fri, 22 Jun 2018 16:55:26 +0300 Subject: [PATCH 1/2] IGNITE-8858 - Client none may not stop. --- .../ignite/internal/util/IgniteUtils.java | 20 ++++++++++++++++--- .../ignite/spi/discovery/tcp/ClientImpl.java | 9 ++++++++- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index e02fe4375ab2c..62d11170958f2 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -4603,11 +4603,24 @@ public static void interrupt(Iterable workers) { * @return {@code true} if thread has finished, {@code false} otherwise. */ public static boolean join(@Nullable Thread t, @Nullable IgniteLogger log) { - if (t != null) + return join(t, log, 0); + } + + /** + * Waits for completion of a given thread. If thread is {@code null} then + * this method returns immediately returning {@code true} + * + * @param t Thread to join. + * @param log Logger for logging errors. + * @param timeout Join timeout. + * @return {@code true} if thread has finished, {@code false} otherwise. + */ + public static boolean join(@Nullable Thread t, @Nullable IgniteLogger log, long timeout) { + if (t != null) { try { - t.join(); + t.join(timeout); - return true; + return !t.isAlive(); } catch (InterruptedException ignore) { warn(log, "Got interrupted while waiting for completion of a thread: " + t); @@ -4616,6 +4629,7 @@ public static boolean join(@Nullable Thread t, @Nullable IgniteLogger log) { return false; } + } return true; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index dc62bf3331fb2..a3b258b34b2df 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -326,7 +326,10 @@ class ClientImpl extends TcpDiscoveryImpl { U.join(msgWorker.runner(), log); U.join(sockWriter, log); - U.join(sockReader, log); + + // SocketReader may loose interruption, this hack is made to overcome that case. + while (!U.join(sockReader, log, 200)) + U.interrupt(sockReader); timer.cancel(); @@ -1071,6 +1074,10 @@ private void forceStopRead() throws InterruptedException { U.error(log, "Failed to read message [sock=" + sock + ", " + "locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']', e); + // Exists possibility that exception raised on interruption. + if (X.hasCause(e, InterruptedException.class)) + interrupt(); + IOException ioEx = X.cause(e, IOException.class); if (ioEx != null) From adb7c84b9f6e3c262c0090463438b9ad3fd94aa1 Mon Sep 17 00:00:00 2001 From: dkarachentsev Date: Tue, 26 Jun 2018 12:18:52 +0300 Subject: [PATCH 2/2] IGNITE-8858 - Client none may not stop. --- .../java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index a3b258b34b2df..edb2ce879e072 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -20,6 +20,7 @@ import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InterruptedIOException; import java.io.StreamCorruptedException; import java.net.InetSocketAddress; import java.net.Socket; @@ -51,6 +52,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheMetrics; @@ -1075,7 +1077,8 @@ private void forceStopRead() throws InterruptedException { "locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + rmtNodeId + ']', e); // Exists possibility that exception raised on interruption. - if (X.hasCause(e, InterruptedException.class)) + if (X.hasCause(e, InterruptedException.class, InterruptedIOException.class, + IgniteInterruptedCheckedException.class, IgniteInterruptedException.class)) interrupt(); IOException ioEx = X.cause(e, IOException.class);