Skip to content

Commit

Permalink
fix: Create next attempt after first attempt to initialize exponentia…
Browse files Browse the repository at this point in the history
…l backoff settings. (#2316)

---------

Co-authored-by: Evan Greco <egreco@google.com>
  • Loading branch information
egreco12 and Evan Greco committed Nov 14, 2023
1 parent 75c2552 commit e5884cc
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 10 deletions.
Expand Up @@ -971,15 +971,22 @@ private Boolean retryOnRetryableError(Code errorCode, AppendRequestAndResponse r
try {
requestWrapper.retryCount++;
if (this.retrySettings != null && errorCode == Code.RESOURCE_EXHAUSTED) {
// Trigger exponential backoff in append loop when request is resent for quota errors
if (requestWrapper.attemptSettings == null) {
requestWrapper.attemptSettings = requestWrapper.retryAlgorithm.createFirstAttempt();
} else {
requestWrapper.attemptSettings =
requestWrapper.retryAlgorithm.createNextAttempt(requestWrapper.attemptSettings);
}
// Trigger exponential backoff in append loop when request is resent for quota errors.
// createNextAttempt correctly initializes the retry delay; createfirstAttempt does not
// include a positive delay, just 0.
requestWrapper.attemptSettings =
requestWrapper.retryAlgorithm.createNextAttempt(
requestWrapper.attemptSettings == null
? requestWrapper.retryAlgorithm.createFirstAttempt()
: requestWrapper.attemptSettings);
requestWrapper.blockMessageSendDeadline =
Instant.now().plusMillis(requestWrapper.attemptSettings.getRetryDelay().toMillis());
log.info(
"Messages blocked for retry for "
+ java.time.Duration.between(
java.time.Instant.now(), requestWrapper.blockMessageSendDeadline)
+ " until "
+ requestWrapper.blockMessageSendDeadline);
}

Long offset =
Expand Down
Expand Up @@ -19,6 +19,8 @@
import com.google.protobuf.AbstractMessage;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -128,4 +130,8 @@ public void setReturnErrorDuringExclusiveStreamRetry(boolean retryOnError) {
public void setVerifyOffset(boolean verifyOffset) {
serviceImpl.setVerifyOffset(verifyOffset);
}

public ArrayList<Instant> getLatestRequestReceivedInstants() {
return serviceImpl.getLatestRequestReceivedInstants();
}
}
Expand Up @@ -20,6 +20,7 @@
import com.google.rpc.Code;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -73,6 +74,7 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase {
private final Map<StreamObserver<AppendRowsResponse>, Boolean> connectionToFirstRequest =
new ConcurrentHashMap<>();
private Status failedStatus = Status.ABORTED;
private ArrayList<Instant> requestReceivedInstants = new ArrayList<>();

/** Class used to save the state of a possible response. */
public static class Response {
Expand Down Expand Up @@ -111,6 +113,10 @@ public String toString() {
}
}

public ArrayList<Instant> getLatestRequestReceivedInstants() {
return requestReceivedInstants;
}

@Override
public void getWriteStream(
GetWriteStreamRequest request, StreamObserver<WriteStream> responseObserver) {
Expand Down Expand Up @@ -197,6 +203,7 @@ public StreamObserver<AppendRowsRequest> appendRows(
new StreamObserver<AppendRowsRequest>() {
@Override
public void onNext(AppendRowsRequest value) {
requestReceivedInstants.add(Instant.now());
recordCount++;
requests.add(value);
long offset = value.getOffset().getValue();
Expand Down
Expand Up @@ -54,6 +54,7 @@
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -86,12 +87,15 @@ public class StreamWriterTest {
private static final String EXPLICIT_STREAM = "projects/p/datasets/d1/tables/t1/streams/s1";
private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
private static final int MAX_RETRY_NUM_ATTEMPTS = 3;
private static final long INITIAL_RETRY_MILLIS = 500;
private static final double RETRY_MULTIPLIER = 1.3;
private static final int MAX_RETRY_DELAY_MINUTES = 5;
private static final RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setInitialRetryDelay(Duration.ofMillis(INITIAL_RETRY_MILLIS))
.setRetryDelayMultiplier(RETRY_MULTIPLIER)
.setMaxAttempts(MAX_RETRY_NUM_ATTEMPTS)
.setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(5))
.setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(MAX_RETRY_DELAY_MINUTES))
.build();
private FakeScheduledExecutorService fakeExecutor;
private FakeBigQueryWrite testBigQueryWrite;
Expand Down Expand Up @@ -2003,6 +2007,46 @@ public void testExclusiveAppendSuccessAndQuotaErrorRetryMaxRetry() throws Except
((StatusRuntimeException) ex.getCause()).getStatus().getCode());
}

@Test
public void testExclusiveAppendQuotaErrorRetryExponentialBackoff() throws Exception {
testBigQueryWrite.setReturnErrorDuringExclusiveStreamRetry(true);
StreamWriter writer = getTestStreamWriterExclusiveRetryEnabled();

testBigQueryWrite.addResponse(
new DummyResponseSupplierWillFailThenSucceed(
new FakeBigQueryWriteImpl.Response(createAppendResponse(0)),
/* totalFailCount= */ MAX_RETRY_NUM_ATTEMPTS + 1,
com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build()));

ApiFuture<AppendRowsResponse> future =
writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0);

ExecutionException ex =
assertThrows(
ExecutionException.class,
() -> {
future.get();
});
assertEquals(
Status.Code.RESOURCE_EXHAUSTED,
((StatusRuntimeException) ex.getCause()).getStatus().getCode());

ArrayList<Instant> instants = testBigQueryWrite.getLatestRequestReceivedInstants();
Instant previousInstant = instants.get(0);
// Include initial attempt
assertEquals(instants.size(), MAX_RETRY_NUM_ATTEMPTS + 1);
double minExpectedDelay = INITIAL_RETRY_MILLIS * 0.95;
for (int i = 1; i < instants.size(); i++) {
Instant currentInstant = instants.get(i);
double differenceInMillis =
java.time.Duration.between(previousInstant, currentInstant).toMillis();
assertThat(differenceInMillis).isAtLeast((double) INITIAL_RETRY_MILLIS);
assertThat(differenceInMillis).isGreaterThan(minExpectedDelay);
minExpectedDelay = minExpectedDelay * RETRY_MULTIPLIER;
previousInstant = currentInstant;
}
}

@Test
public void testAppendSuccessAndNonRetryableError() throws Exception {
StreamWriter writer = getTestStreamWriterRetryEnabled();
Expand Down

0 comments on commit e5884cc

Please sign in to comment.