From 5e266153707021a8866e91dfa3f958066f80fc99 Mon Sep 17 00:00:00 2001 From: Valentin Kulichenko Date: Wed, 6 Apr 2016 18:10:45 -0700 Subject: [PATCH] IGNITE-2951 - Stability fixes for cluster with many clients --- .../GridDhtPartitionsExchangeFuture.java | 2 +- .../continuous/GridContinuousProcessor.java | 13 +++++- .../ignite/spi/discovery/tcp/ClientImpl.java | 2 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 44 ++++++++----------- .../spi/discovery/tcp/TcpDiscoverySpi.java | 23 ++++------ .../IgniteClientReconnectAbstractTest.java | 7 +-- .../tcp/TcpClientDiscoverySpiSelfTest.java | 8 ++-- .../discovery/tcp/TcpDiscoverySelfTest.java | 39 ++++++++-------- ...TcpDiscoverySpiFailureTimeoutSelfTest.java | 23 ++++------ .../discovery/tcp/TestTcpDiscoverySpi.java | 5 ++- 10 files changed, 80 insertions(+), 86 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index bbfc71a418cb7..82e9bda91783f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -931,7 +931,7 @@ private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExc locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map()); m.addLocalPartitionMap(cacheCtx.cacheId(), locMap); - + m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index abafe85881b80..d7838f3b0dd1b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -393,7 +393,18 @@ public void unlockStopping() { /** {@inheritDoc} */ @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) { if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) { - DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos); + Map> clientInfos0 = U.newHashMap(clientInfos.size()); + + for (Map.Entry> e : clientInfos.entrySet()) { + Map copy = U.newHashMap(e.getValue().size()); + + for (Map.Entry e0 : e.getValue().entrySet()) + copy.put(e0.getKey(), e0.getValue()); + + clientInfos0.put(e.getKey(), copy); + } + + DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos0); // Collect listeners information (will be sent to joining node during discovery process). for (Map.Entry e : locInfos.entrySet()) { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 950c68096fc9a..31d614f18f298 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -1062,7 +1062,7 @@ void ackReceived(TcpDiscoveryClientAckResponse res) { try { if (ack) { synchronized (mux) { - assert unackedMsg == null : unackedMsg; + assert unackedMsg == null : "Unacked=" + unackedMsg + ", received=" + msg; unackedMsg = msg; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 88e34e85e3e8e..27a31c4442d36 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -18,9 +18,11 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.ObjectStreamException; +import java.io.OutputStream; import java.io.Serializable; import java.net.ConnectException; import java.net.InetAddress; @@ -74,7 +76,6 @@ import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.internal.util.lang.GridTuple; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.C1; @@ -2134,6 +2135,9 @@ private class RingMessageWorker extends MessageWorkerAdapter { /** Socket. */ private Socket sock; + /** Output stream. */ + private OutputStream out; + /** Last time status message has been sent. */ private long lastTimeStatusMsgSent; @@ -2470,10 +2474,12 @@ else if (log.isDebugEnabled()) sock = spi.openSocket(addr, timeoutHelper); + out = new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize()); + openSock = true; // Handshake. - writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), + spi.writeToSocket(sock, out, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, @@ -2627,7 +2633,7 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); try { - writeToSocket(sock, pendingMsg, timeoutHelper.nextTimeoutChunk( + spi.writeToSocket(sock, out, pendingMsg, timeoutHelper.nextTimeoutChunk( spi.getSocketTimeout())); } finally { @@ -2679,7 +2685,7 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof } } - writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + spi.writeToSocket(sock, out, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); @@ -3999,7 +4005,7 @@ private void processNodeLeftMessage(TcpDiscoveryNodeLeftMessage msg) { } else if (leftNode.equals(next) && sock != null) { try { - writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ? + spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout()); if (log.isDebugEnabled()) @@ -5617,6 +5623,9 @@ private class ClientMessageWorker extends MessageWorkerAdapter { /** Socket. */ private final Socket sock; + /** Output stream. */ + private final OutputStream out; + /** Current client metrics. */ private volatile ClusterMetrics metrics; @@ -5630,11 +5639,13 @@ private class ClientMessageWorker extends MessageWorkerAdapter { * @param sock Socket. * @param clientNodeId Node ID. */ - protected ClientMessageWorker(Socket sock, UUID clientNodeId) { + protected ClientMessageWorker(Socket sock, UUID clientNodeId) throws IOException { super("tcp-disco-client-message-worker", 2000); this.sock = sock; this.clientNodeId = clientNodeId; + + out = new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize()); } /** @@ -5681,7 +5692,7 @@ else if (log.isDebugEnabled()) log.debug("Sending message ack to client [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); - writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ? + spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout()); } } @@ -5692,7 +5703,7 @@ else if (log.isDebugEnabled()) assert topologyInitialized(msg) : msg; - writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ? + spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout()); } } @@ -5799,9 +5810,6 @@ public boolean ping(IgniteSpiOperationTimeoutHelper timeoutHelper) throws Interr * Base class for message workers. */ protected abstract class MessageWorkerAdapter extends IgniteSpiThread { - /** Pre-allocated output stream (100K). */ - private final GridByteArrayOutputStream bout = new GridByteArrayOutputStream(100 * 1024); - /** Message queue. */ private final BlockingDeque queue = new LinkedBlockingDeque<>(); @@ -5883,20 +5891,6 @@ void addMessage(TcpDiscoveryAbstractMessage msg) { protected void noMessageLoop() { // No-op. } - - /** - * @param sock Socket. - * @param msg Message. - * @param timeout Socket timeout. - * @throws IOException If IO failed. - * @throws IgniteCheckedException If marshalling failed. - */ - protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) - throws IOException, IgniteCheckedException { - bout.reset(); - - spi.writeToSocket(sock, msg, bout, timeout); - } } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index df152f8004cd0..d9816092b7d89 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi.discovery.tcp; +import java.io.BufferedOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -51,7 +52,6 @@ import org.apache.ignite.configuration.AddressResolver; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; @@ -1346,45 +1346,38 @@ private void writeToSocket(Socket sock, byte[] data, long timeout) throws IOExce */ protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { - writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024), timeout); // 8K. + writeToSocket(sock, new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize()), msg, timeout); } /** * Writes message to the socket. * * @param sock Socket. + * @param out Stream to write to. * @param msg Message. - * @param bout Byte array output stream. * @param timeout Timeout. * @throws IOException If IO failed or write timed out. * @throws IgniteCheckedException If marshalling failed. */ @SuppressWarnings("ThrowFromFinallyBlock") protected void writeToSocket(Socket sock, + OutputStream out, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { assert sock != null; assert msg != null; - assert bout != null; - - // Marshall message first to perform only write after. - marsh.marshal(msg, bout); + assert out != null; SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout); addTimeoutObject(obj); - IOException err = null; + IgniteCheckedException err = null; try { - OutputStream out = sock.getOutputStream(); - - bout.writeTo(out); - - out.flush(); + marsh.marshal(msg, out); } - catch (IOException e) { + catch (IgniteCheckedException e) { err = e; } finally { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java index 6869d1cc6ab23..4d49366e1ad23 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal; import java.io.IOException; +import java.io.OutputStream; import java.net.Socket; import java.util.Collection; import java.util.Collections; @@ -384,7 +385,7 @@ public static class TestTcpDiscoverySpi extends TcpDiscoverySpi { volatile CountDownLatch writeLatch; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (msg instanceof TcpDiscoveryJoinRequestMessage) { CountDownLatch writeLatch0 = writeLatch; @@ -396,7 +397,7 @@ public static class TestTcpDiscoverySpi extends TcpDiscoverySpi { } } - super.writeToSocket(sock, msg, timeout); + super.writeToSocket(sock, out, msg, timeout); } } @@ -464,4 +465,4 @@ public void print() { log.error(s); } } -} \ No newline at end of file +} diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 7debb4150a420..e01094c1fd375 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.IOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.util.ArrayList; @@ -44,7 +45,6 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.util.GridConcurrentHashSet; -import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; import org.apache.ignite.internal.util.typedef.CIX2; @@ -2158,8 +2158,8 @@ private void pauseResumeOperation(boolean isPause, AtomicBoolean... locks) { } /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { waitFor(writeLock); boolean fail = false; @@ -2184,7 +2184,7 @@ else if (msg instanceof TcpDiscoveryClientReconnectMessage) sock.close(); } - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); if (afterWrite != null) afterWrite.apply(msg, sock); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 7635f0b60970f..7efaca07ce660 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -19,6 +19,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; @@ -54,7 +55,6 @@ import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage; import org.apache.ignite.internal.processors.port.GridPortRecord; -import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; @@ -1852,9 +1852,8 @@ private static class TestEventDiscardSpi extends TcpDiscoverySpi { private volatile boolean failed; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { boolean add = msgIds.add(msg.id()); @@ -1864,7 +1863,7 @@ private static class TestEventDiscardSpi extends TcpDiscoverySpi { failed = true; } - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); } } @@ -1877,8 +1876,8 @@ private static class TestCustomerEventAckSpi extends TcpDiscoverySpi { /** {@inheritDoc} */ @Override protected void writeToSocket(Socket sock, + OutputStream out, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { if (stopBeforeSndAck) { if (msg instanceof TcpDiscoveryCustomEventMessage) { @@ -1908,7 +1907,7 @@ private static class TestCustomerEventAckSpi extends TcpDiscoverySpi { } } - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); } } @@ -1940,8 +1939,8 @@ private static class TestFailedNodesSpi extends TcpDiscoverySpi { /** {@inheritDoc} */ @Override protected void writeToSocket(Socket sock, + OutputStream out, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { if (stop) return; @@ -1986,7 +1985,7 @@ private static class TestFailedNodesSpi extends TcpDiscoverySpi { return; } - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); } } @@ -2001,8 +2000,8 @@ private static class TestCustomEventCoordinatorFailureSpi extends TcpDiscoverySp private boolean stop; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { if (msg instanceof TcpDiscoveryCustomEventMessage && latch != null) { log.info("Stop node on custom event: " + msg); @@ -2014,7 +2013,7 @@ private static class TestCustomEventCoordinatorFailureSpi extends TcpDiscoverySp if (stop) return; - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); } } @@ -2035,8 +2034,8 @@ private static class TestCustomEventRaceSpi extends TcpDiscoverySpi { private boolean debug; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { if (msg instanceof TcpDiscoveryNodeAddedMessage) { if (nodeAdded1 != null) { nodeAdded1.countDown(); @@ -2063,7 +2062,7 @@ private static class TestCustomEventRaceSpi extends TcpDiscoverySpi { if (debug && msg instanceof TcpDiscoveryCustomEventMessage) log.info("--- Send custom event: " + msg); - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); } } @@ -2075,13 +2074,13 @@ private static class TestMessageWorkerFailureSpi1 extends TcpDiscoverySpi { private volatile boolean stop; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { if (stop) throw new RuntimeException("Failing ring message worker explicitly"); - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); } } @@ -2093,12 +2092,12 @@ private static class TestMessageWorkerFailureSpi2 extends TcpDiscoverySpi { private volatile boolean stop; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { if (stop) throw new RuntimeException("Failing ring message worker explicitly"); - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); if (msg instanceof TcpDiscoveryNodeAddedMessage) stop = true; diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java index 4cf9bd02ea551..4ef984f867132 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java @@ -18,12 +18,12 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.IOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.spi.IgniteSpiOperationTimeoutException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; @@ -348,10 +348,14 @@ else if (openSocketTimeoutWait) { } /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (!(msg instanceof TcpDiscoveryPingRequest)) { - super.writeToSocket(sock, msg, timeout); + if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage) + connCheckStatusMsgCntSent++; + + super.writeToSocket(sock, out, msg, timeout); + return; } @@ -370,16 +374,7 @@ else if (openSocketTimeoutWait) { } } else - super.writeToSocket(sock, msg, timeout); - } - - /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, - GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { - if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage) - connCheckStatusMsgCntSent++; - - super.writeToSocket(sock, msg, bout, timeout); + super.writeToSocket(sock, out, msg, timeout); } /** {@inheritDoc} */ @@ -405,4 +400,4 @@ private void resetState() { countConnCheckMsg = false; } } -} \ No newline at end of file +} diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java index dbc54bc406f57..721192fb694df 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.IOException; +import java.io.OutputStream; import java.net.Socket; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; @@ -31,12 +32,12 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi { public boolean ignorePingResponse; /** {@inheritDoc} */ - protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, + protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse) return; else - super.writeToSocket(sock, msg, timeout); + super.writeToSocket(sock, out, msg, timeout); } /** {@inheritDoc} */