New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the lifecycle management of the join control thread in zen discovery. #8327

Merged
merged 1 commit into from Nov 4, 2014
Jump to file or symbol
Failed to load files and symbols.
+125 −88
Diff settings

Always

Just for now

@@ -19,6 +19,7 @@
package org.elasticsearch;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.rest.RestStatus;
@@ -184,4 +185,18 @@ public static boolean isOOM(Throwable t) {
)
);
}
/**
* Throws the specified exception. If null if specified then <code>true</code> is returned.
*/
public static boolean reThrowIfNotNull(@Nullable Throwable e) {
if (e != null) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else {
throw new RuntimeException(e);
}
}
return true;
}
}
@@ -231,8 +231,8 @@ public void setAllocationService(AllocationService allocationService) {
@Override
protected void doStart() throws ElasticsearchException {
nodesFD.setLocalNode(clusterService.localNode());
joinThreadControl.start();
pingService.start();
// start the join thread from a cluster state update. See {@link JoinThreadControl} for details.
@@ -249,7 +249,6 @@ public void onFailure(String source, @org.elasticsearch.common.Nullable Throwabl
logger.warn("failed to start initial join process", t);
}
});
}
@Override
@@ -344,8 +343,8 @@ public void publish(ClusterState clusterState, AckListener ackListener) {
}
/**
* returns true if there is a currently a background thread active for (re)joining the cluster
* used for testing.
* returns true if zen discovery is started and there is a currently a background thread active for (re)joining
* the cluster used for testing.
*/
public boolean joiningCluster() {
return joinThreadControl.joinThreadActive();
@@ -1278,21 +1277,22 @@ public void onRefreshSettings(Settings settings) {
private class JoinThreadControl {
private final ThreadPool threadPool;
private final AtomicBoolean running = new AtomicBoolean(false);
private final AtomicReference<Thread> currentJoinThread = new AtomicReference<>();
public JoinThreadControl(ThreadPool threadPool) {
this.threadPool = threadPool;
}
/** returns true if there is currently an active join thread */
/** returns true if join thread control is started and there is currently an active join thread */
public boolean joinThreadActive() {
Thread currentThread = currentJoinThread.get();
return currentThread != null && currentThread.isAlive();
return running.get() && currentThread != null && currentThread.isAlive();
}
/** returns true if the supplied thread is the currently active joinThread */
/** returns true if join thread control is started and the supplied thread is the currently active joinThread */
public boolean joinThreadActive(Thread joinThread) {
return joinThread.equals(currentJoinThread.get());
return running.get() && joinThread.equals(currentJoinThread.get());
}
/** cleans any running joining thread and calls {@link #rejoin} */
@@ -1302,7 +1302,7 @@ public ClusterState stopRunningThreadAndRejoin(ClusterState clusterState, String
return rejoin(clusterState, reason);
}
/** starts a new joining thread if there is no currently active one */
/** starts a new joining thread if there is no currently active one and join thread controlling is started */

This comment has been minimized.

@bleskes

bleskes Nov 4, 2014

Member

join thread controlling is started <-- love it :)

@bleskes

bleskes Nov 4, 2014

Member

join thread controlling is started <-- love it :)

public void startNewThreadIfNotRunning() {
assertClusterStateThread();
if (joinThreadActive()) {
@@ -1315,15 +1315,18 @@ public void run() {
if (!currentJoinThread.compareAndSet(null, currentThread)) {
return;
}
while (joinThreadActive(currentThread)) {
while (running.get() && joinThreadActive(currentThread)) {
try {
innerJoinCluster();
return;
} catch (Throwable t) {
logger.error("unexpected error while joining cluster, trying again", t);
} catch (Exception e) {
logger.error("unexpected error while joining cluster, trying again", e);
// Because we catch any exception here, we want to know in
// tests if an uncaught exception got to this point and the test infra uncaught exception
// leak detection can catch this. In practise no uncaught exception should leak
assert ExceptionsHelper.reThrowIfNotNull(e);
}
}
// cleaning the current thread from currentJoinThread is done by explicit calls.
}
});
@@ -1348,16 +1351,26 @@ public boolean markThreadAsDone(Thread joinThread) {
}
public void stop() {
running.set(false);
Thread joinThread = currentJoinThread.getAndSet(null);
if (joinThread != null) {
try {
joinThread.interrupt();
} catch (Exception e) {
// ignore
}
try {
joinThread.join(10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public void start() {
running.set(true);
}
private void assertClusterStateThread() {
assert Thread.currentThread().getName().contains(InternalClusterService.UPDATE_THREAD_NAME) : "not called from the cluster state update thread";
}
@@ -185,51 +185,55 @@ public void run() {
return;
}
final int id = pingIdGenerator.incrementAndGet();
receivedResponses.put(id, new PingCollection());
sendPingRequest(id);
// try and send another ping request halfway through (just in case someone woke up during it...)
// this can be a good trade-off to nailing the initial lookup or un-delivered messages
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send second ping request", t, id);
finalizePingCycle(id, listener);
}
@Override
public void doRun() {
sendPingRequest(id);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send third ping request", t, id);
finalizePingCycle(id, listener);
}
try {
receivedResponses.put(id, new PingCollection());
sendPingRequest(id);
// try and send another ping request halfway through (just in case someone woke up during it...)
// this can be a good trade-off to nailing the initial lookup or un-delivered messages
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send second ping request", t, id);
finalizePingCycle(id, listener);
}
@Override
public void doRun() {
// make one last ping, but finalize as soon as all nodes have responded or a timeout has past
PingCollection collection = receivedResponses.get(id);
FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);
receivedResponses.put(id, finalizingPingCollection);
logger.trace("[{}] sending last pings", id);
sendPingRequest(id);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to finalize ping", t, id);
}
@Override
public void doRun() {
sendPingRequest(id);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send third ping request", t, id);
finalizePingCycle(id, listener);
}
@Override
protected void doRun() throws Exception {
finalizePingCycle(id, listener);
}
});
}
});
}
});
@Override
public void doRun() {
// make one last ping, but finalize as soon as all nodes have responded or a timeout has past
PingCollection collection = receivedResponses.get(id);
FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);
receivedResponses.put(id, finalizingPingCollection);
logger.trace("[{}] sending last pings", id);
sendPingRequest(id);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to finalize ping", t, id);
}
@Override
protected void doRun() throws Exception {
finalizePingCycle(id, listener);
}
});
}
});
}
});
} catch (Exception e) {
logger.warn("failed to ping", e);
finalizePingCycle(id, listener);
}
}
/**
@@ -201,44 +201,49 @@ public void onPing(PingResponse[] pings) {
@Override
public void ping(final PingListener listener, final TimeValue timeout) throws ElasticsearchException {
final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingHandlerIdGenerator.incrementAndGet());
receivedResponses.put(sendPingsHandler.id(), sendPingsHandler);
try {
sendPings(timeout, null, sendPingsHandler);
} catch (RejectedExecutionException e) {
logger.debug("Ping execution rejected", e);
// The RejectedExecutionException can come from the fact unicastConnectExecutor is at its max down in sendPings
// But don't bail here, we can retry later on after the send ping has been scheduled.
}
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
protected void doRun() {
receivedResponses.put(sendPingsHandler.id(), sendPingsHandler);
try {
sendPings(timeout, null, sendPingsHandler);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
sendPings(timeout, TimeValue.timeValueMillis(timeout.millis() / 2), sendPingsHandler);
sendPingsHandler.close();
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) {
logger.trace("[{}] disconnecting from {}", sendPingsHandler.id(), node);
transportService.disconnectFromNode(node);
} catch (RejectedExecutionException e) {
logger.debug("Ping execution rejected", e);
// The RejectedExecutionException can come from the fact unicastConnectExecutor is at its max down in sendPings
// But don't bail here, we can retry later on after the send ping has been scheduled.
}
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
protected void doRun() {
sendPings(timeout, null, sendPingsHandler);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
sendPings(timeout, TimeValue.timeValueMillis(timeout.millis() / 2), sendPingsHandler);
sendPingsHandler.close();
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) {
logger.trace("[{}] disconnecting from {}", sendPingsHandler.id(), node);
transportService.disconnectFromNode(node);
}
listener.onPing(sendPingsHandler.pingCollection().toArray());
}
listener.onPing(sendPingsHandler.pingCollection().toArray());
}
@Override
public void onFailure(Throwable t) {
logger.debug("Ping execution failed", t);
sendPingsHandler.close();
}
});
}
@Override
public void onFailure(Throwable t) {
logger.debug("Ping execution failed", t);
sendPingsHandler.close();
}
});
}
@Override
public void onFailure(Throwable t) {
logger.debug("Ping execution failed", t);
sendPingsHandler.close();
}
});
@Override
public void onFailure(Throwable t) {
logger.debug("Ping execution failed", t);
sendPingsHandler.close();
}
});
} catch (Exception e) {
sendPingsHandler.close();

This comment has been minimized.

@bleskes

bleskes Nov 3, 2014

Member

we want to rethrow here, right?

@bleskes

bleskes Nov 3, 2014

Member

we want to rethrow here, right?

throw new ElasticsearchException("Ping execution failed", e);
}
}
class SendPingsHandler implements Closeable {
ProTip! Use n and p to navigate between commits in a pull request.