Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Delegating ActionListener Wrappers #40129

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions server/src/main/java/org/elasticsearch/action/ActionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,53 @@ public void onFailure(Exception e) {
};
}

/**
* Creates a listener that delegates all responses it receives to another listener.
*
* @param delegate ActionListener to wrap and delegate any exception to
* @param bc BiConsumer invoked with delegate listener and exception
* @param <T> Type of the listener
* @return Delegating listener
*/
static <T> ActionListener<T> delegateResponse(ActionListener<T> delegate, BiConsumer<ActionListener<T>, Exception> bc) {
return new ActionListener<T>() {

@Override
public void onResponse(T r) {
delegate.onResponse(r);
}

@Override
public void onFailure(Exception e) {
bc.accept(delegate, e);
}
};
}

/**
* Creates a listener that delegates all exceptions it receives to another listener.
*
* @param delegate ActionListener to wrap and delegate any exception to
* @param bc BiConsumer invoked with delegate listener and response
* @param <T> Type of the delegating listener's response
* @param <R> Type of the wrapped listeners
* @return Delegating listener
*/
static <T, R> ActionListener<T> delegateFailure(ActionListener<R> delegate, BiConsumer<ActionListener<R>, T> bc) {
return new ActionListener<T>() {

@Override
public void onResponse(T r) {
bc.accept(delegate, r);
}

@Override
public void onFailure(Exception e) {
delegate.onFailure(e);
}
};
}

/**
* Creates a listener that listens for a response (or failure) and executes the
* corresponding runnable when the response (or failure) is received.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ void getRunningTaskFromNode(Task thisTask, GetTaskRequest request, ActionListene
// Shift to the generic thread pool and let it wait for the task to complete so we don't block any important threads.
threadPool.generic().execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
protected void doRun() {
taskManager.waitForTaskCompletion(runningTask, waitForCompletionTimeout(request.getTimeout()));
waitedForCompletion(thisTask, request, runningTask.taskInfo(clusterService.localNode().getId(), true), listener);
}
Expand All @@ -180,26 +180,17 @@ public void onFailure(Exception e) {
*/
void waitedForCompletion(Task thisTask, GetTaskRequest request, TaskInfo snapshotOfRunningTask,
ActionListener<GetTaskResponse> listener) {
getFinishedTaskFromIndex(thisTask, request, new ActionListener<GetTaskResponse>() {
@Override
public void onResponse(GetTaskResponse response) {
// We were able to load the task from the task index. Let's send that back.
listener.onResponse(response);
}

@Override
public void onFailure(Exception e) {
getFinishedTaskFromIndex(thisTask, request, ActionListener.delegateResponse(listener, (l, e) -> {
/*
* We couldn't load the task from the task index. Instead of 404 we should use the snapshot we took after it finished. If
* the error isn't a 404 then we'll just throw it back to the user.
*/
if (ExceptionsHelper.unwrap(e, ResourceNotFoundException.class) != null) {
listener.onResponse(new GetTaskResponse(new TaskResult(true, snapshotOfRunningTask)));
l.onResponse(new GetTaskResponse(new TaskResult(true, snapshotOfRunningTask)));
} else {
listener.onFailure(e);
l.onFailure(e);
}
}
});
}));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -69,17 +68,6 @@ protected ClusterBlockException checkBlock(DeleteRepositoryRequest request, Clus
protected void masterOperation(final DeleteRepositoryRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
repositoriesService.unregisterRepository(
request,
new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse unregisterRepositoryResponse) {
listener.onResponse(new AcknowledgedResponse(unregisterRepositoryResponse.isAcknowledged()));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
request, ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(new AcknowledgedResponse(r.isAcknowledged()))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -68,17 +67,7 @@ protected ClusterBlockException checkBlock(PutRepositoryRequest request, Cluster
@Override
protected void masterOperation(final PutRepositoryRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
repositoriesService.registerRepository(request, new ActionListener<ClusterStateUpdateResponse>() {

@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new AcknowledgedResponse(response.isAcknowledged()));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
repositoriesService.registerRepository(
request, ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(new AcknowledgedResponse(r.isAcknowledged()))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.List;

/**
* Transport action for verifying repository operation
*/
Expand Down Expand Up @@ -70,16 +68,7 @@ protected ClusterBlockException checkBlock(VerifyRepositoryRequest request, Clus
@Override
protected void masterOperation(final VerifyRepositoryRequest request, ClusterState state,
final ActionListener<VerifyRepositoryResponse> listener) {
repositoriesService.verifyRepository(request.name(), new ActionListener<List<DiscoveryNode>>() {
@Override
public void onResponse(List<DiscoveryNode> verifyResponse) {
listener.onResponse(new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0])));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
repositoriesService.verifyRepository(request.name(), ActionListener.delegateFailure(listener,
(l, r) -> l.onResponse(new VerifyRepositoryResponse(r.toArray(new DiscoveryNode[0])))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.RestoreService.RestoreCompletionResponse;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -73,20 +72,12 @@ protected ClusterBlockException checkBlock(RestoreSnapshotRequest request, Clust
@Override
protected void masterOperation(final RestoreSnapshotRequest request, final ClusterState state,
final ActionListener<RestoreSnapshotResponse> listener) {
restoreService.restoreSnapshot(request, new ActionListener<RestoreCompletionResponse>() {
@Override
public void onResponse(RestoreCompletionResponse restoreCompletionResponse) {
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, listener);
} else {
listener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo()));
}
restoreService.restoreSnapshot(request, ActionListener.delegateFailure(listener, (l, r) -> {
if (r.getRestoreInfo() == null && request.waitForCompletion()) {
RestoreClusterStateListener.createAndRegisterListener(clusterService, r, l);
} else {
l.onResponse(new RestoreSnapshotResponse(r.getRestoreInfo()));
}

@Override
public void onFailure(Exception t) {
listener.onFailure(t);
}
});
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,9 @@ protected void masterOperation(final Task task,
.masterNodeTimeout(request.masterNodeTimeout())
.waitForActiveShards(request.waitForActiveShards())
.indices(concreteIndices);

indexStateService.closeIndices(closeRequest, new ActionListener<CloseIndexResponse>() {

@Override
public void onResponse(final CloseIndexResponse response) {
listener.onResponse(response);
}

@Override
public void onFailure(final Exception t) {
logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t);
listener.onFailure(t);
}
});
indexStateService.closeIndices(closeRequest, ActionListener.delegateResponse(listener, (l, t) -> {
logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t);
listener.onFailure(t);
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
Expand Down Expand Up @@ -97,25 +96,18 @@ protected void masterOperation(final ResizeRequest resizeRequest, final ClusterS
// there is no need to fetch docs stats for split but we keep it simple and do it anyway for simplicity of the code
final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getSourceIndex());
final String targetIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getTargetIndexRequest().index());
client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true).execute(new ActionListener<IndicesStatsResponse>() {
@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true).execute(
ActionListener.delegateFailure(listener, (l, r) -> {
CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state,
(i) -> {
IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
i -> {
IndexShardStats shard = r.getIndex(sourceIndex).getIndexShards().get(i);
return shard == null ? null : shard.getPrimary().getDocs();
}, sourceIndex, targetIndex);
createIndexService.createIndex(
updateRequest, ActionListener.map(listener,
updateRequest, ActionListener.map(l,
response -> new ResizeResponse(response.isAcknowledged(), response.isShardsAcknowledged(), updateRequest.index()))
);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand Down Expand Up @@ -104,7 +103,7 @@ protected UpgradeResponse newResponse(UpgradeRequest request, int totalShards, i
versions.put(index, new Tuple<>(version, luceneVersion));
}
}
Map<String, Tuple<org.elasticsearch.Version, String>> updatedVersions = new HashMap<>();
Map<String, Tuple<Version, String>> updatedVersions = new HashMap<>();
MetaData metaData = clusterState.metaData();
for (Map.Entry<String, Tuple<Version, org.apache.lucene.util.Version>> versionEntry : versions.entrySet()) {
String index = versionEntry.getKey();
Expand Down Expand Up @@ -209,16 +208,7 @@ public void onFailure(Exception e) {

private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener<UpgradeResponse> listener) {
UpgradeSettingsRequest upgradeSettingsRequest = new UpgradeSettingsRequest(upgradeResponse.versions());
client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest, new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse updateSettingsResponse) {
listener.onResponse(upgradeResponse);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest,
ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(upgradeResponse)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,14 @@ ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis,
return ActionListener.map(actionListener,
response -> new BulkResponse(response.getItems(), response.getTook().getMillis(), ingestTookInMillis));
} else {
return new IngestBulkResponseListener(ingestTookInMillis, originalSlots, itemResponses, actionListener);
return ActionListener.delegateFailure(actionListener, (l, r) -> {
BulkItemResponse[] items = r.getItems();
for (int i = 0; i < items.length; i++) {
itemResponses.add(originalSlots[i], r.getItems()[i]);
}
l.onResponse(
new BulkResponse(itemResponses.toArray(new BulkItemResponse[0]), r.getTook().getMillis(), ingestTookInMillis));
});
}
}

Expand Down Expand Up @@ -688,36 +695,4 @@ void markCurrentItemAsFailed(Exception e) {
}

}

static final class IngestBulkResponseListener implements ActionListener<BulkResponse> {

private final long ingestTookInMillis;
private final int[] originalSlots;
private final List<BulkItemResponse> itemResponses;
private final ActionListener<BulkResponse> actionListener;

IngestBulkResponseListener(long ingestTookInMillis, int[] originalSlots, List<BulkItemResponse> itemResponses,
ActionListener<BulkResponse> actionListener) {
this.ingestTookInMillis = ingestTookInMillis;
this.itemResponses = itemResponses;
this.actionListener = actionListener;
this.originalSlots = originalSlots;
}

@Override
public void onResponse(BulkResponse response) {
BulkItemResponse[] items = response.getItems();
for (int i = 0; i < items.length; i++) {
itemResponses.add(originalSlots[i], response.getItems()[i]);
}
actionListener.onResponse(new BulkResponse(
itemResponses.toArray(new BulkItemResponse[itemResponses.size()]),
response.getTook().getMillis(), ingestTookInMillis));
}

@Override
public void onFailure(Exception e) {
actionListener.onFailure(e);
}
}
}
Loading