Skip to content
Permalink
Browse files
feat!: async connection API (#392)
* feat: support setting timeout per RPC

The Spanner client allows a user to set custom timeouts while creating a
SpannerOptions instance, but these timeouts are static and are applied to
all invocations of the RPCs. This change introduces the possibility to set
custom timeouts and other call options on a per-RPC basis.

Fixes #378

* fix: change grpc deps from test to compile scope

* feat: add async api for connection

* fix: fix test failures

* fix: move state handling from callback to callable

* fix: fix integration tests with emulator

* fix: fix timeout integration test on emulator

* fix: prevent flakiness in DDL tests

* fix: fix clirr build failures

* fix: do not set transaction state for Aborted err

* fix: set transaction state after retry

* cleanup: remove sync methods and use async instead

* cleanup: remove unused code

* feat: make ddl async

* fix: reduce timeout and remove debug info

* feat: make runBatch async

* test: set forkCount to 1 to investigate test failure

* fix: linting + clirr

* fix: prevent deadlock in DmlBatch

* fix: fix DMLBatch state handling

* tests: add tests for aborted async transactions

* test: add aborted tests

* fix: add change to clirr + more tests

* fix: require a rollback after a tx has aborted

* docs: add javadoc for new methods

* tests: add integration tests

* fix: wait for commit before select

* fix: fix handling aborted commit

* docs: document behavior -Async methods

* fix: iterating without callback could cause exception

* fix: remove todos and commented code

* feat: keep track of caller to include in stacktrace

* docs: explain why Aborted is active

* fix: use ticker for better testability

* test: increase coverage and remove unused code

* test: add additional tests

* docs: add missing @OverRide

* docs: fix comment
  • Loading branch information
olavloite committed Oct 8, 2020
1 parent 691a23c commit 3dd0675d2d7882d40a6af1e12fda3b4617019870
Showing with 5,690 additions and 1,762 deletions.
  1. +52 −0 google-cloud-spanner/clirr-ignored-differences.xml
  2. +22 −15 google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java
  3. +1 −1 google-cloud-spanner/src/main/java/com/google/cloud/spanner/ErrorCode.java
  4. +38 −2 google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSets.java
  5. +43 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerApiFutures.java
  6. +26 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java
  7. +7 −3 google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java
  8. +119 −25 google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/AbstractBaseUnitOfWork.java
  9. +10 −5 ...-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/AbstractMultiUseTransaction.java
  10. +47 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/AsyncStatementResult.java
  11. +130 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/AsyncStatementResultImpl.java
  12. +9 −9 google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ChecksumResultSet.java
  13. +245 −2 google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/Connection.java
  14. +196 −80 google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java
  15. +22 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java
  16. +51 −67 google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/DdlBatch.java
  17. +41 −27 google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/DmlBatch.java
  18. +15 −12 google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadOnlyTransaction.java
  19. +360 −226 google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java
  20. +183 −208 google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SingleUseTransaction.java
  21. +22 −6 google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java
  22. +37 −36 google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/StatementExecutor.java
  23. +23 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/StatementResultImpl.java
  24. +32 −28 google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/UnitOfWork.java
  25. +73 −10 google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java
  26. +137 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsTest.java
  27. +118 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerApiFuturesTest.java
  28. +60 −3 google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java
  29. +99 −0 ...cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AsyncStatementResultImplTest.java
  30. +688 −0 ...loud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionAsyncApiAbortedTest.java
  31. +833 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionAsyncApiTest.java
  32. +16 −8 google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionImplTest.java
  33. +77 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionTest.java
  34. +92 −49 google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/DdlBatchTest.java
  35. +16 −25 google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/DmlBatchTest.java
  36. +16 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ITAbstractSpannerTest.java
  37. +22 −28 google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ReadOnlyTransactionTest.java
  38. +22 −19 google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ReadWriteTransactionTest.java
  39. +53 −112 google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SingleUseTransactionTest.java
  40. +76 −69 google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SpannerPoolTest.java
  41. +527 −667 google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/StatementTimeoutTest.java
  42. +1,015 −0 ...oud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITAsyncTransactionRetryTest.java
  43. +3 −14 google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITReadOnlySpannerTest.java
  44. +2 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITSqlMusicScriptTest.java
  45. +3 −1 ...le-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITTransactionRetryTest.java
  46. +11 −5 ...r/src/test/resources/com/google/cloud/spanner/connection/ITSqlScriptTest_TestStatementTimeout.sql
@@ -319,4 +319,56 @@
<className>com/google/cloud/spanner/Value</className>
<method>java.util.List getNumericArray()</method>
</difference>

<!-- Async Connection API -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture beginTransactionAsync()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture commitAsync()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.connection.AsyncStatementResult executeAsync(com.google.cloud.spanner.Statement)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture executeBatchUpdateAsync(java.lang.Iterable)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture executeUpdateAsync(com.google.cloud.spanner.Statement)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture rollbackAsync()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture runBatchAsync()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture writeAsync(com.google.cloud.spanner.Mutation)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture writeAsync(java.lang.Iterable)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/ResultSets</className>
<method>com.google.cloud.spanner.AsyncResultSet toAsyncResultSet(com.google.cloud.spanner.ResultSet, com.google.api.gax.core.ExecutorProvider)</method>
</difference>
</differences>
@@ -25,6 +25,8 @@
import com.google.cloud.spanner.AbstractReadContext.ListenableAsyncResultSet;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@@ -88,8 +90,8 @@ private State(boolean shouldStop) {

private final BlockingDeque<Struct> buffer;
private Struct currentRow;
/** The underlying synchronous {@link ResultSet} that is producing the rows. */
private final ResultSet delegateResultSet;
/** Supplies the underlying synchronous {@link ResultSet} that will be producing the rows. */
private final Supplier<ResultSet> delegateResultSet;

/**
* Any exception that occurs while executing the query and iterating over the result set will be
@@ -144,6 +146,11 @@ private State(boolean shouldStop) {
private volatile CountDownLatch consumingLatch = new CountDownLatch(0);

AsyncResultSetImpl(ExecutorProvider executorProvider, ResultSet delegate, int bufferSize) {
this(executorProvider, Suppliers.ofInstance(Preconditions.checkNotNull(delegate)), bufferSize);
}

AsyncResultSetImpl(
ExecutorProvider executorProvider, Supplier<ResultSet> delegate, int bufferSize) {
super(delegate);
this.executorProvider = Preconditions.checkNotNull(executorProvider);
this.delegateResultSet = Preconditions.checkNotNull(delegate);
@@ -165,7 +172,7 @@ public void close() {
return;
}
if (state == State.INITIALIZED || state == State.SYNC) {
delegateResultSet.close();
delegateResultSet.get().close();
}
this.closed = true;
}
@@ -228,7 +235,7 @@ public CursorState tryNext() throws SpannerException {

private void closeDelegateResultSet() {
try {
delegateResultSet.close();
delegateResultSet.get().close();
} catch (Throwable t) {
log.log(Level.FINE, "Ignoring error from closing delegate result set", t);
}
@@ -261,7 +268,7 @@ public void run() {
// we'll keep the cancelled state.
return;
}
executionException = SpannerExceptionFactory.newSpannerException(e);
executionException = SpannerExceptionFactory.asSpannerException(e);
cursorReturnedDoneOrException = true;
}
return;
@@ -325,10 +332,10 @@ public Void call() throws Exception {
boolean stop = false;
boolean hasNext = false;
try {
hasNext = delegateResultSet.next();
hasNext = delegateResultSet.get().next();
} catch (Throwable e) {
synchronized (monitor) {
executionException = SpannerExceptionFactory.newSpannerException(e);
executionException = SpannerExceptionFactory.asSpannerException(e);
}
}
try {
@@ -357,13 +364,13 @@ public Void call() throws Exception {
}
}
if (!stop) {
buffer.put(delegateResultSet.getCurrentRowAsStruct());
buffer.put(delegateResultSet.get().getCurrentRowAsStruct());
startCallbackIfNecessary();
hasNext = delegateResultSet.next();
hasNext = delegateResultSet.get().next();
}
} catch (Throwable e) {
synchronized (monitor) {
executionException = SpannerExceptionFactory.newSpannerException(e);
executionException = SpannerExceptionFactory.asSpannerException(e);
stop = true;
}
}
@@ -544,9 +551,9 @@ public <T> List<T> toList(Function<StructReader, T> transformer) throws SpannerE
try {
return future.get();
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause());
throw SpannerExceptionFactory.asSpannerException(e.getCause());
} catch (Throwable e) {
throw SpannerExceptionFactory.newSpannerException(e);
throw SpannerExceptionFactory.asSpannerException(e);
}
}

@@ -558,14 +565,14 @@ public boolean next() throws SpannerException {
"Cannot call next() on a result set with a callback.");
this.state = State.SYNC;
}
boolean res = delegateResultSet.next();
currentRow = res ? delegateResultSet.getCurrentRowAsStruct() : null;
boolean res = delegateResultSet.get().next();
currentRow = res ? delegateResultSet.get().getCurrentRowAsStruct() : null;
return res;
}

@Override
public ResultSetStats getStats() {
return delegateResultSet.getStats();
return delegateResultSet.get().getStats();
}

@Override
@@ -89,7 +89,7 @@ static ErrorCode valueOf(String name, ErrorCode defaultValue) {
/**
* Returns the error code corresponding to a gRPC status, or {@code UNKNOWN} if not recognized.
*/
static ErrorCode fromGrpcStatus(Status status) {
public static ErrorCode fromGrpcStatus(Status status) {
ErrorCode code = errorByRpcCode.get(status.getCode().value());
return code == null ? UNKNOWN : code;
}
@@ -16,14 +16,17 @@

package com.google.cloud.spanner;

import com.google.api.core.ApiFuture;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Type.Code;
import com.google.cloud.spanner.Type.StructField;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.spanner.v1.ResultSetStats;
@@ -65,8 +68,41 @@ public static AsyncResultSet toAsyncResultSet(ResultSet delegate) {
* ExecutorProvider}.
*/
public static AsyncResultSet toAsyncResultSet(
ResultSet delegate, ExecutorProvider executorProvider) {
return new AsyncResultSetImpl(executorProvider, delegate, 100);
ResultSet delegate, ExecutorProvider executorProvider, QueryOption... options) {
Options readOptions = Options.fromQueryOptions(options);
final int bufferRows =
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(executorProvider, delegate, bufferRows);
}

/**
* Converts the {@link ResultSet} that will be returned by the given {@link ApiFuture} to an
* {@link AsyncResultSet} using the given {@link ExecutorProvider}.
*/
public static AsyncResultSet toAsyncResultSet(
ApiFuture<ResultSet> delegate, ExecutorProvider executorProvider, QueryOption... options) {
Options readOptions = Options.fromQueryOptions(options);
final int bufferRows =
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(
executorProvider, new FutureResultSetSupplier(delegate), bufferRows);
}

private static class FutureResultSetSupplier implements Supplier<ResultSet> {
final ApiFuture<ResultSet> delegate;

FutureResultSetSupplier(ApiFuture<ResultSet> delegate) {
this.delegate = Preconditions.checkNotNull(delegate);
}

@Override
public ResultSet get() {
return SpannerApiFutures.get(delegate);
}
}

private static class PrePopulatedResultSet implements ResultSet {
@@ -0,0 +1,43 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.api.core.ApiFuture;
import com.google.common.base.Preconditions;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;

public class SpannerApiFutures {
public static <T> T get(ApiFuture<T> future) throws SpannerException {
return getOrNull(Preconditions.checkNotNull(future));
}

public static <T> T getOrNull(ApiFuture<T> future) throws SpannerException {
try {
return future == null ? null : future.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof SpannerException) {
throw (SpannerException) e.getCause();
}
throw SpannerExceptionFactory.newSpannerException(e.getCause());
} catch (InterruptedException e) {
throw SpannerExceptionFactory.propagateInterrupt(e);
} catch (CancellationException e) {
throw SpannerExceptionFactory.newSpannerExceptionForCancellation(null, e);
}
}
}
@@ -83,6 +83,18 @@ public static SpannerException propagateTimeout(TimeoutException e) {
ErrorCode.DEADLINE_EXCEEDED, "Operation did not complete in the given time", e);
}

/**
* Converts the given {@link Throwable} to a {@link SpannerException}. If <code>t</code> is
* already a (subclass of a) {@link SpannerException}, <code>t</code> is returned unaltered.
* Otherwise, a new {@link SpannerException} is created with <code>t</code> as its cause.
*/
public static SpannerException asSpannerException(Throwable t) {
if (t instanceof SpannerException) {
return (SpannerException) t;
}
return newSpannerException(t);
}

/**
* Creates a new exception based on {@code cause}.
*
@@ -126,6 +138,20 @@ public static SpannerBatchUpdateException newSpannerBatchUpdateException(
databaseError);
}

/**
* Constructs a new {@link AbortedDueToConcurrentModificationException} that can be re-thrown for
* a transaction that had already been aborted, but that the client application tried to use for
* additional statements.
*/
public static AbortedDueToConcurrentModificationException
newAbortedDueToConcurrentModificationException(
AbortedDueToConcurrentModificationException cause) {
return new AbortedDueToConcurrentModificationException(
DoNotConstructDirectly.ALLOWED,
"This transaction has already been aborted and could not be retried due to a concurrent modification. Rollback this transaction to start a new one.",
cause);
}

/**
* Creates a new exception based on {@code cause}. If {@code cause} indicates cancellation, {@code
* context} will be inspected to establish the type of cancellation.
@@ -150,7 +150,7 @@ public void removeListener(Runnable listener) {
@GuardedBy("lock")
private long retryDelayInMillis = -1L;

private ByteString transactionId;
private volatile ByteString transactionId;
private Timestamp commitTimestamp;

private TransactionContextImpl(Builder builder) {
@@ -238,12 +238,17 @@ void commit() {
try {
commitTimestamp = commitAsync().get();
} catch (InterruptedException e) {
if (commitFuture != null) {
commitFuture.cancel(true);
}
throw SpannerExceptionFactory.propagateInterrupt(e);
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause());
}
}

volatile ApiFuture<CommitResponse> commitFuture;

ApiFuture<Timestamp> commitAsync() {
final SettableApiFuture<Timestamp> res = SettableApiFuture.create();
final SettableApiFuture<Void> latch;
@@ -273,8 +278,7 @@ public void run() {
span.addAnnotation("Starting Commit");
final Span opSpan =
tracer.spanBuilderWithExplicitParent(SpannerImpl.COMMIT, span).startSpan();
final ApiFuture<CommitResponse> commitFuture =
rpc.commitAsync(commitRequest, session.getOptions());
commitFuture = rpc.commitAsync(commitRequest, session.getOptions());
commitFuture.addListener(
tracer.withSpan(
opSpan,

0 comments on commit 3dd0675

Please sign in to comment.