Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for transaction-level exclusion from change streams #2959

Merged
merged 7 commits into from
Mar 28, 2024
7 changes: 7 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -631,4 +631,11 @@
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setDirectedRead(com.google.spanner.v1.DirectedReadOptions)</method>
</difference>

<difference>
dengwe1 marked this conversation as resolved.
Show resolved Hide resolved
<differenceType>7005</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>long executePartitionedUpdate(com.google.cloud.spanner.Statement, com.google.cloud.spanner.Options$UpdateOption[])</method>
<to>long executePartitionedUpdate(com.google.cloud.spanner.Statement, com.google.cloud.spanner.Options$PartitionedUpdateOption[])</to>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.PartitionedUpdateOption;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
Expand Down Expand Up @@ -600,5 +601,5 @@ ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
* <p>Given the above, Partitioned DML is good fit for large, database-wide, operations that are
* idempotent, such as deleting old rows from a very large table.
*/
long executePartitionedUpdate(Statement stmt, UpdateOption... options);
long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.PartitionedUpdateOption;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -240,7 +240,8 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti
}

@Override
public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) {
public long executePartitionedUpdate(
final Statement stmt, final PartitionedUpdateOption... options) {
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION);
try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,15 @@ public interface QueryOption {}
/** Marker interface to mark options applicable to write operations */
public interface TransactionOption {}

/** Marker interface to mark options applicable to partitioned update */
public interface PartitionedUpdateOption {}

/** Marker interface to mark options applicable to update operation. */
public interface UpdateOption {}
public interface UpdateOption extends PartitionedUpdateOption {}

/** Marker interface to mark options applicable to partitioned update and write operations */
public interface PartitionedUpdateTransactionOption
extends PartitionedUpdateOption, TransactionOption {}

/** Marker interface to mark options applicable to list operations in admin API. */
public interface ListOption {}
Expand All @@ -108,6 +115,17 @@ public static TransactionOption commitStats() {
public static TransactionOption optimisticLock() {
return OPTIMISTIC_LOCK_OPTION;
}

/**
* Specifying this instructs the transaction to be excluded from being recorded in change streams
* with the DDL option `allow_txn_exclusion=true`. This does not exclude the transaction from
* being recorded in the change streams with the DDL option `allow_txn_exclusion` being false or
* unset.
*/
public static PartitionedUpdateTransactionOption excludeTxnFromChangeStreams() {
return EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION;
}

/**
* Specifying this will cause the read to yield at most this many rows. This should be greater
* than 0.
Expand Down Expand Up @@ -281,6 +299,18 @@ void appendToOptions(Options options) {

static final OptimisticLockOption OPTIMISTIC_LOCK_OPTION = new OptimisticLockOption();

/** Option to request the transaction to be excluded from change streams. */
static final class ExcludeTxnFromChangeStreamsOption extends InternalOption
implements PartitionedUpdateTransactionOption {
@Override
void appendToOptions(Options options) {
options.withExcludeTxnFromChangeStreams = true;
}
}

static final ExcludeTxnFromChangeStreamsOption EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION =
new ExcludeTxnFromChangeStreamsOption();

/** Option pertaining to flow control. */
static final class FlowControlOption extends InternalOption implements ReadAndQueryOption {
final int prefetchChunks;
Expand Down Expand Up @@ -405,6 +435,7 @@ void appendToOptions(Options options) {
private String etag;
private Boolean validateOnly;
private Boolean withOptimisticLock;
private Boolean withExcludeTxnFromChangeStreams;
private Boolean dataBoostEnabled;
private DirectedReadOptions directedReadOptions;
private DecodeMode decodeMode;
Expand Down Expand Up @@ -508,6 +539,10 @@ Boolean withOptimisticLock() {
return withOptimisticLock;
}

Boolean withExcludeTxnFromChangeStreams() {
return withExcludeTxnFromChangeStreams;
}

boolean hasDataBoostEnabled() {
return dataBoostEnabled != null;
}
Expand Down Expand Up @@ -571,6 +606,11 @@ public String toString() {
if (withOptimisticLock != null) {
b.append("withOptimisticLock: ").append(withOptimisticLock).append(' ');
}
if (withExcludeTxnFromChangeStreams != null) {
b.append("withExcludeTxnFromChangeStreams: ")
.append(withExcludeTxnFromChangeStreams)
.append(' ');
}
if (dataBoostEnabled != null) {
b.append("dataBoostEnabled: ").append(dataBoostEnabled).append(' ');
}
Expand Down Expand Up @@ -616,6 +656,7 @@ public boolean equals(Object o) {
&& Objects.equals(etag(), that.etag())
&& Objects.equals(validateOnly(), that.validateOnly())
&& Objects.equals(withOptimisticLock(), that.withOptimisticLock())
&& Objects.equals(withExcludeTxnFromChangeStreams(), that.withExcludeTxnFromChangeStreams())
&& Objects.equals(dataBoostEnabled(), that.dataBoostEnabled())
&& Objects.equals(directedReadOptions(), that.directedReadOptions());
}
Expand Down Expand Up @@ -662,6 +703,9 @@ public int hashCode() {
if (withOptimisticLock != null) {
result = 31 * result + withOptimisticLock.hashCode();
}
if (withExcludeTxnFromChangeStreams != null) {
result = 31 * result + withExcludeTxnFromChangeStreams.hashCode();
}
if (dataBoostEnabled != null) {
result = 31 * result + dataBoostEnabled.hashCode();
}
Expand Down Expand Up @@ -704,6 +748,16 @@ static Options fromUpdateOptions(UpdateOption... options) {
return updateOptions;
}

static Options fromPartitinoedUpdateOptions(PartitionedUpdateOption... options) {
Options partitionedUpdateOptions = new Options();
for (PartitionedUpdateOption option : options) {
if (option instanceof InternalOption) {
((InternalOption) option).appendToOptions(partitionedUpdateOptions);
}
}
return partitionedUpdateOptions;
}

static Options fromTransactionOptions(TransactionOption... options) {
Options transactionOptions = new Options();
for (TransactionOption option : options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.google.api.gax.rpc.InternalException;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.UnavailableException;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.Options.PartitionedUpdateOption;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
Expand Down Expand Up @@ -71,15 +71,17 @@ public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction
* last seen resume token if the server returns any.
*/
long executeStreamingPartitionedUpdate(
final Statement statement, final Duration timeout, final UpdateOption... updateOptions) {
final Statement statement,
final Duration timeout,
final PartitionedUpdateOption... partitionedUpdateOptions) {
checkState(isValid, "Partitioned DML has been invalidated by a new operation on the session");
LOGGER.log(Level.FINER, "Starting PartitionedUpdate statement");

ByteString resumeToken = ByteString.EMPTY;
boolean foundStats = false;
long updateCount = 0L;
Stopwatch stopwatch = Stopwatch.createStarted(ticker);
Options options = Options.fromUpdateOptions(updateOptions);
Options options = Options.fromPartitinoedUpdateOptions(partitionedUpdateOptions);

try {
ExecuteSqlRequest request = newTransactionRequestFrom(statement, options);
Expand Down Expand Up @@ -167,7 +169,7 @@ private ExecuteSqlRequest resumeOrRestartRequest(

@VisibleForTesting
ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Options options) {
ByteString transactionId = initTransaction();
ByteString transactionId = initTransaction(options);

final TransactionSelector transactionSelector =
TransactionSelector.newBuilder().setId(transactionId).build();
Expand Down Expand Up @@ -195,13 +197,15 @@ ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Opt
return builder.build();
}

private ByteString initTransaction() {
private ByteString initTransaction(final Options options) {
final BeginTransactionRequest request =
BeginTransactionRequest.newBuilder()
.setSession(session.getName())
.setOptions(
TransactionOptions.newBuilder()
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance()))
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance())
.setExcludeTxnFromChangeStreams(
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE))
.build();
Transaction tx = rpc.beginTransaction(request, session.getOptions(), true);
if (tx.getId().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction;
import com.google.cloud.spanner.AbstractReadContext.SingleReadContext;
import com.google.cloud.spanner.AbstractReadContext.SingleUseReadOnlyTransaction;
import com.google.cloud.spanner.Options.PartitionedUpdateOption;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.SessionClient.SessionId;
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
Expand Down Expand Up @@ -69,11 +69,16 @@ static void throwIfTransactionsPending() {
}

static TransactionOptions createReadWriteTransactionOptions(Options options) {
TransactionOptions.Builder transactionOptions = TransactionOptions.newBuilder();
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
transactionOptions.setExcludeTxnFromChangeStreams(true);
}
TransactionOptions.ReadWrite.Builder readWrite = TransactionOptions.ReadWrite.newBuilder();
if (options.withOptimisticLock() == Boolean.TRUE) {
readWrite.setReadLockMode(TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC);
}
return TransactionOptions.newBuilder().setReadWrite(readWrite).build();
transactionOptions.setReadWrite(readWrite);
return transactionOptions.build();
}

/**
Expand Down Expand Up @@ -135,7 +140,7 @@ void markUsed(Instant instant) {
}

@Override
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
public long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options) {
setActive(null);
PartitionedDmlTransaction txn =
new PartitionedDmlTransaction(this, spanner.getRpc(), Ticker.systemTicker());
Expand Down Expand Up @@ -181,10 +186,16 @@ public CommitResponse writeAtLeastOnceWithOptions(
CommitRequest.newBuilder()
.setSession(name)
.setReturnCommitStats(options.withCommitStats())
.addAllMutations(mutationsProto)
.setSingleUseTransaction(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
.addAllMutations(mutationsProto);

TransactionOptions.Builder transactionOptionsBuilder =
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance());
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
transactionOptionsBuilder.setExcludeTxnFromChangeStreams(true);
}
requestBuilder.setSingleUseTransaction(transactionOptionsBuilder);

if (options.hasMaxCommitDelay()) {
requestBuilder.setMaxCommitDelay(
Duration.newBuilder()
Expand Down Expand Up @@ -238,6 +249,10 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
if (batchWriteRequestOptions != null) {
requestBuilder.setRequestOptions(batchWriteRequestOptions);
}
if (Options.fromTransactionOptions(transactionOptions).withExcludeTxnFromChangeStreams()
== Boolean.TRUE) {
requestBuilder.setExcludeTxnFromChangeStreams(true);
}
ISpan span = tracer.spanBuilder(SpannerImpl.BATCH_WRITE);
try (IScope s = tracer.withSpan(span)) {
return spanner.getRpc().batchWriteAtLeastOnce(requestBuilder.build(), this.options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.google.cloud.Tuple;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.Options.PartitionedUpdateOption;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.ReadOption;
import com.google.cloud.spanner.Options.TransactionOption;
Expand Down Expand Up @@ -1270,7 +1271,7 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti
}

@Override
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
public long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options) {
try {
return get(true).executePartitionedUpdate(stmt, options);
} finally {
Expand Down Expand Up @@ -1470,7 +1471,7 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
}

@Override
public long executePartitionedUpdate(Statement stmt, UpdateOption... options)
public long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options)
throws SpannerException {
try {
markUsed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,9 @@ public void run() {
if (transactionId == null && transactionIdFuture == null) {
requestBuilder.setSingleUseTransaction(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())
.setExcludeTxnFromChangeStreams(
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE));
} else {
requestBuilder.setTransactionId(
transactionId == null
Expand Down