Skip to content

Commit

Permalink
Adding global settings for max concurrent searches/shard requests. (#…
Browse files Browse the repository at this point in the history
…17583)

* Allowing to configure max concurrent searches/shard requests.

* Simplifying constructors in tests.
  • Loading branch information
dennisoelkers committed Dec 5, 2023
1 parent c8e34b4 commit 2d4b321
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.joschi.jadconfig.util.Duration;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Streams;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import org.graylog.shaded.elasticsearch7.org.apache.http.ContentTooLongException;
Expand All @@ -39,6 +40,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Named;
import java.io.IOException;
Expand All @@ -57,17 +59,28 @@ public class ElasticsearchClient {

private final RestHighLevelClient client;
private final boolean compressionEnabled;
private final Optional<Integer> indexerMaxConcurrentSearches;
private final Optional<Integer> indexerMaxConcurrentShardRequests;
private final ObjectMapper objectMapper;

@Inject
public ElasticsearchClient(RestHighLevelClient client,
@Named("elasticsearch_compression_enabled") boolean compressionEnabled,
@Named("indexer_max_concurrent_searches") @Nullable Integer indexerMaxConcurrentSearches,
@Named("indexer_max_concurrent_shard_requests") @Nullable Integer indexerMaxConcurrentShardRequests,
ObjectMapper objectMapper) {
this.client = client;
this.compressionEnabled = compressionEnabled;
this.indexerMaxConcurrentSearches = Optional.ofNullable(indexerMaxConcurrentSearches);
this.indexerMaxConcurrentShardRequests = Optional.ofNullable(indexerMaxConcurrentShardRequests);
this.objectMapper = objectMapper;
}

@VisibleForTesting
public ElasticsearchClient(RestHighLevelClient client, ObjectMapper objectMapper) {
this(client, false, null, null, objectMapper);
}

public SearchResponse search(SearchRequest searchRequest, String errorMessage) {
final MultiSearchRequest multiSearchRequest = new MultiSearchRequest()
.add(searchRequest);
Expand All @@ -84,6 +97,10 @@ public SearchResponse singleSearch(SearchRequest searchRequest, String errorMess
public List<MultiSearchResponse.Item> msearch(List<SearchRequest> searchRequests, String errorMessage) {
final MultiSearchRequest multiSearchRequest = new MultiSearchRequest();

indexerMaxConcurrentSearches.ifPresent(multiSearchRequest::maxConcurrentSearchRequests);
indexerMaxConcurrentShardRequests.ifPresent(maxShardRequests -> searchRequests
.forEach(request -> request.setMaxConcurrentShardRequests(maxShardRequests)));

searchRequests.forEach(multiSearchRequest::add);

final MultiSearchResponse result = this.execute((c, requestOptions) -> c.msearch(multiSearchRequest, requestOptions), errorMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public ElasticsearchInstanceES7(final SearchVersion version, final String hostna
public ElasticsearchInstanceES7 init() {
super.init();
this.restHighLevelClient = buildRestClient();
this.elasticsearchClient = new ElasticsearchClient(this.restHighLevelClient, false, new ObjectMapperProvider().get());
this.elasticsearchClient = new ElasticsearchClient(this.restHighLevelClient, new ObjectMapperProvider().get());
this.client = new ClientES7(this.elasticsearchClient, featureFlags);
this.fixtureImporter = new FixtureImporterES7(this.elasticsearchClient);
this.adapters = new AdaptersES7(elasticsearchClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public OpenSearch13Instance(final SearchVersion version, final String hostname,
public OpenSearch13Instance init() {
super.init();
this.restHighLevelClient = buildRestClient();
this.elasticsearchClient = new ElasticsearchClient(this.restHighLevelClient, false, new ObjectMapperProvider().get());
this.elasticsearchClient = new ElasticsearchClient(this.restHighLevelClient, new ObjectMapperProvider().get());
this.client = new ClientES7(this.elasticsearchClient, featureFlags);
this.fixtureImporter = new FixtureImporterES7(this.elasticsearchClient);
this.adapters = new AdaptersES7(elasticsearchClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class RunningElasticsearchInstanceES7 implements SearchServerInstance {

public RunningElasticsearchInstanceES7(final List<String> featureFlags) {
this.restHighLevelClient = buildRestClient();
this.elasticsearchClient = new ElasticsearchClient(this.restHighLevelClient, false, new ObjectMapperProvider().get());
this.elasticsearchClient = new ElasticsearchClient(this.restHighLevelClient, new ObjectMapperProvider().get());
this.client = new ClientES7(this.elasticsearchClient, featureFlags);
this.fixtureImporter = new FixtureImporterES7(this.elasticsearchClient);
adapters = new AdaptersES7(elasticsearchClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.joschi.jadconfig.util.Duration;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Streams;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import org.graylog.shaded.opensearch2.org.apache.http.ContentTooLongException;
Expand All @@ -39,6 +40,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Named;
import java.io.IOException;
Expand All @@ -57,17 +59,28 @@ public class OpenSearchClient {

private final RestHighLevelClient client;
private final boolean compressionEnabled;
private final Optional<Integer> indexerMaxConcurrentSearches;
private final Optional<Integer> indexerMaxConcurrentShardRequests;
private final ObjectMapper objectMapper;

@Inject
public OpenSearchClient(RestHighLevelClient client,
@Named("elasticsearch_compression_enabled") boolean compressionEnabled,
@Named("indexer_max_concurrent_searches") @Nullable Integer indexerMaxConcurrentSearches,
@Named("indexer_max_concurrent_shard_requests") @Nullable Integer indexerMaxConcurrentShardRequests,
ObjectMapper objectMapper) {
this.client = client;
this.compressionEnabled = compressionEnabled;
this.indexerMaxConcurrentSearches = Optional.ofNullable(indexerMaxConcurrentSearches);
this.indexerMaxConcurrentShardRequests = Optional.ofNullable(indexerMaxConcurrentShardRequests);
this.objectMapper = objectMapper;
}

@VisibleForTesting
public OpenSearchClient(RestHighLevelClient client, ObjectMapper objectMapper) {
this(client, false, null, null, objectMapper);
}

public SearchResponse search(SearchRequest searchRequest, String errorMessage) {
final MultiSearchRequest multiSearchRequest = new MultiSearchRequest()
.add(searchRequest);
Expand All @@ -82,7 +95,11 @@ public SearchResponse singleSearch(SearchRequest searchRequest, String errorMess
}

public List<MultiSearchResponse.Item> msearch(List<SearchRequest> searchRequests, String errorMessage) {
final MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
var multiSearchRequest = new MultiSearchRequest();

indexerMaxConcurrentSearches.ifPresent(multiSearchRequest::maxConcurrentSearchRequests);
indexerMaxConcurrentShardRequests.ifPresent(maxShardRequests -> searchRequests
.forEach(request -> request.setMaxConcurrentShardRequests(maxShardRequests)));

searchRequests.forEach(multiSearchRequest::add);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void consumeContent() throws IOException {
RestStatus restStatus = RestStatus.BAD_REQUEST;
OpenSearchStatusException statusException = new OpenSearchStatusException(
"status msg", restStatus, responseException);
final OpenSearchClient openSearchClient = new OpenSearchClient(restHighLevelClient, true, new ObjectMapper());
final OpenSearchClient openSearchClient = new OpenSearchClient(restHighLevelClient, new ObjectMapper());

Exception exception = assertThrows(BatchSizeTooLargeException.class, () -> {
openSearchClient.execute((a, b) -> {throw statusException;});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public OpenSearchInstance(final SearchVersion version, final String hostname, fi
public OpenSearchInstance init() {
super.init();
RestHighLevelClient restHighLevelClient = buildRestClient();
this.openSearchClient = new OpenSearchClient(restHighLevelClient, false, new ObjectMapperProvider().get());
this.openSearchClient = new OpenSearchClient(restHighLevelClient, new ObjectMapperProvider().get());
this.client = new ClientOS2(this.openSearchClient, featureFlags);
this.fixtureImporter = new FixtureImporterOS2(this.openSearchClient);
adapters = new AdaptersOS2(openSearchClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,10 @@ public class ElasticsearchClientConfiguration {

@Parameter(value = "indexer_jwt_auth_token_expiration_duration")
Duration indexerJwtAuthTokenExpirationDuration = Duration.seconds(180);

@Parameter(value = "indexer_max_concurrent_searches")
Integer indexerMaxConcurrentSearches = null;

@Parameter(value = "indexer_max_concurrent_shard_requests")
Integer indexerMaxConcurrentShardRequests = null;
}

0 comments on commit 2d4b321

Please sign in to comment.