Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Create next attempt after first attempt to initialize exponential backoff settings. #2316

Merged
merged 16 commits into from Nov 14, 2023
Expand Up @@ -971,13 +971,15 @@ private Boolean retryOnRetryableError(Code errorCode, AppendRequestAndResponse r
try {
requestWrapper.retryCount++;
if (this.retrySettings != null && errorCode == Code.RESOURCE_EXHAUSTED) {
// Trigger exponential backoff in append loop when request is resent for quota errors
if (requestWrapper.attemptSettings == null) {
requestWrapper.attemptSettings = requestWrapper.retryAlgorithm.createFirstAttempt();
} else {
requestWrapper.attemptSettings =
requestWrapper.retryAlgorithm.createNextAttempt(requestWrapper.attemptSettings);
}
// Trigger exponential backoff in append loop when request is resent for quota errors.
// createNextAttempt correctly initiales the retry delay; createfirstAttempt does not
egreco12 marked this conversation as resolved.
Show resolved Hide resolved
// include a positive delay, just 0.
requestWrapper.attemptSettings =
requestWrapper.attemptSettings == null
? requestWrapper.retryAlgorithm.createFirstAttempt()
: requestWrapper.attemptSettings;
requestWrapper.attemptSettings =
egreco12 marked this conversation as resolved.
Show resolved Hide resolved
requestWrapper.retryAlgorithm.createNextAttempt(requestWrapper.attemptSettings);
requestWrapper.blockMessageSendDeadline =
Instant.now().plusMillis(requestWrapper.attemptSettings.getRetryDelay().toMillis());
}
Expand Down
Expand Up @@ -19,6 +19,8 @@
import com.google.protobuf.AbstractMessage;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -128,4 +130,8 @@ public void setReturnErrorDuringExclusiveStreamRetry(boolean retryOnError) {
public void setVerifyOffset(boolean verifyOffset) {
serviceImpl.setVerifyOffset(verifyOffset);
}

public ArrayList<Instant> getLatestRequestReceivedInstants() {
return serviceImpl.getLatestRequestReceivedInstants();
}
}
Expand Up @@ -20,6 +20,7 @@
import com.google.rpc.Code;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -73,6 +74,7 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase {
private final Map<StreamObserver<AppendRowsResponse>, Boolean> connectionToFirstRequest =
new ConcurrentHashMap<>();
private Status failedStatus = Status.ABORTED;
private ArrayList<Instant> requestReceivedInstants = new ArrayList<>();

/** Class used to save the state of a possible response. */
public static class Response {
Expand Down Expand Up @@ -111,6 +113,10 @@ public String toString() {
}
}

public ArrayList<Instant> getLatestRequestReceivedInstants() {
return requestReceivedInstants;
}

@Override
public void getWriteStream(
GetWriteStreamRequest request, StreamObserver<WriteStream> responseObserver) {
Expand Down Expand Up @@ -197,6 +203,7 @@ public StreamObserver<AppendRowsRequest> appendRows(
new StreamObserver<AppendRowsRequest>() {
@Override
public void onNext(AppendRowsRequest value) {
requestReceivedInstants.add(Instant.now());
recordCount++;
requests.add(value);
long offset = value.getOffset().getValue();
Expand Down
Expand Up @@ -54,6 +54,7 @@
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -86,12 +87,15 @@ public class StreamWriterTest {
private static final String EXPLICIT_STREAM = "projects/p/datasets/d1/tables/t1/streams/s1";
private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
private static final int MAX_RETRY_NUM_ATTEMPTS = 3;
private static final long INITIAL_RETRY_MILLIS = 500;
private static final double RETRY_MULTIPLIER = 1.3;
private static final int MAX_RETRY_DELAY_MINUTES = 5;
private static final RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setInitialRetryDelay(Duration.ofMillis(INITIAL_RETRY_MILLIS))
.setRetryDelayMultiplier(RETRY_MULTIPLIER)
.setMaxAttempts(MAX_RETRY_NUM_ATTEMPTS)
.setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(5))
.setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(MAX_RETRY_DELAY_MINUTES))
.build();
private FakeScheduledExecutorService fakeExecutor;
private FakeBigQueryWrite testBigQueryWrite;
Expand Down Expand Up @@ -2002,6 +2006,45 @@ public void testExclusiveAppendSuccessAndQuotaErrorRetryMaxRetry() throws Except
((StatusRuntimeException) ex.getCause()).getStatus().getCode());
}

@Test
public void testExclusiveAppendQuotaErrorRetryExponentialBackoff() throws Exception {
testBigQueryWrite.setReturnErrorDuringExclusiveStreamRetry(true);
StreamWriter writer = getTestStreamWriterExclusiveRetryEnabled();

testBigQueryWrite.addResponse(
new DummyResponseSupplierWillFailThenSucceed(
new FakeBigQueryWriteImpl.Response(createAppendResponse(0)),
/* totalFailCount= */ MAX_RETRY_NUM_ATTEMPTS + 1,
com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build()));

ApiFuture<AppendRowsResponse> future =
writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0);

ExecutionException ex =
assertThrows(
ExecutionException.class,
() -> {
future.get();
});
assertEquals(
Status.Code.RESOURCE_EXHAUSTED,
((StatusRuntimeException) ex.getCause()).getStatus().getCode());

ArrayList<Instant> instants = testBigQueryWrite.getLatestRequestReceivedInstants();
Instant previousInstant = instants.get(0);
// Include initial attempt
assertEquals(instants.size(), MAX_RETRY_NUM_ATTEMPTS + 1);
double minExpectedDelay = INITIAL_RETRY_MILLIS;
for (int i = 1; i < instants.size(); i++) {
Instant currentInstant = instants.get(i);
double differenceInMillis =
java.time.Duration.between(previousInstant, currentInstant).toMillis();
assertThat(differenceInMillis).isGreaterThan(minExpectedDelay);
minExpectedDelay = minExpectedDelay * RETRY_MULTIPLIER;
previousInstant = currentInstant;
}
}

@Test
public void testAppendSuccessAndNonRetryableError() throws Exception {
StreamWriter writer = getTestStreamWriterRetryEnabled();
Expand Down