Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for transaction-level exclusion from change streams #2959

Merged
merged 7 commits into from
Mar 28, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public interface ReadOption {}
public interface ReadQueryUpdateTransactionOption
extends ReadOption, QueryOption, UpdateOption, TransactionOption {}

/** Marker interface to mark options applicable to Update and Write operations */
public interface UpdateTransactionOption extends UpdateOption, TransactionOption {}

/**
* Marker interface to mark options applicable to Create, Update and Delete operations in admin
* API.
Expand Down Expand Up @@ -108,6 +111,17 @@ public static TransactionOption commitStats() {
public static TransactionOption optimisticLock() {
return OPTIMISTIC_LOCK_OPTION;
}

/**
* Specifying this instructs the transaction to be excluded from being recorded in change streams
* with the DDL option `allow_txn_exclusion=true`. This does not exclude the transaction from
* being recorded in the change streams with the DDL option `allow_txn_exclusion` being false or
* unset.
*/
public static UpdateTransactionOption excludeTxnFromChangeStreams() {
return EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION;
}

/**
* Specifying this will cause the read to yield at most this many rows. This should be greater
* than 0.
Expand Down Expand Up @@ -281,6 +295,18 @@ void appendToOptions(Options options) {

static final OptimisticLockOption OPTIMISTIC_LOCK_OPTION = new OptimisticLockOption();

/** Option to request the transaction to be excluded from change streams. */
static final class ExcludeTxnFromChangeStreamsOption extends InternalOption
implements UpdateTransactionOption {
@Override
void appendToOptions(Options options) {
options.withExcludeTxnFromChangeStreams = true;
}
}

static final ExcludeTxnFromChangeStreamsOption EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION =
new ExcludeTxnFromChangeStreamsOption();

/** Option pertaining to flow control. */
static final class FlowControlOption extends InternalOption implements ReadAndQueryOption {
final int prefetchChunks;
Expand Down Expand Up @@ -405,6 +431,7 @@ void appendToOptions(Options options) {
private String etag;
private Boolean validateOnly;
private Boolean withOptimisticLock;
private Boolean withExcludeTxnFromChangeStreams;
private Boolean dataBoostEnabled;
private DirectedReadOptions directedReadOptions;
private DecodeMode decodeMode;
Expand Down Expand Up @@ -508,6 +535,10 @@ Boolean withOptimisticLock() {
return withOptimisticLock;
}

Boolean withExcludeTxnFromChangeStreams() {
return withExcludeTxnFromChangeStreams;
}

boolean hasDataBoostEnabled() {
return dataBoostEnabled != null;
}
Expand Down Expand Up @@ -571,6 +602,11 @@ public String toString() {
if (withOptimisticLock != null) {
b.append("withOptimisticLock: ").append(withOptimisticLock).append(' ');
}
if (withExcludeTxnFromChangeStreams != null) {
b.append("withExcludeTxnFromChangeStreams: ")
.append(withExcludeTxnFromChangeStreams)
.append(' ');
}
if (dataBoostEnabled != null) {
b.append("dataBoostEnabled: ").append(dataBoostEnabled).append(' ');
}
Expand Down Expand Up @@ -616,6 +652,7 @@ public boolean equals(Object o) {
&& Objects.equals(etag(), that.etag())
&& Objects.equals(validateOnly(), that.validateOnly())
&& Objects.equals(withOptimisticLock(), that.withOptimisticLock())
&& Objects.equals(withExcludeTxnFromChangeStreams(), that.withExcludeTxnFromChangeStreams())
&& Objects.equals(dataBoostEnabled(), that.dataBoostEnabled())
&& Objects.equals(directedReadOptions(), that.directedReadOptions());
}
Expand Down Expand Up @@ -662,6 +699,9 @@ public int hashCode() {
if (withOptimisticLock != null) {
result = 31 * result + withOptimisticLock.hashCode();
}
if (withExcludeTxnFromChangeStreams != null) {
result = 31 * result + withExcludeTxnFromChangeStreams.hashCode();
}
if (dataBoostEnabled != null) {
result = 31 * result + dataBoostEnabled.hashCode();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ private ExecuteSqlRequest resumeOrRestartRequest(

@VisibleForTesting
ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Options options) {
ByteString transactionId = initTransaction();
ByteString transactionId = initTransaction(options);

final TransactionSelector transactionSelector =
TransactionSelector.newBuilder().setId(transactionId).build();
Expand Down Expand Up @@ -195,13 +195,15 @@ ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Opt
return builder.build();
}

private ByteString initTransaction() {
private ByteString initTransaction(final Options options) {
final BeginTransactionRequest request =
BeginTransactionRequest.newBuilder()
.setSession(session.getName())
.setOptions(
TransactionOptions.newBuilder()
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance()))
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance())
.setExcludeTxnFromChangeStreams(
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE))
.build();
Transaction tx = rpc.beginTransaction(request, session.getOptions(), true);
if (tx.getId().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,16 @@ static void throwIfTransactionsPending() {
}

static TransactionOptions createReadWriteTransactionOptions(Options options) {
TransactionOptions.Builder transactionOptions = TransactionOptions.newBuilder();
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
transactionOptions.setExcludeTxnFromChangeStreams(true);
}
TransactionOptions.ReadWrite.Builder readWrite = TransactionOptions.ReadWrite.newBuilder();
if (options.withOptimisticLock() == Boolean.TRUE) {
readWrite.setReadLockMode(TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC);
}
return TransactionOptions.newBuilder().setReadWrite(readWrite).build();
transactionOptions.setReadWrite(readWrite);
return transactionOptions.build();
}

/**
Expand Down Expand Up @@ -181,10 +186,16 @@ public CommitResponse writeAtLeastOnceWithOptions(
CommitRequest.newBuilder()
.setSession(name)
.setReturnCommitStats(options.withCommitStats())
.addAllMutations(mutationsProto)
.setSingleUseTransaction(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
.addAllMutations(mutationsProto);

TransactionOptions.Builder transactionOptionsBuilder =
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance());
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
transactionOptionsBuilder.setExcludeTxnFromChangeStreams(true);
}
requestBuilder.setSingleUseTransaction(transactionOptionsBuilder);

if (options.hasMaxCommitDelay()) {
requestBuilder.setMaxCommitDelay(
Duration.newBuilder()
Expand Down Expand Up @@ -238,6 +249,10 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
if (batchWriteRequestOptions != null) {
requestBuilder.setRequestOptions(batchWriteRequestOptions);
}
if (Options.fromTransactionOptions(transactionOptions).withExcludeTxnFromChangeStreams()
== Boolean.TRUE) {
requestBuilder.setExcludeTxnFromChangeStreams(true);
}
ISpan span = tracer.spanBuilder(SpannerImpl.BATCH_WRITE);
try (IScope s = tracer.withSpan(span)) {
return spanner.getRpc().batchWriteAtLeastOnce(requestBuilder.build(), this.options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner {
private static final String TRANSACTION_ALREADY_COMMITTED_MESSAGE =
"Transaction has already committed";

private static final String DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE =
"Options.excludeTxnFromChangeStreams() cannot be specified for individual DML requests. "
+ "This option should be set at the transaction level.";

@VisibleForTesting
static class TransactionContextImpl extends AbstractReadContext implements TransactionContext {

Expand Down Expand Up @@ -371,7 +375,9 @@ public void run() {
if (transactionId == null && transactionIdFuture == null) {
requestBuilder.setSingleUseTransaction(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())
.setExcludeTxnFromChangeStreams(
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE));
} else {
requestBuilder.setTransactionId(
transactionId == null
Expand Down Expand Up @@ -725,14 +731,16 @@ public long executeUpdate(Statement statement, UpdateOption... options) {
}

private ResultSet internalExecuteUpdate(
Statement statement, QueryMode queryMode, UpdateOption... options) {
Statement statement, QueryMode queryMode, UpdateOption... updateOptions) {
beforeReadOrQuery();
final Options options = Options.fromUpdateOptions(updateOptions);
if (options.withExcludeTxnFromChangeStreams() != null) {
throw newSpannerException(
ErrorCode.INVALID_ARGUMENT, DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE);
}
final ExecuteSqlRequest.Builder builder =
getExecuteSqlRequestBuilder(
statement,
queryMode,
Options.fromUpdateOptions(options),
/* withTransactionSelector = */ true);
statement, queryMode, options, /* withTransactionSelector = */ true);
try {
com.google.spanner.v1.ResultSet resultSet =
rpc.executeQuery(builder.build(), session.getOptions(), isRouteToLeader());
Expand All @@ -753,14 +761,16 @@ private ResultSet internalExecuteUpdate(
}

@Override
public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... options) {
public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... updateOptions) {
beforeReadOrQuery();
final Options options = Options.fromUpdateOptions(updateOptions);
if (options.withExcludeTxnFromChangeStreams() != null) {
throw newSpannerException(
ErrorCode.INVALID_ARGUMENT, DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE);
}
final ExecuteSqlRequest.Builder builder =
getExecuteSqlRequestBuilder(
statement,
QueryMode.NORMAL,
Options.fromUpdateOptions(options),
/* withTransactionSelector = */ true);
statement, QueryMode.NORMAL, options, /* withTransactionSelector = */ true);
final ApiFuture<com.google.spanner.v1.ResultSet> resultSet;
try {
// Register the update as an async operation that must finish before the transaction may
Expand Down Expand Up @@ -832,10 +842,15 @@ private SpannerException createAbortedExceptionForBatchDml(ExecuteBatchDmlRespon
}

@Override
public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... options) {
public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... updateOptions) {
beforeReadOrQuery();
final Options options = Options.fromUpdateOptions(updateOptions);
if (options.withExcludeTxnFromChangeStreams() != null) {
throw newSpannerException(
ErrorCode.INVALID_ARGUMENT, DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE);
}
final ExecuteBatchDmlRequest.Builder builder =
getExecuteBatchDmlRequestBuilder(statements, Options.fromUpdateOptions(options));
getExecuteBatchDmlRequestBuilder(statements, options);
try {
com.google.spanner.v1.ExecuteBatchDmlResponse response =
rpc.executeBatchDml(builder.build(), session.getOptions());
Expand Down Expand Up @@ -869,10 +884,15 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... option

@Override
public ApiFuture<long[]> batchUpdateAsync(
Iterable<Statement> statements, UpdateOption... options) {
Iterable<Statement> statements, UpdateOption... updateOptions) {
beforeReadOrQuery();
final Options options = Options.fromUpdateOptions(updateOptions);
if (options.withExcludeTxnFromChangeStreams() != null) {
throw newSpannerException(
ErrorCode.INVALID_ARGUMENT, DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE);
}
final ExecuteBatchDmlRequest.Builder builder =
getExecuteBatchDmlRequestBuilder(statements, Options.fromUpdateOptions(options));
getExecuteBatchDmlRequestBuilder(statements, options);
ApiFuture<com.google.spanner.v1.ExecuteBatchDmlResponse> response;
try {
// Register the update as an async operation that must finish before the transaction may
Expand Down