Skip to content

Commit

Permalink
CR: fix naming
Browse files Browse the repository at this point in the history
  • Loading branch information
original-brownbear committed Apr 4, 2019
1 parent 2897e0d commit 1f0a804
Show file tree
Hide file tree
Showing 23 changed files with 125 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,15 @@ public void onFailure(Exception e) {
*/
void waitedForCompletion(Task thisTask, GetTaskRequest request, TaskInfo snapshotOfRunningTask,
ActionListener<GetTaskResponse> listener) {
getFinishedTaskFromIndex(thisTask, request, ActionListener.delegateResponse(listener, (l, e) -> {
getFinishedTaskFromIndex(thisTask, request, ActionListener.delegateResponse(listener, (delegatedListener, e) -> {
/*
* We couldn't load the task from the task index. Instead of 404 we should use the snapshot we took after it finished. If
* the error isn't a 404 then we'll just throw it back to the user.
*/
if (ExceptionsHelper.unwrap(e, ResourceNotFoundException.class) != null) {
l.onResponse(new GetTaskResponse(new TaskResult(true, snapshotOfRunningTask)));
delegatedListener.onResponse(new GetTaskResponse(new TaskResult(true, snapshotOfRunningTask)));
} else {
l.onFailure(e);
delegatedListener.onFailure(e);
}
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ protected ClusterBlockException checkBlock(DeleteRepositoryRequest request, Clus
protected void masterOperation(final DeleteRepositoryRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
repositoriesService.unregisterRepository(
request, ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(new AcknowledgedResponse(r.isAcknowledged()))));
request, ActionListener.delegateFailure(listener,
(delegatedListener, unregisterRepositoryResponse) ->
delegatedListener.onResponse(new AcknowledgedResponse(unregisterRepositoryResponse.isAcknowledged()))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected ClusterBlockException checkBlock(PutRepositoryRequest request, Cluster
@Override
protected void masterOperation(final PutRepositoryRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
repositoriesService.registerRepository(
request, ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(new AcknowledgedResponse(r.isAcknowledged()))));
repositoriesService.registerRepository(request, ActionListener.delegateFailure(listener,
(delegatedListener, response) -> delegatedListener.onResponse(new AcknowledgedResponse(response.isAcknowledged()))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ protected ClusterBlockException checkBlock(VerifyRepositoryRequest request, Clus
protected void masterOperation(final VerifyRepositoryRequest request, ClusterState state,
final ActionListener<VerifyRepositoryResponse> listener) {
repositoriesService.verifyRepository(request.name(), ActionListener.delegateFailure(listener,
(l, r) -> l.onResponse(new VerifyRepositoryResponse(r.toArray(new DiscoveryNode[0])))));
(delegatedListener, verifyResponse) ->
delegatedListener.onResponse(new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0])))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,13 @@ protected ClusterBlockException checkBlock(RestoreSnapshotRequest request, Clust
@Override
protected void masterOperation(final RestoreSnapshotRequest request, final ClusterState state,
final ActionListener<RestoreSnapshotResponse> listener) {
restoreService.restoreSnapshot(request, ActionListener.delegateFailure(listener, (l, r) -> {
if (r.getRestoreInfo() == null && request.waitForCompletion()) {
RestoreClusterStateListener.createAndRegisterListener(clusterService, r, l);
} else {
l.onResponse(new RestoreSnapshotResponse(r.getRestoreInfo()));
}
}));
restoreService.restoreSnapshot(request, ActionListener.delegateFailure(listener,
(delegatedListener, restoreCompletionResponse) -> {
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, delegatedListener);
} else {
delegatedListener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo()));
}
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ protected void masterOperation(final Task task,
.masterNodeTimeout(request.masterNodeTimeout())
.waitForActiveShards(request.waitForActiveShards())
.indices(concreteIndices);
indexStateService.closeIndices(closeRequest, ActionListener.delegateResponse(listener, (l, t) -> {
indexStateService.closeIndices(closeRequest, ActionListener.delegateResponse(listener, (delegatedListener, t) -> {
logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t);
listener.onFailure(t);
delegatedListener.onFailure(t);
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ protected void masterOperation(final ResizeRequest resizeRequest, final ClusterS
final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getSourceIndex());
final String targetIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getTargetIndexRequest().index());
client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true).execute(
ActionListener.delegateFailure(listener, (l, r) -> {
ActionListener.delegateFailure(listener, (delegatedListener, indicesStatsResponse) -> {
CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state,
i -> {
IndexShardStats shard = r.getIndex(sourceIndex).getIndexShards().get(i);
IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
return shard == null ? null : shard.getPrimary().getDocs();
}, sourceIndex, targetIndex);
createIndexService.createIndex(
updateRequest, ActionListener.map(l,
updateRequest, ActionListener.map(delegatedListener,
response -> new ResizeResponse(response.isAcknowledged(), response.isShardsAcknowledged(), updateRequest.index()))
);
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public void onFailure(Exception e) {

private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener<UpgradeResponse> listener) {
UpgradeSettingsRequest upgradeSettingsRequest = new UpgradeSettingsRequest(upgradeResponse.versions());
client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest,
ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(upgradeResponse)));
client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest, ActionListener.delegateFailure(
listener, (delegatedListener, updateSettingsResponse) -> delegatedListener.onResponse(upgradeResponse)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -658,13 +658,14 @@ ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis,
return ActionListener.map(actionListener,
response -> new BulkResponse(response.getItems(), response.getTook().getMillis(), ingestTookInMillis));
} else {
return ActionListener.delegateFailure(actionListener, (l, r) -> {
BulkItemResponse[] items = r.getItems();
return ActionListener.delegateFailure(actionListener, (delegatedListener, response) -> {
BulkItemResponse[] items = response.getItems();
for (int i = 0; i < items.length; i++) {
itemResponses.add(originalSlots[i], r.getItems()[i]);
itemResponses.add(originalSlots[i], response.getItems()[i]);
}
l.onResponse(
new BulkResponse(itemResponses.toArray(new BulkItemResponse[0]), r.getTook().getMillis(), ingestTookInMillis));
delegatedListener.onResponse(
new BulkResponse(
itemResponses.toArray(new BulkItemResponse[0]), response.getTook().getMillis(), ingestTookInMillis));
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,13 @@ protected void doStart(ClusterState clusterState) {
});
}
} else {
ActionListener<Response> delegate = ActionListener.delegateResponse(listener, (l, t) -> {
ActionListener<Response> delegate = ActionListener.delegateResponse(listener, (delegatedListener, t) -> {
if (t instanceof FailedToCommitClusterStateException || t instanceof NotMasterException) {
logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or " +
"stepped down before publishing action [{}], scheduling a retry", actionName), t);
retry(t, masterChangePredicate);
} else {
l.onFailure(t);
delegatedListener.onFailure(t);
}
});
threadPool.executor(executor).execute(new ActionRunnable<Response>(delegate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,13 +463,13 @@ private void fetch(TermsLookup termsLookup, Client client, ActionListener<List<O
? new GetRequest(termsLookup.index(), termsLookup.id())
: new GetRequest(termsLookup.index(), termsLookup.type(), termsLookup.id());
getRequest.preference("_local").routing(termsLookup.routing());
client.get(getRequest, ActionListener.delegateFailure(actionListener, (l, r) -> {
client.get(getRequest, ActionListener.delegateFailure(actionListener, (delegatedListener, getResponse) -> {
List<Object> terms = new ArrayList<>();
if (r.isSourceEmpty() == false) { // extract terms only if the doc source exists
List<Object> extractedValues = XContentMapValues.extractRawValues(termsLookup.path(), r.getSourceAsMap());
if (getResponse.isSourceEmpty() == false) { // extract terms only if the doc source exists
List<Object> extractedValues = XContentMapValues.extractRawValues(termsLookup.path(), getResponse.getSourceAsMap());
terms.addAll(extractedValues);
}
l.onResponse(terms);
delegatedListener.onResponse(terms);
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ protected void asyncShardOperation(T request, ShardId shardId, final ActionListe
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.id());
indexShard.acquirePrimaryOperationPermit(
ActionListener.delegateFailure(listener, (l, r) -> {
try (Releasable ignore = r) {
doRetentionLeaseAction(indexShard, request, l);
ActionListener.delegateFailure(listener, (delegatedListener, releasable) -> {
try (Releasable ignore = releasable) {
doRetentionLeaseAction(indexShard, request, delegatedListener);
}
}),
ThreadPool.Names.SAME,
Expand Down
45 changes: 23 additions & 22 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2688,29 +2688,30 @@ private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm,
// primary term update. Since indexShardOperationPermits doesn't guarantee that async submissions are executed
// in the order submitted, combining both operations ensure that the term is updated before the operation is
// executed. It also has the side effect of acquiring all the permits one time instead of two.
final ActionListener<Releasable> operationListener = ActionListener.delegateFailure(onPermitAcquired, (l, r) -> {
if (opPrimaryTerm < getOperationPrimaryTerm()) {
r.close();
final String message = String.format(
Locale.ROOT,
"%s operation primary term [%d] is too old (current [%d])",
shardId,
opPrimaryTerm,
getOperationPrimaryTerm());
l.onFailure(new IllegalStateException(message));
} else {
assert assertReplicationTarget();
try {
updateGlobalCheckpointOnReplica(globalCheckpoint, "operation");
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes);
} catch (Exception e) {
r.close();
l.onFailure(e);
return;
final ActionListener<Releasable> operationListener = ActionListener.delegateFailure(onPermitAcquired,
(delegatedListener, releasable) -> {
if (opPrimaryTerm < getOperationPrimaryTerm()) {
releasable.close();
final String message = String.format(
Locale.ROOT,
"%s operation primary term [%d] is too old (current [%d])",
shardId,
opPrimaryTerm,
getOperationPrimaryTerm());
delegatedListener.onFailure(new IllegalStateException(message));
} else {
assert assertReplicationTarget();
try {
updateGlobalCheckpointOnReplica(globalCheckpoint, "operation");
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes);
} catch (Exception e) {
releasable.close();
delegatedListener.onFailure(e);
return;
}
delegatedListener.onResponse(releasable);
}
l.onResponse(r);
}
});
});

if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) {
synchronized (mutex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
protected final void masterOperation(final Request request, ClusterState state,
final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.completePersistentTask(request.taskId, request.allocationId, request.exception,
ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(new PersistentTaskResponse(r))));
ActionListener.delegateFailure(listener,
(delegatedListener, task) -> delegatedListener.onResponse(new PersistentTaskResponse(task))));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
protected final void masterOperation(final Request request, ClusterState state,
final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.removePersistentTask(
request.taskId, ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(new PersistentTaskResponse(r))));
request.taskId, ActionListener.delegateFailure(listener,
(delegatedListener, task) -> delegatedListener.onResponse(new PersistentTaskResponse(task))));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
protected final void masterOperation(final Request request, ClusterState state,
final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.createPersistentTask(request.taskId, request.taskName, request.params,
ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(new PersistentTaskResponse(r))));
ActionListener.delegateFailure(listener,
(delegatedListener, task) -> delegatedListener.onResponse(new PersistentTaskResponse(task))));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ protected final void masterOperation(final Request request,
final ClusterState state,
final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.updatePersistentTaskState(request.taskId, request.allocationId, request.state,
ActionListener.delegateFailure(listener, (l, r) -> l.onResponse(new PersistentTaskResponse(r))));
ActionListener.delegateFailure(listener,
(delegatedListener, task) -> delegatedListener.onResponse(new PersistentTaskResponse(task))));
}
}
}
Loading

0 comments on commit 1f0a804

Please sign in to comment.