Skip to content

Commit

Permalink
Tidy up ClusterApplierService (#76837)
Browse files Browse the repository at this point in the history
This commit cleans up some cruft left over from older versions of the
`ClusterApplierService`:

- `UpdateTask` doesn't need to implement lots of interfaces and give
  access to its internals, it can just pass appropriate arguments to
  `runTasks()`.
- No need for the `runOnApplierThread` override with a default priority,
  just have callers be explicit about the priority.
- `submitStateUpdateTask` takes a config which never has a timeout, may
  as well just pass the priority and remove the dead code
- `SafeClusterApplyListener` doesn't need to be a
  `ClusterApplyListener`, may as well just be an `ActionListener<Void>`.
- No implementations of `ClusterApplyListener` care about the source
  argument, may as well drop it.
- Adds assertions to prevent `ClusterApplyListener` implementations from
  throwing exceptions since we just swallow them.
- No need to override getting the current time in the
  `ClusterApplierService`, we can control this from the `ThreadPool`.
  • Loading branch information
DaveCTurner committed Aug 31, 2021
1 parent 627c0ee commit ead0020
Show file tree
Hide file tree
Showing 16 changed files with 324 additions and 267 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -432,12 +432,12 @@ public void testShardActiveElseWhere() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
clusterApplierService.onNewClusterState("test", () -> newState, new ClusterApplyListener() {
@Override
public void onSuccess(String source) {
public void onSuccess() {
latch.countDown();
}

@Override
public void onFailure(String source, Exception e) {
public void onFailure(Exception e) {
latch.countDown();
throw new AssertionError("Expected a proper response", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ public interface ClusterStateTaskListener {
/**
* A callback for when task execution fails.
*
* Implementations of this callback should not throw exceptions: an exception thrown here is logged by the master service at {@code
* ERROR} level and otherwise ignored. If log-and-ignore is the right behaviour then implementations should do so themselves, typically
* using a more specific logger and at a less dramatic log level.
* Implementations of this callback must not throw exceptions: an exception thrown here is logged by the master service at {@code ERROR}
* level and otherwise ignored, except in tests where it raises an {@link AssertionError}. If log-and-ignore is the right behaviour then
* implementations must do so themselves, typically using a more specific logger and at a less dramatic log level.
*/
void onFailure(String source, Exception e);

/**
* A callback for when the task was rejected because the processing node is no longer the elected master.
*
* Implementations of this callback should not throw exceptions: an exception thrown here is logged by the master service at {@code
* ERROR} level and otherwise ignored. If log-and-ignore is the right behaviour then implementations should do so themselves, typically
* using a more specific logger and at a less dramatic log level.
* Implementations of this callback must not throw exceptions: an exception thrown here is logged by the master service at {@code ERROR}
* level and otherwise ignored, except in tests where it raises an {@link AssertionError}. If log-and-ignore is the right behaviour then
* implementations must do so themselves, typically using a more specific logger and at a less dramatic log level.
*/
default void onNoLongerMaster(String source) {
onFailure(source, new NotMasterException("no longer master. source: [" + source + "]"));
Expand All @@ -35,9 +35,9 @@ default void onNoLongerMaster(String source) {
* Called when the result of the {@link ClusterStateTaskExecutor#execute(ClusterState, List)} have been processed
* properly by all listeners.
*
* Implementations of this callback should not throw exceptions: an exception thrown here is logged by the master service at {@code
* ERROR} level and otherwise ignored. If log-and-ignore is the right behaviour then implementations should do so themselves, typically
* using a more specific logger and at a less dramatic log level.
* Implementations of this callback must not throw exceptions: an exception thrown here is logged by the master service at {@code ERROR}
* level and otherwise ignored, except in tests where it raises an {@link AssertionError}. If log-and-ignore is the right behaviour then
* implementations must do so themselves, typically using a more specific logger and at a less dramatic log level.
*/
default void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,12 @@ private void handleApplyCommit(ApplyCommitRequest applyCommitRequest, ActionList
new ClusterApplyListener() {

@Override
public void onFailure(String source, Exception e) {
public void onFailure(Exception e) {
applyListener.onFailure(e);
}

@Override
public void onSuccess(String source) {
public void onSuccess() {
applyListener.onResponse(null);
}
});
Expand Down Expand Up @@ -532,7 +532,7 @@ void becomeCandidate(String method) {

if (applierState.nodes().getMasterNodeId() != null) {
applierState = clusterStateWithNoMasterBlock(applierState);
clusterApplier.onNewClusterState("becoming candidate: " + method, () -> applierState, (source, e) -> {
clusterApplier.onNewClusterState("becoming candidate: " + method, () -> applierState, e -> {
});
}
}
Expand Down Expand Up @@ -1382,7 +1382,7 @@ public void onResponse(Void ignore) {
clusterApplier.onNewClusterState(CoordinatorPublication.this.toString(), () -> applierState,
new ClusterApplyListener() {
@Override
public void onFailure(String source, Exception e) {
public void onFailure(Exception e) {
synchronized (mutex) {
removePublicationAndPossiblyBecomeCandidate("clusterApplier#onNewClusterState");
}
Expand All @@ -1392,7 +1392,7 @@ public void onFailure(String source, Exception e) {
}

@Override
public void onSuccess(String source) {
public void onSuccess() {
clusterStatePublicationEvent.setMasterApplyElapsedMillis(
transportService.getThreadPool().rawRelativeTimeInMillis() - completionTimeMillis);
synchronized (mutex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,26 @@ public interface ClusterApplier {
*/
interface ClusterApplyListener {
/**
* Called on successful cluster state application
* @param source information where the cluster state came from
* Called on successful cluster state application.
*
* Implementations of this callback must not throw exceptions: an exception thrown here is logged by the cluster applier service at
* {@code ERROR} level and otherwise ignored, except in tests where it raises an {@link AssertionError}. If log-and-ignore is the
* right behaviour then implementations must do so themselves, typically using a more specific logger and at a less dramatic log
* level.
*/
default void onSuccess(String source) {
default void onSuccess() {
}

/**
* Called on failure during cluster state application
* @param source information where the cluster state came from
* Called on failure during cluster state application.
*
* Implementations of this callback must not throw exceptions: an exception thrown here is logged by the cluster applier service at
* {@code ERROR} level and otherwise ignored, except in tests where it raises an {@link AssertionError}. If log-and-ignore is the
* right behaviour then implementations must do so themselves, typically using a more specific logger and at a less dramatic log
* level.
*
* @param e exception that occurred
*/
void onFailure(String source, Exception e);
void onFailure(Exception e);
}
}

0 comments on commit ead0020

Please sign in to comment.