Skip to content

Commit

Permalink
Drop source from CSTaskListener failure handlers (#82642)
Browse files Browse the repository at this point in the history
Today `ClusterStateTaskListener#onFailure` and
`ClusterStateTaskListener#onNoLongerMaster` take the `source` of the
failing task as an argument in order to describe the task in logs and
exception messages. In practice every implementation knows the source of
the task for which it's listening, so there's no need for the master
service to pass this data in. Moreover this argument stands in the way
of some future simplifications, for instance by making it impossible to
treat a `ClusterStateTaskListener` as an `ActionListener`.

This commit removes the unnecessary `source` argument from these
methods.
  • Loading branch information
DaveCTurner committed Jan 16, 2022
1 parent 93777b4 commit 16cd22e
Show file tree
Hide file tree
Showing 89 changed files with 748 additions and 758 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public ClusterState execute(ClusterState currentState) {
}

@Override
public void onFailure(String source, Exception e) {
throw new AssertionError(source, e);
public void onFailure(Exception e) {
throw new AssertionError("update state", e);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,9 @@ public ClusterState execute(ClusterState currentState) {
}

@Override
public void onFailure(String source, Exception e) {
public void onFailure(Exception e) {
completionFuture.onFailure(e);
throw new AssertionError(source, e);
throw new AssertionError("looping task", e);
}

@Override
Expand Down Expand Up @@ -384,9 +384,9 @@ public ClusterState execute(ClusterState currentState) {
}

@Override
public void onFailure(String source, Exception e) {
public void onFailure(Exception e) {
completionFuture.onFailure(e);
throw new AssertionError(source, e);
throw new AssertionError("looping task", e);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
}

@Override
public void onFailure(String source, Exception e) {
public void onFailure(Exception e) {
failure.set(e);
latch.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ public ClusterState execute(ClusterState currentState) {
}

@Override
public void onFailure(String source, Exception e) {
public void onFailure(Exception e) {
throw new AssertionError(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public ClusterState execute(ClusterState currentState) {
}

@Override
public void onFailure(String source, Exception e) {}
public void onFailure(Exception e) {}
}, ClusterStateTaskExecutor.unbatched());
ensureGreen(index);
// remove the extra node
Expand All @@ -123,7 +123,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
}

@Override
public void onFailure(String source, Exception e) {}
public void onFailure(Exception e) {}
}, ClusterStateTaskExecutor.unbatched());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
*/
package org.elasticsearch.cluster.service;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -80,8 +79,8 @@ public ClusterState execute(ClusterState currentState) throws Exception {
}

@Override
public void onFailure(String source, Exception e) {
logger.error(() -> new ParameterizedMessage("failed to execute callback in test {}", source), e);
public void onFailure(Exception e) {
logger.error("failed to execute callback in test", e);
onFailure.set(true);
latch.countDown();
}
Expand Down Expand Up @@ -137,8 +136,8 @@ public ClusterState execute(ClusterState currentState) throws Exception {
}

@Override
public void onFailure(String source, Exception e) {
logger.error(() -> new ParameterizedMessage("failed to execute callback in test {}", source), e);
public void onFailure(Exception e) {
logger.error("failed to execute callback in test", e);
onFailure.set(true);
latch.countDown();
}
Expand Down Expand Up @@ -197,8 +196,8 @@ public ClusterState execute(ClusterState currentState) throws Exception {
}

@Override
public void onFailure(String source, Exception e) {
logger.error(() -> new ParameterizedMessage("failed to execute callback in test {}", source), e);
public void onFailure(Exception e) {
logger.error("failed to execute callback in test", e);
onFailure.set(true);
latch.countDown();
}
Expand Down Expand Up @@ -257,8 +256,8 @@ public ClusterState execute(ClusterState currentState) throws Exception {
}

@Override
public void onFailure(String source, Exception e) {
logger.error(() -> new ParameterizedMessage("failed to execute callback in test {}", source), e);
public void onFailure(Exception e) {
logger.error("failed to execute callback in test", e);
onFailure.set(true);
latch.countDown();
}
Expand Down Expand Up @@ -297,7 +296,7 @@ public ClusterState execute(ClusterState currentState) {
}

@Override
public void onFailure(String source, Exception e) {
public void onFailure(Exception e) {
invoked1.countDown();
fail();
}
Expand All @@ -312,7 +311,7 @@ public ClusterState execute(ClusterState currentState) {
}

@Override
public void onFailure(String source, Exception e) {
public void onFailure(Exception e) {
fail();
}

Expand Down Expand Up @@ -367,7 +366,7 @@ public ClusterState execute(ClusterState currentState) {
}

@Override
public void onFailure(String source, Exception e) {
public void onFailure(Exception e) {
invoked3.countDown();
fail();
}
Expand All @@ -382,7 +381,7 @@ public ClusterState execute(ClusterState currentState) {
}

@Override
public void onFailure(String source, Exception e) {
public void onFailure(Exception e) {
fail();
}
}, ClusterStateTaskExecutor.unbatched());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.discovery;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
Expand Down Expand Up @@ -242,8 +241,8 @@ public ClusterState execute(ClusterState currentState) {
}

@Override
public void onFailure(String source, Exception e) {
logger.warn(() -> new ParameterizedMessage("failure [{}]", source), e);
public void onFailure(Exception e) {
logger.warn("failure [sneaky-update]", e);
}
}, ClusterStateTaskExecutor.unbatched());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
}

@Override
public void onFailure(String source, Exception e) {
public void onFailure(Exception e) {
throw new AssertionError("unexpected", e);
}
}, ClusterStateTaskExecutor.unbatched());
Expand Down Expand Up @@ -193,7 +193,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
}

@Override
public void onFailure(String source, Exception e) {
public void onFailure(Exception e) {
throw new AssertionError("unexpected", e);
}
}, ClusterStateTaskExecutor.unbatched());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public ClusterState execute(ClusterState currentState) {
}

@Override
public void onFailure(String source, Exception e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public ClusterState execute(ClusterState currentState) {
}

@Override
public void onFailure(String source, Exception e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ private void waitForEventsAndExecuteHealth(
final long endTimeRelativeMillis
) {
assert request.waitForEvents() != null;
final String source = "cluster_health (wait_for_events [" + request.waitForEvents() + "])";
if (request.local()) {
new LocalMasterServiceTask(request.waitForEvents()) {
@Override
Expand All @@ -125,63 +126,55 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}

@Override
public void onFailure(String source, Exception e) {
public void onFailure(Exception e) {
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
listener.onFailure(e);
}
}.submit(clusterService.getMasterService(), "cluster_health (wait_for_events [" + request.waitForEvents() + "])");
}.submit(clusterService.getMasterService(), source);
} else {
final TimeValue taskTimeout = TimeValue.timeValueMillis(Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis()));
clusterService.submitStateUpdateTask(
"cluster_health (wait_for_events [" + request.waitForEvents() + "])",
new ClusterStateUpdateTask(request.waitForEvents(), taskTimeout) {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;
}
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(request.waitForEvents(), taskTimeout) {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
final long timeoutInMillis = Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis());
final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis);
request.timeout(newTimeout);
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
final long timeoutInMillis = Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis());
final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis);
request.timeout(newTimeout);

// we must use the state from the applier service, because if the state-not-recovered block is in place then the
// applier service has a different view of the cluster state from the one supplied here
final ClusterState appliedState = clusterService.state();
assert newState.stateUUID().equals(appliedState.stateUUID())
: newState.stateUUID() + " vs " + appliedState.stateUUID();
executeHealth(
request,
appliedState,
listener,
waitCount,
observedState -> waitForEventsAndExecuteHealth(request, listener, waitCount, endTimeRelativeMillis)
);
}
// we must use the state from the applier service, because if the state-not-recovered block is in place then the
// applier service has a different view of the cluster state from the one supplied here
final ClusterState appliedState = clusterService.state();
assert newState.stateUUID().equals(appliedState.stateUUID()) : newState.stateUUID() + " vs " + appliedState.stateUUID();
executeHealth(
request,
appliedState,
listener,
waitCount,
observedState -> waitForEventsAndExecuteHealth(request, listener, waitCount, endTimeRelativeMillis)
);
}

@Override
public void onNoLongerMaster(String source) {
logger.trace(
"stopped being master while waiting for events with priority [{}]. retrying.",
request.waitForEvents()
);
// TransportMasterNodeAction implements the retry logic, which is triggered by passing a NotMasterException
listener.onFailure(new NotMasterException("no longer master. source: [" + source + "]"));
}
@Override
public void onNoLongerMaster() {
logger.trace("stopped being master while waiting for events with priority [{}]. retrying.", request.waitForEvents());
// TransportMasterNodeAction implements the retry logic, which is triggered by passing a NotMasterException
listener.onFailure(new NotMasterException("no longer master. source: [" + source + "]"));
}

@Override
public void onFailure(String source, Exception e) {
if (e instanceof ProcessClusterEventTimeoutException) {
listener.onResponse(getResponse(request, clusterService.state(), waitCount, TimeoutState.TIMED_OUT));
} else {
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
listener.onFailure(e);
}
@Override
public void onFailure(Exception e) {
if (e instanceof ProcessClusterEventTimeoutException) {
listener.onResponse(getResponse(request, clusterService.state(), waitCount, TimeoutState.TIMED_OUT));
} else {
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
listener.onFailure(e);
}
},
ClusterStateTaskExecutor.unbatched()
);
}
}, ClusterStateTaskExecutor.unbatched());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}

@Override
public void onFailure(String source, Exception e) {
public void onFailure(Exception e) {
logger.warn("Failed to remove repository cleanup task [{}] from cluster state", repositoryCleanupInProgress);
}
},
Expand Down Expand Up @@ -219,7 +219,7 @@ public ClusterState execute(ClusterState currentState) {
}

@Override
public void onFailure(String source, Exception e) {
public void onFailure(Exception e) {
after(e, null);
}

Expand Down Expand Up @@ -268,7 +268,7 @@ public ClusterState execute(ClusterState currentState) {
}

@Override
public void onFailure(String source, Exception e) {
public void onFailure(Exception e) {
if (failure != null) {
e.addSuppressed(failure);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
Expand Down Expand Up @@ -159,9 +158,11 @@ private void verifyThenSubmitUpdate(
);
}

private static final String TASK_SOURCE = "cluster_reroute (api)";

private void submitStateUpdate(final ClusterRerouteRequest request, final ActionListener<ClusterRerouteResponse> listener) {
clusterService.submitStateUpdateTask(
"cluster_reroute (api)",
TASK_SOURCE,
new ClusterRerouteResponseAckedClusterStateUpdateTask(logger, allocationService, request, listener.map(response -> {
if (request.dryRun() == false) {
response.getExplanations().getYesDecisionMessages().forEach(logger::info);
Expand Down Expand Up @@ -205,9 +206,9 @@ public void onAckTimeout() {
}

@Override
public void onFailure(String source, Exception e) {
logger.debug(() -> new ParameterizedMessage("failed to perform [{}]", source), e);
super.onFailure(source, e);
public void onFailure(Exception e) {
logger.debug("failed to perform [" + TASK_SOURCE + "]", e);
super.onFailure(e);
}

@Override
Expand Down

0 comments on commit 16cd22e

Please sign in to comment.