Skip to content
Permalink
Browse files
Allow optional query retry
Patch by Yifan Cai; reviewed by marcuse for CASSANDRA-16125
  • Loading branch information
yifan-c authored and krummas committed Sep 21, 2020
1 parent 0e5f319 commit 4c9bc4f4e3fd7d23b1284c89266ffbf10b8f0183
Showing 15 changed files with 409 additions and 20 deletions.
@@ -0,0 +1,102 @@
package org.apache.cassandra.diff;

import java.util.concurrent.TimeUnit;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;

import static org.apache.cassandra.diff.ExponentialRetryStrategyProvider.ExponentialRetryStrategy.BASE_DELAY_MS_KEY;
import static org.apache.cassandra.diff.ExponentialRetryStrategyProvider.ExponentialRetryStrategy.DEFAULT_BASE_DELAY_MS;
import static org.apache.cassandra.diff.ExponentialRetryStrategyProvider.ExponentialRetryStrategy.DEFAULT_TOTAL_DELAY_MS;
import static org.apache.cassandra.diff.ExponentialRetryStrategyProvider.ExponentialRetryStrategy.TOTAL_DELAY_MS_KEY;

public class ExponentialRetryStrategyProvider extends RetryStrategyProvider {
public ExponentialRetryStrategyProvider(JobConfiguration.RetryOptions retryOptions) {
super(retryOptions);
}

@Override
public RetryStrategy get() {
long baseDelayMs = Long.parseLong(retryOptions.getOrDefault(BASE_DELAY_MS_KEY, DEFAULT_BASE_DELAY_MS));
long totalDelayMs = Long.parseLong(retryOptions.getOrDefault(TOTAL_DELAY_MS_KEY, DEFAULT_TOTAL_DELAY_MS));
return new ExponentialRetryStrategy(baseDelayMs, totalDelayMs);
}

static class ExponentialRetryStrategy extends RetryStrategy {
public final static String BASE_DELAY_MS_KEY = "base_delay_ms";
public final static String TOTAL_DELAY_MS_KEY = "total_delay_ms";
final static String DEFAULT_BASE_DELAY_MS = String.valueOf(TimeUnit.SECONDS.toMillis(1));
final static String DEFAULT_TOTAL_DELAY_MS = String.valueOf(TimeUnit.MINUTES.toMillis(30));

private final Exponential exponential;
private int attempts = 0;

public ExponentialRetryStrategy(long baseDelayMs, long totalDelayMs) {
this.exponential = new Exponential(baseDelayMs, totalDelayMs);
}

@Override
protected boolean shouldRetry() {
long pauseTimeMs = exponential.get(attempts);
if (pauseTimeMs > 0) {
Uninterruptibles.sleepUninterruptibly(pauseTimeMs, TimeUnit.MILLISECONDS);
attempts += 1;
return true;
}
return false;
}

@Override
public String toString() {
return String.format("%s(baseDelayMs: %s, totalDelayMs: %s, currentAttempts: %s)",
this.getClass().getSimpleName(), exponential.baseDelayMs, exponential.totalDelayMs, attempts);
}
}

/**
* Calculate the pause time exponentially, according to the attempts.
* The total delay is capped at totalDelayMs, meaning the sum of all the previous pauses cannot exceed it.
*/
static class Exponential {
// base delay in ms used to calculate the next pause time
private final long baseDelayMs;
// total delay in ms permitted
private final long totalDelayMs;

Exponential(long baseDelayMs, long totalDelayMs) {
Preconditions.checkArgument(baseDelayMs <= totalDelayMs, "baseDelayMs cannot be greater than totalDelayMs");
this.baseDelayMs = baseDelayMs;
this.totalDelayMs = totalDelayMs;
}

/**
* Calculate the pause time based on attempts.
* It is guaranteed that the all the pauses do not exceed totalDelayMs.
* @param attempts, number of attempts, starts with 0.
* @return the next pasuse time in milliseconds, or negtive if no longer allowed.
*/
long get(int attempts) {
long nextMaybe = baseDelayMs << attempts; // Do not care about overflow. pausedInTotal() corrects the value
if (attempts == 0) { // first retry
return nextMaybe;
} else {
long pausedInTotal = pausedInTotal(attempts);
if (pausedInTotal < totalDelayMs) {
return Math.min(totalDelayMs - pausedInTotal, nextMaybe); // adjust the next pause time if possible
}
return -1; // the previous retries have exhausted the permits
}
}

// Returns the total pause time according to the `attempts`,
// i.e. [0, attempts), which is guaranteed to be greater than or equal to 0.
// No overflow can happen.
private long pausedInTotal(int attempts) {
// take care of overflow. Such long pause time is not realistic though.
if (attempts >= Long.numberOfLeadingZeros(baseDelayMs))
return totalDelayMs;
long result = (baseDelayMs << attempts) - baseDelayMs; // X^1 + X^2 ... + X^n = X^(n+1) - X
return Math.min(totalDelayMs, result);
}
}
}
@@ -20,6 +20,7 @@
package org.apache.cassandra.diff;

import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -86,6 +87,17 @@ default SpecificTokens specificTokens() {

MetadataKeyspaceOptions metadataOptions();

/**
* Contains the options that specify the retry strategy for retrieving data at the application level.
* Note that it is different than cassandra java driver's {@link com.datastax.driver.core.policies.RetryPolicy},
* which is evaluated at the Netty worker threads.
*/
RetryOptions retryOptions();

Map<String, String> clusterConfig(String identifier);

// Just an alias
public static class RetryOptions extends HashMap<String, String> {
}

}
@@ -0,0 +1,45 @@
package org.apache.cassandra.diff;

import java.util.concurrent.Callable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class RetryStrategy {
private final static Logger logger = LoggerFactory.getLogger(RetryStrategy.class);

/**
* Decide whether retry is desired or not.
* @return true to retry, see {@link #retry(Callable)}.
* return false to re-throw the exception.
*/
protected abstract boolean shouldRetry();

public final <T> T retry(Callable<T> retryable) throws Exception {
while (true) {
try {
return retryable.call();
}
catch (Exception exception) {
if (!shouldRetry()) {
throw exception;
}
logger.warn("Retry with " + toString());
}
}
}

public static class NoRetry extends RetryStrategy {
public final static RetryStrategy INSTANCE = new NoRetry();

@Override
public boolean shouldRetry() {
return false;
}

@Override
public String toString() {
return this.getClass().getSimpleName();
}
}
}
@@ -0,0 +1,46 @@
package org.apache.cassandra.diff;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Provides new RetryStrategy instances.
* Use abstract class instead of interface in order to retain the referece to retryOptions;
*/
public abstract class RetryStrategyProvider {
protected final JobConfiguration.RetryOptions retryOptions;

public RetryStrategyProvider(JobConfiguration.RetryOptions retryOptions) {
this.retryOptions = retryOptions;
}

/**
* Create a new instance of RetryStrategy.
*/
public abstract RetryStrategy get();


public final static String IMPLEMENTATION_KEY = "impl";
private final static Logger logger = LoggerFactory.getLogger(RetryStrategyProvider.class);

/**
* Create a RetryStrategyProvider based on {@param retryOptions}.
*/
public static RetryStrategyProvider create(JobConfiguration.RetryOptions retryOptions) {
try {
String implClass = retryOptions.get(IMPLEMENTATION_KEY);
return (RetryStrategyProvider) Class.forName(implClass)
.getConstructor(JobConfiguration.RetryOptions.class)
.newInstance(retryOptions);
} catch (Exception ex) {
logger.warn("Unable to create RetryStrategyProvider. Use the default provider, NoRetry.", ex);

return new RetryStrategyProvider(retryOptions) {
@Override
public RetryStrategy get() {
return RetryStrategy.NoRetry.INSTANCE;
}
};
}
}
}
@@ -47,6 +47,7 @@ public class YamlJobConfiguration implements JobConfiguration {
public Map<String, Map<String, String>> cluster_config;
public String specific_tokens = null;
public String disallowed_tokens = null;
public RetryOptions retry_options;

public static YamlJobConfiguration load(InputStream inputStream) {
Yaml yaml = new Yaml(new CustomClassLoaderConstructor(YamlJobConfiguration.class,
@@ -102,6 +103,10 @@ public MetadataKeyspaceOptions metadataOptions() {
return metadata_options;
}

public RetryOptions retryOptions() {
return retry_options;
}

public Map<String, String> clusterConfig(String identifier) {
return cluster_config.get(identifier);
}
@@ -0,0 +1,100 @@
package org.apache.cassandra.diff;

import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import static org.apache.cassandra.diff.ExponentialRetryStrategyProvider.Exponential;
import static org.apache.cassandra.diff.ExponentialRetryStrategyProvider.ExponentialRetryStrategy;

public class ExponentialRetryStrategyTest {
@Rule
public ExpectedException expectedException = ExpectedException.none();

@Test
public void testPauseTimeIncreaseExponentially() {
long base = 10;
long total = 1000;
Exponential exponential = new Exponential(base, total);
long totalSoFar = 0;
for (int i = 0; i < 100; i ++) {
long actual = exponential.get(i);
long expected = base << i;
if (totalSoFar >= total) {
expected = -1;
} else {
if (totalSoFar + expected > total) {
expected = total - totalSoFar; // adjust the pause time for the last valid pause.
}
totalSoFar += expected;
}
Assert.assertEquals("Exponential generates unexpected sequence at iteration#" + i, expected, actual);
}
Assert.assertEquals("The total pause time is not capped at totalDelayMs", total, totalSoFar);
}

@Test
public void testWrongArguments() {
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("baseDelayMs cannot be greater than totalDelayMs");
new Exponential(10, 1);
}

@Test
public void testToString() {
ExponentialRetryStrategyProvider provider = new ExponentialRetryStrategyProvider(new JobConfiguration.RetryOptions());
String output = provider.get().toString();
Assert.assertEquals("ExponentialRetryStrategy(baseDelayMs: 1000, totalDelayMs: 1800000, currentAttempts: 0)",
output);
}

@Test
public void testSuccessAfterRetry() throws Exception {
AtomicInteger retryCount = new AtomicInteger(0);
ExponentialRetryStrategy strategy = new ExponentialRetryStrategy(1, 1000);
int result = strategy.retry(() -> {
if (retryCount.getAndIncrement() < 2) {
throw new RuntimeException("fail");
}
return 1;
});
Assert.assertEquals(1, result);
Assert.assertEquals(3, retryCount.get());
}

@Test
public void testFailureAfterAllRetries() throws Exception {
AtomicInteger execCount = new AtomicInteger(0);
ExponentialRetryStrategy strategy = new ExponentialRetryStrategy(1, 2);
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("fail at execution#2"); // 0 based
// the lambda runs 3 times at timestamp 0, 1, 2 and fail
strategy.retry(() -> {
throw new RuntimeException("fail at execution#" + execCount.getAndIncrement());
});
}

@Test
public void testOverflowPrevention() {
Random rand = new Random();
for (int i = 0; i < 1000; i++) {
long base = rand.nextInt(100000) + 1; // [1, 100000]
int leadingZeros = Long.numberOfLeadingZeros(base);
Exponential exponential = new Exponential(base, Long.MAX_VALUE);
Assert.assertTrue("The last attempt that still generate valid pause time. Failed with base: " + base,
exponential.get(leadingZeros - 1) > 0);
Assert.assertEquals("Failed with base: " + base, -1, exponential.get(leadingZeros));
}
}

private JobConfiguration.RetryOptions retryOptions(long baseDelayMs, long totalDelayMs) {
return new JobConfiguration.RetryOptions() {{
put(ExponentialRetryStrategy.BASE_DELAY_MS_KEY, String.valueOf(baseDelayMs));
put(ExponentialRetryStrategy.TOTAL_DELAY_MS_KEY, String.valueOf(totalDelayMs));
}};
}
}
@@ -0,0 +1,26 @@
package org.apache.cassandra.diff;

import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class NoRetryStrategyTest {
@Rule
public ExpectedException expectedException = ExpectedException.none();

@Test
public void testNoRetry() throws Exception {
RetryStrategy strategy = RetryStrategy.NoRetry.INSTANCE;
Assert.assertFalse("NoRetry should always not retry",
strategy.shouldRetry());
AtomicInteger execCount = new AtomicInteger(0);
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("fail at execution#0"); // no retry
strategy.retry(() -> {
throw new RuntimeException("fail at execution#" + execCount.getAndIncrement());
});
}
}

0 comments on commit 4c9bc4f

Please sign in to comment.