Skip to content
Permalink
Browse files
feat: add bufferAsync methods (#1145)
* feat: add bufferAsync methods

Adds bufferAsync methods to TransactionContext. The existing buffer methods
were already non-blocking, but the async versions also return an ApiFuture,
which make them easier to use when chaining multiple async calls together.

Also changes some calls in the AsyncTransactionManagerTest to use lambdas
instead of the test helper methods.

Fixes #1126

* fix: do not take lock on async method

* build: remove custom skip tests variable

* test: add test for committing twice

* fix: synchronize buffering and committing
  • Loading branch information
olavloite committed May 18, 2021
1 parent e70b009 commit 7d6816f1fd14bcd2c7f91d814855b5d921ba970d
@@ -605,4 +605,17 @@
<className>com/google/cloud/spanner/StructReader</className>
<method>com.google.cloud.spanner.Value getValue(java.lang.String)</method>
</difference>

<!-- Adds bufferAsync to DatabaseClient -->
<!-- These are not breaking changes, since we provide default interface implementation -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>com.google.api.core.ApiFuture bufferAsync(com.google.cloud.spanner.Mutation)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>com.google.api.core.ApiFuture bufferAsync(java.lang.Iterable)</method>
</difference>
</differences>
@@ -18,6 +18,7 @@

import com.google.api.core.ApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture;
import com.google.cloud.spanner.TransactionManager.TransactionState;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
@@ -98,31 +99,21 @@ Timestamp get(long timeout, TimeUnit unit)
* <p>Example usage:
*
* <pre>{@code
* TransactionContextFuture txnFuture = manager.beginAsync();
* final String column = "FirstName";
* txnFuture.then(
* new AsyncTransactionFunction<Void, Struct>() {
* @Override
* public ApiFuture<Struct> apply(TransactionContext txn, Void input)
* throws Exception {
* return txn.readRowAsync(
* "Singers", Key.of(singerId), Collections.singleton(column));
* }
* })
* .then(
* new AsyncTransactionFunction<Struct, Void>() {
* @Override
* public ApiFuture<Void> apply(TransactionContext txn, Struct input)
* throws Exception {
* String name = input.getString(column);
* txn.buffer(
* Mutation.newUpdateBuilder("Singers")
* .set(column)
* .to(name.toUpperCase())
* .build());
* return ApiFutures.immediateFuture(null);
* }
* })
* final long singerId = 1L;
* AsyncTransactionManager manager = client.transactionManagerAsync();
* TransactionContextFuture txnFuture = manager.beginAsync();
* txnFuture
* .then((transaction, ignored) ->
* transaction.readRowAsync("Singers", Key.of(singerId), Collections.singleton(column)),
* executor)
* .then((transaction, row) ->
* transaction.bufferAsync(
* Mutation.newUpdateBuilder("Singers")
* .set(column).to(row.getString(column).toUpperCase())
* .build()),
* executor)
* .commitAsync();
* }</pre>
*/
interface AsyncTransactionStep<I, O> extends ApiFuture<O> {
@@ -431,8 +431,7 @@ CommitResponse writeAtLeastOnceWithOptions(
* lifecycle. This API is meant for advanced users. Most users should instead use the {@link
* #runAsync()} API instead.
*
* <p>Example of using {@link AsyncTransactionManager} with lambda expressions (Java 8 and
* higher).
* <p>Example of using {@link AsyncTransactionManager}.
*
* <pre>{@code
* long singerId = 1L;
@@ -449,56 +448,11 @@ CommitResponse writeAtLeastOnceWithOptions(
* .then(
* (transaction, row) -> {
* String name = row.getString(column);
* transaction.buffer(
* return transaction.bufferAsync(
* Mutation.newUpdateBuilder("Singers")
* .set(column)
* .to(name.toUpperCase())
* .build());
* return ApiFutures.immediateFuture(null);
* })
* .commitAsync();
* try {
* commitTimestamp.get();
* break;
* } catch (AbortedException e) {
* Thread.sleep(e.getRetryDelayInMillis());
* transactionFuture = manager.resetForRetryAsync();
* }
* }
* }
* }</pre>
*
* <p>Example of using {@link AsyncTransactionManager} (Java 7).
*
* <pre>{@code
* final long singerId = 1L;
* try (AsyncTransactionManager manager = client().transactionManagerAsync()) {
* TransactionContextFuture transactionFuture = manager.beginAsync();
* while (true) {
* final String column = "FirstName";
* CommitTimestampFuture commitTimestamp =
* transactionFuture.then(
* new AsyncTransactionFunction<Void, Struct>() {
* @Override
* public ApiFuture<Struct> apply(TransactionContext transaction, Void input)
* throws Exception {
* return transaction.readRowAsync(
* "Singers", Key.of(singerId), Collections.singleton(column));
* }
* })
* .then(
* new AsyncTransactionFunction<Struct, Void>() {
* @Override
* public ApiFuture<Void> apply(TransactionContext transaction, Struct input)
* throws Exception {
* String name = input.getString(column);
* transaction.buffer(
* Mutation.newUpdateBuilder("Singers")
* .set(column)
* .to(name.toUpperCase())
* .build());
* return ApiFutures.immediateFuture(null);
* }
* })
* .commitAsync();
* try {
@@ -675,6 +675,11 @@ public void buffer(Mutation mutation) {
delegate.buffer(mutation);
}

@Override
public ApiFuture<Void> bufferAsync(Mutation mutation) {
return delegate.bufferAsync(mutation);
}

@Override
public Struct readRowUsingIndex(String table, String index, Key key, Iterable<String> columns) {
try {
@@ -703,6 +708,11 @@ public void buffer(Iterable<Mutation> mutations) {
delegate.buffer(mutations);
}

@Override
public ApiFuture<Void> bufferAsync(Iterable<Mutation> mutations) {
return delegate.bufferAsync(mutations);
}

@Override
public long executeUpdate(Statement statement, UpdateOption... options) {
try {
@@ -91,13 +91,23 @@ public interface TransactionContext extends ReadContext {
*/
void buffer(Mutation mutation);

/** Same as {@link #buffer(Mutation)}, but is guaranteed to be non-blocking. */
default ApiFuture<Void> bufferAsync(Mutation mutation) {
throw new UnsupportedOperationException("method should be overwritten");
}

/**
* Buffers mutations to be applied if the transaction commits successfully. The effects of the
* mutations will not be visible to subsequent operations in the transaction. All buffered
* mutations will be applied atomically.
*/
void buffer(Iterable<Mutation> mutations);

/** Same as {@link #buffer(Iterable)}, but is guaranteed to be non-blocking. */
default ApiFuture<Void> bufferAsync(Iterable<Mutation> mutations) {
throw new UnsupportedOperationException("method should be overwritten");
}

/**
* Executes the DML statement(s) and returns the number of rows modified. For non-DML statements,
* it will result in an {@code IllegalArgumentException}. The effects of the DML statement will be
@@ -54,7 +54,9 @@
import io.opencensus.trace.Tracing;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -75,6 +77,9 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner {
*/
private static final String TRANSACTION_CANCELLED_MESSAGE = "invalidated by a later transaction";

private static final String TRANSACTION_ALREADY_COMMITTED_MESSAGE =
"Transaction has already committed";

@VisibleForTesting
static class TransactionContextImpl extends AbstractReadContext implements TransactionContext {
static class Builder extends AbstractReadContext.Builder<Builder, TransactionContextImpl> {
@@ -146,7 +151,9 @@ public void removeListener(Runnable listener) {
}
}

@GuardedBy("lock")
private final Object committingLock = new Object();

@GuardedBy("committingLock")
private volatile boolean committing;

@GuardedBy("lock")
@@ -155,8 +162,7 @@ public void removeListener(Runnable listener) {
@GuardedBy("lock")
private volatile int runningAsyncOperations;

@GuardedBy("lock")
private List<Mutation> mutations = new ArrayList<>();
private final Queue<Mutation> mutations = new ConcurrentLinkedQueue<>();

@GuardedBy("lock")
private boolean aborted;
@@ -280,6 +286,16 @@ void commit() {
volatile ApiFuture<CommitResponse> commitFuture;

ApiFuture<CommitResponse> commitAsync() {
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
synchronized (committingLock) {
if (committing) {
throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
}
committing = true;
if (!mutations.isEmpty()) {
Mutation.toProto(mutations, mutationsProto);
}
}
final SettableApiFuture<CommitResponse> res = SettableApiFuture.create();
final SettableApiFuture<Void> finishOps;
CommitRequest.Builder builder =
@@ -303,14 +319,8 @@ ApiFuture<CommitResponse> commitAsync() {
} else {
finishOps = finishedAsyncOperations;
}
if (!mutations.isEmpty()) {
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
Mutation.toProto(mutations, mutationsProto);
builder.addAllMutations(mutationsProto);
}
// Ensure that no call to buffer mutations that would be lost can succeed.
mutations = null;
}
builder.addAllMutations(mutationsProto);
finishOps.addListener(
new CommitRunnable(res, finishOps, builder), MoreExecutors.directExecutor());
return res;
@@ -603,22 +613,44 @@ public void onDone(boolean withBeginTransaction) {

@Override
public void buffer(Mutation mutation) {
synchronized (lock) {
checkNotNull(mutations, "Context is closed");
synchronized (committingLock) {
if (committing) {
throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
}
mutations.add(checkNotNull(mutation));
}
}

@Override
public ApiFuture<Void> bufferAsync(Mutation mutation) {
// Normally, we would call the async method from the sync method, but this is also safe as
// both are non-blocking anyways, and this prevents the creation of an ApiFuture that is not
// really used when the sync method is called.
buffer(mutation);
return ApiFutures.immediateFuture(null);
}

@Override
public void buffer(Iterable<Mutation> mutations) {
synchronized (lock) {
checkNotNull(this.mutations, "Context is closed");
synchronized (committingLock) {
if (committing) {
throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
}
for (Mutation mutation : mutations) {
this.mutations.add(checkNotNull(mutation));
}
}
}

@Override
public ApiFuture<Void> bufferAsync(Iterable<Mutation> mutations) {
// Normally, we would call the async method from the sync method, but this is also safe as
// both are non-blocking anyways, and this prevents the creation of an ApiFuture that is not
// really used when the sync method is called.
buffer(mutations);
return ApiFutures.immediateFuture(null);
}

@Override
public long executeUpdate(Statement statement, UpdateOption... options) {
beforeReadOrQuery();

0 comments on commit 7d6816f

Please sign in to comment.