Skip to content

Commit

Permalink
feat: add support for transaction-level exclusion from change streams (
Browse files Browse the repository at this point in the history
…#2959)

* Add support for transaction-level exclusion from change streams

* cleanup

* refactor: introduce PartitionedUpdateOption

* Revert "refactor: introduce PartitionedUpdateOption"

This reverts commit 96b508b.

* Add error handling in DML update APIs where excludeTxnFromChangeStreams option is not applicable

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
dengwe1 and gcf-owl-bot[bot] committed Mar 28, 2024
1 parent 58ef1fe commit 7ae376a
Show file tree
Hide file tree
Showing 6 changed files with 534 additions and 23 deletions.
Expand Up @@ -61,6 +61,9 @@ public interface ReadOption {}
public interface ReadQueryUpdateTransactionOption
extends ReadOption, QueryOption, UpdateOption, TransactionOption {}

/** Marker interface to mark options applicable to Update and Write operations */
public interface UpdateTransactionOption extends UpdateOption, TransactionOption {}

/**
* Marker interface to mark options applicable to Create, Update and Delete operations in admin
* API.
Expand Down Expand Up @@ -108,6 +111,17 @@ public static TransactionOption commitStats() {
public static TransactionOption optimisticLock() {
return OPTIMISTIC_LOCK_OPTION;
}

/**
* Specifying this instructs the transaction to be excluded from being recorded in change streams
* with the DDL option `allow_txn_exclusion=true`. This does not exclude the transaction from
* being recorded in the change streams with the DDL option `allow_txn_exclusion` being false or
* unset.
*/
public static UpdateTransactionOption excludeTxnFromChangeStreams() {
return EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION;
}

/**
* Specifying this will cause the read to yield at most this many rows. This should be greater
* than 0.
Expand Down Expand Up @@ -281,6 +295,18 @@ void appendToOptions(Options options) {

static final OptimisticLockOption OPTIMISTIC_LOCK_OPTION = new OptimisticLockOption();

/** Option to request the transaction to be excluded from change streams. */
static final class ExcludeTxnFromChangeStreamsOption extends InternalOption
implements UpdateTransactionOption {
@Override
void appendToOptions(Options options) {
options.withExcludeTxnFromChangeStreams = true;
}
}

static final ExcludeTxnFromChangeStreamsOption EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION =
new ExcludeTxnFromChangeStreamsOption();

/** Option pertaining to flow control. */
static final class FlowControlOption extends InternalOption implements ReadAndQueryOption {
final int prefetchChunks;
Expand Down Expand Up @@ -405,6 +431,7 @@ void appendToOptions(Options options) {
private String etag;
private Boolean validateOnly;
private Boolean withOptimisticLock;
private Boolean withExcludeTxnFromChangeStreams;
private Boolean dataBoostEnabled;
private DirectedReadOptions directedReadOptions;
private DecodeMode decodeMode;
Expand Down Expand Up @@ -508,6 +535,10 @@ Boolean withOptimisticLock() {
return withOptimisticLock;
}

Boolean withExcludeTxnFromChangeStreams() {
return withExcludeTxnFromChangeStreams;
}

boolean hasDataBoostEnabled() {
return dataBoostEnabled != null;
}
Expand Down Expand Up @@ -571,6 +602,11 @@ public String toString() {
if (withOptimisticLock != null) {
b.append("withOptimisticLock: ").append(withOptimisticLock).append(' ');
}
if (withExcludeTxnFromChangeStreams != null) {
b.append("withExcludeTxnFromChangeStreams: ")
.append(withExcludeTxnFromChangeStreams)
.append(' ');
}
if (dataBoostEnabled != null) {
b.append("dataBoostEnabled: ").append(dataBoostEnabled).append(' ');
}
Expand Down Expand Up @@ -616,6 +652,7 @@ public boolean equals(Object o) {
&& Objects.equals(etag(), that.etag())
&& Objects.equals(validateOnly(), that.validateOnly())
&& Objects.equals(withOptimisticLock(), that.withOptimisticLock())
&& Objects.equals(withExcludeTxnFromChangeStreams(), that.withExcludeTxnFromChangeStreams())
&& Objects.equals(dataBoostEnabled(), that.dataBoostEnabled())
&& Objects.equals(directedReadOptions(), that.directedReadOptions());
}
Expand Down Expand Up @@ -662,6 +699,9 @@ public int hashCode() {
if (withOptimisticLock != null) {
result = 31 * result + withOptimisticLock.hashCode();
}
if (withExcludeTxnFromChangeStreams != null) {
result = 31 * result + withExcludeTxnFromChangeStreams.hashCode();
}
if (dataBoostEnabled != null) {
result = 31 * result + dataBoostEnabled.hashCode();
}
Expand Down
Expand Up @@ -167,7 +167,7 @@ private ExecuteSqlRequest resumeOrRestartRequest(

@VisibleForTesting
ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Options options) {
ByteString transactionId = initTransaction();
ByteString transactionId = initTransaction(options);

final TransactionSelector transactionSelector =
TransactionSelector.newBuilder().setId(transactionId).build();
Expand Down Expand Up @@ -195,13 +195,15 @@ ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Opt
return builder.build();
}

private ByteString initTransaction() {
private ByteString initTransaction(final Options options) {
final BeginTransactionRequest request =
BeginTransactionRequest.newBuilder()
.setSession(session.getName())
.setOptions(
TransactionOptions.newBuilder()
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance()))
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance())
.setExcludeTxnFromChangeStreams(
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE))
.build();
Transaction tx = rpc.beginTransaction(request, session.getOptions(), true);
if (tx.getId().isEmpty()) {
Expand Down
Expand Up @@ -69,11 +69,16 @@ static void throwIfTransactionsPending() {
}

static TransactionOptions createReadWriteTransactionOptions(Options options) {
TransactionOptions.Builder transactionOptions = TransactionOptions.newBuilder();
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
transactionOptions.setExcludeTxnFromChangeStreams(true);
}
TransactionOptions.ReadWrite.Builder readWrite = TransactionOptions.ReadWrite.newBuilder();
if (options.withOptimisticLock() == Boolean.TRUE) {
readWrite.setReadLockMode(TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC);
}
return TransactionOptions.newBuilder().setReadWrite(readWrite).build();
transactionOptions.setReadWrite(readWrite);
return transactionOptions.build();
}

/**
Expand Down Expand Up @@ -209,10 +214,16 @@ public CommitResponse writeAtLeastOnceWithOptions(
CommitRequest.newBuilder()
.setSession(name)
.setReturnCommitStats(options.withCommitStats())
.addAllMutations(mutationsProto)
.setSingleUseTransaction(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
.addAllMutations(mutationsProto);

TransactionOptions.Builder transactionOptionsBuilder =
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance());
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
transactionOptionsBuilder.setExcludeTxnFromChangeStreams(true);
}
requestBuilder.setSingleUseTransaction(transactionOptionsBuilder);

if (options.hasMaxCommitDelay()) {
requestBuilder.setMaxCommitDelay(
Duration.newBuilder()
Expand Down Expand Up @@ -266,6 +277,10 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
if (batchWriteRequestOptions != null) {
requestBuilder.setRequestOptions(batchWriteRequestOptions);
}
if (Options.fromTransactionOptions(transactionOptions).withExcludeTxnFromChangeStreams()
== Boolean.TRUE) {
requestBuilder.setExcludeTxnFromChangeStreams(true);
}
ISpan span = tracer.spanBuilder(SpannerImpl.BATCH_WRITE);
try (IScope s = tracer.withSpan(span)) {
return spanner.getRpc().batchWriteAtLeastOnce(requestBuilder.build(), this.options);
Expand Down
Expand Up @@ -76,6 +76,10 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner {
private static final String TRANSACTION_ALREADY_COMMITTED_MESSAGE =
"Transaction has already committed";

private static final String DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE =
"Options.excludeTxnFromChangeStreams() cannot be specified for individual DML requests. "
+ "This option should be set at the transaction level.";

@VisibleForTesting
static class TransactionContextImpl extends AbstractReadContext implements TransactionContext {

Expand Down Expand Up @@ -371,7 +375,9 @@ public void run() {
if (transactionId == null && transactionIdFuture == null) {
requestBuilder.setSingleUseTransaction(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())
.setExcludeTxnFromChangeStreams(
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE));
} else {
requestBuilder.setTransactionId(
transactionId == null
Expand Down Expand Up @@ -725,14 +731,16 @@ public long executeUpdate(Statement statement, UpdateOption... options) {
}

private ResultSet internalExecuteUpdate(
Statement statement, QueryMode queryMode, UpdateOption... options) {
Statement statement, QueryMode queryMode, UpdateOption... updateOptions) {
beforeReadOrQuery();
final Options options = Options.fromUpdateOptions(updateOptions);
if (options.withExcludeTxnFromChangeStreams() != null) {
throw newSpannerException(
ErrorCode.INVALID_ARGUMENT, DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE);
}
final ExecuteSqlRequest.Builder builder =
getExecuteSqlRequestBuilder(
statement,
queryMode,
Options.fromUpdateOptions(options),
/* withTransactionSelector = */ true);
statement, queryMode, options, /* withTransactionSelector = */ true);
try {
com.google.spanner.v1.ResultSet resultSet =
rpc.executeQuery(builder.build(), session.getOptions(), isRouteToLeader());
Expand All @@ -753,14 +761,16 @@ private ResultSet internalExecuteUpdate(
}

@Override
public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... options) {
public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... updateOptions) {
beforeReadOrQuery();
final Options options = Options.fromUpdateOptions(updateOptions);
if (options.withExcludeTxnFromChangeStreams() != null) {
throw newSpannerException(
ErrorCode.INVALID_ARGUMENT, DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE);
}
final ExecuteSqlRequest.Builder builder =
getExecuteSqlRequestBuilder(
statement,
QueryMode.NORMAL,
Options.fromUpdateOptions(options),
/* withTransactionSelector = */ true);
statement, QueryMode.NORMAL, options, /* withTransactionSelector = */ true);
final ApiFuture<com.google.spanner.v1.ResultSet> resultSet;
try {
// Register the update as an async operation that must finish before the transaction may
Expand Down Expand Up @@ -832,10 +842,15 @@ private SpannerException createAbortedExceptionForBatchDml(ExecuteBatchDmlRespon
}

@Override
public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... options) {
public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... updateOptions) {
beforeReadOrQuery();
final Options options = Options.fromUpdateOptions(updateOptions);
if (options.withExcludeTxnFromChangeStreams() != null) {
throw newSpannerException(
ErrorCode.INVALID_ARGUMENT, DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE);
}
final ExecuteBatchDmlRequest.Builder builder =
getExecuteBatchDmlRequestBuilder(statements, Options.fromUpdateOptions(options));
getExecuteBatchDmlRequestBuilder(statements, options);
try {
com.google.spanner.v1.ExecuteBatchDmlResponse response =
rpc.executeBatchDml(builder.build(), session.getOptions());
Expand Down Expand Up @@ -869,10 +884,15 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... option

@Override
public ApiFuture<long[]> batchUpdateAsync(
Iterable<Statement> statements, UpdateOption... options) {
Iterable<Statement> statements, UpdateOption... updateOptions) {
beforeReadOrQuery();
final Options options = Options.fromUpdateOptions(updateOptions);
if (options.withExcludeTxnFromChangeStreams() != null) {
throw newSpannerException(
ErrorCode.INVALID_ARGUMENT, DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE);
}
final ExecuteBatchDmlRequest.Builder builder =
getExecuteBatchDmlRequestBuilder(statements, Options.fromUpdateOptions(options));
getExecuteBatchDmlRequestBuilder(statements, options);
ApiFuture<com.google.spanner.v1.ExecuteBatchDmlResponse> response;
try {
// Register the update as an async operation that must finish before the transaction may
Expand Down

0 comments on commit 7ae376a

Please sign in to comment.