Skip to content
Permalink
Browse files
fix: Partitioned DML timeout was not always respected (#203)
* fix: Partitioned DML timeout was not always respected

Setting a timeout value for Partitioned DML would not be respected if the timeout
value was higher than the timeout value set for the ExecuteSql RPC on the SpannerStub.
Lower timeout values would be respected.

Fixes #199

* fix: add ignored changes + InternalApi

* tests: add test for retry on UNAVAILABLE
  • Loading branch information
olavloite committed May 14, 2020
1 parent 50cb174 commit 13cb37e55ddfd1ff4ec22b1dcdc20c4832eee444
@@ -159,4 +159,16 @@
<method>com.google.longrunning.Operation getOperation(java.lang.String)</method>
</difference>

<!-- Fix Partitioned DML timeout settings. -->
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.spanner.v1.ResultSet executePartitionedDml(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, org.threeten.bp.Duration)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/spi/v1/GapicSpannerRpc</className>
<method>com.google.spanner.v1.ResultSet executePartitionedDml(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map, org.threeten.bp.Duration)</method>
</difference>

</differences>
@@ -29,7 +29,6 @@
import com.google.spanner.v1.TransactionSelector;
import java.util.Map;
import java.util.concurrent.Callable;
import org.threeten.bp.Duration;

/** Partitioned DML transaction for bulk updates and deletes. */
class PartitionedDMLTransaction implements SessionTransaction {
@@ -63,7 +62,7 @@ private ByteString initTransaction() {
* Executes the {@link Statement} using a partitioned dml transaction with automatic retry if the
* transaction was aborted.
*/
long executePartitionedUpdate(final Statement statement, final Duration timeout) {
long executePartitionedUpdate(final Statement statement) {
checkState(isValid, "Partitioned DML has been invalidated by a new operation on the session");
Callable<com.google.spanner.v1.ResultSet> callable =
new Callable<com.google.spanner.v1.ResultSet>() {
@@ -84,7 +83,7 @@ public com.google.spanner.v1.ResultSet call() throws Exception {
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
}
}
return rpc.executePartitionedDml(builder.build(), session.getOptions(), timeout);
return rpc.executePartitionedDml(builder.build(), session.getOptions());
}
};
com.google.spanner.v1.ResultSet resultSet =
@@ -105,7 +105,7 @@ public String getName() {
public long executePartitionedUpdate(Statement stmt) {
setActive(null);
PartitionedDMLTransaction txn = new PartitionedDMLTransaction(this, spanner.getRpc());
return txn.executePartitionedUpdate(stmt, spanner.getOptions().getPartitionedDmlTimeout());
return txn.executePartitionedUpdate(stmt);
}

@Override
@@ -19,6 +19,7 @@
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.core.NanoClock;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
@@ -54,6 +55,7 @@
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub;
import com.google.cloud.spanner.v1.stub.GrpcSpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
@@ -153,6 +155,7 @@
import org.threeten.bp.Duration;

/** Implementation of Cloud Spanner remote calls using Gapic libraries. */
@InternalApi
public class GapicSpannerRpc implements SpannerRpc {
/**
* {@link ExecutorProvider} that keeps track of the executors that are created and shuts these
@@ -207,6 +210,7 @@ private synchronized void shutdown() {
private final ManagedInstantiatingExecutorProvider executorProvider;
private boolean rpcIsClosed;
private final SpannerStub spannerStub;
private final SpannerStub partitionedDmlStub;
private final InstanceAdminStub instanceAdminStub;
private final DatabaseAdminStubSettings databaseAdminStubSettings;
private final DatabaseAdminStub databaseAdminStub;
@@ -326,6 +330,22 @@ public GapicSpannerRpc(final SpannerOptions options) {
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
.build());
SpannerStubSettings.Builder pdmlSettings = options.getSpannerStubSettings().toBuilder();
pdmlSettings
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
.executeSqlSettings()
.setRetrySettings(
options
.getSpannerStubSettings()
.executeSqlSettings()
.getRetrySettings()
.toBuilder()
.setInitialRpcTimeout(options.getPartitionedDmlTimeout())
.setMaxRpcTimeout(options.getPartitionedDmlTimeout())
.build());
this.partitionedDmlStub = GrpcSpannerStub.create(pdmlSettings.build());

this.instanceAdminStub =
GrpcInstanceAdminStub.create(
@@ -1029,9 +1049,9 @@ public ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map<Option, ?

@Override
public ResultSet executePartitionedDml(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, Duration timeout) {
GrpcCallContext context = newCallContext(options, request.getSession(), timeout);
return get(spannerStub.executeSqlCallable().futureCall(request, context));
ExecuteSqlRequest request, @Nullable Map<Option, ?> options) {
GrpcCallContext context = newCallContext(options, request.getSession());
return get(partitionedDmlStub.executeSqlCallable().futureCall(request, context));
}

@Override
@@ -1191,19 +1211,11 @@ private static <T> T get(final Future<T> future) throws SpannerException {

@VisibleForTesting
GrpcCallContext newCallContext(@Nullable Map<Option, ?> options, String resource) {
return newCallContext(options, resource, null);
}

private GrpcCallContext newCallContext(
@Nullable Map<Option, ?> options, String resource, Duration timeout) {
GrpcCallContext context = GrpcCallContext.createDefault();
if (options != null) {
context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue());
}
context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName));
if (timeout != null) {
context = context.withTimeout(timeout);
}
if (callCredentialsProvider != null) {
CallCredentials callCredentials = callCredentialsProvider.getCallCredentials();
if (callCredentials != null) {
@@ -57,7 +57,6 @@
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* Abstracts remote calls to the Cloud Spanner service. Typically end-consumer code will never use
@@ -282,8 +281,7 @@ StreamingCall read(

ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map<Option, ?> options);

ResultSet executePartitionedDml(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, Duration timeout);
ResultSet executePartitionedDml(ExecuteSqlRequest request, @Nullable Map<Option, ?> options);

StreamingCall executeQuery(
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);

0 comments on commit 13cb37e

Please sign in to comment.