Skip to content

Commit

Permalink
Rename ActionListener#wrap to #running (#93829)
Browse files Browse the repository at this point in the history
There are three overloads of `ActionListener#wrap`, two of which "wrap"
the `onResponse` handler in a `try...catch` which feeds exceptions into
the `onFailure` handler. The overload which accepts a bare `Runnable`
does not have this behaviour, so it's kind of confusing to use the same
terminology. Following the pattern started by `releasing(Releasable)`
this commit renames this method to `running(Runnable)`.
  • Loading branch information
DaveCTurner committed Feb 16, 2023
1 parent 7c79595 commit 405a53b
Show file tree
Hide file tree
Showing 68 changed files with 151 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public void testJoinWaitsForClusterApplier() throws Exception {
} catch (Exception e) {
throw new AssertionError(e);
}
}, ActionListener.wrap(() -> {}));
}, ActionListener.noop());
barrier.await(10, TimeUnit.SECONDS);

// drop the victim from the cluster with a network disruption
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public void testBackgroundRetentionLeaseSync() throws Exception {
// put a new lease
currentRetentionLeases.put(
id,
primary.addRetentionLease(id, retainingSequenceNumber, source, ActionListener.wrap(latch::countDown))
primary.addRetentionLease(id, retainingSequenceNumber, source, ActionListener.running(latch::countDown))
);
latch.await();
// now renew all existing leases; we expect to see these synced to the replicas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void testPendingRefreshWithIntervalChange() throws Exception {
assertTrue(shard.isSearchIdle());
CountDownLatch refreshLatch = new CountDownLatch(1);
// async on purpose to make sure it happens concurrently
client().admin().indices().prepareRefresh().execute(ActionListener.wrap(refreshLatch::countDown));
client().admin().indices().prepareRefresh().execute(ActionListener.running(refreshLatch::countDown));
assertHitCount(client().prepareSearch().get(), 1);
client().prepareIndex("test").setId("1").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
assertFalse(shard.scheduledRefresh());
Expand All @@ -153,7 +153,7 @@ public void testPendingRefreshWithIntervalChange() throws Exception {
.indices()
.prepareUpdateSettings("test")
.setSettings(Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1).build())
.execute(ActionListener.wrap(updateSettingsLatch::countDown));
.execute(ActionListener.running(updateSettingsLatch::countDown));
assertHitCount(client().prepareSearch().get(), 2);
// wait for both to ensure we don't have in-flight operations
updateSettingsLatch.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private Releasable interceptVerifyShardBeforeCloseActions(final String indexPatt
if (Glob.globMatch(indexPattern, index)) {
logger.info("request {} intercepted for index {}", requestId, index);
onIntercept.run();
release.addListener(ActionListener.wrap(() -> {
release.addListener(ActionListener.running(() -> {
logger.info("request {} released for index {}", requestId, index);
try {
connection.sendRequest(requestId, action, request, options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testEnableAssignmentAfterRestart() throws Exception {
"task_" + i,
TestPersistentTasksExecutor.NAME,
new TestParams(randomAlphaOfLength(10)),
ActionListener.wrap(latch::countDown)
ActionListener.running(latch::countDown)
);
}
latch.await();
Expand Down
96 changes: 53 additions & 43 deletions server/src/main/java/org/elasticsearch/action/ActionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,41 +147,6 @@ public <T> ActionListener<T> map(CheckedFunction<T, Response, Exception> fn) {
}
}

/**
* Creates a listener that listens for a response (or failure) and executes the
* corresponding consumer when the response (or failure) is received.
*
* @param onResponse the checked consumer of the response, when the listener receives one
* @param onFailure the consumer of the failure, when the listener receives one
* @param <Response> the type of the response
* @return a listener that listens for responses and invokes the consumer when received
*/
static <Response> ActionListener<Response> wrap(
CheckedConsumer<Response, ? extends Exception> onResponse,
Consumer<Exception> onFailure
) {
return new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
onResponse.accept(response);
} catch (Exception e) {
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
onFailure.accept(e);
}

@Override
public String toString() {
return "WrappedActionListener{" + onResponse + "}{" + onFailure + "}";
}
};
}

/**
* Creates a listener that delegates all responses it receives to this instance.
*
Expand Down Expand Up @@ -260,18 +225,17 @@ public String toString() {
* Creates a listener which releases the given resource on completion (whether success or failure)
*/
static <Response> ActionListener<Response> releasing(Releasable releasable) {
return assertOnce(wrap(runnableFromReleasable(releasable)));
return assertOnce(running(runnableFromReleasable(releasable)));
}

/**
* Creates a listener that listens for a response (or failure) and executes the
* corresponding runnable when the response (or failure) is received.
* Creates a listener that executes the given runnable on completion (whether successful or otherwise).
*
* @param runnable the runnable that will be called in event of success or failure
* @param <Response> the type of the response
* @return a listener that listens for responses and invokes the runnable when received
* @param runnable the runnable that will be called in event of success or failure. This must not throw.
* @param <Response> the type of the response, which is ignored.
* @return a listener that executes the given runnable on completion (whether successful or otherwise).
*/
static <Response> ActionListener<Response> wrap(Runnable runnable) {
static <Response> ActionListener<Response> running(Runnable runnable) {
return new ActionListener<>() {
@Override
public void onResponse(Response response) {
Expand Down Expand Up @@ -301,9 +265,55 @@ public String toString() {
};
}

/**
* @deprecated in favour of {@link #running(Runnable)} because this implementation doesn't "wrap" exceptions from {@link #onResponse}
* into {@link #onFailure}.
*/
@Deprecated(forRemoval = true)
static <Response> ActionListener<Response> wrap(Runnable runnable) {
return running(runnable);
}

/**
* Creates a listener that executes the appropriate consumer when the response (or failure) is received. This listener is "wrapped" in
* the sense that an exception from the {@code onResponse} consumer is passed into the {@code onFailure} consumer.
*
* @param onResponse the checked consumer of the response, executed when the listener is completed successfully. If it throws an
* exception, the exception is passed to the {@code onFailure} consumer.
* @param onFailure the consumer of the failure, executed when the listener is completed with an exception (or it is completed
* successfully but the {@code onResponse} consumer threw an exception).
* @param <Response> the type of the response
* @return a listener that executes the appropriate consumer when the response (or failure) is received.
*/
static <Response> ActionListener<Response> wrap(
CheckedConsumer<Response, ? extends Exception> onResponse,
Consumer<Exception> onFailure
) {
return new ActionListener<>() {
@Override
public void onResponse(Response response) {
try {
onResponse.accept(response);
} catch (Exception e) {
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
onFailure.accept(e);
}

@Override
public String toString() {
return "WrappedActionListener{" + onResponse + "}{" + onFailure + "}";
}
};
}

/**
* Adds a wrapper around a listener which catches exceptions thrown by its {@link #onResponse} method and feeds them to its
* {@link #onFailure}.
* {@link #onFailure} method.
*/
static <DelegateResponse, Response extends DelegateResponse> ActionListener<Response> wrap(ActionListener<DelegateResponse> delegate) {
return new ActionListener<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ void getRunningTaskFromNode(Task thisTask, GetTaskRequest request, ActionListene
requireNonNullElse(request.getTimeout(), DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT),
ThreadPool.Names.SAME
);
future.addListener(ActionListener.wrap(failByTimeout::cancel));
future.addListener(ActionListener.running(failByTimeout::cancel));
}
} else {
TaskInfo info = runningTask.taskInfo(clusterService.localNode().getId(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ protected void processTasks(ListTasksRequest request, Consumer<Task> operation,
requireNonNullElse(request.getTimeout(), DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT),
ThreadPool.Names.SAME
);
future.addListener(ActionListener.wrap(cancellable::cancel));
future.addListener(ActionListener.running(cancellable::cancel));
}
} else {
super.processTasks(request, operation, nodeOperation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ private boolean assertRefreshInvariant() {
private class RefreshScheduler {

ActionListener<ClusterInfo> getListener() {
return ActionListener.wrap(() -> {
return ActionListener.running(() -> {
if (shouldRefresh()) {
threadPool.scheduleUnlessShuttingDown(updateFrequency, ThreadPool.Names.SAME, () -> {
if (shouldRefresh()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void onFailure(Exception e) {

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
future.addListener(ActionListener.wrap(() -> ActionListener.onResponse(currentListeners, newState)));
future.addListener(ActionListener.running(() -> ActionListener.onResponse(currentListeners, newState)));
}
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public void clusterStateProcessed(final ClusterState oldState, final ClusterStat
logger.info("recovered [{}] indices into cluster_state", newState.metadata().indices().size());
// reset flag even though state recovery completed, to ensure that if we subsequently become leader again based on a
// not-recovered state, that we again do another state recovery.
rerouteService.reroute("state recovered", Priority.NORMAL, ActionListener.wrap(GatewayService.this::resetRecoveredFlags));
rerouteService.reroute("state recovered", Priority.NORMAL, ActionListener.running(GatewayService.this::resetRecoveredFlags));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ protected void serverAcceptedChannel(HttpChannel httpChannel) {
boolean addedOnThisCall = httpChannels.add(httpChannel);
assert addedOnThisCall : "Channel should only be added to http channel set once";
refCounted.incRef();
httpChannel.addCloseListener(ActionListener.wrap(() -> {
httpChannel.addCloseListener(ActionListener.running(() -> {
httpChannels.remove(httpChannel);
refCounted.decRef();
}));
Expand Down Expand Up @@ -484,7 +484,7 @@ private RestRequest requestWithoutFailedHeader(

private static ActionListener<Void> earlyResponseListener(HttpRequest request, HttpChannel httpChannel) {
if (HttpUtils.shouldCloseConnection(request)) {
return ActionListener.wrap(() -> CloseableChannel.closeChannel(httpChannel));
return ActionListener.running(() -> CloseableChannel.closeChannel(httpChannel));
} else {
return ActionListener.noop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void sendResponse(RestResponse restResponse) {
restResponse.getHeaders()
.forEach((key, values) -> tracer.setAttribute(traceId, "http.response.headers." + key, String.join("; ", values)));

ActionListener<Void> listener = ActionListener.wrap(() -> Releasables.close(toClose));
ActionListener<Void> listener = ActionListener.running(() -> Releasables.close(toClose));
try (ThreadContext.StoredContext existing = threadContext.stashContext()) {
httpChannel.sendResponse(httpResponse, listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void addClientStats(final HttpChannel httpChannel) {
threadPool.absoluteTimeInMillis()
)
);
httpChannel.addCloseListener(ActionListener.wrap(() -> {
httpChannel.addCloseListener(ActionListener.running(() -> {
try {
final ClientStatsBuilder disconnectedClientStats = httpChannelStats.remove(httpChannel);
if (disconnectedClientStats != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void clusterChanged(ClusterChangedEvent event) {
// The failures are logged in upgradeIndexMetadata(), so we don't actually care about them here.
ActionListener<AcknowledgedResponse> listener = new GroupedActionListener<>(
descriptors.size(),
ActionListener.wrap(() -> isUpgradeInProgress.set(false))
ActionListener.running(() -> isUpgradeInProgress.set(false))
);

descriptors.forEach(descriptor -> upgradeIndexMetadata(descriptor, listener));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,7 @@ private void notifyFailureOnceAllOutstandingRequestAreDone(Exception e) {
// to notify the listener about the cancellation
final CountDown pendingRequestsCountDown = new CountDown(pendingRequests.size());
for (ListenableFuture<Void> outstandingFuture : pendingRequests) {
outstandingFuture.addListener(ActionListener.wrap(() -> {
outstandingFuture.addListener(ActionListener.running(() -> {
if (pendingRequestsCountDown.countDown()) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ void doCancelTaskAndDescendants(CancellableTask task, String reason, boolean wai

// We remove bans after all child tasks are completed although in theory we can do it on a per-connection basis.
completedListener.addListener(
ActionListener.wrap(
ActionListener.running(
transportService.getThreadPool()
.getThreadContext()
// If we start unbanning when the last child task completed and that child task executed with a specific user, then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,13 @@ private void connectToNodeOrRetry(
try {
connectionListener.onNodeConnected(node, conn);
} finally {
conn.addCloseListener(ActionListener.wrap(() -> {
conn.addCloseListener(ActionListener.running(() -> {
connectedNodes.remove(node, conn);
connectionListener.onNodeDisconnected(node, conn);
managerRefs.decRef();
}));

conn.addCloseListener(ActionListener.wrap(() -> {
conn.addCloseListener(ActionListener.running(() -> {
if (connectingRefCounter.hasReferences() == false) {
logger.trace("connection manager shut down, closing transport connection to [{}]", node);
} else if (conn.hasReferences()) {
Expand Down Expand Up @@ -366,7 +366,7 @@ private void internalOpenConnection(
try {
connectionListener.onConnectionOpened(connection);
} finally {
connection.addCloseListener(ActionListener.wrap(() -> connectionListener.onConnectionClosed(connection)));
connection.addCloseListener(ActionListener.running(() -> connectionListener.onConnectionClosed(connection)));
}
if (connection.isClosed()) {
throw new ConnectTransportException(node, "a channel closed while connecting");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private void sendMessage(TcpChannel channel, OutboundMessage networkMessage, Rel
release.close();
}
}
internalSend(channel, message, networkMessage, ActionListener.wrap(release::close));
internalSend(channel, message, networkMessage, ActionListener.running(release::close));
}

private void internalSend(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ static void handleException(TcpChannel channel, Exception e, Lifecycle lifecycle
outboundHandler.sendBytes(
channel,
new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8)),
ActionListener.wrap(() -> CloseableChannel.closeChannel(channel))
ActionListener.running(() -> CloseableChannel.closeChannel(channel))
);
closeChannel = false;
}
Expand Down Expand Up @@ -791,7 +791,7 @@ protected void serverAcceptedChannel(TcpChannel channel) {
assert addedOnThisCall : "Channel should only be added to accepted channel set once";
// Mark the channel init time
channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis());
channel.addCloseListener(ActionListener.wrap(() -> acceptedChannels.remove(channel)));
channel.addCloseListener(ActionListener.running(() -> acceptedChannels.remove(channel)));
logger.trace(() -> format("Tcp transport channel accepted: %s", channel));
}

Expand Down Expand Up @@ -1127,7 +1127,7 @@ public void onResponse(Void v) {
nodeChannels.channels.forEach(ch -> {
// Mark the channel init time
ch.getChannelStats().markAccessed(relativeMillisTime);
ch.addCloseListener(ActionListener.wrap(nodeChannels::close));
ch.addCloseListener(ActionListener.running(nodeChannels::close));
});
keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile);
nodeChannels.addCloseListener(new ChannelCloseLogger(node, connectionId, relativeMillisTime));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ void sendHandshake(
final HandshakeResponseHandler handler = new HandshakeResponseHandler(requestId, version, listener);
pendingHandshakes.put(requestId, handler);
channel.addCloseListener(
ActionListener.wrap(() -> handler.handleLocalException(new TransportException("handshake failed because connection reset")))
ActionListener.running(() -> handler.handleLocalException(new TransportException("handshake failed because connection reset")))
);
boolean success = false;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void registerNodeConnection(List<TcpChannel> nodeChannels, ConnectionProfile con

for (TcpChannel channel : nodeChannels) {
scheduledPing.addChannel(channel);
channel.addCloseListener(ActionListener.wrap(() -> scheduledPing.removeChannel(channel)));
channel.addCloseListener(ActionListener.running(() -> scheduledPing.removeChannel(channel)));
}
}

Expand Down

0 comments on commit 405a53b

Please sign in to comment.