Skip to content

Commit

Permalink
Add NOOP ActionListener Singleton (#85414)
Browse files Browse the repository at this point in the history
I know there was some reservation against adding this in the past because
noop listeners are hard to debug since they don't even have any logging to them.
Then again, given that it was possible to identify `103` uses of a noop listener
within minutes using Idea I think it's better to face reality here and at least save
some objects. Also having a clean `toString` on this method will render nicer in logs
when combined with other listeners.
Last but not least it simply saves some memory and compiler work to have a
singleton for this.
  • Loading branch information
original-brownbear committed Mar 28, 2022
1 parent 31b9aff commit 55af878
Show file tree
Hide file tree
Showing 49 changed files with 133 additions and 284 deletions.
26 changes: 26 additions & 0 deletions server/src/main/java/org/elasticsearch/action/ActionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,32 @@ public interface ActionListener<Response> {
*/
void onFailure(Exception e);

@SuppressWarnings("rawtypes")
ActionListener NOOP = new ActionListener() {
@Override
public void onResponse(Object o) {

}

@Override
public void onFailure(Exception e) {

}

@Override
public String toString() {
return "NoopActionListener";
}
};

/**
* @return a listener that does nothing
*/
@SuppressWarnings("unchecked")
static <T> ActionListener<T> noop() {
return (ActionListener<T>) NOOP;
}

/**
* Creates a listener that wraps this listener, mapping response values via the given mapping function and passing along
* exceptions to this instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,8 @@ public void sendFreeContext(Transport.Connection connection, final ShardSearchCo
FREE_CONTEXT_ACTION_NAME,
new SearchFreeContextRequest(originalIndices, contextId),
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(new ActionListener<SearchFreeContextResponse>() {
@Override
public void onResponse(SearchFreeContextResponse response) {
// no need to respond if it was freed or not
}

@Override
public void onFailure(Exception e) {

}
}, SearchFreeContextResponse::new)
// no need to respond if it was freed or not
new ActionListenerResponseHandler<>(ActionListener.noop(), SearchFreeContextResponse::new)
);
}

Expand Down Expand Up @@ -671,7 +662,7 @@ public void cancelSearchTask(SearchTask task, String reason) {
CancelTasksRequest req = new CancelTasksRequest().setTargetTaskId(new TaskId(client.getLocalNodeId(), task.getId()))
.setReason("Fatal failure during search: " + reason);
// force the origin to execute the cancellation as a system user
new OriginSettingClient(client, GetTaskAction.TASKS_ORIGIN).admin().cluster().cancelTasks(req, ActionListener.wrap(() -> {}));
new OriginSettingClient(client, GetTaskAction.TASKS_ORIGIN).admin().cluster().cancelTasks(req, ActionListener.noop());
}

public NamedWriteableRegistry getNamedWriteableRegistry() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ void becomeCandidate(String method) {

if (applierState.nodes().getMasterNodeId() != null) {
applierState = clusterStateWithNoMasterBlock(applierState);
clusterApplier.onNewClusterState("becoming candidate: " + method, () -> applierState, ActionListener.wrap(() -> {}));
clusterApplier.onNewClusterState("becoming candidate: " + method, () -> applierState, ActionListener.noop());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@

public abstract class AbstractHttpServerTransport extends AbstractLifecycleComponent implements HttpServerTransport {
private static final Logger logger = LogManager.getLogger(AbstractHttpServerTransport.class);
private static final ActionListener<Void> NO_OP = ActionListener.wrap(() -> {});

protected final Settings settings;
public final HttpHandlingSettings handlingSettings;
Expand Down Expand Up @@ -490,7 +489,7 @@ private static ActionListener<Void> earlyResponseListener(HttpRequest request, H
if (HttpUtils.shouldCloseConnection(request)) {
return ActionListener.wrap(() -> CloseableChannel.closeChannel(httpChannel));
} else {
return NO_OP;
return ActionListener.noop();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
private final PeerRecoveryTargetService recoveryTargetService;
private final ShardStateAction shardStateAction;

private static final ActionListener<Void> SHARD_STATE_ACTION_LISTENER = ActionListener.wrap(() -> {});

private final Settings settings;
// a list of shards that failed during recovery
// we keep track of these shards in order to prevent repeated recovery of these shards on each cluster state update
Expand Down Expand Up @@ -258,7 +256,7 @@ private void updateFailedShardsCache(final ClusterState state) {
if (masterNode != null) { // TODO: can we remove this? Is resending shard failures the responsibility of shardStateAction?
String message = "master " + masterNode + " has not removed previously failed shard. resending shard failure";
logger.trace("[{}] re-sending failed shard [{}], reason [{}]", matchedRouting.shardId(), matchedRouting, message);
shardStateAction.localShardFailed(matchedRouting, message, null, SHARD_STATE_ACTION_LISTENER, state);
shardStateAction.localShardFailed(matchedRouting, message, null, ActionListener.noop(), state);
}
}
}
Expand Down Expand Up @@ -670,7 +668,7 @@ private void updateShard(
+ state
+ "], mark shard as started",
shard.getTimestampRange(),
SHARD_STATE_ACTION_LISTENER,
ActionListener.noop(),
clusterState
);
}
Expand Down Expand Up @@ -735,7 +733,7 @@ public void onRecoveryDone(final RecoveryState state, ShardLongFieldRange timest
primaryTerm,
"after " + state.getRecoverySource(),
timestampMillisFieldRange,
SHARD_STATE_ACTION_LISTENER
ActionListener.noop()
);
}

Expand Down Expand Up @@ -791,7 +789,7 @@ private void sendFailShard(ShardRouting shardRouting, String message, @Nullable
failure
);
failedShardsCache.put(shardRouting.shardId(), shardRouting);
shardStateAction.localShardFailed(shardRouting, message, failure, SHARD_STATE_ACTION_LISTENER, state);
shardStateAction.localShardFailed(shardRouting, message, failure, ActionListener.noop(), state);
} catch (Exception inner) {
if (failure != null) inner.addSuppressed(failure);
logger.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void onFailure(Exception e) {
private void cancelTask(TaskId taskId) {
CancelTasksRequest req = new CancelTasksRequest().setTargetTaskId(taskId).setReason("http channel [" + httpChannel + "] closed");
// force the origin to execute the cancellation as a system user
new OriginSettingClient(client, TASKS_ORIGIN).admin().cluster().cancelTasks(req, ActionListener.wrap(() -> {}));
new OriginSettingClient(client, TASKS_ORIGIN).admin().cluster().cancelTasks(req, ActionListener.noop());
}

private class CloseListener implements ActionListener<Void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ public void onFailure(Exception e) {
@Override
protected void doRun() {
for (CancellableTask task : tasks) {
cancelTaskAndDescendants(task, "channel was closed", false, ActionListener.wrap(() -> {}));
cancelTaskAndDescendants(task, "channel was closed", false, ActionListener.noop());
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
} catch (NoSuchRemoteClusterException e) {
if (ensureConnected == false) {
// trigger another connection attempt, but don't wait for it to complete
remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(() -> {}));
remoteClusterService.ensureConnected(clusterAlias, ActionListener.noop());
}
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,13 @@ public void testOnFailure() {
public void testRunAfter() {
{
AtomicBoolean afterSuccess = new AtomicBoolean();
ActionListener<Object> listener = ActionListener.runAfter(ActionListener.wrap(r -> {}, e -> {}), () -> afterSuccess.set(true));
ActionListener<Object> listener = ActionListener.runAfter(ActionListener.noop(), () -> afterSuccess.set(true));
listener.onResponse(null);
assertThat(afterSuccess.get(), equalTo(true));
}
{
AtomicBoolean afterFailure = new AtomicBoolean();
ActionListener<Object> listener = ActionListener.runAfter(ActionListener.wrap(r -> {}, e -> {}), () -> afterFailure.set(true));
ActionListener<Object> listener = ActionListener.runAfter(ActionListener.noop(), () -> afterFailure.set(true));
listener.onFailure(null);
assertThat(afterFailure.get(), equalTo(true));
}
Expand All @@ -161,13 +161,13 @@ public void testRunAfter() {
public void testRunBefore() {
{
AtomicBoolean afterSuccess = new AtomicBoolean();
ActionListener<Object> listener = ActionListener.runBefore(ActionListener.wrap(r -> {}, e -> {}), () -> afterSuccess.set(true));
ActionListener<Object> listener = ActionListener.runBefore(ActionListener.noop(), () -> afterSuccess.set(true));
listener.onResponse(null);
assertThat(afterSuccess.get(), equalTo(true));
}
{
AtomicBoolean afterFailure = new AtomicBoolean();
ActionListener<Object> listener = ActionListener.runBefore(ActionListener.wrap(r -> {}, e -> {}), () -> afterFailure.set(true));
ActionListener<Object> listener = ActionListener.runBefore(ActionListener.noop(), () -> afterFailure.set(true));
listener.onFailure(null);
assertThat(afterFailure.get(), equalTo(true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ public void testRegisterAndExecuteChildTaskWhileParentTaskIsBeingCanceled() thro
testAction,
childRequest,
testNodes[0].transportService.getLocalNodeConnection(),
ActionTestUtils.wrapAsTaskListener(ActionListener.wrap(() -> {}))
ActionTestUtils.wrapAsTaskListener(ActionListener.noop())
)
);
assertThat(cancelledException.getMessage(), equalTo("task cancelled before starting [test]"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,7 @@ public void writeTo(StreamOutput out) throws IOException {

IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
() -> action.doExecute(null, request, new ActionListener<ResolveIndexAction.Response>() {
@Override
public void onResponse(ResolveIndexAction.Response response) {}

@Override
public void onFailure(Exception e) {}
})
() -> action.doExecute(null, request, ActionListener.noop())
);

assertThat(ex.getMessage(), containsString("not compatible with version"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ public void testNoFailures() {

BulkRequest bulkRequest = modifier.getBulkRequest();
assertThat(bulkRequest, Matchers.sameInstance(originalBulkRequest));
ActionListener<BulkResponse> actionListener = ActionListener.wrap(() -> {});
assertThat(modifier.wrapActionListenerIfNeeded(1L, actionListener), instanceOf(ActionListener.MappedActionListener.class));
assertThat(modifier.wrapActionListenerIfNeeded(1L, ActionListener.noop()), instanceOf(ActionListener.MappedActionListener.class));
}

private static class CaptureActionListener implements ActionListener<BulkResponse> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,7 @@ protected void doWriteTo(StreamOutput out) throws IOException {

IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
() -> action.doExecute(null, fieldCapsRequest, new ActionListener<FieldCapabilitiesResponse>() {
@Override
public void onResponse(FieldCapabilitiesResponse response) {}

@Override
public void onFailure(Exception e) {}
})
() -> action.doExecute(null, fieldCapsRequest, ActionListener.noop())
);

assertThat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ protected void executeShardAction(
}
};

ActionTestUtils.execute(transportAction, task, request.request(), new ActionListenerAdapter());
ActionTestUtils.execute(transportAction, task, request.request(), ActionListener.noop());
assertTrue(shardActionInvoked.get());
}

Expand Down Expand Up @@ -259,7 +259,7 @@ protected void executeShardAction(
}
};

ActionTestUtils.execute(transportAction, task, request.request(), new ActionListenerAdapter());
ActionTestUtils.execute(transportAction, task, request.request(), ActionListener.noop());
assertTrue(shardActionInvoked.get());

}
Expand Down Expand Up @@ -287,12 +287,4 @@ public Index concreteSingleIndex(ClusterState state, IndicesRequest request) {
}
}

static class ActionListenerAdapter implements ActionListener<MultiGetResponse> {

@Override
public void onResponse(MultiGetResponse response) {}

@Override
public void onFailure(Exception e) {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ protected void executeShardAction(
}
};

ActionTestUtils.execute(transportAction, task, request.request(), new ActionListenerAdapter());
ActionTestUtils.execute(transportAction, task, request.request(), ActionListener.noop());
assertTrue(shardActionInvoked.get());
}

Expand Down Expand Up @@ -263,7 +263,7 @@ protected void executeShardAction(
}
};

ActionTestUtils.execute(transportAction, task, request.request(), new ActionListenerAdapter());
ActionTestUtils.execute(transportAction, task, request.request(), ActionListener.noop());
assertTrue(shardActionInvoked.get());
}

Expand All @@ -290,12 +290,4 @@ public Index concreteSingleIndex(ClusterState state, IndicesRequest request) {
}
}

static class ActionListenerAdapter implements ActionListener<MultiTermVectorsResponse> {

@Override
public void onResponse(MultiTermVectorsResponse response) {}

@Override
public void onFailure(Exception e) {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void testSendUpdateMappingUsingAutoPutMappingAction() {
RootObjectMapper rootObjectMapper = new RootObjectMapper.Builder("name").build(MapperBuilderContext.ROOT);
Mapping update = new Mapping(rootObjectMapper, new MetadataFieldMapper[0], Map.of());

mua.sendUpdateMapping(new Index("name", "uuid"), update, ActionListener.wrap(() -> {}));
mua.sendUpdateMapping(new Index("name", "uuid"), update, ActionListener.noop());
verify(indicesAdminClient).execute(eq(AutoPutMappingAction.INSTANCE), any(), any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void testClusterHealthWaitsForClusterStateApplication() throws Interrupte
() -> ClusterState.builder(currentState)
.nodes(DiscoveryNodes.builder(currentState.nodes()).masterNodeId(currentState.nodes().getLocalNodeId()))
.build(),
ActionListener.wrap(() -> {})
ActionListener.noop()
);

logger.info("--> waiting for listener to be called and cluster state being blocked");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public void testNotifiesOnFailure() throws InterruptedException {
return ClusterState.builder(state)
.nodes(DiscoveryNodes.builder(state.nodes()).masterNodeId(randomBoolean() ? null : state.nodes().getLocalNodeId()))
.build();
}, ActionListener.wrap(() -> {}));
}, ActionListener.noop());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package org.elasticsearch.index.seqno;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNodesHelper;
Expand All @@ -34,8 +33,6 @@

public class PeerRecoveryRetentionLeaseExpiryTests extends ReplicationTrackerTestCase {

private static final ActionListener<ReplicationResponse> EMPTY_LISTENER = ActionListener.wrap(() -> {});

private ReplicationTracker replicationTracker;
private AtomicLong currentTimeMillis;
private Settings settings;
Expand Down Expand Up @@ -87,7 +84,7 @@ public void setUpReplicationTracker() throws InterruptedException {
replicationTracker.addPeerRecoveryRetentionLease(
routingTableWithReplica.getByAllocationId(replicaAllocationId.getId()).currentNodeId(),
randomCheckpoint(),
EMPTY_LISTENER
ActionListener.noop()
);

replicationTracker.initiateTracking(replicaAllocationId.getId());
Expand Down Expand Up @@ -135,7 +132,7 @@ public void testPeerRecoveryRetentionLeasesForAssignedCopiesDoNotEverExpire() {
public void testPeerRecoveryRetentionLeasesForUnassignedCopiesDoNotExpireImmediatelyIfShardsNotAllStarted() {
final String unknownNodeId = randomAlphaOfLength(10);
final long globalCheckpoint = randomNonNegativeLong(); // not NO_OPS_PERFORMED since this always results in file-based recovery
replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, globalCheckpoint, EMPTY_LISTENER);
replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, globalCheckpoint, ActionListener.noop());

currentTimeMillis.set(
currentTimeMillis.get() + randomLongBetween(
Expand Down Expand Up @@ -168,7 +165,7 @@ public void testPeerRecoveryRetentionLeasesForUnassignedCopiesExpireEventually()

final String unknownNodeId = randomAlphaOfLength(10);
final long globalCheckpoint = randomCheckpoint();
replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, globalCheckpoint, EMPTY_LISTENER);
replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, globalCheckpoint, ActionListener.noop());

currentTimeMillis.set(
randomLongBetween(
Expand All @@ -195,7 +192,7 @@ public void testPeerRecoveryRetentionLeasesForUnassignedCopiesExpireEventually()

public void testPeerRecoveryRetentionLeasesForUnassignedCopiesExpireImmediatelyIfShardsAllStarted() {
final String unknownNodeId = randomAlphaOfLength(10);
replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, randomCheckpoint(), EMPTY_LISTENER);
replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, randomCheckpoint(), ActionListener.noop());

startReplica();

Expand Down Expand Up @@ -227,7 +224,7 @@ public void testPeerRecoveryRetentionLeasesForUnassignedCopiesExpireIfRetainingT

final String unknownNodeId = randomAlphaOfLength(10);
final long globalCheckpoint = randomValueOtherThan(SequenceNumbers.NO_OPS_PERFORMED, this::randomCheckpoint);
replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, globalCheckpoint, EMPTY_LISTENER);
replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, globalCheckpoint, ActionListener.noop());

safeCommitInfo = randomSafeCommitInfoSuitableForFileBasedRecovery(globalCheckpoint);

Expand Down

0 comments on commit 55af878

Please sign in to comment.