Skip to content

Commit

Permalink
Discovery: add a finalize round to multicast pinging
Browse files Browse the repository at this point in the history
When sending a multicast ping, there is no way to determine how long it will take before all nodes will respond. Currently we send two pings (one at start, one after half timeout) and wait until the ping timeout has passed for all responses to come back. However, if all nodes are fast to respond, there is a gap relatively large between the moment that pings were gathered and the election that is based on them. This commits adds a last ping round (at timeout) where we know the number of nodes we expect to receive answers from. Once all nodes responded, we complete the pinging.

Closes #7924
  • Loading branch information
bleskes committed Oct 2, 2014
1 parent f13169c commit 3153773
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 12 deletions.
Expand Up @@ -213,5 +213,9 @@ public synchronized PingResponse[] toArray() {
return pings.values().toArray(new PingResponse[pings.size()]);
}

/** the number of nodes for which there are known pings */
public synchronized int size() {
return pings.size();
}
}
}
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand Down Expand Up @@ -187,23 +188,58 @@ public void run() {
sendPingRequest(id);
// try and send another ping request halfway through (just in case someone woke up during it...)
// this can be a good trade-off to nailing the initial lookup or un-delivered messages
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new Runnable() {
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void run() {
try {
sendPingRequest(id);
} catch (Exception e) {
logger.warn("[{}] failed to send second ping request", e, id);
}
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send second ping request", t, id);
finalizePingCycle(id, listener);
}
});
threadPool.schedule(timeout, ThreadPool.Names.GENERIC, new Runnable() {

@Override
public void run() {
PingCollection responses = receivedResponses.remove(id);
listener.onPing(responses.toArray());
public void doRun() {
sendPingRequest(id);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send third ping request", t, id);
finalizePingCycle(id, listener);
}

@Override
public void doRun() {
// make one last ping, but finalize as soon as all nodes have responded or a timeout has past
PingCollection collection = receivedResponses.get(id);
FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);
receivedResponses.put(id, finalizingPingCollection);
logger.trace("[{}] sending last pings", id);
sendPingRequest(id);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to finalize ping", t, id);
}

@Override
protected void doRun() throws Exception {
finalizePingCycle(id, listener);
}
});
}
});
}
});

}

/**
* takes all pings collected for a given id and pass them to the given listener.
* this method is safe to call multiple times as is guaranteed to only finalize once.
*/
private void finalizePingCycle(int id, final PingListener listener) {
PingCollection responses = receivedResponses.remove(id);
if (responses != null) {
listener.onPing(responses.toArray());
}
}

private void sendPingRequest(int id) {
Expand Down Expand Up @@ -233,6 +269,59 @@ private void sendPingRequest(int id) {
}
}

class FinalizingPingCollection extends PingCollection {
final private PingCollection internalCollection;
final private int expectedResponses;
final private AtomicInteger responseCount;
final private PingListener listener;
final private int id;

public FinalizingPingCollection(int id, PingCollection internalCollection, int expectedResponses, PingListener listener) {
this.id = id;
this.internalCollection = internalCollection;
this.expectedResponses = expectedResponses;
this.responseCount = new AtomicInteger();
this.listener = listener;
}

@Override
public synchronized boolean addPing(PingResponse ping) {
if (internalCollection.addPing(ping)) {
if (responseCount.incrementAndGet() >= expectedResponses) {
logger.trace("[{}] all nodes responded", id);
finish();
}
return true;
}
return false;
}

@Override
public synchronized void addPings(PingResponse[] pings) {
internalCollection.addPings(pings);
}

@Override
public synchronized PingResponse[] toArray() {
return internalCollection.toArray();
}

void finish() {
// spawn another thread as we may be running on a network thread
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.error("failed to call ping listener", t);
}

@Override
protected void doRun() throws Exception {
finalizePingCycle(id, listener);
}
});
}
}

class MulticastPingResponseRequestHandler extends BaseTransportRequestHandler<MulticastPingResponse> {

@Override
Expand Down

0 comments on commit 3153773

Please sign in to comment.