Skip to content

Commit

Permalink
Increase concurrent request of opening point-in-time (#96959)
Browse files Browse the repository at this point in the history
* Increase concurrent request of opening point-in-time (#96782) (#96957)

Today, we mistakenly throttle the opening point-in-time API to 1 request
per node. As a result, when attempting to open a point-in-time across
large clusters, it can take a significant amount of time and eventually
fails due to relocated target shards or deleted target indices managed
by ILM. Ideally, we should batch the requests per node and eliminate
this throttle completely. However, this requires all clusters to be on
the latest version.

This PR increases the number of concurrent requests from 1 to 5, which
is the default of search.

* Fix tests

* Fix tests
  • Loading branch information
dnhatn committed Jun 20, 2023
1 parent 1fe30f5 commit e1995a7
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 3 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/96782.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 96782
summary: Increase concurrent request of opening point-in-time
area: Search
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,20 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.SearchHit;
Expand All @@ -32,10 +36,14 @@
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand All @@ -53,6 +61,11 @@

public class PointInTimeIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
Expand Down Expand Up @@ -433,6 +446,55 @@ public void testCloseInvalidPointInTime() {
assertThat(tasks, empty());
}

public void testOpenPITConcurrentShardRequests() throws Exception {
DiscoveryNode dataNode = randomFrom(clusterService().state().nodes().getDataNodes().values());
int numShards = randomIntBetween(5, 10);
int maxConcurrentRequests = randomIntBetween(2, 5);
assertAcked(
client().admin()
.indices()
.prepareCreate("test")
.setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards)
.put("index.routing.allocation.require._id", dataNode.getId())
.build()
)
);
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(
TransportService.class,
dataNode.getName()
);
try {
CountDownLatch sentLatch = new CountDownLatch(maxConcurrentRequests);
CountDownLatch readyLatch = new CountDownLatch(1);
transportService.addRequestHandlingBehavior(
TransportOpenPointInTimeAction.OPEN_SHARD_READER_CONTEXT_NAME,
(handler, request, channel, task) -> {
sentLatch.countDown();
Thread thread = new Thread(() -> {
try {
assertTrue(readyLatch.await(1, TimeUnit.MINUTES));
handler.messageReceived(request, channel, task);
} catch (Exception e) {
throw new AssertionError(e);
}
});
thread.start();
}
);
OpenPointInTimeRequest request = new OpenPointInTimeRequest("test").keepAlive(TimeValue.timeValueMinutes(1));
request.maxConcurrentShardRequests(maxConcurrentRequests);
PlainActionFuture<OpenPointInTimeResponse> future = new PlainActionFuture<>();
client().execute(OpenPointInTimeAction.INSTANCE, request, future);
assertTrue(sentLatch.await(1, TimeUnit.MINUTES));
readyLatch.countDown();
closePointInTime(future.actionGet().getPointInTimeId());
} finally {
transportService.clearAllRules();
}
}

@SuppressWarnings({ "rawtypes", "unchecked" })
private void assertPagination(PointInTimeBuilder pit, int expectedNumDocs, int size, SortBuilder<?>... sorts) throws Exception {
Set<String> seen = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@
import static org.elasticsearch.action.ValidateActions.addValidationError;

public final class OpenPointInTimeRequest extends ActionRequest implements IndicesRequest.Replaceable {

private String[] indices;
private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;
private TimeValue keepAlive;

private int maxConcurrentShardRequests = SearchRequest.DEFAULT_MAX_CONCURRENT_SHARD_REQUESTS;
@Nullable
private String routing;
@Nullable
Expand Down Expand Up @@ -123,6 +124,27 @@ public OpenPointInTimeRequest preference(String preference) {
return this;
}

/**
* Similar to {@link SearchRequest#getMaxConcurrentShardRequests()}, this returns the number of shard requests that should be
* executed concurrently on a single node . This value should be used as a protection mechanism to reduce the number of shard
* requests fired per open point-in-time request. The default is {@code 5}
*/
public int maxConcurrentShardRequests() {
return maxConcurrentShardRequests;
}

/**
* Similar to {@link SearchRequest#setMaxConcurrentShardRequests(int)}, this sets the number of shard requests that should be
* executed concurrently on a single node. This value should be used as a protection mechanism to reduce the number of shard
* requests fired per open point-in-time request.
*/
public void maxConcurrentShardRequests(int maxConcurrentShardRequests) {
if (maxConcurrentShardRequests < 1) {
throw new IllegalArgumentException("maxConcurrentShardRequests must be >= 1");
}
this.maxConcurrentShardRequests = maxConcurrentShardRequests;
}

@Override
public boolean allowsRemoteIndices() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
openRequest.routing(request.param("routing"));
openRequest.preference(request.param("preference"));
openRequest.keepAlive(TimeValue.parseTimeValue(request.param("keep_alive"), null, "keep_alive"));
if (request.hasParam("max_concurrent_shard_requests")) {
final int maxConcurrentShardRequests = request.paramAsInt(
"max_concurrent_shard_requests",
openRequest.maxConcurrentShardRequests()
);
openRequest.maxConcurrentShardRequests(maxConcurrentShardRequests);
}
return channel -> client.execute(OpenPointInTimeAction.INSTANCE, openRequest, new RestToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
private int batchedReduceSize = DEFAULT_BATCHED_REDUCE_SIZE;

private int maxConcurrentShardRequests = 0;
public static final int DEFAULT_MAX_CONCURRENT_SHARD_REQUESTS = 5;

private Integer preFilterShardSize;

Expand Down Expand Up @@ -717,7 +718,7 @@ public int getBatchedReduceSize() {
* cluster can be throttled with this number to reduce the cluster load. The default is {@code 5}
*/
public int getMaxConcurrentShardRequests() {
return maxConcurrentShardRequests == 0 ? 5 : maxConcurrentShardRequests;
return maxConcurrentShardRequests == 0 ? DEFAULT_MAX_CONCURRENT_SHARD_REQUESTS : maxConcurrentShardRequests;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen
.routing(request.routing())
.allowPartialSearchResults(false);
searchRequest.setCcsMinimizeRoundtrips(false);
searchRequest.setMaxConcurrentShardRequests(request.maxConcurrentShardRequests());
transportSearchAction.executeRequest(
(SearchTask) task,
searchRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ public AbstractSearchAsyncAction<? extends SearchPhaseResult> asyncSearchAction(
clusterState,
task,
new ArraySearchPhaseResults<>(shardsIts.size()),
1,
searchRequest.getMaxConcurrentShardRequests(),
clusters
) {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.search;

import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.test.rest.RestActionTestCase;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;

public class RestOpenPointInTimeActionTests extends RestActionTestCase {

public void testMaxConcurrentSearchRequests() {
RestOpenPointInTimeAction action = new RestOpenPointInTimeAction();
controller().registerHandler(action);
Queue<OpenPointInTimeRequest> transportRequests = ConcurrentCollections.newQueue();
verifyingClient.setExecuteVerifier(((actionType, transportRequest) -> {
assertThat(transportRequest, instanceOf(OpenPointInTimeRequest.class));
transportRequests.add((OpenPointInTimeRequest) transportRequest);
return new OpenPointInTimeResponse("n/a");
}));
{
Map<String, String> params = new HashMap<>();
params.put("keep_alive", "1m");
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST)
.withPath("/_pit")
.withParams(params)
.build();
dispatchRequest(request);
assertThat(transportRequests, hasSize(1));
OpenPointInTimeRequest transportRequest = transportRequests.remove();
assertThat(transportRequest.maxConcurrentShardRequests(), equalTo(5));
}
{
int maxConcurrentRequests = randomIntBetween(1, 100);
Map<String, String> params = new HashMap<>();
params.put("max_concurrent_shard_requests", Integer.toString(maxConcurrentRequests));
params.put("keep_alive", "1m");
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST)
.withPath("/_pit")
.withParams(params)
.build();
dispatchRequest(request);
assertThat(transportRequests, hasSize(1));
OpenPointInTimeRequest transportRequest = transportRequests.remove();
assertThat(transportRequest.maxConcurrentShardRequests(), equalTo(maxConcurrentRequests));
}
}
}

0 comments on commit e1995a7

Please sign in to comment.