Skip to content
Permalink
Browse files
feat: introduce TransactionOptions and UpdateOptions (#716)
* feat: introduce TransactionOptions and UpdateOptions

Adds TransactionOptions and UpdateOptions for read/write transactions and statements.
These can be used in the future to specify options to affect how transactions and
statements will be executed.

* test: add options tests
  • Loading branch information
olavloite committed Dec 15, 2020
1 parent 0fd859d commit 5c96fab6d1c19518d52d0a7f0d634f0526066f03
Showing with 422 additions and 177 deletions.
  1. +47 −0 google-cloud-spanner/clirr-ignored-differences.xml
  2. +7 −5 google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
  3. +5 −2 google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java
  4. +6 −5 google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java
  5. +29 −28 google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java
  6. +27 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java
  7. +13 −8 google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java
  8. +31 −27 google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
  9. +75 −66 google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
  10. +9 −2 google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java
  11. +5 −4 google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContext.java
  12. +6 −3 google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java
  13. +35 −11 google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java
  14. +4 −2 google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java
  15. +6 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java
  16. +57 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java
  17. +38 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java
  18. +9 −3 google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java
  19. +1 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionContextImplTest.java
  20. +9 −9 google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java
  21. +3 −2 google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java
@@ -406,4 +406,51 @@
<className>com/google/cloud/spanner/AbstractLazyInitializer</className>
<method>java.lang.Object initialize()</method>
</difference>

<!-- TransactionOptions and UpdateOptions -->
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>long executePartitionedUpdate(com.google.cloud.spanner.Statement)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.TransactionRunner readWriteTransaction()</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.AsyncRunner runAsync()</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.TransactionManager transactionManager()</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.AsyncTransactionManager transactionManagerAsync()</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>long[] batchUpdate(java.lang.Iterable)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>com.google.api.core.ApiFuture batchUpdateAsync(java.lang.Iterable)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>long executeUpdate(com.google.cloud.spanner.Statement)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>com.google.api.core.ApiFuture executeUpdateAsync(com.google.cloud.spanner.Statement)</method>
</difference>
</differences>
@@ -554,7 +554,8 @@ QueryOptions buildQueryOptions(QueryOptions requestOptions) {
return builder.build();
}

ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(Statement statement, QueryMode queryMode) {
ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
Statement statement, QueryMode queryMode, Options options) {
ExecuteSqlRequest.Builder builder =
ExecuteSqlRequest.newBuilder()
.setSql(statement.getSql())
@@ -577,7 +578,8 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(Statement statement, Query
return builder;
}

ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(Iterable<Statement> statements) {
ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(
Iterable<Statement> statements, Options options) {
ExecuteBatchDmlRequest.Builder builder =
ExecuteBatchDmlRequest.newBuilder().setSession(session.getName());
int idx = 0;
@@ -609,7 +611,7 @@ ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(Iterable<Stateme
ResultSet executeQueryInternalWithOptions(
final Statement statement,
final com.google.spanner.v1.ExecuteSqlRequest.QueryMode queryMode,
Options options,
final Options options,
final ByteString partitionToken) {
beforeReadOrQuery();
final int prefetchChunks =
@@ -620,7 +622,7 @@ ResultSet executeQueryInternalWithOptions(
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks);
final ExecuteSqlRequest.Builder request =
getExecuteSqlRequestBuilder(statement, queryMode);
getExecuteSqlRequestBuilder(statement, queryMode, options);
if (partitionToken != null) {
request.setPartitionToken(partitionToken);
}
@@ -707,7 +709,7 @@ ResultSet readInternalWithOptions(
@Nullable String index,
KeySet keys,
Iterable<String> columns,
Options readOptions,
final Options readOptions,
ByteString partitionToken) {
beforeReadOrQuery();
final ReadRequest.Builder builder =
@@ -22,6 +22,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager;
import com.google.cloud.spanner.TransactionManager.TransactionState;
@@ -40,14 +41,16 @@

private final SessionImpl session;
private Span span;
private final Options options;

private TransactionRunnerImpl.TransactionContextImpl txn;
private TransactionState txnState;
private final SettableApiFuture<Timestamp> commitTimestamp = SettableApiFuture.create();

AsyncTransactionManagerImpl(SessionImpl session, Span span) {
AsyncTransactionManagerImpl(SessionImpl session, Span span, TransactionOption... options) {
this.session = session;
this.span = span;
this.options = Options.fromTransactionOptions(options);
}

@Override
@@ -82,7 +85,7 @@ public TransactionContextFutureImpl beginAsync() {

private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
txnState = TransactionState.STARTED;
txn = session.newTransaction();
txn = session.newTransaction(options);
if (firstAttempt) {
session.setActive(this);
}
@@ -18,6 +18,7 @@

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;

/**
* Interface for all the APIs that are used to read/write data into a Cloud Spanner database. An
@@ -308,7 +309,7 @@ CommitResponse writeAtLeastOnceWithOptions(
* });
* </code></pre>
*/
TransactionRunner readWriteTransaction();
TransactionRunner readWriteTransaction(TransactionOption... options);

/**
* Returns a transaction manager which allows manual management of transaction lifecycle. This API
@@ -338,7 +339,7 @@ CommitResponse writeAtLeastOnceWithOptions(
* }
* }</pre>
*/
TransactionManager transactionManager();
TransactionManager transactionManager(TransactionOption... options);

/**
* Returns an asynchronous transaction runner for executing a single logical transaction with
@@ -371,7 +372,7 @@ CommitResponse writeAtLeastOnceWithOptions(
* executor);
* </code></pre>
*/
AsyncRunner runAsync();
AsyncRunner runAsync(TransactionOption... options);

/**
* Returns an asynchronous transaction manager which allows manual management of transaction
@@ -459,7 +460,7 @@ CommitResponse writeAtLeastOnceWithOptions(
* }
* }</pre>
*/
AsyncTransactionManager transactionManagerAsync();
AsyncTransactionManager transactionManagerAsync(TransactionOption... options);

/**
* Returns the lower bound of rows modified by this DML statement.
@@ -508,5 +509,5 @@ CommitResponse writeAtLeastOnceWithOptions(
* <p>Given the above, Partitioned DML is good fit for large, database-wide, operations that are
* idempotent, such as deleting old rows from a very large table.
*/
long executePartitionedUpdate(Statement stmt);
long executePartitionedUpdate(Statement stmt, UpdateOption... options);
}
@@ -18,6 +18,7 @@

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.common.annotations.VisibleForTesting;
@@ -54,13 +55,20 @@ PooledSessionFuture getSession() {

@Override
public Timestamp write(final Iterable<Mutation> mutations) throws SpannerException {
return writeWithOptions(mutations).getCommitTimestamp();
}

@Override
public CommitResponse writeWithOptions(
final Iterable<Mutation> mutations, final TransactionOption... options)
throws SpannerException {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return runWithSessionRetry(
new Function<Session, Timestamp>() {
new Function<Session, CommitResponse>() {
@Override
public Timestamp apply(Session session) {
return session.write(mutations);
public CommitResponse apply(Session session) {
return session.writeWithOptions(mutations, options);
}
});
} catch (RuntimeException e) {
@@ -72,21 +80,21 @@ public Timestamp apply(Session session) {
}

@Override
public CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption... options)
throws SpannerException {
final Timestamp commitTimestamp = write(mutations);
return new CommitResponse(commitTimestamp);
public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws SpannerException {
return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp();
}

@Override
public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws SpannerException {
public CommitResponse writeAtLeastOnceWithOptions(
final Iterable<Mutation> mutations, final TransactionOption... options)
throws SpannerException {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return runWithSessionRetry(
new Function<Session, Timestamp>() {
new Function<Session, CommitResponse>() {
@Override
public Timestamp apply(Session session) {
return session.writeAtLeastOnce(mutations);
public CommitResponse apply(Session session) {
return session.writeAtLeastOnceWithOptions(mutations, options);
}
});
} catch (RuntimeException e) {
@@ -97,13 +105,6 @@ public Timestamp apply(Session session) {
}
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
final Timestamp commitTimestamp = writeAtLeastOnce(mutations);
return new CommitResponse(commitTimestamp);
}

@Override
public ReadContext singleUse() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
@@ -171,10 +172,10 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
}

@Override
public TransactionRunner readWriteTransaction() {
public TransactionRunner readWriteTransaction(TransactionOption... options) {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getSession().readWriteTransaction();
return getSession().readWriteTransaction(options);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
@@ -184,47 +185,47 @@ public TransactionRunner readWriteTransaction() {
}

@Override
public TransactionManager transactionManager() {
public TransactionManager transactionManager(TransactionOption... options) {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getSession().transactionManager();
return getSession().transactionManager(options);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

@Override
public AsyncRunner runAsync() {
public AsyncRunner runAsync(TransactionOption... options) {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getSession().runAsync();
return getSession().runAsync(options);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

@Override
public AsyncTransactionManager transactionManagerAsync() {
public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getSession().transactionManagerAsync();
return getSession().transactionManagerAsync(options);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

@Override
public long executePartitionedUpdate(final Statement stmt) {
public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) {
Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return runWithSessionRetry(
new Function<Session, Long>() {
@Override
public Long apply(Session session) {
return session.executePartitionedUpdate(stmt);
return session.executePartitionedUpdate(stmt, options);
}
});
} catch (RuntimeException e) {
@@ -30,12 +30,19 @@ public interface ReadAndQueryOption extends ReadOption, QueryOption {}
/** Marker interface to mark options applicable to read operation */
public interface ReadOption {}

/** Marker interface to mark options applicable to Read, Query, Update and Write operations */
public interface ReadQueryUpdateTransactionOption
extends ReadOption, QueryOption, UpdateOption, TransactionOption {}

/** Marker interface to mark options applicable to query operation. */
public interface QueryOption {}

/** Marker interface to mark options applicable to write operations */
public interface TransactionOption {}

/** Marker interface to mark options applicable to update operation. */
public interface UpdateOption {}

/** Marker interface to mark options applicable to list operations in admin API. */
public interface ListOption {}

@@ -287,6 +294,26 @@ static Options fromQueryOptions(QueryOption... options) {
return readOptions;
}

static Options fromUpdateOptions(UpdateOption... options) {
Options updateOptions = new Options();
for (UpdateOption option : options) {
if (option instanceof InternalOption) {
((InternalOption) option).appendToOptions(updateOptions);

Check warning on line 301 in google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java

Codecov / codecov/patch

google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java#L301

Added line #L301 was not covered by tests
}
}
return updateOptions;
}

static Options fromTransactionOptions(TransactionOption... options) {
Options transactionOptions = new Options();
for (TransactionOption option : options) {
if (option instanceof InternalOption) {
((InternalOption) option).appendToOptions(transactionOptions);

Check warning on line 311 in google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java

Codecov / codecov/patch

google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java#L311

Added line #L311 was not covered by tests
}
}
return transactionOptions;
}

static Options fromListOptions(ListOption... options) {
Options listOptions = new Options();
for (ListOption option : options) {

0 comments on commit 5c96fab

Please sign in to comment.