Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ReadRequest;
import com.google.spanner.v1.RequestOptions;
Expand Down Expand Up @@ -893,6 +894,13 @@ public void onDone(boolean withBeginTransaction) {
this.session.onReadDone();
}

/**
* For transactions other than read-write, the MultiplexedSessionPrecommitToken will not be
* present in the RPC response. In such cases, this method will be a no-op.
*/
@Override
public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {}

private ResultSet readInternal(
String table,
@Nullable String index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.protobuf.ListValue;
import com.google.protobuf.ProtocolMessageEnum;
import com.google.protobuf.Value.KindCase;
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.Transaction;
import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -57,6 +58,12 @@ void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId)

/** Called when the read finishes normally. */
void onDone(boolean withBeginTransaction);

/**
* Called when the RPC response contains a MultiplexedSessionPrecommitToken. A precommit token
* will be included if the read-write transaction is executed on a multiplexed session.
*/
void onPrecommitToken(MultiplexedSessionPrecommitToken token);
}

static final class LazyByteArray implements Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class GrpcResultSet extends AbstractResultSet<List<Object>> implements ProtobufR

GrpcResultSet(
CloseableIterator<PartialResultSet> iterator, Listener listener, DecodeMode decodeMode) {
this.iterator = new GrpcValueIterator(iterator);
this.iterator = new GrpcValueIterator(iterator, listener);
this.listener = listener;
this.decodeMode = decodeMode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkState;

import com.google.cloud.spanner.AbstractResultSet.CloseableIterator;
import com.google.cloud.spanner.AbstractResultSet.Listener;
import com.google.common.collect.AbstractIterator;
import com.google.protobuf.ListValue;
import com.google.protobuf.Value.KindCase;
Expand All @@ -44,9 +45,11 @@ private enum StreamValue {
private PartialResultSet current;
private int pos;
private ResultSetStats statistics;
private final Listener listener;

GrpcValueIterator(CloseableIterator<PartialResultSet> stream) {
GrpcValueIterator(CloseableIterator<PartialResultSet> stream, Listener listener) {
this.stream = stream;
this.listener = listener;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -154,6 +157,10 @@ private boolean ensureReady(StreamValue requiredValue) throws SpannerException {
ErrorCode.INTERNAL, "Invalid type metadata: " + e.getMessage(), e);
}
}
// collect the precommit token from each PartialResultSet
if (current.hasPrecommitToken()) {
listener.onPrecommitToken(current.getPrecommitToken());
}
if (current.hasStats()) {
statistics = current.getStats();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.spanner.v1.ExecuteBatchDmlResponse;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.ResultSet;
import com.google.spanner.v1.ResultSetStats;
Expand Down Expand Up @@ -179,6 +180,11 @@ public void removeListener(Runnable listener) {
@GuardedBy("committingLock")
private volatile boolean committing;

private final Object precommitTokenLock = new Object();

@GuardedBy("precommitTokenLock")
private MultiplexedSessionPrecommitToken latestPrecommitToken;

@GuardedBy("lock")
private volatile SettableApiFuture<Void> finishedAsyncOperations = SettableApiFuture.create();

Expand Down Expand Up @@ -439,6 +445,10 @@ public void run() {
}
requestBuilder.setRequestOptions(requestOptionsBuilder.build());
}
if (session.getIsMultiplexed() && getLatestPrecommitToken() != null) {
// Set the precommit token in the CommitRequest for multiplexed sessions.
requestBuilder.setPrecommitToken(getLatestPrecommitToken());
}
final CommitRequest commitRequest = requestBuilder.build();
span.addAnnotation("Starting Commit");
final ApiFuture<com.google.spanner.v1.CommitResponse> commitFuture;
Expand Down Expand Up @@ -643,6 +653,25 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
}
}

/**
* In read-write transactions, the precommit token with the highest sequence number from this
* transaction attempt will be tracked and included in the
* [Commit][google.spanner.v1.Spanner.Commit] request for the transaction.
*/
@Override
public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {
if (token == null) {
return;
}
synchronized (precommitTokenLock) {
if (this.latestPrecommitToken == null
|| token.getSeqNum() > this.latestPrecommitToken.getSeqNum()) {
this.latestPrecommitToken = token;
txnLogger.log(Level.FINE, "Updating precommit token to " + this.latestPrecommitToken);
}
}
}

@Nullable
String getTransactionTag() {
if (this.options.hasTag()) {
Expand All @@ -651,6 +680,13 @@ String getTransactionTag() {
return null;
}

@Nullable
MultiplexedSessionPrecommitToken getLatestPrecommitToken() {
synchronized (precommitTokenLock) {
return this.latestPrecommitToken;
}
}

@Override
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
e = super.onError(e, withBeginTransaction);
Expand Down Expand Up @@ -829,6 +865,9 @@ private ResultSet internalExecuteUpdate(
throw new IllegalArgumentException(
"DML response missing stats possibly due to non-DML statement as input");
}
if (resultSet.hasPrecommitToken()) {
onPrecommitToken(resultSet.getPrecommitToken());
}
return resultSet;
} catch (Throwable t) {
throw onError(
Expand Down Expand Up @@ -903,6 +942,9 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... u
resultSet.get().getMetadata().getTransaction(),
builder.getTransaction().hasBegin());
}
if (resultSet.get().hasPrecommitToken()) {
onPrecommitToken(resultSet.get().getPrecommitToken());
}
} catch (Throwable e) {
// Ignore this error here as it is handled by the future that is returned by the
// executeUpdateAsync method.
Expand Down Expand Up @@ -958,6 +1000,10 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... update
}
}

if (response.hasPrecommitToken()) {
onPrecommitToken(response.getPrecommitToken());
}

// If one of the DML statements was aborted, we should throw an aborted exception.
// In all other cases, we should throw a BatchUpdateException.
if (response.getStatus().getCode() == Code.ABORTED_VALUE) {
Expand Down Expand Up @@ -1022,6 +1068,9 @@ public ApiFuture<long[]> batchUpdateAsync(
builder.getTransaction().hasBegin());
}
}
if (batchDmlResponse.hasPrecommitToken()) {
onPrecommitToken(batchDmlResponse.getPrecommitToken());
}
// If one of the DML statements was aborted, we should throw an aborted exception.
// In all other cases, we should throw a BatchUpdateException.
if (batchDmlResponse.getStatus().getCode() == Code.ABORTED_VALUE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.QueryPlan;
import com.google.spanner.v1.ResultSetMetadata;
Expand Down Expand Up @@ -77,6 +78,9 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction

@Override
public void onDone(boolean withBeginTransaction) {}

@Override
public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {}
}

@Before
Expand Down
Loading
Loading