Skip to content

Commit

Permalink
Added retry logic if client request returns an error code that is con…
Browse files Browse the repository at this point in the history
…figured for retrying

Closes #4408

Added additional tests for coverage

Included example of comma separated values for retry-error-codes config parameter

Simplified retry loop logic

Moved retry configurations into constructor and removed setters

Signed-off-by: Allan Clements <criminosis@gmail.com>
  • Loading branch information
criminosis authored and porunov committed Apr 27, 2024
1 parent 2dba430 commit 4bddfb4
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 12 deletions.
4 changes: 4 additions & 0 deletions docs/configs/janusgraph-cfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ Elasticsearch index configuration
| index.[X].elasticsearch.enable_index_names_cache | Enables cache for generated index store names. It is recommended to always enable index store names cache unless you have more then 50000 indexes per index store. | Boolean | true | MASKABLE |
| index.[X].elasticsearch.health-request-timeout | When JanusGraph initializes its ES backend, JanusGraph waits up to this duration for the ES cluster health to reach at least yellow status. This string should be formatted as a natural number followed by the lowercase letter "s", e.g. 3s or 60s. | String | 30s | MASKABLE |
| index.[X].elasticsearch.interface | Interface for connecting to Elasticsearch. TRANSPORT_CLIENT and NODE were previously supported, but now are required to migrate to REST_CLIENT. See the JanusGraph upgrade instructions for more details. | String | REST_CLIENT | MASKABLE |
| index.[X].elasticsearch.retry-error-codes | Comma separated list of Elasticsearch REST client ResponseException error codes to retry. E.g. "408,429" | String[] | | LOCAL |
| index.[X].elasticsearch.retry-initial-wait | Sets the initial retry wait time (in milliseconds) before exponential backoff. | Long | 1 | LOCAL |
| index.[X].elasticsearch.retry-limit | Sets the number of attempts for configured retryable error codes. | Integer | 0 | LOCAL |
| index.[X].elasticsearch.retry-max-wait | Sets the max retry wait time (in milliseconds). | Long | 1000 | LOCAL |
| index.[X].elasticsearch.retry_on_conflict | Specify how many times should the operation be retried when a conflict occurs. | Integer | 0 | MASKABLE |
| index.[X].elasticsearch.scroll-keep-alive | How long (in seconds) elasticsearch should keep alive the scroll context. | Integer | 60 | GLOBAL_OFFLINE |
| index.[X].elasticsearch.setup-max-open-scroll-contexts | Whether JanusGraph should setup max_open_scroll_context to maximum value for the cluster or not. | Boolean | true | MASKABLE |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,26 @@ public class ElasticSearchIndex implements IndexProvider {
"Sets the maximum socket timeout (in milliseconds).", ConfigOption.Type.MASKABLE,
Integer.class, RestClientBuilder.DEFAULT_SOCKET_TIMEOUT_MILLIS);

public static final ConfigOption<Integer> RETRY_LIMIT =
new ConfigOption<>(ELASTICSEARCH_NS, "retry-limit",
"Sets the number of attempts for configured retryable error codes.", ConfigOption.Type.LOCAL,
Integer.class, 0);

public static final ConfigOption<Long> RETRY_INITIAL_WAIT =
new ConfigOption<>(ELASTICSEARCH_NS, "retry-initial-wait",
"Sets the initial retry wait time (in milliseconds) before exponential backoff.",
ConfigOption.Type.LOCAL, Long.class, 1L);

public static final ConfigOption<Long> RETRY_MAX_WAIT =
new ConfigOption<>(ELASTICSEARCH_NS, "retry-max-wait",
"Sets the max retry wait time (in milliseconds).", ConfigOption.Type.LOCAL,
Long.class, 1000L);

public static final ConfigOption<String[]> RETRY_ERROR_CODES =
new ConfigOption<>(ELASTICSEARCH_NS, "retry-error-codes",
"Comma separated list of Elasticsearch REST client ResponseException error codes to retry. " +
"E.g. \"408,429\"", ConfigOption.Type.LOCAL, String[].class, new String[0]);

public static final int HOST_PORT_DEFAULT = 9200;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_HOSTS;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_PORT;
Expand Down Expand Up @@ -73,7 +76,13 @@ public ElasticSearchClient connect(Configuration config) throws IOException {
final int scrollKeepAlive = config.get(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE);
Preconditions.checkArgument(scrollKeepAlive >= 1, "Scroll keep-alive should be greater than or equal to 1");
final boolean useMappingTypesForES7 = config.get(ElasticSearchIndex.USE_MAPPING_FOR_ES7);
final RestElasticSearchClient client = getElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7);
int retryLimit = config.getOrDefault(ElasticSearchIndex.RETRY_LIMIT);
long retryInitialWaitMs = config.getOrDefault(ElasticSearchIndex.RETRY_INITIAL_WAIT);
long retryMaxWaitMs = config.getOrDefault(ElasticSearchIndex.RETRY_MAX_WAIT);
Set<Integer> errorCodesToRetry = Arrays.stream(config.getOrDefault(ElasticSearchIndex.RETRY_ERROR_CODES))
.mapToInt(Integer::parseInt).boxed().collect(Collectors.toSet());
final RestElasticSearchClient client = getElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7,
retryLimit, errorCodesToRetry, retryInitialWaitMs, retryMaxWaitMs);
if (config.has(ElasticSearchIndex.BULK_REFRESH)) {
client.setBulkRefresh(config.get(ElasticSearchIndex.BULK_REFRESH));
}
Expand Down Expand Up @@ -104,8 +113,11 @@ protected RestClientBuilder getRestClientBuilder(HttpHost[] hosts) {
return RestClient.builder(hosts);
}

protected RestElasticSearchClient getElasticSearchClient(RestClient rc, int scrollKeepAlive, boolean useMappingTypesForES7) {
return new RestElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7);
protected RestElasticSearchClient getElasticSearchClient(RestClient rc, int scrollKeepAlive, boolean useMappingTypesForES7,
int retryAttemptLimit, Set<Integer> retryOnErrorCodes, long retryInitialWaitMs,
long retryMaxWaitMs) {
return new RestElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7, retryAttemptLimit, retryOnErrorCodes,
retryInitialWaitMs, retryMaxWaitMs);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -110,14 +112,28 @@ public class RestElasticSearchClient implements ElasticSearchClient {
private Integer retryOnConflict;

private final String retryOnConflictKey;

public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean useMappingTypesForES7) {

private final int retryAttemptLimit;

private final Set<Integer> retryOnErrorCodes;

private final long retryInitialWaitMs;

private final long retryMaxWaitMs;

public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean useMappingTypesForES7,
int retryAttemptLimit, Set<Integer> retryOnErrorCodes, long retryInitialWaitMs,
long retryMaxWaitMs) {
this.delegate = delegate;
majorVersion = getMajorVersion();
this.scrollKeepAlive = scrollKeepAlive+"s";
esVersion7 = ElasticMajorVersion.SEVEN.equals(majorVersion);
useMappingTypes = majorVersion.getValue() < 7 || (useMappingTypesForES7 && esVersion7);
retryOnConflictKey = majorVersion.getValue() >= 7 ? "retry_on_conflict" : "_retry_on_conflict";
this.retryAttemptLimit = retryAttemptLimit;
this.retryOnErrorCodes = Collections.unmodifiableSet(retryOnErrorCodes);
this.retryInitialWaitMs = retryInitialWaitMs;
this.retryMaxWaitMs = retryMaxWaitMs;
}

@Override
Expand Down Expand Up @@ -546,13 +562,35 @@ private Response performRequest(String method, String path, byte[] requestData)
return performRequest(new Request(method, path), requestData);
}

private Response performRequestWithRetry(Request request) throws IOException {
int retryCount = 0;
while (true) {
try {
return delegate.performRequest(request);
} catch (ResponseException e) {
if (!retryOnErrorCodes.contains(e.getResponse().getStatusLine().getStatusCode()) || retryCount >= retryAttemptLimit) {
throw e;
}
//Wait before trying again
long waitDurationMs = Math.min((long) (retryInitialWaitMs * Math.pow(10, retryCount)), retryMaxWaitMs);
log.warn("Retrying Elasticsearch request in {} ms. Attempt {} of {}", waitDurationMs, retryCount, retryAttemptLimit);
try {
Thread.sleep(waitDurationMs);
} catch (InterruptedException interruptedException) {
throw new RuntimeException(String.format("Thread interrupted while waiting for retry attempt %d of %d", retryCount, retryAttemptLimit), interruptedException);
}
}
retryCount++;
}
}

private Response performRequest(Request request, byte[] requestData) throws IOException {

final HttpEntity entity = requestData != null ? new ByteArrayEntity(requestData, ContentType.APPLICATION_JSON) : null;

request.setEntity(entity);

final Response response = delegate.performRequest(request);
final Response response = performRequestWithRetry(request);

if (response.getStatusLine().getStatusCode() >= 400) {
throw new IOException("Error executing request: " + response.getStatusLine().getReasonPhrase());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2024 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.diskstorage.es.rest;

import com.google.common.collect.Sets;
import org.apache.http.StatusLine;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

import java.io.IOException;
import java.util.Collections;
import java.util.Set;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
public class RestClientRetryTest {
@Mock
private RestClient restClientMock;

@Mock
private ResponseException responseException;

@Mock
private Response response;

@Mock
private StatusLine statusLine;

@Captor
private ArgumentCaptor<Request> requestCaptor;

RestElasticSearchClient createClient(int retryAttemptLimit, Set<Integer> retryErrorCodes) throws IOException {
//Just throw an exception when there's an attempt to look up the ES version during instantiation
when(restClientMock.performRequest(any())).thenThrow(new IOException());

RestElasticSearchClient clientUnderTest = new RestElasticSearchClient(restClientMock, 0, false,
retryAttemptLimit, retryErrorCodes, 0, 0);
//There's an initial query to get the ES version we need to accommodate, and then reset for the actual test
Mockito.reset(restClientMock);
return clientUnderTest;
}

@Test
public void testRetryOnConfiguredErrorStatus() throws IOException {
Integer retryCode = 429;
int expectedNumberOfRequestAttempts = 2;
doReturn(retryCode).when(statusLine).getStatusCode();
doReturn(statusLine).when(response).getStatusLine();
doReturn(response).when(responseException).getResponse();
//Just throw an expected exception the second time to confirm the retry occurred
//rather than mock out a parsable response
IOException expectedFinalException = new IOException("Expected");

try (RestElasticSearchClient restClientUnderTest = createClient(expectedNumberOfRequestAttempts - 1,
Sets.newHashSet(retryCode))) {
//prime the restClientMock again after it's reset after creation
when(restClientMock.performRequest(any()))
.thenThrow(responseException)
.thenThrow(expectedFinalException);
restClientUnderTest.bulkRequest(Collections.emptyList(), null);
Assertions.fail("Should have thrown the expected exception after retry");
} catch (Exception actualException) {
Assertions.assertSame(expectedFinalException, actualException);
}
verify(restClientMock, times(expectedNumberOfRequestAttempts)).performRequest(requestCaptor.capture());
}

@Test
public void testRetriesExhaustedReturnsLastRetryException() throws IOException {
Integer retryCode = 429;
int expectedNumberOfRequestAttempts = 2;
doReturn(retryCode).when(statusLine).getStatusCode();
doReturn(statusLine).when(response).getStatusLine();
doReturn(response).when(responseException).getResponse();
ResponseException initialRetryException = mock(ResponseException.class);
doReturn(response).when(initialRetryException).getResponse();

try (RestElasticSearchClient restClientUnderTest = createClient(expectedNumberOfRequestAttempts - 1,
Sets.newHashSet(retryCode))) {
//prime the restClientMock again after it's reset after creation
when(restClientMock.performRequest(any()))
//first throw a different retry exception instance, then make sure it's the latter one
//that was retained and then thrown
.thenThrow(initialRetryException)
.thenThrow(responseException);


restClientUnderTest.bulkRequest(Collections.emptyList(), null);
Assertions.fail("Should have thrown the expected exception after retry");
} catch (Exception e) {
Assertions.assertSame(responseException, e);
}
verify(restClientMock, times(expectedNumberOfRequestAttempts)).performRequest(requestCaptor.capture());
}

@Test
public void testNonRetryErrorCodeException() throws IOException {
doReturn(503).when(statusLine).getStatusCode();
doReturn(statusLine).when(response).getStatusLine();
doReturn(response).when(responseException).getResponse();
try (RestElasticSearchClient restClientUnderTest = createClient(0,
//Other retry error code is configured
Sets.newHashSet(429))) {
//prime the restClientMock again after it's reset after creation
when(restClientMock.performRequest(any()))
.thenThrow(responseException);
restClientUnderTest.bulkRequest(Collections.emptyList(), null);
Assertions.fail("Should have thrown the expected exception");
} catch (Exception e) {
Assertions.assertSame(responseException, e);
}
verify(restClientMock, times(1)).performRequest(requestCaptor.capture());
}

@Test
public void testNonResponseExceptionErrorThrown() throws IOException {
IOException differentExceptionType = new IOException();
when(restClientMock.performRequest(any()))
.thenThrow(differentExceptionType);
try (RestElasticSearchClient restClientUnderTest = createClient(0, Collections.emptySet())) {
restClientUnderTest.bulkRequest(Collections.emptyList(), null);
Assertions.fail("Should have thrown the expected exception");
} catch (Exception e) {
Assertions.assertSame(differentExceptionType, e);
}
verify(restClientMock, times(1)).performRequest(requestCaptor.capture());
}
}
Loading

1 comment on commit 4bddfb4

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 4bddfb4 Previous: c8792aa Ratio
org.janusgraph.JanusGraphSpeedBenchmark.basicAddAndDelete 12908.204133962428 ms/op 12933.219068971923 ms/op 1.00
org.janusgraph.GraphCentricQueryBenchmark.getVertices 914.1221574072703 ms/op 929.0686750649414 ms/op 0.98
org.janusgraph.MgmtOlapJobBenchmark.runClearIndex 215.84311695942034 ms/op 215.98180543333334 ms/op 1.00
org.janusgraph.MgmtOlapJobBenchmark.runReindex 340.8255733954761 ms/op 348.8265452326923 ms/op 0.98
org.janusgraph.JanusGraphSpeedBenchmark.basicCount 245.5829632924528 ms/op 227.8074437066411 ms/op 1.08
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 4679.91571303061 ms/op 4880.398383755226 ms/op 0.96
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingEmitRepeatSteps 16120.292173029271 ms/op 17909.154248616192 ms/op 0.90
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithSmallBatch 19827.186690320406 ms/op 18799.44054118485 ms/op 1.05
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.vertexCentricPropertiesFetching 57104.95668296667 ms/op 55455.590322899996 ms/op 1.03
org.janusgraph.CQLMultiQueryBenchmark.getAllElementsTraversedFromOuterVertex 8064.758032637933 ms/op 8339.218499689177 ms/op 0.97
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithDoubleUnion 374.8262527686136 ms/op 371.31223395833683 ms/op 1.01
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithUnlimitedBatch 4067.836286447487 ms/op 4227.4425002369935 ms/op 0.96
org.janusgraph.CQLMultiQueryBenchmark.getNames 8108.184501725952 ms/op 8528.923591561108 ms/op 0.95
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesThreePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 5440.028451566197 ms/op 5366.416493988495 ms/op 1.01
org.janusgraph.CQLMultiQueryBenchmark.getLabels 6894.927179507316 ms/op 7119.612218557372 ms/op 0.97
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFilteredByAndStep 414.80562095654705 ms/op 416.6330074327548 ms/op 1.00
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFromMultiNestedRepeatStepStartingFromSingleVertex 12784.83637231603 ms/op 13040.1543998325 ms/op 0.98
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithCoalesceUsage 351.2481271894102 ms/op 366.6865794040481 ms/op 0.96
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 14270.975220842462 ms/op 14169.280107965806 ms/op 1.01
org.janusgraph.CQLMultiQueryBenchmark.getIdToOutVerticesProjection 243.28901882020082 ms/op 243.11426684096145 ms/op 1.00
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithUnlimitedBatch 14855.148389887285 ms/op 14724.027508337518 ms/op 1.01
org.janusgraph.CQLMultiQueryBenchmark.getNeighborNames 7969.189853083388 ms/op 8379.61509861012 ms/op 0.95
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingRepeatUntilSteps 8616.567888758142 ms/op 9197.912189783667 ms/op 0.94
org.janusgraph.CQLMultiQueryBenchmark.getAdjacentVerticesLocalCounts 8398.589858403062 ms/op 8786.588147997933 ms/op 0.96

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.