Skip to content

Commit

Permalink
Discovery: add a finalize round to multicast pinging
Browse files Browse the repository at this point in the history
When sending a multicast ping, there is no way to determine how long it will take before all nodes will respond. Currently we send two pings (one at start, one after half timeout) and wait until the ping timeout has passed for all responses to come back. However, if all nodes are fast to respond, there is a gap relatively large between the moment that pings were gathered and the election that is based on them. This commits adds a last ping round (at timeout) where we know the number of nodes we expect to receive answers from. Once all nodes responded, we complete the pinging.

Closes elastic#7924
  • Loading branch information
bleskes committed Oct 2, 2014
1 parent 3ee7661 commit d5dfd90
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 10 deletions.
Expand Up @@ -213,5 +213,9 @@ public synchronized PingResponse[] toArray() {
return pings.values().toArray(new PingResponse[pings.size()]);
}

/** the number of nodes for which there are known pings */
public synchronized int size() {
return pings.size();
}
}
}
Expand Up @@ -188,22 +188,51 @@ public void run() {
// try and send another ping request halfway through (just in case someone woke up during it...)
// this can be a good trade-off to nailing the initial lookup or un-delivered messages
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
try {
sendPingRequest(id);
} catch (Exception e) {
logger.warn("[{}] failed to send second ping request", e, id);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new Runnable() {
public void run() {
try {
// make one last ping, but finalize as soon as all nodes have responded or a timeout has past
PingCollection collection = receivedResponses.get(id);
FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);
receivedResponses.put(id, finalizingPingCollection);
logger.trace("[{}] sending last pings", id);
sendPingRequest(id);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new Runnable() {
public void run() {
try {
finalizePingCycle(id, listener);
} catch (Throwable t) {
logger.warn("[{}] failed to finalize ping", t, id);
}
}
});
} catch (Throwable t) {
logger.warn("[{}] failed to send third ping request", t, id);
finalizePingCycle(id, listener);
}
}
});
} catch (Throwable t) {
logger.warn("[{}] failed to send second ping request", t, id);
finalizePingCycle(id, listener);
}
}
});
threadPool.schedule(timeout, ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
PingCollection responses = receivedResponses.remove(id);
listener.onPing(responses.toArray());
}
});

}

/**
* takes all pings collected for a given id and pass them to the given listener.
* this method is safe to call multiple times as is guaranteed to only finalize once.
*/
private void finalizePingCycle(int id, final PingListener listener) {
PingCollection responses = receivedResponses.remove(id);
if (responses != null) {
listener.onPing(responses.toArray());
}
}

private void sendPingRequest(int id) {
Expand Down Expand Up @@ -233,6 +262,58 @@ private void sendPingRequest(int id) {
}
}

class FinalizingPingCollection extends PingCollection {
final private PingCollection internalCollection;
final private int expectedResponses;
final private AtomicInteger responseCount;
final private PingListener listener;
final private int id;

public FinalizingPingCollection(int id, PingCollection internalCollection, int expectedResponses, PingListener listener) {
this.id = id;
this.internalCollection = internalCollection;
this.expectedResponses = expectedResponses;
this.responseCount = new AtomicInteger();
this.listener = listener;
}

@Override
public synchronized boolean addPing(PingResponse ping) {
if (internalCollection.addPing(ping)) {
if (responseCount.incrementAndGet() >= expectedResponses) {
logger.trace("[{}] all nodes responded", id);
finish();
}
return true;
}
return false;
}

@Override
public synchronized void addPings(PingResponse[] pings) {
internalCollection.addPings(pings);
}

@Override
public synchronized PingResponse[] toArray() {
return internalCollection.toArray();
}

void finish() {
// spawn another thread as we may be running on a network thread
threadPool.generic().execute(new Runnable() {

public void run() {
try {
finalizePingCycle(id, listener);
} catch (Throwable t) {
logger.warn("failed to call ping listener", t);
}
}
});
}
}

class MulticastPingResponseRequestHandler extends BaseTransportRequestHandler<MulticastPingResponse> {

@Override
Expand Down

0 comments on commit d5dfd90

Please sign in to comment.