Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a finalize round to multicast pinging #7924

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.discovery.zen.ping;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.LifecycleComponent;
Expand Down Expand Up @@ -190,5 +189,9 @@ public synchronized PingResponse[] toArray() {
return pings.values().toArray(new PingResponse[pings.size()]);
}

/** the number of nodes for which there are known pings */
public synchronized int size() {
return pings.size();
}
}
}
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand Down Expand Up @@ -187,23 +188,58 @@ public void run() {
sendPingRequest(id);
// try and send another ping request halfway through (just in case someone woke up during it...)
// this can be a good trade-off to nailing the initial lookup or un-delivered messages
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new Runnable() {
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void run() {
try {
sendPingRequest(id);
} catch (Exception e) {
logger.warn("[{}] failed to send second ping request", e, id);
}
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send second ping request", t, id);
finalizePingCycle(id, listener);
}
});
threadPool.schedule(timeout, ThreadPool.Names.GENERIC, new Runnable() {

@Override
public void run() {
PingCollection responses = receivedResponses.remove(id);
listener.onPing(responses.toArray());
public void doRun() {
sendPingRequest(id);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send third ping request", t, id);
finalizePingCycle(id, listener);
}

@Override
public void doRun() {
// make one last ping, but finalize as soon as all nodes have responded or a timeout has past
PingCollection collection = receivedResponses.get(id);
FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);
receivedResponses.put(id, finalizingPingCollection);
logger.trace("[{}] sending last pings", id);
sendPingRequest(id);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to finalize ping", t, id);
}

@Override
protected void doRun() throws Exception {
finalizePingCycle(id, listener);
}
});
}
});
}
});

}

/**
* takes all pings collected for a given id and pass them to the given listener.
* this method is safe to call multiple times as is guaranteed to only finalize once.
*/
private void finalizePingCycle(int id, final PingListener listener) {
PingCollection responses = receivedResponses.remove(id);
if (responses != null) {
listener.onPing(responses.toArray());
}
}

private void sendPingRequest(int id) {
Expand Down Expand Up @@ -233,6 +269,59 @@ private void sendPingRequest(int id) {
}
}

class FinalizingPingCollection extends PingCollection {
final private PingCollection internalCollection;
final private int expectedResponses;
final private AtomicInteger responseCount;
final private PingListener listener;
final private int id;

public FinalizingPingCollection(int id, PingCollection internalCollection, int expectedResponses, PingListener listener) {
this.id = id;
this.internalCollection = internalCollection;
this.expectedResponses = expectedResponses;
this.responseCount = new AtomicInteger();
this.listener = listener;
}

@Override
public synchronized boolean addPing(PingResponse ping) {
if (internalCollection.addPing(ping)) {
if (responseCount.incrementAndGet() >= expectedResponses) {
logger.trace("[{}] all nodes responded", id);
finish();
}
return true;
}
return false;
}

@Override
public synchronized void addPings(PingResponse[] pings) {
internalCollection.addPings(pings);
}

@Override
public synchronized PingResponse[] toArray() {
return internalCollection.toArray();
}

void finish() {
// spawn another thread as we may be running on a network thread
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.error("failed to call ping listener", t);
}

@Override
protected void doRun() throws Exception {
finalizePingCycle(id, listener);
}
});
}
}

class MulticastPingResponseRequestHandler extends BaseTransportRequestHandler<MulticastPingResponse> {

@Override
Expand Down