Skip to content

Commit

Permalink
Make field-caps tasks cancellable (#92051) (#92800)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Jan 10, 2023
1 parent 689a2b3 commit 68d1337
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 3 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/92051.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 92051
summary: Make field-caps tasks cancellable
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -109,6 +113,26 @@ public ActionRequestValidationException validate() {
return null;
}

@Override
public String getDescription() {
final StringBuilder stringBuilder = new StringBuilder("shards[");
Strings.collectionToDelimitedStringWithLimit(shardIds, ",", "", "", 1024, stringBuilder);
stringBuilder.append("], fields[");
Strings.collectionToDelimitedStringWithLimit(Arrays.asList(fields), ",", "", "", 1024, stringBuilder);
stringBuilder.append("]");
return stringBuilder.toString();
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers) {
@Override
public String getDescription() {
return FieldCapabilitiesNodeRequest.this.getDescription();
}
};
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

Expand Down Expand Up @@ -250,4 +253,13 @@ public String getDescription() {
return stringBuilder.toString();
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers) {
@Override
public String getDescription() {
return FieldCapabilitiesRequest.this.getDescription();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
Expand Down Expand Up @@ -93,6 +94,7 @@ public TransportFieldCapabilitiesAction(
@Override
protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener<FieldCapabilitiesResponse> listener) {
// retrieve the initial timestamp in case the action is a cross cluster search
final CancellableTask fieldCapTask = (CancellableTask) task;
long nowInMillis = request.nowInMillis() == null ? System.currentTimeMillis() : request.nowInMillis();
final ClusterState clusterState = clusterService.state();
final Map<String, OriginalIndices> remoteClusterIndices = transportService.getRemoteClusterService()
Expand Down Expand Up @@ -121,7 +123,7 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
final FailureCollector indexFailures = new FailureCollector();
// One for each cluster including the local cluster
final CountDown completionCounter = new CountDown(1 + remoteClusterIndices.size());
final Runnable countDown = createResponseMerger(request, completionCounter, indexResponses, indexFailures, listener);
final Runnable countDown = createResponseMerger(request, fieldCapTask, completionCounter, indexResponses, indexFailures, listener);
final RequestDispatcher requestDispatcher = new RequestDispatcher(
clusterService,
transportService,
Expand Down Expand Up @@ -170,6 +172,7 @@ private void checkIndexBlocks(ClusterState clusterState, String[] concreteIndice

private Runnable createResponseMerger(
FieldCapabilitiesRequest request,
CancellableTask task,
CountDown completionCounter,
Map<String, FieldCapabilitiesIndexResponse> indexResponses,
FailureCollector indexFailures,
Expand All @@ -186,7 +189,7 @@ private Runnable createResponseMerger(
.submit(
ActionRunnable.supply(
listener,
() -> merge(indexResponses, request.includeUnmapped(), new ArrayList<>(failures))
() -> merge(task, indexResponses, request.includeUnmapped(), new ArrayList<>(failures))
)
);
} else {
Expand Down Expand Up @@ -224,13 +227,15 @@ private static FieldCapabilitiesRequest prepareRemoteRequest(
}

private FieldCapabilitiesResponse merge(
CancellableTask task,
Map<String, FieldCapabilitiesIndexResponse> indexResponses,
boolean includeUnmapped,
List<FieldCapabilitiesFailure> failures
) {
String[] indices = indexResponses.keySet().stream().sorted().toArray(String[]::new);
final Map<String, Map<String, FieldCapabilities.Builder>> responseMapBuilder = new HashMap<>();
for (FieldCapabilitiesIndexResponse response : indexResponses.values()) {
task.ensureNotCancelled();
innerMerge(responseMapBuilder, response);
}
final Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
Expand Down Expand Up @@ -327,6 +332,7 @@ boolean isEmpty() {
private class NodeTransportHandler implements TransportRequestHandler<FieldCapabilitiesNodeRequest> {
@Override
public void messageReceived(FieldCapabilitiesNodeRequest request, TransportChannel channel, Task task) throws Exception {
final CancellableTask cancellableTask = (CancellableTask) task;
final ActionListener<FieldCapabilitiesNodeResponse> listener = new ChannelActionListener<>(channel, ACTION_NODE_NAME, request);
ActionListener.completeWith(listener, () -> {
final List<FieldCapabilitiesIndexResponse> allResponses = new ArrayList<>();
Expand All @@ -342,6 +348,7 @@ public void messageReceived(FieldCapabilitiesNodeRequest request, TransportChann
final Map<ShardId, Exception> failures = new HashMap<>();
final Set<ShardId> unmatched = new HashSet<>();
for (ShardId shardId : shardIds) {
cancellableTask.ensureNotCancelled();
final FieldCapabilitiesIndexRequest indexRequest = new FieldCapabilitiesIndexRequest(
request.fields(),
shardId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
PARSER.parse(parser, fieldRequest, null);
}
});
return channel -> client.fieldCaps(fieldRequest, new RestToXContentListener<>(channel));
return channel -> {
RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());
cancelClient.fieldCaps(fieldRequest, new RestToXContentListener<>(channel));
};
}

private static ParseField INDEX_FILTER_FIELD = new ParseField("index_filter");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;

public class FieldCapabilitiesNodeRequestTests extends AbstractWireSerializingTestCase<FieldCapabilitiesNodeRequest> {

@Override
Expand Down Expand Up @@ -148,4 +151,26 @@ protected FieldCapabilitiesNodeRequest mutateInstance(FieldCapabilitiesNodeReque
throw new IllegalStateException("The test should only allow 5 parameters mutated");
}
}

public void testDescription() {
FieldCapabilitiesNodeRequest r1 = new FieldCapabilitiesNodeRequest(
Arrays.asList(new ShardId("index-1", "n/a", 0), new ShardId("index-2", "n/a", 3)),
new String[] { "field-1", "field-2" },
randomOriginalIndices(1),
null,
randomNonNegativeLong(),
Collections.emptyMap()
);
assertThat(r1.getDescription(), equalTo("shards[[index-1][0],[index-2][3]], fields[field-1,field-2]"));

FieldCapabilitiesNodeRequest r2 = new FieldCapabilitiesNodeRequest(
Arrays.asList(new ShardId("index-1", "n/a", 0)),
new String[] { "*" },
randomOriginalIndices(1),
null,
randomNonNegativeLong(),
Collections.emptyMap()
);
assertThat(r2.getDescription(), equalTo("shards[[index-1][0]], fields[*]"));
}
}

0 comments on commit 68d1337

Please sign in to comment.