Skip to content

Commit

Permalink
ignite-5860 Try process TcpDiscoveryClientReconnectMessage from socke…
Browse files Browse the repository at this point in the history
…t reader instead of always processing it on coordinator.
  • Loading branch information
dmekhanikov authored and sboikov committed Oct 27, 2017
1 parent 717c549 commit 56a63f8
Show file tree
Hide file tree
Showing 5 changed files with 467 additions and 195 deletions.
Expand Up @@ -26,6 +26,7 @@
import java.net.SocketTimeoutException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -470,17 +471,18 @@ else if (state == DISCONNECTED) {
}

/**
* @param recon {@code True} if reconnects.
* @param prevAddr If reconnect is in progress, then previous address of the router the client was connected to
* and {@code null} otherwise.
* @param timeout Timeout.
* @return Opened socket or {@code null} if timeout.
* @throws InterruptedException If interrupted.
* @throws IgniteSpiException If failed.
* @see TcpDiscoverySpi#joinTimeout
*/
@SuppressWarnings("BusyWait")
@Nullable private T2<SocketStream, Boolean> joinTopology(boolean recon, long timeout)
@Nullable private T2<SocketStream, Boolean> joinTopology(InetSocketAddress prevAddr, long timeout)
throws IgniteSpiException, InterruptedException {
Collection<InetSocketAddress> addrs = null;
List<InetSocketAddress> addrs = null;

long startTime = U.currentTimeMillis();

Expand All @@ -489,7 +491,7 @@ else if (state == DISCONNECTED) {
throw new InterruptedException();

while (addrs == null || addrs.isEmpty()) {
addrs = spi.resolvedAddresses();
addrs = new ArrayList<>(spi.resolvedAddresses());

if (!F.isEmpty(addrs)) {
if (log.isDebugEnabled())
Expand All @@ -509,22 +511,30 @@ else if (state == DISCONNECTED) {
}
}

Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);
// Process failed node last.
if (prevAddr != null) {
int idx = addrs.indexOf(prevAddr);

Iterator<InetSocketAddress> it = addrs.iterator();
if (idx != -1)
Collections.swap(addrs, idx, 0);
}

Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);

boolean wait = false;

while (it.hasNext()) {
for (int i = addrs.size() - 1; i >= 0; i--) {
if (Thread.currentThread().isInterrupted())
throw new InterruptedException();

InetSocketAddress addr = it.next();
InetSocketAddress addr = addrs.get(i);

boolean recon = prevAddr != null;

T3<SocketStream, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr);

if (sockAndRes == null) {
it.remove();
addrs.remove(i);

continue;
}
Expand Down Expand Up @@ -852,8 +862,8 @@ private NavigableSet<ClusterNode> allVisibleNodes() {
}

/** {@inheritDoc} */
@Override protected IgniteSpiThread workerThread() {
return msgWorker;
@Override protected Collection<IgniteSpiThread> threads() {
return Arrays.asList(sockWriter, msgWorker);
}

/**
Expand Down Expand Up @@ -1336,15 +1346,20 @@ private class Reconnector extends IgniteSpiThread {
private boolean clientAck;

/** */
private boolean join;
private final boolean join;

/** */
private final InetSocketAddress prevAddr;

/**
* @param join {@code True} if reconnects during join.
* @param prevAddr Address of the node, that this client was previously connected to.
*/
protected Reconnector(boolean join) {
protected Reconnector(boolean join, InetSocketAddress prevAddr) {
super(spi.ignite().name(), "tcp-client-disco-reconnector", log);

this.join = join;
this.prevAddr = prevAddr;
}

/**
Expand Down Expand Up @@ -1374,7 +1389,7 @@ public void cancel() {

try {
while (true) {
T2<SocketStream, Boolean> joinRes = joinTopology(true, timeout);
T2<SocketStream, Boolean> joinRes = joinTopology(prevAddr, timeout);

if (joinRes == null) {
if (join) {
Expand Down Expand Up @@ -1609,6 +1624,10 @@ else if (msg instanceof TcpDiscoveryNodeFailedMessage &&
}
else if (msg instanceof SocketClosedMessage) {
if (((SocketClosedMessage)msg).sock == currSock) {
Socket sock = currSock.sock;

InetSocketAddress prevAddr = new InetSocketAddress(sock.getInetAddress(), sock.getPort());

currSock = null;

boolean join = joinLatch.getCount() > 0;
Expand Down Expand Up @@ -1637,8 +1656,7 @@ else if (msg instanceof SocketClosedMessage) {

assert reconnector == null;

final Reconnector reconnector = new Reconnector(join);
this.reconnector = reconnector;
reconnector = new Reconnector(join, prevAddr);
reconnector.start();
}
}
Expand Down Expand Up @@ -1811,7 +1829,7 @@ private void tryJoin() throws InterruptedException {
T2<SocketStream, Boolean> joinRes;

try {
joinRes = joinTopology(false, spi.joinTimeout);
joinRes = joinTopology(null, spi.joinTimeout);
}
catch (IgniteSpiException e) {
joinError(e);
Expand Down

0 comments on commit 56a63f8

Please sign in to comment.