Skip to content
Permalink
Browse files
feat: add async api (#81)
* feat: add async api

* feat: session pool is non-blocking

* tests: fix integration tests that assumed tx was blocking

Some integration tests started transactions without executing a query,
and expected these transactions to fail. However, as the client is
now non-blocking up until the first call to ResultSet#next(), no
exception would occur.

* feat: add read methods support

* tests: test async runner

* feat: create async runner

* tests: centralize some commonly used test objects

* feat: keep session checked out until async finishes

* fix: fix span test cases after rebase

* fix: fix async runner tests

* fix: make async runner wait for async operations

* examples: add example integration test

* examples: add more examples

* tests: fix flaky tests

* rebase: rebase on current master

* fix: run code formatter

* feat: add support for poller

* tests: support more param types

* fix: fix race conditions

* feat: return ApiFuture to monitor end of AsyncResultSet

* feat: add helper method for create test result sets

* feat: add batchUpdateAsync

* fix: add ignored interface differences

* refactor: use future as waiter in SessionPool

* format: run code formatter

* tests: fix test case + remove commented code

* fix: AsyncResultSet should throw Cancelled

* feat: expose DatabaseId.of(String name)

* deps: set version to 1.53 to match bom

* feat: steps to add async support for tx manager

* review: process review comments

* chore: remove unused code

* clirr: add ignored differences to clirr

* fix: call listeners after all rows have been consumed

* feat: towards AsyncTransactionManager

* fix: session leaks + code format

* fix: more session leak fixes

* feat: further work on AsyncTransactionManager

* fix: fix test failures

* fix: fix several race conditions

* tests: increase test timeout

* feat: further towards AsyncTransactionManager

* feat: require executor for transaction functions

* revert: remove async connection api from branch

* chore: run code formatter

* chore: fix flaky test case

* tests: fix ITs for emulator

* fix: SpannerOptions.toBuilder().host should override emulatorHost

* tests: fix potentially hanging test
  • Loading branch information
olavloite committed Jun 30, 2020
1 parent 965e95e commit 462839b625e58e235581b8ba10b398e1d222eaaf
Showing with 9,895 additions and 687 deletions.
  1. +82 −1 google-cloud-spanner/clirr-ignored-differences.xml
  2. +156 −1 google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
  3. +6 −6 google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java
  4. +226 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSet.java
  5. +586 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java
  6. +59 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunner.java
  7. +81 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunnerImpl.java
  8. +203 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java
  9. +167 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java
  10. +5 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java
  11. +121 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java
  12. +26 −4 google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java
  13. +1 −1 google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseId.java
  14. +65 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingAsyncResultSet.java
  15. +14 −7 google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java
  16. +101 −45 google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingStructReader.java
  17. +34 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java
  18. +5 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java
  19. +31 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/ReadContext.java
  20. +27 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSets.java
  21. +91 −22 google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
  22. +880 −296 google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
  23. +216 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java
  24. +19 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java
  25. +4 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/Spanner.java
  26. +17 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java
  27. +82 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java
  28. +1 −1 google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceUtil.java
  29. +24 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContext.java
  30. +258 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContextFutureImpl.java
  31. +12 −3 google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java
  32. +373 −45 google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java
  33. +36 −8 google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java
  34. +14 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java
  35. +140 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractAsyncTransactionTest.java
  36. +2 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java
  37. +464 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplStressTest.java
  38. +443 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java
  39. +618 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerTest.java
  40. +1,045 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java
  41. +1 −1 google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java
  42. +522 −29 google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java
  43. +15 −14 ...le-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java
  44. +2 −3 google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockDatabaseAdminServiceImpl.java
  45. +270 −61 google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java
  46. +151 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerTestUtil.java
  47. +166 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/RandomResultSetGenerator.java
  48. +510 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadAsyncTest.java
  49. +34 −1 google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java
  50. +7 −4 google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java
  51. +11 −10 google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java
  52. +48 −45 google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java
  53. +61 −38 google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java
  54. +1 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpanTest.java
  55. +5 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerExceptionFactoryTest.java
  56. +7 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerGaxRetryTest.java
  57. +37 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerMatchers.java
  58. +21 −16 google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java
  59. +32 −17 google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java
  60. +30 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ReadOnlyTransactionTest.java
  61. +30 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SingleUseTransactionTest.java
  62. +309 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncAPITest.java
  63. +550 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncExamplesTest.java
  64. +1 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseTest.java
  65. +12 −6 google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITReadOnlyTxnTest.java
  66. +2 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITReadTest.java
  67. +318 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionManagerAsyncTest.java
  68. +6 −1 google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java
  69. +1 −1 versions.txt
@@ -93,7 +93,6 @@
<className>com/google/cloud/spanner/DatabaseAdminClient</className>
<method>com.google.cloud.spanner.Backup updateBackup(java.lang.String, java.lang.String, com.google.cloud.Timestamp)</method>
</difference>

<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
@@ -147,6 +146,88 @@
<method>com.google.api.gax.paging.Page listDatabases()</method>
</difference>

<!-- Add Async API -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.api.core.ApiFuture executeQueryAsync(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>* runAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>* transactionManagerAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/Spanner</className>
<method>* getAsyncExecutorProvider(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/ReadContext</className>
<method>* executeQueryAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/ReadContext</className>
<method>* readAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/ReadContext</className>
<method>* readRowAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/ReadContext</className>
<method>* readUsingIndexAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/ReadContext</className>
<method>* readRowUsingIndexAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>* batchUpdateAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>* executeUpdateAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>* beginTransactionAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>* commitAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>* rollbackAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>* executeBatchDmlAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>* executeQueryAsync(*)</method>
</difference>

<!-- Adding operation RPCs to InstanceAdminClient. -->
<difference>
<differenceType>7012</differenceType>
@@ -21,16 +21,24 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.ExecutorProvider;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbstractResultSet.CloseableIterator;
import com.google.cloud.spanner.AbstractResultSet.GrpcResultSet;
import com.google.cloud.spanner.AbstractResultSet.GrpcStreamIterator;
import com.google.cloud.spanner.AbstractResultSet.ResumableStreamIterator;
import com.google.cloud.spanner.AsyncResultSet.CallbackResponse;
import com.google.cloud.spanner.AsyncResultSet.ReadyCallback;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.ReadOption;
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
@@ -62,6 +70,7 @@
private Span span = Tracing.getTracer().getCurrentSpan();
private int defaultPrefetchChunks = SpannerOptions.Builder.DEFAULT_PREFETCH_CHUNKS;
private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS;
private ExecutorProvider executorProvider;

Builder() {}

@@ -95,9 +104,25 @@ B setDefaultQueryOptions(QueryOptions defaultQueryOptions) {
return self();
}

B setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
return self();
}

abstract T build();
}

/**
* {@link AsyncResultSet} that supports adding listeners that are called when all rows from the
* underlying result stream have been fetched.
*/
interface ListenableAsyncResultSet extends AsyncResultSet {
/** Adds a listener to this {@link AsyncResultSet}. */
void addListener(Runnable listener);

void removeListener(Runnable listener);
}

/**
* A {@code ReadContext} for standalone reads. This can only be used for a single operation, since
* each standalone read may see a different timestamp of Cloud Spanner data.
@@ -350,7 +375,8 @@ void initTransaction() {
final Object lock = new Object();
final SessionImpl session;
final SpannerRpc rpc;
final Span span;
final ExecutorProvider executorProvider;
Span span;
private final int defaultPrefetchChunks;
private final QueryOptions defaultQueryOptions;

@@ -374,6 +400,12 @@ void initTransaction() {
this.defaultPrefetchChunks = builder.defaultPrefetchChunks;
this.defaultQueryOptions = builder.defaultQueryOptions;
this.span = builder.span;
this.executorProvider = builder.executorProvider;
}

@Override
public void setSpan(Span span) {
this.span = span;
}

long getSeqNo() {
@@ -386,12 +418,38 @@ public final ResultSet read(
return readInternal(table, null, keys, columns, options);
}

@Override
public ListenableAsyncResultSet readAsync(
String table, KeySet keys, Iterable<String> columns, ReadOption... options) {
Options readOptions = Options.fromReadOptions(options);
final int bufferRows =
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(
executorProvider, readInternal(table, null, keys, columns, options), bufferRows);
}

@Override
public final ResultSet readUsingIndex(
String table, String index, KeySet keys, Iterable<String> columns, ReadOption... options) {
return readInternal(table, checkNotNull(index), keys, columns, options);
}

@Override
public ListenableAsyncResultSet readUsingIndexAsync(
String table, String index, KeySet keys, Iterable<String> columns, ReadOption... options) {
Options readOptions = Options.fromReadOptions(options);
final int bufferRows =
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(
executorProvider,
readInternal(table, checkNotNull(index), keys, columns, options),
bufferRows);
}

@Nullable
@Override
public final Struct readRow(String table, Key key, Iterable<String> columns) {
@@ -400,6 +458,13 @@ public final Struct readRow(String table, Key key, Iterable<String> columns) {
}
}

@Override
public final ApiFuture<Struct> readRowAsync(String table, Key key, Iterable<String> columns) {
try (AsyncResultSet resultSet = readAsync(table, KeySet.singleKey(key), columns)) {
return consumeSingleRowAsync(resultSet);
}
}

@Nullable
@Override
public final Struct readRowUsingIndex(
@@ -409,12 +474,35 @@ public final Struct readRowUsingIndex(
}
}

@Override
public final ApiFuture<Struct> readRowUsingIndexAsync(
String table, String index, Key key, Iterable<String> columns) {
try (AsyncResultSet resultSet =
readUsingIndexAsync(table, index, KeySet.singleKey(key), columns)) {
return consumeSingleRowAsync(resultSet);
}
}

@Override
public final ResultSet executeQuery(Statement statement, QueryOption... options) {
return executeQueryInternal(
statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options);
}

@Override
public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOption... options) {
Options readOptions = Options.fromQueryOptions(options);
final int bufferRows =
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(
executorProvider,
executeQueryInternal(
statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options),
bufferRows);
}

@Override
public final ResultSet analyzeQuery(Statement statement, QueryAnalyzeMode readContextQueryMode) {
switch (readContextQueryMode) {
@@ -666,4 +754,71 @@ private Struct consumeSingleRow(ResultSet resultSet) {
}
return row;
}

static ApiFuture<Struct> consumeSingleRowAsync(AsyncResultSet resultSet) {
final SettableApiFuture<Struct> result = SettableApiFuture.create();
// We can safely use a directExecutor here, as we will only be consuming one row, and we will
// not be doing any blocking stuff in the handler.
final SettableApiFuture<Struct> row = SettableApiFuture.create();
ApiFutures.addCallback(
resultSet.setCallback(MoreExecutors.directExecutor(), ConsumeSingleRowCallback.create(row)),
new ApiFutureCallback<Void>() {
@Override
public void onFailure(Throwable t) {
result.setException(t);
}

@Override
public void onSuccess(Void input) {
try {
result.set(row.get());
} catch (Throwable t) {
result.setException(t);
}
}
},
MoreExecutors.directExecutor());
return result;
}

/**
* {@link ReadyCallback} for returning the first row in a result set as a future {@link Struct}.
*/
private static class ConsumeSingleRowCallback implements ReadyCallback {
private final SettableApiFuture<Struct> result;
private Struct row;

static ConsumeSingleRowCallback create(SettableApiFuture<Struct> result) {
return new ConsumeSingleRowCallback(result);
}

private ConsumeSingleRowCallback(SettableApiFuture<Struct> result) {
this.result = result;
}

@Override
public CallbackResponse cursorReady(AsyncResultSet resultSet) {
try {
switch (resultSet.tryNext()) {
case DONE:
result.set(row);
return CallbackResponse.DONE;
case NOT_READY:
return CallbackResponse.CONTINUE;
case OK:
if (row != null) {
throw newSpannerException(
ErrorCode.INTERNAL, "Multiple rows returned for single key");
}
row = resultSet.getCurrentRowAsStruct();
return CallbackResponse.CONTINUE;
default:
throw new IllegalStateException();
}
} catch (Throwable t) {
result.setException(t);
return CallbackResponse.DONE;
}
}
}
}
@@ -495,7 +495,7 @@ private static Struct decodeStructValue(Type structType, ListValue structValue)
return new GrpcStruct(structType, fields);
}

private static Object decodeArrayValue(Type elementType, ListValue listValue) {
static Object decodeArrayValue(Type elementType, ListValue listValue) {
switch (elementType.getCode()) {
case BOOL:
// Use a view: element conversion is virtually free.
@@ -1009,7 +1009,7 @@ protected PartialResultSet computeNext() {
}
}

private static double valueProtoToFloat64(com.google.protobuf.Value proto) {
static double valueProtoToFloat64(com.google.protobuf.Value proto) {
if (proto.getKindCase() == KindCase.STRING_VALUE) {
switch (proto.getStringValue()) {
case "-Infinity":
@@ -1037,7 +1037,7 @@ private static double valueProtoToFloat64(com.google.protobuf.Value proto) {
return proto.getNumberValue();
}

private static NullPointerException throwNotNull(int columnIndex) {
static NullPointerException throwNotNull(int columnIndex) {
throw new NullPointerException(
"Cannot call array getter for column " + columnIndex + " with null elements");
}
@@ -1048,7 +1048,7 @@ private static NullPointerException throwNotNull(int columnIndex) {
* {@code BigDecimal} respectively. Rather than construct new wrapper objects for each array
* element, we use primitive arrays and a {@code BitSet} to track nulls.
*/
private abstract static class PrimitiveArray<T, A> extends AbstractList<T> {
abstract static class PrimitiveArray<T, A> extends AbstractList<T> {
private final A data;
private final BitSet nulls;
private final int size;
@@ -1103,7 +1103,7 @@ A toPrimitiveArray(int columnIndex) {
}
}

private static class Int64Array extends PrimitiveArray<Long, long[]> {
static class Int64Array extends PrimitiveArray<Long, long[]> {
Int64Array(ListValue protoList) {
super(protoList);
}
@@ -1128,7 +1128,7 @@ Long get(long[] array, int i) {
}
}

private static class Float64Array extends PrimitiveArray<Double, double[]> {
static class Float64Array extends PrimitiveArray<Double, double[]> {
Float64Array(ListValue protoList) {
super(protoList);
}

0 comments on commit 462839b

Please sign in to comment.