Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: async connection API #392

Merged
merged 41 commits into from Oct 8, 2020
Merged

feat!: async connection API #392

merged 41 commits into from Oct 8, 2020

Conversation

olavloite
Copy link
Contributor

@olavloite olavloite commented Aug 19, 2020

Description

This PR adds async methods for the Connection API. Note that the Connection API, including the async methods, still assumes the following:

  1. A Connection instance is not thread safe.
  2. At most one statement will be actively executed at any time.

Technical Implementation

  1. The PR adds Async versions for all execute and transaction methods of the Connection interface.
  2. The ConnectionImpl class already has a single-threaded executor that will actually execute the statements on the backend. This was already implemented this way in preparation for a future async version of the API, and because it made it possible to cancel statements and make individual statements have a specific timeout.
  3. All Async methods are executed using the executor from 2, and are basically the same as the current synchronous version, except the API no longer waits for the statement to finish, but returns the Future that is returned by the executor. Some extra logic is added for retrying statements in case of an AbortedException.
  4. Statements with a timeout used to implement the timeout by calling Future.get() with a timeout. That is now replaced with the new timeout feature that was recently added to the client library. This makes the timeout feature also usable for asynchronous statements.
  5. DDL statements cannot use the new timeout feature in the client library, as these execute RPCs on the DatabaseAdmin interface instead of the Spanner interface, still implement the timeout by calling Future.get() with a timeout.
  6. The UnitOfWork interface is the base interface for all transactions in the Connection API. All the methods in this interface have been changed from sync to async, and the synchronous methods in the Connection interface use the async methods under the hood. This ensures that there is only one implementation of each method. It also ensures that all existing (integration) tests also invoke the new async methods.

olavloite added 3 commits Aug 10, 2020
The Spanner client allows a user to set custom timeouts while creating a
SpannerOptions instance, but these timeouts are static and are applied to
all invocations of the RPCs. This change introduces the possibility to set
custom timeouts and other call options on a per-RPC basis.

Fixes #378
@google-cla google-cla bot added the cla: yes label Aug 19, 2020
@olavloite
Copy link
Contributor Author

@olavloite olavloite commented Aug 19, 2020

@product-auto-label product-auto-label bot added the api: spanner label Aug 21, 2020
@olavloite olavloite changed the title [WIP] feat: async connection API feat: async connection API (WIP) Aug 25, 2020
@olavloite olavloite changed the title feat: async connection API (WIP) feat: async connection API Sep 8, 2020
@olavloite olavloite marked this pull request as ready for review Sep 8, 2020
@@ -89,7 +89,7 @@ static ErrorCode valueOf(String name, ErrorCode defaultValue) {
/**
* Returns the error code corresponding to a gRPC status, or {@code UNKNOWN} if not recognized.
*/
static ErrorCode fromGrpcStatus(Status status) {
public static ErrorCode fromGrpcStatus(Status status) {
Copy link
Contributor Author

@olavloite olavloite Sep 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to public to be accessible from the com.google.cloud.spanner.connection package.

ResultSet delegate, ExecutorProvider executorProvider) {
return new AsyncResultSetImpl(executorProvider, delegate, 100);
ResultSet delegate, ExecutorProvider executorProvider, QueryOption... options) {
Options readOptions = Options.fromQueryOptions(options);
Copy link
Contributor Author

@olavloite olavloite Sep 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allow specifying a buffer size instead of always using a default size.

private volatile UnitOfWorkState state = UnitOfWorkState.STARTED;
private volatile AbortedException abortedException;
Copy link
Contributor Author

@olavloite olavloite Sep 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeps track of whether this transaction has already aborted, and stops allowing any further statements on the transaction until it has been rolled back. Otherwise, a list of 'blind' async statements could lead to multiple unnecessary round-trips to the backend:

connection.executeUpdateAsync(statement1);
connection.executeUpdateAsync(statement2);
connection.commitAsync();

If the first statement in the example above aborts, the following statements will not be sent to the backend, but will return the aborted error of the first statement.

ConnectionPreconditions.checkState(
state == UnitOfWorkState.STARTED,
this.state == UnitOfWorkState.STARTED || this.state == UnitOfWorkState.ABORTED,
Copy link
Contributor Author

@olavloite olavloite Sep 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We allow submitting a statement on an aborted transaction, as the aborted error will be returned through the ApiFuture that is returned. This guarantees that multiple blind async statements in a row will not cause unexpected exceptions.

throw e;

ApiFuture<ResultSet> res;
if (retryAbortsInternally) {
Copy link
Contributor Author

@olavloite olavloite Sep 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try-catch block removed, as the exception is returned by the ApiFuture.

* continue to block on the timed out/cancelled statement, a new {@link ExecutorService} is
* created.
*/
void recreate() {
Copy link
Contributor Author

@olavloite olavloite Sep 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer needed thanks to the new feature in the client library for setting a timeout per RPC.

@@ -39,9 +41,11 @@

enum UnitOfWorkState {
STARTED,
COMMITTING,
Copy link
Contributor Author

@olavloite olavloite Sep 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

State added as a commitAsync will initiate a commit, but return before the outcome of the commit is known. Setting this state is needed in order to know that the following statement should be executed in a new transaction.

@@ -408,6 +409,7 @@ private StatusRuntimeException getException() {
private final int randomExecutionTime;
private final Queue<Exception> exceptions;
private final boolean stickyException;
private final Queue<Long> streamIndices;
Copy link
Contributor Author

@olavloite olavloite Sep 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This property is used to trigger an error halfway a stream of PartialResultSets.

mockDatabaseAdmin.reset();
mockInstanceAdmin.reset();

futureParentHandlers = Logger.getLogger(AbstractFuture.class.getName()).getUseParentHandlers();
Copy link
Contributor Author

@olavloite olavloite Sep 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disable logging of certain warnings that may be ignored during these tests. These are generated because we force errors halfway streams etc.

fail("missing expected exception");
} catch (IllegalStateException e) {
}
}

Copy link
Contributor Author

@olavloite olavloite Sep 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests have been replaced by tests using a mock server.

@@ -275,7 +282,7 @@ public void run() {
switch (response) {
case DONE:
state = State.DONE;
closeDelegateResultSet();
Copy link
Contributor Author

@olavloite olavloite Sep 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The delegate ResultSet should not be closed by the callback executor, but only by the executor that fetches and buffers the rows (in the ProduceRowsCallable at line 380)

@codecov
Copy link

@codecov codecov bot commented Sep 11, 2020

Codecov Report

Merging #392 into master will increase coverage by 1.84%.
The diff coverage is 87.62%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master     #392      +/-   ##
============================================
+ Coverage     82.19%   84.04%   +1.84%     
- Complexity     2470     2538      +68     
============================================
  Files           138      140       +2     
  Lines         13646    13868     +222     
  Branches       1314     1327      +13     
============================================
+ Hits          11217    11655     +438     
+ Misses         1900     1670     -230     
- Partials        529      543      +14     
Impacted Files Coverage Δ Complexity Δ
.../main/java/com/google/cloud/spanner/ErrorCode.java 82.85% <ø> (ø) 7.00 <0.00> (ø)
...om/google/cloud/spanner/connection/Connection.java 100.00% <ø> (ø) 0.00 <0.00> (ø)
.../cloud/spanner/connection/StatementResultImpl.java 85.48% <0.00%> (-14.52%) 20.00 <0.00> (ø)
...le/cloud/spanner/connection/StatementExecutor.java 79.16% <56.25%> (-11.95%) 7.00 <3.00> (-1.00)
...om/google/cloud/spanner/TransactionRunnerImpl.java 86.91% <66.66%> (+0.82%) 9.00 <0.00> (ø)
...m/google/cloud/spanner/connection/SpannerPool.java 85.87% <66.66%> (-1.11%) 30.00 <0.00> (ø)
...d/spanner/connection/AsyncStatementResultImpl.java 79.06% <79.06%> (ø) 12.00 <12.00> (?)
...a/com/google/cloud/spanner/AsyncResultSetImpl.java 92.27% <86.66%> (+1.61%) 32.00 <2.00> (+1.00)
...cloud/spanner/connection/ReadWriteTransaction.java 80.89% <88.82%> (+18.52%) 55.00 <12.00> (+11.00)
...oogle/cloud/spanner/connection/ConnectionImpl.java 84.84% <90.21%> (+5.15%) 175.00 <23.00> (+21.00)
... and 35 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update c99294b...b6e175a. Read the comment docs.

@olavloite olavloite requested review from skuruppu and thiagotnunes Sep 12, 2020
@olavloite olavloite added the kokoro:force-run label Sep 12, 2020
@yoshi-kokoro yoshi-kokoro removed the kokoro:force-run label Sep 12, 2020
@olavloite olavloite requested review from as code owners Oct 5, 2020
@@ -89,7 +91,7 @@ private State(boolean shouldStop) {
private final BlockingDeque<Struct> buffer;
private Struct currentRow;
/** The underlying synchronous {@link ResultSet} that is producing the rows. */
private final ResultSet delegateResultSet;
private final Supplier<ResultSet> delegateResultSet;
Copy link
Contributor Author

@olavloite olavloite Oct 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The executeQueryAsync method will produce a Future<ResultSet> that should be used as the underlying delegate of an AsyncResultSet.

@@ -261,7 +268,7 @@ public void run() {
// we'll keep the cancelled state.
return;
}
executionException = SpannerExceptionFactory.newSpannerException(e);
executionException = SpannerExceptionFactory.asSpannerException(e);
Copy link
Contributor Author

@olavloite olavloite Oct 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returns the same exception instance if e is already a SpannerException, instead of wrapping a SpannerException in another SpannerException.

return getOrNull(Preconditions.checkNotNull(future));
}

public static <T> T getOrNull(ApiFuture<T> future) throws SpannerException {
Copy link
Contributor Author

@olavloite olavloite Oct 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convenience method that will catch and wrap common exceptions.

*/
void commit();
ApiFuture<Void> commitAsync();
Copy link
Contributor Author

@olavloite olavloite Oct 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All methods in the UnitOfWork interface have been changed from sync to async. The sync methods in the Connection interface now use the async versions in UnitOfWork under the hood.

@@ -107,13 +113,7 @@ public Boolean call() throws Exception {
@Override
public boolean next() {
// Call next() with retry.
boolean res = transaction.runWithRetry(nextCallable);
Copy link
Contributor Author

@olavloite olavloite Oct 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been moved to the NextCallable to make it retriable.

ConnectionPreconditions.checkState(
state == UnitOfWorkState.STARTED, "The batch is no longer active and cannot be ran");
try {
Copy link
Contributor Author

@olavloite olavloite Oct 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block of code is moved to the Callable below, that can be used as input for the generic executeStatementAsync method.

@@ -236,14 +237,17 @@ public int hashCode() {
@GuardedBy("this")
private final Map<SpannerPoolKey, Long> lastConnectionClosedAt = new HashMap<>();

private final Ticker ticker;
Copy link
Contributor Author

@olavloite olavloite Oct 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a Ticker to measure time to make it easier to test.

private static ExecutorService createExecutorService() {
return new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), THREAD_FACTORY);
private static ListeningExecutorService createExecutorService() {
Copy link
Contributor Author

@olavloite olavloite Oct 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a ListeningExecutorService. The ListenableFutures that are returned by this can be converted to ApiFutures.

COMMITTED,
COMMIT_FAILED,
ROLLED_BACK,
RUNNING,
Copy link
Contributor Author

@olavloite olavloite Oct 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This state is added to indicate that a batch is running, but not yet finished, and that all subsequent statements should not be included in the batch.

Copy link
Contributor

@thiagotnunes thiagotnunes left a comment

LGTM

@olavloite olavloite changed the title feat: async connection API feat!: async connection API Oct 7, 2020
@olavloite olavloite merged commit 3dd0675 into master Oct 8, 2020
19 checks passed
@olavloite olavloite deleted the async-connection branch Oct 8, 2020
@release-please release-please bot mentioned this pull request Oct 8, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: spanner cla: yes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants