Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 5290cec

Browse files
author
Praful Makani
authored
fix: add retry logic for readrows v1beta1 (#314)
1 parent cf1ab06 commit 5290cec

File tree

6 files changed

+569
-4
lines changed

6 files changed

+569
-4
lines changed

google-cloud-bigquerystorage/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,6 @@
169169
<dependency>
170170
<groupId>com.google.api.grpc</groupId>
171171
<artifactId>grpc-google-cloud-bigquerystorage-v1beta1</artifactId>
172-
<scope>test</scope>
173172
</dependency>
174173
<dependency>
175174
<groupId>com.google.api.grpc</groupId>

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/EnhancedBigQueryStorageStub.java

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,20 @@
1717

1818
import com.google.api.core.InternalApi;
1919
import com.google.api.gax.core.BackgroundResource;
20+
import com.google.api.gax.grpc.GrpcCallSettings;
21+
import com.google.api.gax.grpc.GrpcRawCallableFactory;
22+
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
23+
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
24+
import com.google.api.gax.retrying.StreamingRetryAlgorithm;
25+
import com.google.api.gax.rpc.Callables;
2026
import com.google.api.gax.rpc.ClientContext;
27+
import com.google.api.gax.rpc.RequestParamsExtractor;
28+
import com.google.api.gax.rpc.ServerStreamingCallSettings;
2129
import com.google.api.gax.rpc.ServerStreamingCallable;
2230
import com.google.api.gax.rpc.UnaryCallable;
31+
import com.google.api.gax.tracing.SpanName;
32+
import com.google.api.gax.tracing.TracedServerStreamingCallable;
33+
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageGrpc;
2334
import com.google.cloud.bigquery.storage.v1beta1.Storage.BatchCreateReadSessionStreamsRequest;
2435
import com.google.cloud.bigquery.storage.v1beta1.Storage.BatchCreateReadSessionStreamsResponse;
2536
import com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest;
@@ -29,8 +40,12 @@
2940
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession;
3041
import com.google.cloud.bigquery.storage.v1beta1.Storage.SplitReadStreamRequest;
3142
import com.google.cloud.bigquery.storage.v1beta1.Storage.SplitReadStreamResponse;
43+
import com.google.cloud.bigquery.storage.v1beta1.stub.readrows.ApiResultRetryAlgorithm;
44+
import com.google.cloud.bigquery.storage.v1beta1.stub.readrows.ReadRowsRetryingCallable;
45+
import com.google.common.collect.ImmutableMap;
3246
import com.google.protobuf.Empty;
3347
import java.io.IOException;
48+
import java.util.Map;
3449
import java.util.concurrent.TimeUnit;
3550

3651
/**
@@ -40,7 +55,10 @@
4055
*/
4156
public class EnhancedBigQueryStorageStub implements BackgroundResource {
4257

58+
private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage";
4359
private final GrpcBigQueryStorageStub stub;
60+
private final BigQueryStorageStubSettings stubSettings;
61+
private final ClientContext context;
4462

4563
public static EnhancedBigQueryStorageStub create(EnhancedBigQueryStorageStubSettings settings)
4664
throws IOException {
@@ -89,20 +107,67 @@ public static EnhancedBigQueryStorageStub create(EnhancedBigQueryStorageStubSett
89107
BigQueryStorageStubSettings baseSettings = baseSettingsBuilder.build();
90108
ClientContext clientContext = ClientContext.create(baseSettings);
91109
GrpcBigQueryStorageStub stub = new GrpcBigQueryStorageStub(baseSettings, clientContext);
92-
return new EnhancedBigQueryStorageStub(stub);
110+
return new EnhancedBigQueryStorageStub(stub, baseSettings, clientContext);
93111
}
94112

95113
@InternalApi("Visible for testing")
96-
EnhancedBigQueryStorageStub(GrpcBigQueryStorageStub stub) {
114+
EnhancedBigQueryStorageStub(
115+
GrpcBigQueryStorageStub stub,
116+
BigQueryStorageStubSettings stubSettings,
117+
ClientContext context) {
97118
this.stub = stub;
119+
this.stubSettings = stubSettings;
120+
this.context = context;
98121
}
99122

100123
public UnaryCallable<CreateReadSessionRequest, ReadSession> createReadSessionCallable() {
101124
return stub.createReadSessionCallable();
102125
}
103126

104127
public ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> readRowsCallable() {
105-
return stub.readRowsCallable();
128+
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> innerCallable =
129+
GrpcRawCallableFactory.createServerStreamingCallable(
130+
GrpcCallSettings.<ReadRowsRequest, ReadRowsResponse>newBuilder()
131+
.setMethodDescriptor(BigQueryStorageGrpc.getReadRowsMethod())
132+
.setParamsExtractor(
133+
new RequestParamsExtractor<ReadRowsRequest>() {
134+
@Override
135+
public Map<String, String> extract(ReadRowsRequest request) {
136+
return ImmutableMap.of(
137+
"read_position.stream.name",
138+
String.valueOf(request.getReadPosition().getStream().getName()));
139+
}
140+
})
141+
.build(),
142+
stubSettings.readRowsSettings().getRetryableCodes());
143+
ServerStreamingCallSettings<ReadRowsRequest, ReadRowsResponse> callSettings =
144+
stubSettings.readRowsSettings();
145+
146+
StreamingRetryAlgorithm<Void> retryAlgorithm =
147+
new StreamingRetryAlgorithm<>(
148+
new ApiResultRetryAlgorithm<Void>(),
149+
new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock()));
150+
151+
ScheduledRetryingExecutor<Void> retryingExecutor =
152+
new ScheduledRetryingExecutor<>(retryAlgorithm, context.getExecutor());
153+
154+
if (context.getStreamWatchdog() != null) {
155+
innerCallable = Callables.watched(innerCallable, callSettings, context);
156+
}
157+
158+
ReadRowsRetryingCallable outerCallable =
159+
new ReadRowsRetryingCallable(
160+
context.getDefaultCallContext(),
161+
innerCallable,
162+
retryingExecutor,
163+
callSettings.getResumptionStrategy());
164+
165+
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> traced =
166+
new TracedServerStreamingCallable<>(
167+
outerCallable,
168+
context.getTracerFactory(),
169+
SpanName.of(TRACING_OUTER_CLIENT_NAME, "ReadRows"));
170+
return traced.withDefaultCallContext(context.getDefaultCallContext());
106171
}
107172

108173
public UnaryCallable<BatchCreateReadSessionStreamsRequest, BatchCreateReadSessionStreamsResponse>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.bigquery.storage.v1beta1.stub.readrows;
18+
19+
import com.google.api.core.InternalApi;
20+
import com.google.api.gax.retrying.ResultRetryAlgorithm;
21+
import com.google.api.gax.retrying.TimedAttemptSettings;
22+
import com.google.api.gax.rpc.ApiException;
23+
import io.grpc.Status;
24+
import org.threeten.bp.Duration;
25+
26+
/** For internal use, public for technical reasons. */
27+
@InternalApi
28+
public class ApiResultRetryAlgorithm<ResponseT> implements ResultRetryAlgorithm<ResponseT> {
29+
// Duration to sleep on if the error is DEADLINE_EXCEEDED.
30+
public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1);
31+
32+
@Override
33+
public TimedAttemptSettings createNextAttempt(
34+
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
35+
if (prevThrowable != null) {
36+
Status status = Status.fromThrowable(prevThrowable);
37+
if (status.getCode() == Status.Code.INTERNAL
38+
&& status.getDescription() != null
39+
&& status.getDescription().equals("Received unexpected EOS on DATA frame from server")) {
40+
return TimedAttemptSettings.newBuilder()
41+
.setGlobalSettings(prevSettings.getGlobalSettings())
42+
.setRetryDelay(prevSettings.getRetryDelay())
43+
.setRpcTimeout(prevSettings.getRpcTimeout())
44+
.setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION)
45+
.setAttemptCount(prevSettings.getAttemptCount() + 1)
46+
.setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos())
47+
.build();
48+
}
49+
}
50+
return null;
51+
}
52+
53+
@Override
54+
public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) {
55+
if (prevThrowable != null) {
56+
Status status = Status.fromThrowable(prevThrowable);
57+
if (status.getCode() == Status.Code.INTERNAL
58+
&& status.getDescription() != null
59+
&& status.getDescription().equals("Received unexpected EOS on DATA frame from server")) {
60+
return true;
61+
}
62+
}
63+
return (prevThrowable instanceof ApiException) && ((ApiException) prevThrowable).isRetryable();
64+
}
65+
}

0 commit comments

Comments
 (0)