Skip to content

Commit

Permalink
fix: More robust STREAM_RST logic (#1102)
Browse files Browse the repository at this point in the history
* fix: more robust STREAM_RST logic

* Update google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/util/Errors.java

Co-authored-by: Stephanie Wang <stephaniewang526@users.noreply.github.com>

* Update google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/util/ErrorsTest.java

Co-authored-by: Stephanie Wang <stephaniewang526@users.noreply.github.com>

Co-authored-by: Stephanie Wang <stephaniewang526@users.noreply.github.com>
  • Loading branch information
emkornfield and stephaniewang526 committed May 31, 2021
1 parent b2e3489 commit dd67534
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.util;

import io.grpc.Status;

/** Static utility methods for working with Errors returned from the service. */
public class Errors {
private Errors() {};

/**
* Returns true iff the Status indicates and internal error that is retryable.
*
* <p>Generally, internal errors are not considered retryable, however there are certain transient
* network issues that appear as internal but are in fact retryable.
*/
public static boolean isRetryableInternalStatus(Status status) {
String description = status.getDescription();
return status.getCode() == Status.Code.INTERNAL
&& description != null
&& (description.contains("Received unexpected EOS on DATA frame from server")
|| description.contains(" Rst ")
|| description.contains("RST_STREAM")
|| description.contains("Connection closed with unknown cause")
|| description.contains("HTTP/2 error code: INTERNAL_ERROR"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.bigquery.storage.util.Errors;
import io.grpc.Status;
import org.threeten.bp.Duration;

Expand All @@ -29,19 +30,12 @@ public class ApiResultRetryAlgorithm<ResponseT> implements ResultRetryAlgorithm<
// Duration to sleep on if the error is DEADLINE_EXCEEDED.
public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1);

private boolean isRetryableStatus(Status status) {
return status.getCode() == Status.Code.INTERNAL
&& status.getDescription() != null
&& (status.getDescription().contains("Received unexpected EOS on DATA frame from server")
|| status.getDescription().contains("Received Rst Stream"));
}

@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
if (prevThrowable != null) {
Status status = Status.fromThrowable(prevThrowable);
if (isRetryableStatus(status)) {
if (Errors.isRetryableInternalStatus(status)) {
return TimedAttemptSettings.newBuilder()
.setGlobalSettings(prevSettings.getGlobalSettings())
.setRetryDelay(prevSettings.getRetryDelay())
Expand All @@ -59,7 +53,7 @@ public TimedAttemptSettings createNextAttempt(
public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) {
if (prevThrowable != null) {
Status status = Status.fromThrowable(prevThrowable);
if (isRetryableStatus(status)) {
if (Errors.isRetryableInternalStatus(status)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.bigquery.storage.util.Errors;
import io.grpc.Status;
import org.threeten.bp.Duration;

Expand All @@ -29,19 +30,12 @@ public class ApiResultRetryAlgorithm<ResponseT> implements ResultRetryAlgorithm<
// Duration to sleep on if the error is DEADLINE_EXCEEDED.
public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1);

private boolean isRetryableStatus(Status status) {
return status.getCode() == Status.Code.INTERNAL
&& status.getDescription() != null
&& (status.getDescription().contains("Received unexpected EOS on DATA frame from server")
|| status.getDescription().contains("Received Rst Stream"));
}

@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
if (prevThrowable != null) {
Status status = Status.fromThrowable(prevThrowable);
if (isRetryableStatus(status)) {
if (Errors.isRetryableInternalStatus(status)) {
return TimedAttemptSettings.newBuilder()
.setGlobalSettings(prevSettings.getGlobalSettings())
.setRetryDelay(prevSettings.getRetryDelay())
Expand All @@ -59,7 +53,7 @@ public TimedAttemptSettings createNextAttempt(
public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) {
if (prevThrowable != null) {
Status status = Status.fromThrowable(prevThrowable);
if (isRetryableStatus(status)) {
if (Errors.isRetryableInternalStatus(status)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.bigquery.storage.util.Errors;
import io.grpc.Status;
import org.threeten.bp.Duration;

Expand All @@ -29,19 +30,12 @@ public class ApiResultRetryAlgorithm<ResponseT> implements ResultRetryAlgorithm<
// Duration to sleep on if the error is DEADLINE_EXCEEDED.
public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1);

private boolean isRetryableStatus(Status status) {
return status.getCode() == Status.Code.INTERNAL
&& status.getDescription() != null
&& (status.getDescription().contains("Received unexpected EOS on DATA frame from server")
|| status.getDescription().contains("Received Rst Stream"));
}

@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
if (prevThrowable != null) {
Status status = Status.fromThrowable(prevThrowable);
if (isRetryableStatus(status)) {
if (Errors.isRetryableInternalStatus(status)) {
return TimedAttemptSettings.newBuilder()
.setGlobalSettings(prevSettings.getGlobalSettings())
.setRetryDelay(prevSettings.getRetryDelay())
Expand All @@ -59,7 +53,7 @@ public TimedAttemptSettings createNextAttempt(
public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) {
if (prevThrowable != null) {
Status status = Status.fromThrowable(prevThrowable);
if (isRetryableStatus(status)) {
if (Errors.isRetryableInternalStatus(status)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.util;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import io.grpc.Status;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class ErrorsTest {

@Test
public void testRetryableInternalForRstErrors() {
assertTrue(
Errors.isRetryableInternalStatus(
Status.INTERNAL.withDescription(
"HTTP/2 error code: INTERNAL_ERROR\nReceived Rst stream")));
assertTrue(
Errors.isRetryableInternalStatus(
Status.INTERNAL.withDescription(
"RST_STREAM closed stream. HTTP/2 error code: INTERNAL_ERROR")));
}

@Test
public void testNonRetryableInternalError() {
assertFalse(Errors.isRetryableInternalStatus(Status.INTERNAL));
assertFalse(Errors.isRetryableInternalStatus(Status.INTERNAL.withDescription("Server error.")));
}

@Test
public void testNonRetryableOtherError() {
assertFalse(
Errors.isRetryableInternalStatus(
Status.DATA_LOSS.withDescription(
"RST_STREAM closed stream. HTTP/2 error code: INTERNAL_ERROR")));
}
}

0 comments on commit dd67534

Please sign in to comment.