Skip to content

Commit

Permalink
IGNITE-2951 - Stability fixes for cluster with many clients
Browse files Browse the repository at this point in the history
  • Loading branch information
vkulichenko committed Apr 7, 2016
1 parent da47901 commit 5e26615
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 86 deletions.
Expand Up @@ -931,7 +931,7 @@ private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExc
locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map());

m.addLocalPartitionMap(cacheCtx.cacheId(), locMap);

m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
}
}
Expand Down
Expand Up @@ -393,7 +393,18 @@ public void unlockStopping() {
/** {@inheritDoc} */
@Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) {
if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) {
DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos);
Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos0 = U.newHashMap(clientInfos.size());

for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> e : clientInfos.entrySet()) {
Map<UUID, LocalRoutineInfo> copy = U.newHashMap(e.getValue().size());

for (Map.Entry<UUID, LocalRoutineInfo> e0 : e.getValue().entrySet())
copy.put(e0.getKey(), e0.getValue());

clientInfos0.put(e.getKey(), copy);
}

DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos0);

// Collect listeners information (will be sent to joining node during discovery process).
for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) {
Expand Down
Expand Up @@ -1062,7 +1062,7 @@ void ackReceived(TcpDiscoveryClientAckResponse res) {
try {
if (ack) {
synchronized (mux) {
assert unackedMsg == null : unackedMsg;
assert unackedMsg == null : "Unacked=" + unackedMsg + ", received=" + msg;

unackedMsg = msg;
}
Expand Down
Expand Up @@ -18,9 +18,11 @@
package org.apache.ignite.spi.discovery.tcp;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectStreamException;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.ConnectException;
import java.net.InetAddress;
Expand Down Expand Up @@ -74,7 +76,6 @@
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
import org.apache.ignite.internal.util.lang.GridTuple;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.C1;
Expand Down Expand Up @@ -2134,6 +2135,9 @@ private class RingMessageWorker extends MessageWorkerAdapter {
/** Socket. */
private Socket sock;

/** Output stream. */
private OutputStream out;

/** Last time status message has been sent. */
private long lastTimeStatusMsgSent;

Expand Down Expand Up @@ -2470,10 +2474,12 @@ else if (log.isDebugEnabled())

sock = spi.openSocket(addr, timeoutHelper);

out = new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize());

openSock = true;

// Handshake.
writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId),
spi.writeToSocket(sock, out, new TcpDiscoveryHandshakeRequest(locNodeId),
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));

TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null,
Expand Down Expand Up @@ -2627,7 +2633,7 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof
timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);

try {
writeToSocket(sock, pendingMsg, timeoutHelper.nextTimeoutChunk(
spi.writeToSocket(sock, out, pendingMsg, timeoutHelper.nextTimeoutChunk(
spi.getSocketTimeout()));
}
finally {
Expand Down Expand Up @@ -2679,7 +2685,7 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof
}
}

writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
spi.writeToSocket(sock, out, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));

spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);

Expand Down Expand Up @@ -3999,7 +4005,7 @@ private void processNodeLeftMessage(TcpDiscoveryNodeLeftMessage msg) {
}
else if (leftNode.equals(next) && sock != null) {
try {
writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled() ?
spi.failureDetectionTimeout() : spi.getSocketTimeout());

if (log.isDebugEnabled())
Expand Down Expand Up @@ -5617,6 +5623,9 @@ private class ClientMessageWorker extends MessageWorkerAdapter {
/** Socket. */
private final Socket sock;

/** Output stream. */
private final OutputStream out;

/** Current client metrics. */
private volatile ClusterMetrics metrics;

Expand All @@ -5630,11 +5639,13 @@ private class ClientMessageWorker extends MessageWorkerAdapter {
* @param sock Socket.
* @param clientNodeId Node ID.
*/
protected ClientMessageWorker(Socket sock, UUID clientNodeId) {
protected ClientMessageWorker(Socket sock, UUID clientNodeId) throws IOException {
super("tcp-disco-client-message-worker", 2000);

this.sock = sock;
this.clientNodeId = clientNodeId;

out = new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize());
}

/**
Expand Down Expand Up @@ -5681,7 +5692,7 @@ else if (log.isDebugEnabled())
log.debug("Sending message ack to client [sock=" + sock + ", locNodeId="
+ getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');

writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled() ?
spi.failureDetectionTimeout() : spi.getSocketTimeout());
}
}
Expand All @@ -5692,7 +5703,7 @@ else if (log.isDebugEnabled())

assert topologyInitialized(msg) : msg;

writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled() ?
spi.failureDetectionTimeout() : spi.getSocketTimeout());
}
}
Expand Down Expand Up @@ -5799,9 +5810,6 @@ public boolean ping(IgniteSpiOperationTimeoutHelper timeoutHelper) throws Interr
* Base class for message workers.
*/
protected abstract class MessageWorkerAdapter extends IgniteSpiThread {
/** Pre-allocated output stream (100K). */
private final GridByteArrayOutputStream bout = new GridByteArrayOutputStream(100 * 1024);

/** Message queue. */
private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>();

Expand Down Expand Up @@ -5883,20 +5891,6 @@ void addMessage(TcpDiscoveryAbstractMessage msg) {
protected void noMessageLoop() {
// No-op.
}

/**
* @param sock Socket.
* @param msg Message.
* @param timeout Socket timeout.
* @throws IOException If IO failed.
* @throws IgniteCheckedException If marshalling failed.
*/
protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
throws IOException, IgniteCheckedException {
bout.reset();

spi.writeToSocket(sock, msg, bout, timeout);
}
}

/**
Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.spi.discovery.tcp;

import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -51,7 +52,6 @@
import org.apache.ignite.configuration.AddressResolver;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
Expand Down Expand Up @@ -1346,45 +1346,38 @@ private void writeToSocket(Socket sock, byte[] data, long timeout) throws IOExce
*/
protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
IgniteCheckedException {
writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024), timeout); // 8K.
writeToSocket(sock, new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize()), msg, timeout);
}

/**
* Writes message to the socket.
*
* @param sock Socket.
* @param out Stream to write to.
* @param msg Message.
* @param bout Byte array output stream.
* @param timeout Timeout.
* @throws IOException If IO failed or write timed out.
* @throws IgniteCheckedException If marshalling failed.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
protected void writeToSocket(Socket sock,
OutputStream out,
TcpDiscoveryAbstractMessage msg,
GridByteArrayOutputStream bout,
long timeout) throws IOException, IgniteCheckedException {
assert sock != null;
assert msg != null;
assert bout != null;

// Marshall message first to perform only write after.
marsh.marshal(msg, bout);
assert out != null;

SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);

addTimeoutObject(obj);

IOException err = null;
IgniteCheckedException err = null;

try {
OutputStream out = sock.getOutputStream();

bout.writeTo(out);

out.flush();
marsh.marshal(msg, out);
}
catch (IOException e) {
catch (IgniteCheckedException e) {
err = e;
}
finally {
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal;

import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -384,7 +385,7 @@ public static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
volatile CountDownLatch writeLatch;

/** {@inheritDoc} */
@Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
@Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout)
throws IOException, IgniteCheckedException {
if (msg instanceof TcpDiscoveryJoinRequestMessage) {
CountDownLatch writeLatch0 = writeLatch;
Expand All @@ -396,7 +397,7 @@ public static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
}
}

super.writeToSocket(sock, msg, timeout);
super.writeToSocket(sock, out, msg, timeout);
}
}

Expand Down Expand Up @@ -464,4 +465,4 @@ public void print() {
log.error(s);
}
}
}
}
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.spi.discovery.tcp;

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
Expand All @@ -44,7 +45,6 @@
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.typedef.CIX2;
Expand Down Expand Up @@ -2158,8 +2158,8 @@ private void pauseResumeOperation(boolean isPause, AtomicBoolean... locks) {
}

/** {@inheritDoc} */
@Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException {
@Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
waitFor(writeLock);

boolean fail = false;
Expand All @@ -2184,7 +2184,7 @@ else if (msg instanceof TcpDiscoveryClientReconnectMessage)
sock.close();
}

super.writeToSocket(sock, msg, bout, timeout);
super.writeToSocket(sock, out, msg, timeout);

if (afterWrite != null)
afterWrite.apply(msg, sock);
Expand Down

0 comments on commit 5e26615

Please sign in to comment.