Skip to content

Commit

Permalink
Remove CSTL#onNoLongerMaster (#83420)
Browse files Browse the repository at this point in the history
Implementations of `ClusterStateTaskListener#onNoLongerMaster` almost
universally do the same thing as `CSTL#onFailure` except for differences
in logging. In most cases for logging purposes we should treat not being
the master the same as failing to commit the cluster state, but we don't
do so and this can generate a lot of log noise on a master failure. In
other places we rely on the default implementation of `onNoLongerMaster`
and react to the type of the exception in the `onFailure` handler as
needed.

To simplify the situation, this commit combines `onNoLongerMaster` and
`onFailure` everywhere and harmonises the logging behaviour for
`FailedToCommitClusterStateException` and `NotMasterException`.
  • Loading branch information
DaveCTurner committed May 9, 2022
1 parent 23831e2 commit 4af21df
Show file tree
Hide file tree
Showing 16 changed files with 95 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.action.admin.cluster.health;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand Down Expand Up @@ -158,19 +159,15 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
);
}

@Override
public void onNoLongerMaster() {
logger.trace("stopped being master while waiting for events with priority [{}]. retrying.", request.waitForEvents());
// TransportMasterNodeAction implements the retry logic, which is triggered by passing a NotMasterException
listener.onFailure(new NotMasterException("no longer master. source: [" + source + "]"));
}

@Override
public void onFailure(Exception e) {
if (e instanceof ProcessClusterEventTimeoutException) {
listener.onResponse(getResponse(request, clusterService.state(), waitCount, TimeoutState.TIMED_OUT));
} else {
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
final Level level = e instanceof NotMasterException ? Level.TRACE : Level.ERROR;
assert e instanceof NotMasterException : e; // task cannot fail, nor will it trigger a publication which fails
logger.log(level, () -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
// TransportMasterNodeAction implements the retry logic, which is triggered by passing a NotMasterException
listener.onFailure(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -207,7 +206,7 @@ public void onResponse(ClusterState clusterState) {
@Override
public void onFailure(Exception e) {
logger.debug(new ParameterizedMessage("failed to perform [{}]", REROUTE_TASK_SOURCE), e);
if (e instanceof NotMasterException || e instanceof FailedToCommitClusterStateException) {
if (MasterService.isPublishFailureException(e)) {
listener.onResponse(
new ClusterUpdateSettingsResponse(
updateSettingsAcked,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.MasterNodeChangePredicate;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
Expand Down Expand Up @@ -196,7 +195,7 @@ protected void doStart(ClusterState clusterState) {
}
} else {
ActionListener<Response> delegate = listener.delegateResponse((delegatedListener, t) -> {
if (t instanceof FailedToCommitClusterStateException || t instanceof NotMasterException) {
if (MasterService.isPublishFailureException(t)) {
logger.debug(
() -> new ParameterizedMessage(
"master could not publish cluster state or "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,30 @@
*/
package org.elasticsearch.cluster;

import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.elasticsearch.cluster.service.MasterService;

import java.util.List;

public interface ClusterStateTaskListener {

/**
* A callback for when task execution fails.
*
* A callback for when task execution fails. May receive a {@link NotMasterException} if this node stopped being the master before this
* task was executed or a {@link ProcessClusterEventTimeoutException} if the task timed out before it was executed. If the task fails
* during execution then this method receives the corresponding exception. If the task executes successfully but the resulting cluster
* state publication fails then this method receives a {@link FailedToCommitClusterStateException}. If publication fails then a new
* master is elected and the update might or might not take effect, depending on whether or not the newly-elected master accepted the
* published state that failed to be committed.
* <p>
* Use {@link MasterService#isPublishFailureException} to detect the "expected" master failure cases if needed.
* <p>
* Implementations of this callback must not throw exceptions: an exception thrown here is logged by the master service at {@code ERROR}
* level and otherwise ignored, except in tests where it raises an {@link AssertionError}. If log-and-ignore is the right behaviour then
* implementations must do so themselves, typically using a more specific logger and at a less dramatic log level.
*/
void onFailure(Exception e);

/**
* A callback for when the task was rejected because the processing node is no longer the elected master.
*
* Implementations of this callback must not throw exceptions: an exception thrown here is logged by the master service at {@code ERROR}
* level and otherwise ignored, except in tests where it raises an {@link AssertionError}. If log-and-ignore is the right behaviour then
* implementations must do so themselves, typically using a more specific logger and at a less dramatic log level.
*/
default void onNoLongerMaster() {
onFailure(new NotMasterException("no longer master"));
}

/**
* Called when the result of the {@link ClusterStateTaskExecutor#execute(ClusterState, List)} method have been processed properly by all
* listeners.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

package org.elasticsearch.cluster;

import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -47,7 +50,14 @@ public ClusterStateUpdateTask(Priority priority, TimeValue timeout) {
public abstract ClusterState execute(ClusterState currentState) throws Exception;

/**
* A callback for when task execution fails.
* A callback for when task execution fails. May receive a {@link NotMasterException} if this node stopped being the master before this
* task was executed or a {@link ProcessClusterEventTimeoutException} if the task timed out before it was executed. If the task fails
* during execution then this method receives the corresponding exception. If the task executes successfully but the resulting cluster
* state publication fails then this method receives a {@link FailedToCommitClusterStateException}. If publication fails then a new
* master is elected and the update might or might not take effect, depending on whether or not the newly-elected master accepted the
* published state that failed to be committed.
* <p>
* Use {@link MasterService#isPublishFailureException} to detect the "expected" master failure cases if needed.
* <p>
* Implementations of this callback should not throw exceptions: an exception thrown here is logged by the master service at {@code
* ERROR} level and otherwise ignored. If log-and-ignore is the right behaviour then implementations should do so themselves, typically
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
import java.io.IOException;

/**
* Thrown when a node join request or a master ping reaches a node which is not
* currently acting as a master or when a cluster state update task is to be executed
* on a node that is no longer master.
* Exception which indicates that an operation failed because the node stopped being the elected master.
*/
public class NotMasterException extends ElasticsearchException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.cluster.action.shard;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand All @@ -34,6 +35,7 @@
import org.elasticsearch.cluster.routing.allocation.FailedShard;
import org.elasticsearch.cluster.routing.allocation.StaleShard;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -557,13 +559,11 @@ public void onFailure(Exception e) {

@Override
public void onFailure(Exception e) {
if (e instanceof NotMasterException) {
logger.debug(() -> new ParameterizedMessage("{} no longer master while failing shard [{}]", entry.shardId, entry));
} else if (e instanceof FailedToCommitClusterStateException) {
logger.debug(() -> new ParameterizedMessage("{} unexpected failure while failing shard [{}]", entry.shardId, entry), e);
} else {
logger.error(() -> new ParameterizedMessage("{} unexpected failure while failing shard [{}]", entry.shardId, entry), e);
}
logger.log(
MasterService.isPublishFailureException(e) ? Level.DEBUG : Level.ERROR,
() -> new ParameterizedMessage("{} unexpected failure while failing shard [{}]", entry.shardId, entry),
e
);
listener.onFailure(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
import java.io.IOException;

/**
* Thrown when failing to publish a cluster state. See {@link ClusterStatePublisher} for more details.
* Thrown when a cluster state publication fails to commit the new cluster state. If publication fails then a new master is elected but the
* update might or might not take effect, depending on whether or not the newly-elected master accepted the published state that failed to
* be committed.
*
* See {@link ClusterStatePublisher} for more details.
*/
public class FailedToCommitClusterStateException extends ElasticsearchException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RerouteService;
Expand Down Expand Up @@ -188,8 +187,7 @@ void logNow() {
static Level getLogLevel(TransportException e) {
Throwable cause = e.unwrapCause();
if (cause instanceof CoordinationStateRejectedException
|| cause instanceof FailedToCommitClusterStateException
|| cause instanceof NotMasterException) {
|| (cause instanceof Exception causeException && MasterService.isPublishFailureException(causeException))) {
return Level.DEBUG;
}
return Level.INFO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
Expand All @@ -16,6 +17,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;

import java.util.List;
Expand All @@ -30,12 +32,7 @@ public record Task(DiscoveryNode node, String reason, Runnable onClusterStatePro

@Override
public void onFailure(final Exception e) {
logger.error("unexpected failure during [node-left]", e);
}

@Override
public void onNoLongerMaster() {
logger.debug("no longer master while processing node removal [node-left]");
logger.log(MasterService.isPublishFailureException(e) ? Level.DEBUG : Level.ERROR, "unexpected failure during [node-left]", e);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
Expand Down Expand Up @@ -119,17 +120,6 @@ public ClusterState execute(ClusterState currentState) {
}
}

@Override
public void onNoLongerMaster() {
synchronized (mutex) {
if (pendingRerouteListeners == currentListeners) {
pendingRerouteListeners = null;
}
}
ActionListener.onFailure(currentListeners, new NotMasterException("delayed reroute [" + reason + "] cancelled"));
// no big deal, the new master will reroute again
}

@Override
public void onFailure(Exception e) {
synchronized (mutex) {
Expand All @@ -138,7 +128,13 @@ public void onFailure(Exception e) {
}
}
final ClusterState state = clusterService.state();
if (logger.isTraceEnabled()) {
if (MasterService.isPublishFailureException(e)) {
logger.debug(
() -> new ParameterizedMessage("unexpected failure during [{}], current state:\n{}", source, state),
e
);
// no big deal, the new master will reroute again
} else if (logger.isTraceEnabled()) {
logger.error(
() -> new ParameterizedMessage("unexpected failure during [{}], current state:\n{}", source, state),
e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.coordination.ClusterStatePublisher;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
Expand Down Expand Up @@ -186,14 +187,6 @@ public void onFailure(Exception e) {
}
}

public void onNoLongerMaster() {
try (ThreadContext.StoredContext ignore = threadContextSupplier.get()) {
listener.onNoLongerMaster();
} catch (Exception e) {
logger.error("exception thrown by listener while notifying no longer master", e);
}
}

@Nullable
public ContextPreservingAckListener wrapInTaskContext(@Nullable ClusterStateAckListener clusterStateAckListener) {
return clusterStateAckListener == null
Expand Down Expand Up @@ -243,7 +236,7 @@ private void runTasks(

if (previousClusterState.nodes().isLocalNodeElectedMaster() == false && executor.runOnlyOnMaster()) {
logger.debug("failing [{}]: local node is no longer master", summary);
updateTasks.forEach(Batcher.UpdateTask::onNoLongerMaster);
updateTasks.forEach(t -> t.onFailure(new NotMasterException("no longer master, failing [" + t.source() + "]")));
return;
}

Expand Down Expand Up @@ -1098,4 +1091,7 @@ synchronized ClusterStateUpdateStats getStatistics() {
}
}

public static boolean isPublishFailureException(Exception e) {
return e instanceof NotMasterException || e instanceof FailedToCommitClusterStateException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.gateway;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand All @@ -22,6 +23,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -224,15 +226,13 @@ public void clusterStateProcessed(final ClusterState oldState, final ClusterStat
rerouteService.reroute("state recovered", Priority.NORMAL, ActionListener.wrap(GatewayService.this::resetRecoveredFlags));
}

@Override
public void onNoLongerMaster() {
logger.debug("stepped down as master before recovering state [{}]", TASK_SOURCE);
resetRecoveredFlags();
}

@Override
public void onFailure(final Exception e) {
logger.info(() -> new ParameterizedMessage("unexpected failure during [{}]", TASK_SOURCE), e);
logger.log(
MasterService.isPublishFailureException(e) ? Level.DEBUG : Level.INFO,
() -> new ParameterizedMessage("unexpected failure during [{}]", TASK_SOURCE),
e
);
resetRecoveredFlags();
}
}
Expand Down

0 comments on commit 4af21df

Please sign in to comment.