Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CC-1059 Changed ES connector to use exponential backoff with jitter #116

Merged
merged 4 commits into from Sep 8, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/configuration_options.rst
Expand Up @@ -55,7 +55,7 @@ Connector
* Importance: low

``retry.backoff.ms``
How long to wait in milliseconds before attempting to retry a failed indexing request. This avoids retrying in a tight loop under failure scenarios.
How long to wait in milliseconds before attempting to retry a the first failed indexing request. This connector uses exponential backoff with jitter, which means that upon additional failures, this connector may wait up to twice as long as the previous wait, up to the maximum number of retries. This avoids retrying in a tight loop under failure scenarios.

* Type: long
* Default: 100
Expand Down
57 changes: 57 additions & 0 deletions docs/elasticsearch_connector.rst
Expand Up @@ -241,6 +241,63 @@ The following change is not allowed:

As mappings are more flexible, schema compatibility should be enforced when writing data to Kafka.

Automatic Retries
-----------------
The Elasticsearch connector may experience problems writing to the Elasticsearch endpoint, such as when
the Elasticsearch service is temporarily overloaded. In many cases, the connector will retry the request
a number of times before failing. To prevent from further overloading the Elasticsearch service, the connector
uses an exponential backoff technique to give the Elasticsearch service time to recover. The technique
adds randomness, called jitter, to the calculated backoff times to prevent a thundering herd, where large
numbers of requests from many tasks are submitted concurrently and overwhelm the service. Randomness spreads out
the retries from many tasks and should reduce the overall time required to complete all outstanding requests
compared to simple exponential backoff.

The number of retries is dictated by the ``max.retries`` connector configuration property, which defaults
to 5 attempts. The backoff time, which is the amount of time to wait before retrying, is a function of the
retry attempt number and the initial backoff time specified in the ``retry.backoff.ms`` connector configuration
property, which defaults to 500 milliseconds. For example, the following table shows the possible wait times
before submitting each of the 5 retry attempts:

.. table:: Range of backoff times for each retry using the default configuration
:widths: auto

===== ===================== ===================== ==============================================
Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec)
===== ===================== ===================== ==============================================
1 0.5 0.5 0.5
2 0.5 1.0 1.5
3 0.5 2.0 3.5
4 0.5 4.0 7.5
5 0.5 8.0 15.5
===== ===================== ===================== ==============================================

Note how the maximum wait time is simply the normal exponential backoff, calculated as ``${retry.backoff.ms} * 2 ^ (retry-1)``.
Increasing the maximum number of retries adds more backoff:

.. table:: Range of backoff times for additional retries
:widths: auto

===== ===================== ===================== ==============================================
Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec)
===== ===================== ===================== ==============================================
6 0.5 16.0 31.5
7 0.5 32.0 63.5
8 0.5 64.0 127.5
9 0.5 128.0 256.5
10 0.5 256.0 511.5
11 0.5 512.0 1023.5
12 0.5 1024.0 2047.5
13 0.5 2048.0 4095.5
===== ===================== ===================== ==============================================

By increasing ``max.retries`` to 10, the connector may take up to 511.5 seconds, or a little over 8.5 minutes,
to successfully send a batch of records when experiencing an overloaded Elasticsearch service. Increasing the value
to 13 quickly increases the maximum potential time to submit a batch of records to well over 1 hour 8 minutes.

You can adjust both the ``max.retries`` and ``retry.backoff.ms`` connector configuration properties to achieve
the desired backoff and retry characteristics.


Reindexing
----------
In some cases, the way to index a set of documents may need to be changed. For example, the analyzer,
Expand Down
Expand Up @@ -78,7 +78,8 @@ protected static ConfigDef baseConfigDef() {
+ "If the retry attempts are exhausted the task will fail.",
group, ++order, Width.SHORT, "Max Retries")
.define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, Importance.LOW,
"How long to wait in milliseconds before attempting to retry a failed indexing request. "
"How long to wait in milliseconds before attempting the first retry of a failed indexing request. "
+ "Upon a failure, this connector may wait up to twice as long as the previous wait, up to the maximum number of retries. "
+ "This avoids retrying in a tight loop under failure scenarios.",
group, ++order, Width.SHORT, "Retry Backoff (ms)");
}
Expand Down
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
Expand Down Expand Up @@ -74,6 +75,16 @@ public void start(Map<String, String> props, JestClient client) {
long retryBackoffMs = config.getLong(ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG);
int maxRetry = config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG);

// Calculate the maximum possible backoff time ...
long maxRetryBackoffMs = RetryUtil.computeRetryWaitTimeInMillis(maxRetry, retryBackoffMs);
if (maxRetryBackoffMs > RetryUtil.MAX_RETRY_TIME_MS) {
log.warn("This connector uses exponential backoff with jitter for retries, and using '{}={}' and '{}={}' " +
"results in an impractical but possible maximum backoff time greater than {} hours.",
ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG, maxRetry,
ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs,
TimeUnit.MILLISECONDS.toHours(maxRetryBackoffMs));
}

if (client != null) {
this.client = client;
} else {
Expand Down
73 changes: 73 additions & 0 deletions src/main/java/io/confluent/connect/elasticsearch/RetryUtil.java
@@ -0,0 +1,73 @@
/**
* Copyright 2017 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
**/
package io.confluent.connect.elasticsearch;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

/**
* Utility to compute the retry times for a given attempt, using exponential backoff.
* <p>
* The purposes of using exponential backoff is to give the ES service time to recover when it becomes overwhelmed.
* Adding jitter attempts to prevent a thundering herd, where large numbers of requests from many tasks overwhelm the
* ES service, and without randomization all tasks retry at the same time. Randomization should spread the retries
* out and should reduce the overall time required to complete all attempts.
* See <a href="https://www.awsarchitectureblog.com/2015/03/backoff.html">this blog post</a> for details.
*/
public class RetryUtil {

/**
* An arbitrary absolute maximum practical retry time.
*/
public static final long MAX_RETRY_TIME_MS = TimeUnit.HOURS.toMillis(24);

/**
* Compute the time to sleep using exponential backoff with jitter. This method computes the normal exponential backoff
* as {@code initialRetryBackoffMs << retryAttempt}, and then chooses a random value between {@code initialRetryBackoffMs}
* and that value.
*
* @param retryAttempts the number of previous retry attempts; must be non-negative
* @param initialRetryBackoffMs the initial time to wait before retrying; assumed to be 0 if value is negative
* @return the non-negative time in milliseconds to wait before the next retry attempt, or 0 if {@code initialRetryBackoffMs} is negative
*/
public static long computeRandomRetryWaitTimeInMillis(int retryAttempts, long initialRetryBackoffMs) {
if (initialRetryBackoffMs < 0) return 0;
if (retryAttempts <= 0) return initialRetryBackoffMs;
long maxRetryTime = computeRetryWaitTimeInMillis(retryAttempts, initialRetryBackoffMs);
return ThreadLocalRandom.current().nextLong(initialRetryBackoffMs, maxRetryTime);
}

/**
* Compute the time to sleep using exponential backoff. This method computes the normal exponential backoff
* as {@code initialRetryBackoffMs << retryAttempt}. bounded to always be less than {@link #MAX_RETRY_TIME_MS}.
*
* @param retryAttempts the number of previous retry attempts; must be non-negative
* @param initialRetryBackoffMs the initial time to wait before retrying; assumed to be 0 if value is negative
* @return the non-negative time in milliseconds to wait before the next retry attempt, or 0 if {@code initialRetryBackoffMs} is negative
*/
public static long computeRetryWaitTimeInMillis(int retryAttempts, long initialRetryBackoffMs) {
if (initialRetryBackoffMs < 0) return 0;
if (retryAttempts <= 0) return initialRetryBackoffMs;
if (retryAttempts > 32) {
// This would overflow the exponential algorithm ...
return MAX_RETRY_TIME_MS;
}
long result = initialRetryBackoffMs << retryAttempts;
return result < 0L ? MAX_RETRY_TIME_MS : Math.min(MAX_RETRY_TIME_MS, result);
}


}
Expand Up @@ -15,6 +15,7 @@
**/
package io.confluent.connect.elasticsearch.bulk;

import io.confluent.connect.elasticsearch.RetryUtil;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -341,28 +342,34 @@ private BulkResponse execute() throws Exception {
log.error("Failed to create bulk request from batch {} of {} records", batchId, batch.size(), e);
throw e;
}
for (int remainingRetries = maxRetries; true; remainingRetries--) {
final int maxAttempts = maxRetries + 1;
for (int attempts = 1, retryAttempts = 0; true; ++attempts, ++retryAttempts) {
boolean retriable = true;
try {
log.trace("Executing batch {} of {} records", batchId, batch.size());
log.trace("Executing batch {} of {} records with attempt {}/{}", batchId, batch.size(), attempts, maxAttempts);
final BulkResponse bulkRsp = bulkClient.execute(bulkReq);
if (bulkRsp.isSucceeded()) {
if (attempts > 1) {
// We only logged failures, so log the success immediately after a failure ...
log.debug("Completed batch {} of {} records with attempt {}/{}", batchId, batch.size(), attempts, maxAttempts);
}
return bulkRsp;
}
retriable = bulkRsp.isRetriable();
throw new ConnectException("Bulk request failed: " + bulkRsp.getErrorInfo());
} catch (Exception e) {
if (retriable && remainingRetries > 0) {
log.warn("Failed to execute batch {} of {} records, retrying after {} ms", batchId, batch.size(), retryBackoffMs, e);
time.sleep(retryBackoffMs);
if (retriable && attempts < maxAttempts) {
long sleepTimeMs = RetryUtil.computeRandomRetryWaitTimeInMillis(retryAttempts, retryBackoffMs);
log.warn("Failed to execute batch {} of {} records with attempt {}/{}, will attempt retry after {} ms. Failure reason: {}",
batchId, batch.size(), attempts, maxAttempts, sleepTimeMs, e.getMessage());
time.sleep(sleepTimeMs);
} else {
log.error("Failed to execute batch {} of {} records", batchId, batch.size(), e);
log.error("Failed to execute batch {} of {} records after total of {} attempt(s)", batchId, batch.size(), attempts, e);
throw e;
}
}
}
}

}

private synchronized void onBatchCompletion(int batchSize) {
Expand Down
@@ -0,0 +1,59 @@
/**
* Copyright 2017 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
**/
package io.confluent.connect.elasticsearch;

import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class RetryUtilTest {
@Test
public void computeRetryBackoffForValidRanges() {
assertComputeRetryInRange(10, 10L);
assertComputeRetryInRange(10, 100L);
assertComputeRetryInRange(10, 1000L);
assertComputeRetryInRange(100, 1000L);
}

@Test
public void computeRetryBackoffForNegativeRetryTimes() {
assertComputeRetryInRange(1, -100L);
assertComputeRetryInRange(10, -100L);
assertComputeRetryInRange(100, -100L);
}

@Test
public void computeNonRandomRetryTimes() {
assertEquals(100L, RetryUtil.computeRetryWaitTimeInMillis(0, 100L));
assertEquals(200L, RetryUtil.computeRetryWaitTimeInMillis(1, 100L));
assertEquals(400L, RetryUtil.computeRetryWaitTimeInMillis(2, 100L));
assertEquals(800L, RetryUtil.computeRetryWaitTimeInMillis(3, 100L));
assertEquals(1600L, RetryUtil.computeRetryWaitTimeInMillis(4, 100L));
assertEquals(3200L, RetryUtil.computeRetryWaitTimeInMillis(5, 100L));
}

protected void assertComputeRetryInRange(int retryAttempts, long initialRetryBackoffMs) {
for (int retries = 0; retries <= retryAttempts; ++retries) {
long result = RetryUtil.computeRetryWaitTimeInMillis(retries, initialRetryBackoffMs);
if (initialRetryBackoffMs < 0) {
assertEquals(0, result);
} else {
assertTrue(result >= initialRetryBackoffMs);
}
}
}
}
Expand Up @@ -233,6 +233,43 @@ public void retriableErrors() throws InterruptedException, ExecutionException {
assertTrue(bulkProcessor.submitBatchWhenReady().get().succeeded);
}

@Test
public void retriableErrorsHitMaxRetries() throws InterruptedException, ExecutionException {
final int maxBufferedRecords = 100;
final int maxInFlightBatches = 5;
final int batchSize = 2;
final int lingerMs = 5;
final int maxRetries = 2;
final int retryBackoffMs = 1;
final String errorInfo = "a final retriable error again";

client.expect(Arrays.asList(42, 43), BulkResponse.failure(true, "a retiable error"));
client.expect(Arrays.asList(42, 43), BulkResponse.failure(true, "a retriable error again"));
client.expect(Arrays.asList(42, 43), BulkResponse.failure(true, errorInfo));

final BulkProcessor<Integer, ?> bulkProcessor = new BulkProcessor<>(
new SystemTime(),
client,
maxBufferedRecords,
maxInFlightBatches,
batchSize,
lingerMs,
maxRetries,
retryBackoffMs
);

final int addTimeoutMs = 10;
bulkProcessor.add(42, addTimeoutMs);
bulkProcessor.add(43, addTimeoutMs);

try {
bulkProcessor.submitBatchWhenReady().get();
fail();
} catch (ExecutionException e) {
assertTrue(e.getCause().getMessage().contains(errorInfo));
}
}

@Test
public void unretriableErrors() throws InterruptedException {
final int maxBufferedRecords = 100;
Expand Down