Skip to content

Commit

Permalink
Improve logging in LeaderChecker (#78883)
Browse files Browse the repository at this point in the history
Today if the `LeaderChecker` decides it's time to restart discovery then
we log a verbose and confusing message that looks something like this:

    [instance-0000000006] master node [...] failed, restarting discovery
    org.elasticsearch.ElasticsearchException: node [...] failed [3] consecutive checks
        at org.elasticsearch.cluster.coordination.LeaderChecker$CheckScheduler$1.handleException(LeaderChecker.java:275) ~[elasticsearch-7.14.1.jar:7.14.1]
        ...
        at java.lang.Thread.run(Thread.java:831) [?:?]
    Caused by: org.elasticsearch.transport.RemoteTransportException: [...][internal:coordination/fault_detection/leader_check]
    Caused by: org.elasticsearch.cluster.coordination.CoordinationStateRejectedException: rejecting leader check since [...] has been removed from the cluster
        at org.elasticsearch.cluster.coordination.LeaderChecker.handleLeaderCheck(LeaderChecker.java:181) ~[elasticsearch-7.14.1.jar:7.14.1]
    ...
        at java.lang.Thread.run(Thread.java:831) ~[?:?]

There's quite a few problems with this:

- We use `DiscoveryNode#toString` which is far too chatty.
- There's basically nothing useful in these stack traces.
- It's easy to miss the `RemoteTransportException` in the middle.
- It's also easy to miss the root cause below it.
- We say the master node failed which sounds very bad but, well, you
  know, that's just like, uh, our opinion. The master node is often
  fine, it just rejected our checks for some reason.
- Reports of unstable clusters include these messages because they're
  noisy and look important, but don't include the more informative ones
  from the master because the master logs look quieter.

This commit reworks the logging in this area to avoid these problems:

- We use `DiscoveryNode#descriptionWithoutAttributes` throughout.
- We suppress the full stack traces unless `DEBUG` logging is on.
- The `LeaderChecker` now provides the message to be logged, rather than
  putting all the details into an exception that wraps around the root
  cause.
- The message describes the root cause rather than just saying that the
  "master node failed"
- We distinguish timeouts from rejections and report the count of each.
- The message guides towards checking the master node logs too.
  • Loading branch information
DaveCTurner committed Oct 15, 2021
1 parent 3eadef4 commit fdae6f1
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.MessageSupplier;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -251,11 +252,15 @@ private ClusterFormationState getClusterFormationState() {
.collect(Collectors.toList()), getCurrentTerm(), electionStrategy, nodeHealthService.getHealth());
}

private void onLeaderFailure(Exception e) {
private void onLeaderFailure(MessageSupplier message, Exception e) {
synchronized (mutex) {
if (mode != Mode.CANDIDATE) {
assert lastKnownLeader.isPresent();
logger.info(new ParameterizedMessage("master node [{}] failed, restarting discovery", lastKnownLeader.get()), e);
if (logger.isDebugEnabled()) {
logger.info(message, e);
} else {
logger.info(message);
}
}
becomeCandidate("onLeaderFailure");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,25 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.MessageSupplier;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.discovery.zen.MasterFaultDetection;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
Expand All @@ -42,9 +44,7 @@
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;

Expand Down Expand Up @@ -80,21 +80,25 @@ public class LeaderChecker {
private final TimeValue leaderCheckTimeout;
private final int leaderCheckRetryCount;
private final TransportService transportService;
private final Consumer<Exception> onLeaderFailure;
private final LeaderFailureListener leaderFailureListener;
private final NodeHealthService nodeHealthService;

private AtomicReference<CheckScheduler> currentChecker = new AtomicReference<>();
private final AtomicReference<CheckScheduler> currentChecker = new AtomicReference<>();

private volatile DiscoveryNodes discoveryNodes;

LeaderChecker(final Settings settings, final TransportService transportService, final Consumer<Exception> onLeaderFailure,
NodeHealthService nodeHealthService) {
this.settings = settings;
leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings);
LeaderChecker(
final Settings settings,
final TransportService transportService,
final LeaderFailureListener leaderFailureListener,
final NodeHealthService nodeHealthService
) {
this.settings = settings;
leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings);
leaderCheckTimeout = LEADER_CHECK_TIMEOUT_SETTING.get(settings);
leaderCheckRetryCount = LEADER_CHECK_RETRY_COUNT_SETTING.get(settings);
this.transportService = transportService;
this.onLeaderFailure = onLeaderFailure;
this.leaderFailureListener = leaderFailureListener;
this.nodeHealthService = nodeHealthService;

transportService.registerRequestHandler(LEADER_CHECK_ACTION_NAME, Names.SAME, false, false, LeaderCheckRequest::new,
Expand Down Expand Up @@ -167,18 +171,17 @@ private void handleLeaderCheck(LeaderCheckRequest request) {
assert discoveryNodes != null;
final StatusInfo statusInfo = nodeHealthService.getHealth();
if (statusInfo.getStatus() == UNHEALTHY) {
final String message = "rejecting leader check from [" + request.getSender() + "] " +
"since node is unhealthy [" + statusInfo.getInfo() + "]";
logger.debug(message);
throw new NodeHealthCheckFailureException(message);
logger.debug("this node is unhealthy [{}], rejecting leader check: {}", statusInfo.getInfo(), request);
throw new NodeHealthCheckFailureException(statusInfo.getInfo());
} else if (discoveryNodes.isLocalNodeElectedMaster() == false) {
logger.debug("rejecting leader check on non-master {}", request);
throw new CoordinationStateRejectedException(
"rejecting leader check from [" + request.getSender() + "] sent to a node that is no longer the master");
logger.debug("rejecting leader check on non-master: {}", request);
throw new CoordinationStateRejectedException("no longer the elected master");
} else if (discoveryNodes.nodeExists(request.getSender()) == false) {
logger.debug("rejecting leader check from removed node: {}", request);
throw new CoordinationStateRejectedException(
"rejecting leader check since [" + request.getSender() + "] has been removed from the cluster");
"rejecting check since [" +
request.getSender().descriptionWithoutAttributes() +
"] has been removed from the cluster");
} else {
logger.trace("handling {}", request);
}
Expand All @@ -193,11 +196,14 @@ private void handleDisconnectedNode(DiscoveryNode discoveryNode) {
}
}

private static final String RESTARTING_DISCOVERY_TEXT = "restarting discovery; more details may be available in the master node logs";

private class CheckScheduler implements Releasable {

private final AtomicBoolean isClosed = new AtomicBoolean();
private final AtomicLong failureCountSinceLastSuccess = new AtomicLong();
private final DiscoveryNode leader;
private final AtomicBoolean isClosed = new AtomicBoolean();
private int rejectedCountSinceLastSuccess;
private int timeoutCountSinceLastSuccess;

CheckScheduler(final DiscoveryNode leader) {
this.leader = leader;
Expand Down Expand Up @@ -244,7 +250,8 @@ public void handleResponse(TransportResponse.Empty response) {
return;
}

failureCountSinceLastSuccess.set(0);
rejectedCountSinceLastSuccess = 0;
timeoutCountSinceLastSuccess = 0;
scheduleNextWakeUp(); // logs trace message indicating success
}

Expand All @@ -256,23 +263,55 @@ public void handleException(TransportException exp) {
}

if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
logger.debug(new ParameterizedMessage(
"leader [{}] disconnected during check", leader), exp);
leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp));
logger.debug(new ParameterizedMessage("leader [{}] disconnected during check", leader), exp);
leaderFailed(
() -> new ParameterizedMessage(
"master node [{}] disconnected, restarting discovery [{}]",
leader.descriptionWithoutAttributes(),
ExceptionsHelper.unwrapCause(exp).getMessage()),
exp);
return;
} else if (exp.getCause() instanceof NodeHealthCheckFailureException) {
logger.debug(new ParameterizedMessage(
"leader [{}] health check failed", leader), exp);
leaderFailed(new NodeHealthCheckFailureException("node [" + leader + "] failed health checks", exp));
logger.debug(new ParameterizedMessage("leader [{}] health check failed", leader), exp);
leaderFailed(
() -> new ParameterizedMessage(
"master node [{}] reported itself as unhealthy [{}], {}",
leader.descriptionWithoutAttributes(),
exp.getCause().getMessage(),
RESTARTING_DISCOVERY_TEXT),
exp);
return;
}
long failureCount = failureCountSinceLastSuccess.incrementAndGet();

if (exp instanceof ReceiveTimeoutTransportException) {
timeoutCountSinceLastSuccess += 1;
} else {
rejectedCountSinceLastSuccess += 1;
}

long failureCount = rejectedCountSinceLastSuccess + timeoutCountSinceLastSuccess;
if (failureCount >= leaderCheckRetryCount) {
logger.debug(new ParameterizedMessage(
"leader [{}] has failed {} consecutive checks (limit [{}] is {}); last failure was:",
leader, failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount), exp);
leaderFailed(new ElasticsearchException(
"node [" + leader + "] failed [" + failureCount + "] consecutive checks", exp));
logger.debug(
new ParameterizedMessage(
"leader [{}] failed {} consecutive checks (rejected [{}], timed out [{}], limit [{}] is {})",
leader,
failureCount,
rejectedCountSinceLastSuccess,
timeoutCountSinceLastSuccess,
LEADER_CHECK_RETRY_COUNT_SETTING.getKey(),
leaderCheckRetryCount),
exp);
leaderFailed(
() -> new ParameterizedMessage(
"[{}] consecutive checks of the master node [{}] were unsuccessful ([{}] rejected, [{}] timed out), " +
"{} [last unsuccessful check: {}]",
failureCount,
leader.descriptionWithoutAttributes(),
rejectedCountSinceLastSuccess,
timeoutCountSinceLastSuccess,
RESTARTING_DISCOVERY_TEXT,
ExceptionsHelper.unwrapCause(exp).getMessage()),
exp);
return;
}

Expand All @@ -283,12 +322,12 @@ public void handleException(TransportException exp) {
});
}

void leaderFailed(Exception e) {
void leaderFailed(MessageSupplier messageSupplier, Exception e) {
if (isClosed.compareAndSet(false, true)) {
transportService.getThreadPool().generic().execute(new Runnable() {
@Override
public void run() {
onLeaderFailure.accept(e);
leaderFailureListener.onLeaderFailure(messageSupplier, e);
}

@Override
Expand All @@ -304,7 +343,11 @@ public String toString() {
void handleDisconnectedNode(DiscoveryNode discoveryNode) {
if (discoveryNode.equals(leader)) {
logger.debug("leader [{}] disconnected", leader);
leaderFailed(new NodeDisconnectedException(discoveryNode, "disconnected"));
leaderFailed(
() -> new ParameterizedMessage(
"master node [{}] disconnected, restarting discovery",
leader.descriptionWithoutAttributes()),
new NodeDisconnectedException(discoveryNode, "disconnected"));
}
}

Expand Down Expand Up @@ -367,5 +410,18 @@ public String toString() {
'}';
}
}

@FunctionalInterface
interface LeaderFailureListener {
/**
* Called when a leader failure is detected. Checking the leader health is somewhat asynchronous, so this method may report a leader
* failure after the node has already decided there's no known leader for some other reason. This method is called on the {@code
* GENERIC} thread pool.
*
* @param messageSupplier The message to log if prior to this failure there was a known master in the cluster.
* @param exception An exception that gives more detail of the leader failure.
*/
void onLeaderFailure(MessageSupplier messageSupplier, Exception exception);
}
}

0 comments on commit fdae6f1

Please sign in to comment.