Skip to content
Permalink
Browse files
fix: mark transaction as invalid if no tx is returned before RS is cl…
…osed (#791)

If a query requests the begin of a new transaction, the transaction id is returned by the first call to ResultSet#next(). If the ResultSet is closed by another thread before the first result has been returned, or before that result has been consumed internally to set the transaction id, no transaction id will be set. This will cause any subsequent statement in the same transaction to timeout while waiting for a transaction to be returned.
  • Loading branch information
olavloite committed Jan 12, 2021
1 parent 3f28c46 commit e02e5a7d95c0e92d9f13640dd2afe5b899f4e56d
@@ -691,7 +691,7 @@ public void onTransactionMetadata(Transaction transaction) {}
public void onError(SpannerException e, boolean withBeginTransaction) {}

@Override
public void onDone() {}
public void onDone(boolean withBeginTransaction) {}

private ResultSet readInternal(
String table,
@@ -84,7 +84,7 @@ interface Listener {
void onError(SpannerException e, boolean withBeginTransaction);

/** Called when the read finishes normally. */
void onDone();
void onDone(boolean withBeginTransaction);
}

@VisibleForTesting
@@ -118,6 +118,11 @@ public boolean next() throws SpannerException {
ResultSetMetadata metadata = iterator.getMetadata();
if (metadata.hasTransaction()) {
listener.onTransactionMetadata(metadata.getTransaction());
} else if (iterator.isWithBeginTransaction()) {
// The query should have returned a transaction.
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION,
"Query requested a transaction to be started, but no transaction was returned");
}
currRow = new GrpcStruct(iterator.type(), new ArrayList<>());
}
@@ -126,8 +131,10 @@ public boolean next() throws SpannerException {
statistics = iterator.getStats();
}
return hasNext;
} catch (SpannerException e) {
throw yieldError(e, iterator.isWithBeginTransaction() && currRow == null);
} catch (Throwable t) {
throw yieldError(
SpannerExceptionFactory.asSpannerException(t),
iterator.isWithBeginTransaction() && currRow == null);
}
}

@@ -139,6 +146,7 @@ public ResultSetStats getStats() {

@Override
public void close() {
listener.onDone(iterator.isWithBeginTransaction());
iterator.close("ResultSet closed");
closed = true;
}
@@ -150,8 +158,8 @@ public Type getType() {
}

private SpannerException yieldError(SpannerException e, boolean beginTransaction) {
close();
listener.onError(e, beginTransaction);
close();
throw e;
}
}
@@ -539,6 +539,19 @@ public void onError(SpannerException e, boolean withBeginTransaction) {
}
}

@Override
public void onDone(boolean withBeginTransaction) {
if (withBeginTransaction
&& transactionIdFuture != null
&& !this.transactionIdFuture.isDone()) {
// Context was done (closed) before a transaction id was returned.
this.transactionIdFuture.setException(
SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION,
"ResultSet was closed before a transaction id was returned"));
}
}

@Override
public void buffer(Mutation mutation) {
synchronized (lock) {
@@ -62,7 +62,7 @@ public void onTransactionMetadata(Transaction transaction) throws SpannerExcepti
public void onError(SpannerException e, boolean withBeginTransaction) {}

@Override
public void onDone() {}
public void onDone(boolean withBeginTransaction) {}
}

@Before
@@ -16,7 +16,6 @@

package com.google.cloud.spanner;

import static com.google.cloud.spanner.MockSpannerTestUtil.SELECT1;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.fail;

@@ -37,6 +36,7 @@
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
@@ -204,6 +204,7 @@ public void setUp() throws IOException {
public void tearDown() throws Exception {
spanner.close();
mockSpanner.reset();
mockSpanner.clearRequests();
}

@Test
@@ -1348,6 +1349,69 @@ public Void run(TransactionContext transaction) throws Exception {
assertThat(countRequests(CommitRequest.class)).isEqualTo(0);
}

@Test
public void testCloseResultSetWhileRequestInFlight() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
final ExecutorService service = Executors.newSingleThreadExecutor();
try {
client
.readWriteTransaction()
.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws Exception {
final ResultSet rs = transaction.executeQuery(SELECT1);
// Prevent the server from executing the query.
mockSpanner.freeze();
service.submit(
new Runnable() {
@Override
public void run() {
// This call will be stuck on the server until the mock server is
// unfrozen.
rs.next();
}
});

// Close the result set while the request is in flight.
mockSpanner.waitForRequestsToContain(
new Predicate<AbstractMessage>() {
@Override
public boolean apply(AbstractMessage input) {
return input instanceof ExecuteSqlRequest
&& ((ExecuteSqlRequest) input).getTransaction().hasBegin();
}
},
100L);
rs.close();
// The next statement should now fail before it is sent to the server because the
// first statement failed to return a transaction while the result set was still
// open.
mockSpanner.unfreeze();
try {
transaction.executeUpdate(UPDATE_STATEMENT);
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.FAILED_PRECONDITION);
assertThat(e.getMessage())
.contains("ResultSet was closed before a transaction id was returned");
}
return null;
}
});
fail("missing expected exception");
} catch (SpannerException e) {
// The commit request will also fail, which means that the entire transaction will fail.
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.FAILED_PRECONDITION);
assertThat(e.getMessage())
.contains("ResultSet was closed before a transaction id was returned");
}
service.shutdown();
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(1);
assertThat(countRequests(CommitRequest.class)).isEqualTo(0);
}

private int countRequests(Class<? extends AbstractMessage> requestType) {
int count = 0;
for (AbstractMessage msg : mockSpanner.getRequests()) {
@@ -50,7 +50,7 @@ public void onTransactionMetadata(Transaction transaction) throws SpannerExcepti
public void onError(SpannerException e, boolean withBeginTransaction) {}

@Override
public void onDone() {}
public void onDone(boolean withBeginTransaction) {}
}

public ReadFormatTestRunner(Class<?> clazz) throws InitializationError {

0 comments on commit e02e5a7

Please sign in to comment.