Skip to content

Commit

Permalink
IGNITE-11 (Discovery takes a lot of time on Windows if DescoverySpi c…
Browse files Browse the repository at this point in the history
…ontains several hosts and several ports per host)
  • Loading branch information
sevdokimov-gg committed Feb 19, 2015
1 parent c93d86f commit a9ecd99
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 55 deletions.
Expand Up @@ -1395,65 +1395,83 @@ private boolean sendJoinRequestMessage() throws IgniteSpiException {
return false;

boolean retry = false;
IgniteCheckedException errs = null;
Collection<Exception> errs = new ArrayList<>();

for (InetSocketAddress addr : addrs) {
try {
Integer res = sendMessageDirectly(joinReq, addr);
SocketMultiConnector multiConnector = new SocketMultiConnector(addrs, 2);

assert res != null;
try {
GridTuple3<InetSocketAddress, Socket, Exception> tuple;

noResAddrs.remove(addr);
while ((tuple = multiConnector.next()) != null) {
InetSocketAddress addr = tuple.get1();
Socket sock = tuple.get2();
Exception ex = tuple.get3();

// Address is responsive, reset period start.
noResStart = 0;
if (ex == null) {
assert sock != null;

switch (res) {
case RES_WAIT:
// Concurrent startup, try sending join request again or wait if no success.
retry = true;
try {
Integer res = sendMessageDirectly(joinReq, addr, sock);

break;
case RES_OK:
if (log.isDebugEnabled())
log.debug("Join request message has been sent to address [addr=" + addr +
", req=" + joinReq + ']');
assert res != null;

// Join request sending succeeded, wait for response from topology.
return true;
noResAddrs.remove(addr);

default:
// Concurrent startup, try next node.
if (res == RES_CONTINUE_JOIN) {
if (!fromAddrs.contains(addr))
// Address is responsive, reset period start.
noResStart = 0;

switch (res) {
case RES_WAIT:
// Concurrent startup, try sending join request again or wait if no success.
retry = true;
}
else {
if (log.isDebugEnabled())
log.debug("Unexpected response to join request: " + res);

retry = true;
}
break;
case RES_OK:
if (log.isDebugEnabled())
log.debug("Join request message has been sent to address [addr=" + addr +
", req=" + joinReq + ']');

break;
}
}
catch (IgniteSpiException e) {
if (errs == null)
errs = new IgniteCheckedException("Multiple connection attempts failed.");
// Join request sending succeeded, wait for response from topology.
return true;

errs.addSuppressed(e);
default:
// Concurrent startup, try next node.
if (res == RES_CONTINUE_JOIN) {
if (!fromAddrs.contains(addr))
retry = true;
}
else {
if (log.isDebugEnabled())
log.debug("Unexpected response to join request: " + res);

if (log.isDebugEnabled()) {
IOException ioe = X.cause(e, IOException.class);
retry = true;
}

log.debug("Failed to send join request message [addr=" + addr +
", msg=" + ioe != null ? ioe.getMessage() : e.getMessage() + ']');
break;
}
}
catch (IgniteSpiException e) {
ex = e;
}
}

noResAddrs.add(addr);
if (ex != null) {
errs.add(ex);

if (log.isDebugEnabled()) {
IOException ioe = X.cause(ex, IOException.class);

log.debug("Failed to send join request message [addr=" + addr +
", msg=" + ioe != null ? ioe.getMessage() : ex.getMessage() + ']');
}

noResAddrs.add(addr);
}
}
}
finally {
multiConnector.close();
}

if (retry) {
if (log.isDebugEnabled())
Expand All @@ -1467,7 +1485,16 @@ private boolean sendJoinRequestMessage() throws IgniteSpiException {
}
}
else if (!ipFinder.isShared() && !ipFinderHasLocAddr) {
if (errs != null && X.hasCause(errs, ConnectException.class))
IgniteCheckedException e = null;

if (!errs.isEmpty()) {
e = new IgniteCheckedException("Multiple connection attempts failed.");

for (Exception err : errs)
e.addSuppressed(err);
}

if (e != null && X.hasCause(e, ConnectException.class))
LT.warn(log, null, "Failed to connect to any address from IP finder " +
"(make sure IP finder addresses are correct and firewalls are disabled on all host machines): " +
addrs);
Expand All @@ -1480,14 +1507,14 @@ else if (U.currentTimeMillis() - noResStart > joinTimeout)
"Failed to connect to any address from IP finder within join timeout " +
"(make sure IP finder addresses are correct, and operating system firewalls are disabled " +
"on all host machines, or consider increasing 'joinTimeout' configuration property): " +
addrs, errs);
addrs, e);
}

try {
U.sleep(2000);
}
catch (IgniteInterruptedCheckedException e) {
throw new IgniteSpiException("Thread has been interrupted.", e);
catch (IgniteInterruptedCheckedException ex) {
throw new IgniteSpiException("Thread has been interrupted.", ex);
}
}
else
Expand All @@ -1503,17 +1530,15 @@ else if (U.currentTimeMillis() - noResStart > joinTimeout)
* @param msg Message to send.
* @param addr Address to send message to.
* @return Response read from the recipient or {@code null} if no response is supposed.
* @throws org.apache.ignite.spi.IgniteSpiException If an error occurs.
* @throws IgniteSpiException If an error occurs.
*/
@Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr)
@Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr, Socket sock)
throws IgniteSpiException {
assert msg != null;
assert addr != null;

Collection<Throwable> errs = null;

Socket sock = null;

long ackTimeout0 = ackTimeout;

int connectAttempts = 1;
Expand All @@ -1532,7 +1557,8 @@ else if (U.currentTimeMillis() - noResStart > joinTimeout)
try {
long tstamp = U.currentTimeMillis();

sock = openSocket(addr);
if (sock == null)
sock = openSocket(addr);

openSock = true;

Expand Down Expand Up @@ -1612,6 +1638,8 @@ else if (U.currentTimeMillis() - noResStart > joinTimeout)
}
finally {
U.closeQuiet(sock);

sock = null;
}
}

Expand All @@ -1634,7 +1662,7 @@ else if (U.currentTimeMillis() - noResStart > joinTimeout)
* Marshalls credentials with discovery SPI marshaller (will replace attribute value).
*
* @param node Node to marshall credentials for.
* @throws org.apache.ignite.spi.IgniteSpiException If marshalling failed.
* @throws IgniteSpiException If marshalling failed.
*/
private void marshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException {
try {
Expand All @@ -1656,7 +1684,7 @@ private void marshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException
*
* @param node Node to unmarshall credentials for.
* @return Security credentials.
* @throws org.apache.ignite.spi.IgniteSpiException If unmarshal fails.
* @throws IgniteSpiException If unmarshal fails.
*/
private GridSecurityCredentials unmarshalCredentials(TcpDiscoveryNode node) throws IgniteSpiException {
try {
Expand Down Expand Up @@ -3337,7 +3365,7 @@ else if (ring.hasRemoteNodes())
*
* @param node Node to send message to.
* @param msg Message.
* @throws org.apache.ignite.spi.IgniteSpiException Last failure if all attempts failed.
* @throws IgniteSpiException Last failure if all attempts failed.
*/
private void trySendMessageDirectly(TcpDiscoveryNode node, TcpDiscoveryAbstractMessage msg)
throws IgniteSpiException {
Expand All @@ -3358,7 +3386,7 @@ private void trySendMessageDirectly(TcpDiscoveryNode node, TcpDiscoveryAbstractM

for (InetSocketAddress addr : getNodeAddresses(node, U.sameMacs(locNode, node))) {
try {
sendMessageDirectly(msg, addr);
sendMessageDirectly(msg, addr, null);

ex = null;

Expand Down Expand Up @@ -4385,7 +4413,7 @@ private class TcpServer extends IgniteSpiThread {
/**
* Constructor.
*
* @throws org.apache.ignite.spi.IgniteSpiException In case of error.
* @throws IgniteSpiException In case of error.
*/
TcpServer() throws IgniteSpiException {
super(ignite.name(), "tcp-disco-srvr", log);
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.io.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
Expand Down Expand Up @@ -1005,4 +1006,113 @@ protected final void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg)
TcpDiscoverySpiAdapter.this.writeToSocket(sock, msg, bout);
}
}

/**
*
*/
protected class SocketMultiConnector {
/** */
private int connInProgress;

/** */
private boolean closed;

/** */
private final ExecutorService executor;

/** */
private final Queue<GridTuple3<InetSocketAddress, Socket, Exception>> queue = new LinkedList<>();

/**
* @param addrs Addresses.
* @param retryCnt Retry count.
*/
public SocketMultiConnector(Collection<InetSocketAddress> addrs, final int retryCnt) {
connInProgress = addrs.size();

executor = new ThreadPoolExecutor(0, 10, 1L, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>());

for (final InetSocketAddress addr : addrs) {
executor.execute(new Runnable() {
@Override public void run() {
Exception ex = null;
Socket sock = null;

for (int i = 0; i < retryCnt; i++) {
synchronized (SocketMultiConnector.this) {
if (closed)
return;
}

try {
sock = openSocket(addr);

break;
}
catch (Exception e) {
ex = e;
}
}

synchronized (SocketMultiConnector.this) {
if (closed)
U.closeQuiet(sock);
else
queue.add(new GridTuple3<>(addr, sock, ex));

connInProgress--;

SocketMultiConnector.this.notifyAll();
}
}
});
}
}

/**
*
*/
@Nullable public synchronized GridTuple3<InetSocketAddress, Socket, Exception> next() {
try {
do {
if (closed)
return null;

GridTuple3<InetSocketAddress, Socket, Exception> res = queue.poll();

if (res != null)
return res;

if (connInProgress == 0)
return null;

wait();
}
while (true);
}
catch (InterruptedException e) {
throw new IgniteSpiException("Thread has been interrupted.", e);
}
}

/**
*
*/
public void close() {
synchronized (this) {
if (closed)
return;

closed = true;

notifyAll();
}

executor.shutdown();

for (GridTuple3<InetSocketAddress, Socket, Exception> tuple : queue)
U.closeQuiet(tuple.get2());
}
}
}

0 comments on commit a9ecd99

Please sign in to comment.