Skip to content

Commit

Permalink
feat: add 'Received Rst Stream' to automatic retry strings (#419)
Browse files Browse the repository at this point in the history
* Add 'Received Rst Stream' to automatic retry strings

* Run 'mvn com.coveo:fmt-maven-plugin:format'
  • Loading branch information
kmjung committed Jul 15, 2020
1 parent 5a27c5f commit 1584bdb
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,19 @@ public class ApiResultRetryAlgorithm<ResponseT> implements ResultRetryAlgorithm<
// Duration to sleep on if the error is DEADLINE_EXCEEDED.
public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1);

private boolean isRetryableStatus(Status status) {
return status.getCode() == Status.Code.INTERNAL
&& status.getDescription() != null
&& (status.getDescription().contains("Received unexpected EOS on DATA frame from server")
|| status.getDescription().contains("Received Rst Stream"));
}

@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
if (prevThrowable != null) {
Status status = Status.fromThrowable(prevThrowable);
if (status.getCode() == Status.Code.INTERNAL
&& status.getDescription() != null
&& status.getDescription().equals("Received unexpected EOS on DATA frame from server")) {
if (isRetryableStatus(status)) {
return TimedAttemptSettings.newBuilder()
.setGlobalSettings(prevSettings.getGlobalSettings())
.setRetryDelay(prevSettings.getRetryDelay())
Expand All @@ -54,9 +59,7 @@ public TimedAttemptSettings createNextAttempt(
public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) {
if (prevThrowable != null) {
Status status = Status.fromThrowable(prevThrowable);
if (status.getCode() == Status.Code.INTERNAL
&& status.getDescription() != null
&& status.getDescription().equals("Received unexpected EOS on DATA frame from server")) {
if (isRetryableStatus(status)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,19 @@ public class ApiResultRetryAlgorithm<ResponseT> implements ResultRetryAlgorithm<
// Duration to sleep on if the error is DEADLINE_EXCEEDED.
public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1);

private boolean isRetryableStatus(Status status) {
return status.getCode() == Status.Code.INTERNAL
&& status.getDescription() != null
&& (status.getDescription().contains("Received unexpected EOS on DATA frame from server")
|| status.getDescription().contains("Received Rst Stream"));
}

@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
if (prevThrowable != null) {
Status status = Status.fromThrowable(prevThrowable);
if (status.getCode() == Status.Code.INTERNAL
&& status.getDescription() != null
&& status.getDescription().equals("Received unexpected EOS on DATA frame from server")) {
if (isRetryableStatus(status)) {
return TimedAttemptSettings.newBuilder()
.setGlobalSettings(prevSettings.getGlobalSettings())
.setRetryDelay(prevSettings.getRetryDelay())
Expand All @@ -54,9 +59,7 @@ public TimedAttemptSettings createNextAttempt(
public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) {
if (prevThrowable != null) {
Status status = Status.fromThrowable(prevThrowable);
if (status.getCode() == Status.Code.INTERNAL
&& status.getDescription() != null
&& status.getDescription().equals("Received unexpected EOS on DATA frame from server")) {
if (isRetryableStatus(status)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,19 @@ public class ApiResultRetryAlgorithm<ResponseT> implements ResultRetryAlgorithm<
// Duration to sleep on if the error is DEADLINE_EXCEEDED.
public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1);

private boolean isRetryableStatus(Status status) {
return status.getCode() == Status.Code.INTERNAL
&& status.getDescription() != null
&& (status.getDescription().contains("Received unexpected EOS on DATA frame from server")
|| status.getDescription().contains("Received Rst Stream"));
}

@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
if (prevThrowable != null) {
Status status = Status.fromThrowable(prevThrowable);
if (status.getCode() == Status.Code.INTERNAL
&& status.getDescription() != null
&& status.getDescription().equals("Received unexpected EOS on DATA frame from server")) {
if (isRetryableStatus(status)) {
return TimedAttemptSettings.newBuilder()
.setGlobalSettings(prevSettings.getGlobalSettings())
.setRetryDelay(prevSettings.getRetryDelay())
Expand All @@ -54,9 +59,7 @@ public TimedAttemptSettings createNextAttempt(
public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) {
if (prevThrowable != null) {
Status status = Status.fromThrowable(prevThrowable);
if (status.getCode() == Status.Code.INTERNAL
&& status.getDescription() != null
&& status.getDescription().equals("Received unexpected EOS on DATA frame from server")) {
if (isRetryableStatus(status)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@

import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.grpc.testing.LocalChannelProvider;
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.grpc.testing.MockStreamObserver;
import com.google.api.gax.rpc.ApiClientHeaderProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.InternalException;
import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StatusCode;
import com.google.protobuf.AbstractMessage;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -165,10 +169,38 @@ public void readRowsExceptionTest() throws Exception {

@Test
@SuppressWarnings("all")
public void readRowsRetryingExceptionTest() throws ExecutionException, InterruptedException {
StatusRuntimeException exception =
new StatusRuntimeException(
Status.INTERNAL.withDescription("Received unexpected EOS on DATA frame from server"));
public void readRowsRetryingEOSExceptionTest() throws ExecutionException, InterruptedException {
ApiException exception =
new InternalException(
new StatusRuntimeException(
Status.INTERNAL.withDescription(
"Received unexpected EOS on DATA frame from server")),
GrpcStatusCode.of(Code.INTERNAL),
/* retryable = */ false);
mockBigQueryRead.addException(exception);
long rowCount = 1340416618L;
ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build();
mockBigQueryRead.addResponse(expectedResponse);
ReadRowsRequest request = ReadRowsRequest.newBuilder().build();

MockStreamObserver<ReadRowsResponse> responseObserver = new MockStreamObserver<>();

ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> callable = client.readRowsCallable();
callable.serverStreamingCall(request, responseObserver);
List<ReadRowsResponse> actualResponses = responseObserver.future().get();
Assert.assertEquals(1, actualResponses.size());
}

@Test
@SuppressWarnings("all")
public void readRowsRetryingHttp2StreamRstTest() throws ExecutionException, InterruptedException {
ApiException exception =
new InternalException(
new StatusRuntimeException(
Status.INTERNAL.withDescription(
"HTTP/2 error code: INTERNAL_ERROR\nReceived Rst Stream")),
GrpcStatusCode.of(Code.INTERNAL),
/* retryable = */ false);
mockBigQueryRead.addException(exception);
long rowCount = 1340416618L;
ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.grpc.testing.LocalChannelProvider;
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.grpc.testing.MockStreamObserver;
import com.google.api.gax.rpc.ApiClientHeaderProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.InternalException;
import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StatusCode;
Expand All @@ -40,6 +43,7 @@
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -295,10 +299,38 @@ public void splitReadStreamExceptionTest() throws Exception {

@Test
@SuppressWarnings("all")
public void readRowsRetryingExceptionTest() throws ExecutionException, InterruptedException {
StatusRuntimeException exception =
new StatusRuntimeException(
Status.INTERNAL.withDescription("Received unexpected EOS on DATA frame from server"));
public void readRowsRetryingEOSExceptionTest() throws ExecutionException, InterruptedException {
ApiException exception =
new InternalException(
new StatusRuntimeException(
Status.INTERNAL.withDescription(
"Received unexpected EOS on DATA frame from server")),
GrpcStatusCode.of(Code.INTERNAL),
/* retryable = */ false);
mockBigQueryStorage.addException(exception);
long rowCount = 1340416618L;
ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build();
mockBigQueryStorage.addResponse(expectedResponse);
ReadRowsRequest request = ReadRowsRequest.newBuilder().build();

MockStreamObserver<ReadRowsResponse> responseObserver = new MockStreamObserver<>();

ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> callable = client.readRowsCallable();
callable.serverStreamingCall(request, responseObserver);
List<ReadRowsResponse> actualResponses = responseObserver.future().get();
Assert.assertEquals(1, actualResponses.size());
}

@Test
@SuppressWarnings("all")
public void readRowsRetryingHttp2StreamRstTest() throws ExecutionException, InterruptedException {
ApiException exception =
new InternalException(
new StatusRuntimeException(
Status.INTERNAL.withDescription(
"HTTP/2 error code: INTERNAL_ERROR\nReceived Rst Stream")),
GrpcStatusCode.of(Code.INTERNAL),
/* retryable = */ false);
mockBigQueryStorage.addException(exception);
long rowCount = 1340416618L;
ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@

import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.grpc.testing.LocalChannelProvider;
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.grpc.testing.MockStreamObserver;
import com.google.api.gax.rpc.ApiClientHeaderProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.InternalException;
import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StatusCode;
import com.google.protobuf.AbstractMessage;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -165,10 +169,38 @@ public void readRowsExceptionTest() throws Exception {

@Test
@SuppressWarnings("all")
public void readRowsRetryingExceptionTest() throws ExecutionException, InterruptedException {
StatusRuntimeException exception =
new StatusRuntimeException(
Status.INTERNAL.withDescription("Received unexpected EOS on DATA frame from server"));
public void readRowsRetryingEOSExceptionTest() throws ExecutionException, InterruptedException {
ApiException exception =
new InternalException(
new StatusRuntimeException(
Status.INTERNAL.withDescription(
"Received unexpected EOS on DATA frame from server")),
GrpcStatusCode.of(Code.INTERNAL),
/* retryable = */ false);
mockBigQueryRead.addException(exception);
long rowCount = 1340416618L;
ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build();
mockBigQueryRead.addResponse(expectedResponse);
ReadRowsRequest request = ReadRowsRequest.newBuilder().build();

MockStreamObserver<ReadRowsResponse> responseObserver = new MockStreamObserver<>();

ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> callable = client.readRowsCallable();
callable.serverStreamingCall(request, responseObserver);
List<ReadRowsResponse> actualResponses = responseObserver.future().get();
Assert.assertEquals(1, actualResponses.size());
}

@Test
@SuppressWarnings("all")
public void readRowsRetryingHttp2StreamRstTest() throws ExecutionException, InterruptedException {
ApiException exception =
new InternalException(
new StatusRuntimeException(
Status.INTERNAL.withDescription(
"HTTP/2 error code: INTERNAL_ERROR\nReceived Rst Stream")),
GrpcStatusCode.of(Code.INTERNAL),
/* retryable = */ false);
mockBigQueryRead.addException(exception);
long rowCount = 1340416618L;
ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build();
Expand Down

0 comments on commit 1584bdb

Please sign in to comment.