diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml
index ac445e1e35..c0ddc050ae 100644
--- a/google-cloud-spanner/clirr-ignored-differences.xml
+++ b/google-cloud-spanner/clirr-ignored-differences.xml
@@ -422,4 +422,25 @@
com/google/cloud/spanner/spi/v1/SpannerRpc$StreamingCall
com.google.api.gax.rpc.ApiCallContext getCallContext()
+
+
+ 7012
+ com/google/cloud/spanner/spi/v1/SpannerRpc
+ com.google.api.gax.retrying.RetrySettings getReadRetrySettings()
+
+
+ 7012
+ com/google/cloud/spanner/spi/v1/SpannerRpc
+ com.google.api.gax.retrying.RetrySettings getExecuteQueryRetrySettings()
+
+
+ 7012
+ com/google/cloud/spanner/spi/v1/SpannerRpc
+ java.util.Set getReadRetryableCodes()
+
+
+ 7012
+ com/google/cloud/spanner/spi/v1/SpannerRpc
+ java.util.Set getExecuteQueryRetryableCodes()
+
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
index e19ace944a..d5b1abe0b5 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
@@ -664,7 +664,12 @@ ResultSet executeQueryInternalWithOptions(
getExecuteSqlRequestBuilder(
statement, queryMode, options, /* withTransactionSelector = */ false);
ResumableStreamIterator stream =
- new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, SpannerImpl.QUERY, span) {
+ new ResumableStreamIterator(
+ MAX_BUFFERED_CHUNKS,
+ SpannerImpl.QUERY,
+ span,
+ rpc.getExecuteQueryRetrySettings(),
+ rpc.getExecuteQueryRetryableCodes()) {
@Override
CloseableIterator startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks);
@@ -798,7 +803,12 @@ ResultSet readInternalWithOptions(
final int prefetchChunks =
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
ResumableStreamIterator stream =
- new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, SpannerImpl.READ, span) {
+ new ResumableStreamIterator(
+ MAX_BUFFERED_CHUNKS,
+ SpannerImpl.READ,
+ span,
+ rpc.getReadRetrySettings(),
+ rpc.getReadRetryableCodes()) {
@Override
CloseableIterator startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java
index 37024bd267..b0d5ab2bba 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java
@@ -24,8 +24,10 @@
import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
+import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
+import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
@@ -65,6 +67,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
@@ -1082,10 +1085,12 @@ public void onError(SpannerException e) {
@VisibleForTesting
abstract static class ResumableStreamIterator extends AbstractIterator
implements CloseableIterator {
- private static final RetrySettings STREAMING_RETRY_SETTINGS =
+ private static final RetrySettings DEFAULT_STREAMING_RETRY_SETTINGS =
SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetrySettings();
+ private final RetrySettings streamingRetrySettings;
+ private final Set retryableCodes;
private static final Logger logger = Logger.getLogger(ResumableStreamIterator.class.getName());
- private final BackOff backOff = newBackOff();
+ private final BackOff backOff;
private final LinkedList buffer = new LinkedList<>();
private final int maxBufferSize;
private final Span span;
@@ -1099,24 +1104,58 @@ abstract static class ResumableStreamIterator extends AbstractIterator retryableCodes) {
checkArgument(maxBufferSize >= 0);
this.maxBufferSize = maxBufferSize;
this.span = tracer.spanBuilderWithExplicitParent(streamName, parent).startSpan();
- }
-
- private static ExponentialBackOff newBackOff() {
+ this.streamingRetrySettings = Preconditions.checkNotNull(streamingRetrySettings);
+ this.retryableCodes = Preconditions.checkNotNull(retryableCodes);
+ this.backOff = newBackOff();
+ }
+
+ private ExponentialBackOff newBackOff() {
+ if (Objects.equals(streamingRetrySettings, DEFAULT_STREAMING_RETRY_SETTINGS)) {
+ return new ExponentialBackOff.Builder()
+ .setMultiplier(streamingRetrySettings.getRetryDelayMultiplier())
+ .setInitialIntervalMillis(
+ Math.max(10, (int) streamingRetrySettings.getInitialRetryDelay().toMillis()))
+ .setMaxIntervalMillis(
+ Math.max(1000, (int) streamingRetrySettings.getMaxRetryDelay().toMillis()))
+ .setMaxElapsedTimeMillis(
+ Integer.MAX_VALUE) // Prevent Backoff.STOP from getting returned.
+ .build();
+ }
return new ExponentialBackOff.Builder()
- .setMultiplier(STREAMING_RETRY_SETTINGS.getRetryDelayMultiplier())
+ .setMultiplier(streamingRetrySettings.getRetryDelayMultiplier())
+ // All of these values must be > 0.
.setInitialIntervalMillis(
- Math.max(10, (int) STREAMING_RETRY_SETTINGS.getInitialRetryDelay().toMillis()))
+ Math.max(
+ 1,
+ (int)
+ Math.min(
+ streamingRetrySettings.getInitialRetryDelay().toMillis(),
+ Integer.MAX_VALUE)))
.setMaxIntervalMillis(
- Math.max(1000, (int) STREAMING_RETRY_SETTINGS.getMaxRetryDelay().toMillis()))
- .setMaxElapsedTimeMillis(Integer.MAX_VALUE) // Prevent Backoff.STOP from getting returned.
+ Math.max(
+ 1,
+ (int)
+ Math.min(
+ streamingRetrySettings.getMaxRetryDelay().toMillis(), Integer.MAX_VALUE)))
+ .setMaxElapsedTimeMillis(
+ Math.max(
+ 1,
+ (int)
+ Math.min(
+ streamingRetrySettings.getTotalTimeout().toMillis(), Integer.MAX_VALUE)))
.build();
}
- private static void backoffSleep(Context context, BackOff backoff) throws SpannerException {
+ private void backoffSleep(Context context, BackOff backoff) throws SpannerException {
backoffSleep(context, nextBackOffMillis(backoff));
}
@@ -1128,7 +1167,7 @@ private static long nextBackOffMillis(BackOff backoff) throws SpannerException {
}
}
- private static void backoffSleep(Context context, long backoffMillis) throws SpannerException {
+ private void backoffSleep(Context context, long backoffMillis) throws SpannerException {
tracer
.getCurrentSpan()
.addAnnotation(
@@ -1145,7 +1184,7 @@ private static void backoffSleep(Context context, long backoffMillis) throws Spa
try {
if (backoffMillis == BackOff.STOP) {
// Highly unlikely but we handle it just in case.
- backoffMillis = STREAMING_RETRY_SETTINGS.getMaxRetryDelay().toMillis();
+ backoffMillis = streamingRetrySettings.getMaxRetryDelay().toMillis();
}
if (latch.await(backoffMillis, TimeUnit.MILLISECONDS)) {
// Woken by context cancellation.
@@ -1233,11 +1272,12 @@ protected PartialResultSet computeNext() {
return null;
}
}
- } catch (SpannerException e) {
- if (safeToRetry && e.isRetryable()) {
+ } catch (SpannerException spannerException) {
+ if (safeToRetry && isRetryable(spannerException)) {
span.addAnnotation(
- "Stream broken. Safe to retry", TraceUtil.getExceptionAnnotations(e));
- logger.log(Level.FINE, "Retryable exception, will sleep and retry", e);
+ "Stream broken. Safe to retry",
+ TraceUtil.getExceptionAnnotations(spannerException));
+ logger.log(Level.FINE, "Retryable exception, will sleep and retry", spannerException);
// Truncate any items in the buffer before the last retry token.
while (!buffer.isEmpty() && buffer.getLast().getResumeToken().isEmpty()) {
buffer.removeLast();
@@ -1245,7 +1285,7 @@ protected PartialResultSet computeNext() {
assert buffer.isEmpty() || buffer.getLast().getResumeToken().equals(resumeToken);
stream = null;
try (Scope s = tracer.withSpan(span)) {
- long delay = e.getRetryDelayInMillis();
+ long delay = spannerException.getRetryDelayInMillis();
if (delay != -1) {
backoffSleep(context, delay);
} else {
@@ -1256,8 +1296,8 @@ protected PartialResultSet computeNext() {
continue;
}
span.addAnnotation("Stream broken. Not safe to retry");
- TraceUtil.setWithFailure(span, e);
- throw e;
+ TraceUtil.setWithFailure(span, spannerException);
+ throw spannerException;
} catch (RuntimeException e) {
span.addAnnotation("Stream broken. Not safe to retry");
TraceUtil.setWithFailure(span, e);
@@ -1265,6 +1305,12 @@ protected PartialResultSet computeNext() {
}
}
}
+
+ boolean isRetryable(SpannerException spannerException) {
+ return spannerException.isRetryable()
+ || retryableCodes.contains(
+ GrpcStatusCode.of(spannerException.getErrorCode().getGrpcStatusCode()).getCode());
+ }
}
static double valueProtoToFloat64(com.google.protobuf.Value proto) {
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java
index 1afea7676d..3e6fc5fbcb 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java
@@ -45,6 +45,7 @@
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.StatusCode;
+import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.StreamController;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
@@ -230,6 +231,10 @@ public class GapicSpannerRpc implements SpannerRpc {
private boolean rpcIsClosed;
private final SpannerStub spannerStub;
+ private final RetrySettings executeQueryRetrySettings;
+ private final Set executeQueryRetryableCodes;
+ private final RetrySettings readRetrySettings;
+ private final Set readRetryableCodes;
private final SpannerStub partitionedDmlStub;
private final RetrySettings partitionedDmlRetrySettings;
private final InstanceAdminStub instanceAdminStub;
@@ -368,6 +373,14 @@ public GapicSpannerRpc(final SpannerOptions options) {
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
.build());
+ this.readRetrySettings =
+ options.getSpannerStubSettings().streamingReadSettings().getRetrySettings();
+ this.readRetryableCodes =
+ options.getSpannerStubSettings().streamingReadSettings().getRetryableCodes();
+ this.executeQueryRetrySettings =
+ options.getSpannerStubSettings().executeStreamingSqlSettings().getRetrySettings();
+ this.executeQueryRetryableCodes =
+ options.getSpannerStubSettings().executeStreamingSqlSettings().getRetryableCodes();
partitionedDmlRetrySettings =
options
.getSpannerStubSettings()
@@ -472,6 +485,10 @@ public UnaryCallable createUnaryCalla
this.databaseAdminStub = null;
this.instanceAdminStub = null;
this.spannerStub = null;
+ this.readRetrySettings = null;
+ this.readRetryableCodes = null;
+ this.executeQueryRetrySettings = null;
+ this.executeQueryRetryableCodes = null;
this.partitionedDmlStub = null;
this.databaseAdminStubSettings = null;
this.spannerWatchdog = null;
@@ -1585,6 +1602,16 @@ public ApiFuture asyncDeleteSession(String sessionName, @Nullable Map getReadRetryableCodes() {
+ return readRetryableCodes;
+ }
+
@Override
public StreamingCall read(
ReadRequest request,
@@ -1599,6 +1626,16 @@ public StreamingCall read(
return new GrpcStreamingCall(context, responseObserver.getController());
}
+ @Override
+ public RetrySettings getExecuteQueryRetrySettings() {
+ return executeQueryRetrySettings;
+ }
+
+ @Override
+ public Set getExecuteQueryRetryableCodes() {
+ return executeQueryRetryableCodes;
+ }
+
@Override
public ResultSet executeQuery(
ExecuteSqlRequest request, @Nullable Map