Skip to content
Permalink
Browse files
feat: inline begin transaction (#325)
* feat: inline begin tx with first statement

* feat: support inlining BeginTransaction

* fix: invalid dml statement can still return tx id

* bench: add benchmarks for inline begin

* feat: add inline begin for async runner

* test: add additional tests and ITs

* test: add tests for error during tx

* test: use statement with same error code on emulator

* test: skip test on emulator

* test: constraint error causes transaction to be invalidated

* fix: retry transaction if first statements fails and had BeginTransaction option

* fix: handle aborted exceptions

* test: add additional tests for corner cases

* feat: use single-use tx for idem-potent mutations

* fix: remove check for idempotent mutations

* chore: remove commented code

* feat!: remove session pool preparing (#515)

* feat: remove session pool preparing

* fix: fix integration tests

* test: fix malformed retry loop in test case

* fix: review comments

* chore: run formatter

* test: fix integration test that relied on data from other test case
  • Loading branch information
olavloite committed Oct 23, 2020
1 parent 659719d commit d08d3debb6457548bb6b04335b7a2d2227369211
Showing with 3,105 additions and 1,953 deletions.
  1. +11 −3 google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java
  2. +8 −5 google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java
  3. +19 −4 google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java
  4. +16 −35 google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java
  5. +13 −0 google-cloud-spanner/src/main/java/com/google/cloud/spanner/MetricRegistryConstants.java
  6. +65 −497 google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
  7. +24 −11 google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java
  8. +4 −2 google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java
  9. +257 −146 google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java
  10. +15 −16 google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerTest.java
  11. +26 −27 google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java
  12. +0 −70 google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchCreateSessionsTest.java
  13. +1 −144 google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java
  14. +4 −4 google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java
  15. +11 −11 google-cloud-spanner/src/test/java/com/google/cloud/spanner/ITSessionPoolIntegrationTest.java
  16. +264 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginBenchmark.java
  17. +1,155 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java
  18. +2 −16 ...le-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java
  19. +17 −12 google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java
  20. +2 −2 google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java
  21. +540 −0 ...cloud-spanner/src/test/java/com/google/cloud/spanner/ReadWriteTransactionWithInlineBeginTest.java
  22. +24 −52 google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java
  23. +5 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java
  24. +11 −16 google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java
  25. +20 −20 google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java
  26. +2 −19 google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java
  27. +230 −783 google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java
  28. +4 −26 google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpanTest.java
  29. +2 −2 google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerGaxRetryTest.java
  30. +8 −8 google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerAbortedTest.java
  31. +113 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java
  32. +70 −4 google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java
  33. +11 −11 google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITClosedSessionTest.java
  34. +2 −1 google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDMLTest.java
  35. +14 −4 google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionManagerAsyncTest.java
  36. +9 −0 google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionManagerTest.java
  37. +126 −2 google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java
@@ -633,7 +633,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
return stream;
}
};
return new GrpcResultSet(stream, this);
return new GrpcResultSet(
stream, this, request.hasTransaction() && request.getTransaction().hasBegin());
}

/**
@@ -672,14 +673,20 @@ public void close() {
}
}

/**
* Returns the {@link TransactionSelector} that should be used for a statement that is executed on
* this read context. This could be a reference to an existing transaction ID, or it could be a
* BeginTransaction option that should be included with the statement.
*/
@Nullable
abstract TransactionSelector getTransactionSelector();

/** This method is called when a statement returned a new transaction as part of its results. */
@Override
public void onTransactionMetadata(Transaction transaction) {}

@Override
public void onError(SpannerException e) {}
public void onError(SpannerException e, boolean withBeginTransaction) {}

@Override
public void onDone() {}
@@ -740,7 +747,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
return stream;
}
};
GrpcResultSet resultSet = new GrpcResultSet(stream, this);
GrpcResultSet resultSet =
new GrpcResultSet(stream, this, selector != null && selector.hasBegin());
return resultSet;
}

@@ -81,7 +81,7 @@ interface Listener {
void onTransactionMetadata(Transaction transaction) throws SpannerException;

/** Called when the read finishes with an error. */
void onError(SpannerException e);
void onError(SpannerException e, boolean withBeginTransaction);

/** Called when the read finishes normally. */
void onDone();
@@ -91,14 +91,17 @@ interface Listener {
static class GrpcResultSet extends AbstractResultSet<List<Object>> {
private final GrpcValueIterator iterator;
private final Listener listener;
private final boolean beginTransaction;
private GrpcStruct currRow;
private SpannerException error;
private ResultSetStats statistics;
private boolean closed;

GrpcResultSet(CloseableIterator<PartialResultSet> iterator, Listener listener) {
GrpcResultSet(
CloseableIterator<PartialResultSet> iterator, Listener listener, boolean beginTransaction) {
this.iterator = new GrpcValueIterator(iterator);
this.listener = listener;
this.beginTransaction = beginTransaction;
}

@Override
@@ -127,7 +130,7 @@ public boolean next() throws SpannerException {
}
return hasNext;
} catch (SpannerException e) {
throw yieldError(e);
throw yieldError(e, beginTransaction && currRow == null);
}
}

@@ -149,9 +152,9 @@ public Type getType() {
return currRow.getType();
}

private SpannerException yieldError(SpannerException e) {
private SpannerException yieldError(SpannerException e, boolean beginTransaction) {
close();
listener.onError(e);
listener.onError(e, beginTransaction);
throw e;
}
}
@@ -16,6 +16,7 @@

package com.google.cloud.spanner;

import com.google.api.core.ApiAsyncFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
@@ -27,6 +28,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Empty;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
@@ -76,14 +78,19 @@ public TransactionContextFutureImpl beginAsync() {
return begin;
}

private ApiFuture<TransactionContext> internalBeginAsync(boolean setActive) {
private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
txnState = TransactionState.STARTED;
txn = session.newTransaction();
if (setActive) {
if (firstAttempt) {
session.setActive(this);
}
final SettableApiFuture<TransactionContext> res = SettableApiFuture.create();
final ApiFuture<Void> fut = txn.ensureTxnAsync();
final ApiFuture<Void> fut;
if (firstAttempt) {
fut = ApiFutures.immediateFuture(null);
} else {
fut = txn.ensureTxnAsync();
}
ApiFutures.addCallback(
fut,
new ApiFutureCallback<Void>() {
@@ -149,7 +156,15 @@ public ApiFuture<Void> rollbackAsync() {
txnState == TransactionState.STARTED,
"rollback can only be called if the transaction is in progress");
try {
return txn.rollbackAsync();
return ApiFutures.transformAsync(
txn.rollbackAsync(),
new ApiAsyncFunction<Empty, Void>() {
@Override
public ApiFuture<Void> apply(Empty input) throws Exception {
return ApiFutures.immediateFuture(null);
}
},
MoreExecutors.directExecutor());
} finally {
txnState = TransactionState.ROLLED_BACK;
}
@@ -34,11 +34,6 @@ class DatabaseClientImpl implements DatabaseClient {
private static final String PARTITION_DML_TRANSACTION = "CloudSpanner.PartitionDMLTransaction";
private static final Tracer tracer = Tracing.getTracer();

private enum SessionMode {
READ,
READ_WRITE
}

@VisibleForTesting final String clientId;
@VisibleForTesting final SessionPool pool;

@@ -53,21 +48,15 @@ private enum SessionMode {
}

@VisibleForTesting
PooledSessionFuture getReadSession() {
return pool.getReadSession();
}

@VisibleForTesting
PooledSessionFuture getReadWriteSession() {
return pool.getReadWriteSession();
PooledSessionFuture getSession() {
return pool.getSession();
}

@Override
public Timestamp write(final Iterable<Mutation> mutations) throws SpannerException {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return runWithSessionRetry(
SessionMode.READ_WRITE,
new Function<Session, Timestamp>() {
@Override
public Timestamp apply(Session session) {
@@ -94,7 +83,6 @@ public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws Spa
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return runWithSessionRetry(
SessionMode.READ_WRITE,
new Function<Session, Timestamp>() {
@Override
public Timestamp apply(Session session) {
@@ -120,7 +108,7 @@ public CommitResponse writeAtLeastOnceWithOptions(
public ReadContext singleUse() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().singleUse();
return getSession().singleUse();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
@@ -131,7 +119,7 @@ public ReadContext singleUse() {
public ReadContext singleUse(TimestampBound bound) {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().singleUse(bound);
return getSession().singleUse(bound);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
@@ -142,7 +130,7 @@ public ReadContext singleUse(TimestampBound bound) {
public ReadOnlyTransaction singleUseReadOnlyTransaction() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().singleUseReadOnlyTransaction();
return getSession().singleUseReadOnlyTransaction();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
@@ -153,7 +141,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() {
public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().singleUseReadOnlyTransaction(bound);
return getSession().singleUseReadOnlyTransaction(bound);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
@@ -164,7 +152,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
public ReadOnlyTransaction readOnlyTransaction() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().readOnlyTransaction();
return getSession().readOnlyTransaction();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
@@ -175,7 +163,7 @@ public ReadOnlyTransaction readOnlyTransaction() {
public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().readOnlyTransaction(bound);
return getSession().readOnlyTransaction(bound);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
@@ -186,9 +174,9 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
public TransactionRunner readWriteTransaction() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().readWriteTransaction();
return getSession().readWriteTransaction();
} catch (RuntimeException e) {
TraceUtil.setWithFailure(span, e);
TraceUtil.endSpanWithFailure(span, e);
throw e;
} finally {
span.end(TraceUtil.END_SPAN_OPTIONS);
@@ -199,7 +187,7 @@ public TransactionRunner readWriteTransaction() {
public TransactionManager transactionManager() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().transactionManager();
return getSession().transactionManager();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
@@ -210,7 +198,7 @@ public TransactionManager transactionManager() {
public AsyncRunner runAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().runAsync();
return getSession().runAsync();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
@@ -221,7 +209,7 @@ public AsyncRunner runAsync() {
public AsyncTransactionManager transactionManagerAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().transactionManagerAsync();
return getSession().transactionManagerAsync();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
@@ -232,10 +220,7 @@ public AsyncTransactionManager transactionManagerAsync() {
public long executePartitionedUpdate(final Statement stmt) {
Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
// A partitioned update transaction does not need a prepared write session, as the transaction
// object will start a new transaction with specific options anyway.
return runWithSessionRetry(
SessionMode.READ,
new Function<Session, Long>() {
@Override
public Long apply(Session session) {
@@ -248,17 +233,13 @@ public Long apply(Session session) {
}
}

private <T> T runWithSessionRetry(SessionMode mode, Function<Session, T> callable) {
PooledSessionFuture session =
mode == SessionMode.READ_WRITE ? getReadWriteSession() : getReadSession();
private <T> T runWithSessionRetry(Function<Session, T> callable) {
PooledSessionFuture session = getSession();
while (true) {
try {
return callable.apply(session);
} catch (SessionNotFoundException e) {
session =
mode == SessionMode.READ_WRITE
? pool.replaceReadWriteSession(e, session)
: pool.replaceReadSession(e, session);
session = pool.replaceSession(e, session);
}
}
}
@@ -36,9 +36,22 @@ class MetricRegistryConstants {
private static final LabelValue UNSET_LABEL = LabelValue.create(null);

static final LabelValue NUM_IN_USE_SESSIONS = LabelValue.create("num_in_use_sessions");

/**
* The session pool no longer prepares a fraction of the sessions with a read/write transaction.
* This metric will therefore always be zero and may be removed in the future.
*/
@Deprecated
static final LabelValue NUM_SESSIONS_BEING_PREPARED =
LabelValue.create("num_sessions_being_prepared");

static final LabelValue NUM_READ_SESSIONS = LabelValue.create("num_read_sessions");

/**
* The session pool no longer prepares a fraction of the sessions with a read/write transaction.
* This metric will therefore always be zero and may be removed in the future.
*/
@Deprecated
static final LabelValue NUM_WRITE_SESSIONS = LabelValue.create("num_write_prepared_sessions");

static final ImmutableList<LabelKey> SPANNER_LABEL_KEYS =

0 comments on commit d08d3de

Please sign in to comment.