diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java index d5c95d0a5a..3dbd0c1cda 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java @@ -61,6 +61,9 @@ public interface ReadOption {} public interface ReadQueryUpdateTransactionOption extends ReadOption, QueryOption, UpdateOption, TransactionOption {} + /** Marker interface to mark options applicable to Update and Write operations */ + public interface UpdateTransactionOption extends UpdateOption, TransactionOption {} + /** * Marker interface to mark options applicable to Create, Update and Delete operations in admin * API. @@ -108,6 +111,17 @@ public static TransactionOption commitStats() { public static TransactionOption optimisticLock() { return OPTIMISTIC_LOCK_OPTION; } + + /** + * Specifying this instructs the transaction to be excluded from being recorded in change streams + * with the DDL option `allow_txn_exclusion=true`. This does not exclude the transaction from + * being recorded in the change streams with the DDL option `allow_txn_exclusion` being false or + * unset. + */ + public static UpdateTransactionOption excludeTxnFromChangeStreams() { + return EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION; + } + /** * Specifying this will cause the read to yield at most this many rows. This should be greater * than 0. @@ -281,6 +295,18 @@ void appendToOptions(Options options) { static final OptimisticLockOption OPTIMISTIC_LOCK_OPTION = new OptimisticLockOption(); + /** Option to request the transaction to be excluded from change streams. */ + static final class ExcludeTxnFromChangeStreamsOption extends InternalOption + implements UpdateTransactionOption { + @Override + void appendToOptions(Options options) { + options.withExcludeTxnFromChangeStreams = true; + } + } + + static final ExcludeTxnFromChangeStreamsOption EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION = + new ExcludeTxnFromChangeStreamsOption(); + /** Option pertaining to flow control. */ static final class FlowControlOption extends InternalOption implements ReadAndQueryOption { final int prefetchChunks; @@ -405,6 +431,7 @@ void appendToOptions(Options options) { private String etag; private Boolean validateOnly; private Boolean withOptimisticLock; + private Boolean withExcludeTxnFromChangeStreams; private Boolean dataBoostEnabled; private DirectedReadOptions directedReadOptions; private DecodeMode decodeMode; @@ -508,6 +535,10 @@ Boolean withOptimisticLock() { return withOptimisticLock; } + Boolean withExcludeTxnFromChangeStreams() { + return withExcludeTxnFromChangeStreams; + } + boolean hasDataBoostEnabled() { return dataBoostEnabled != null; } @@ -571,6 +602,11 @@ public String toString() { if (withOptimisticLock != null) { b.append("withOptimisticLock: ").append(withOptimisticLock).append(' '); } + if (withExcludeTxnFromChangeStreams != null) { + b.append("withExcludeTxnFromChangeStreams: ") + .append(withExcludeTxnFromChangeStreams) + .append(' '); + } if (dataBoostEnabled != null) { b.append("dataBoostEnabled: ").append(dataBoostEnabled).append(' '); } @@ -616,6 +652,7 @@ public boolean equals(Object o) { && Objects.equals(etag(), that.etag()) && Objects.equals(validateOnly(), that.validateOnly()) && Objects.equals(withOptimisticLock(), that.withOptimisticLock()) + && Objects.equals(withExcludeTxnFromChangeStreams(), that.withExcludeTxnFromChangeStreams()) && Objects.equals(dataBoostEnabled(), that.dataBoostEnabled()) && Objects.equals(directedReadOptions(), that.directedReadOptions()); } @@ -662,6 +699,9 @@ public int hashCode() { if (withOptimisticLock != null) { result = 31 * result + withOptimisticLock.hashCode(); } + if (withExcludeTxnFromChangeStreams != null) { + result = 31 * result + withExcludeTxnFromChangeStreams.hashCode(); + } if (dataBoostEnabled != null) { result = 31 * result + dataBoostEnabled.hashCode(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java index cabc270566..d498bb232a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java @@ -167,7 +167,7 @@ private ExecuteSqlRequest resumeOrRestartRequest( @VisibleForTesting ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Options options) { - ByteString transactionId = initTransaction(); + ByteString transactionId = initTransaction(options); final TransactionSelector transactionSelector = TransactionSelector.newBuilder().setId(transactionId).build(); @@ -195,13 +195,15 @@ ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Opt return builder.build(); } - private ByteString initTransaction() { + private ByteString initTransaction(final Options options) { final BeginTransactionRequest request = BeginTransactionRequest.newBuilder() .setSession(session.getName()) .setOptions( TransactionOptions.newBuilder() - .setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance())) + .setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance()) + .setExcludeTxnFromChangeStreams( + options.withExcludeTxnFromChangeStreams() == Boolean.TRUE)) .build(); Transaction tx = rpc.beginTransaction(request, session.getOptions(), true); if (tx.getId().isEmpty()) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index bea44abab3..8c4a006859 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -69,11 +69,16 @@ static void throwIfTransactionsPending() { } static TransactionOptions createReadWriteTransactionOptions(Options options) { + TransactionOptions.Builder transactionOptions = TransactionOptions.newBuilder(); + if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) { + transactionOptions.setExcludeTxnFromChangeStreams(true); + } TransactionOptions.ReadWrite.Builder readWrite = TransactionOptions.ReadWrite.newBuilder(); if (options.withOptimisticLock() == Boolean.TRUE) { readWrite.setReadLockMode(TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC); } - return TransactionOptions.newBuilder().setReadWrite(readWrite).build(); + transactionOptions.setReadWrite(readWrite); + return transactionOptions.build(); } /** @@ -209,10 +214,16 @@ public CommitResponse writeAtLeastOnceWithOptions( CommitRequest.newBuilder() .setSession(name) .setReturnCommitStats(options.withCommitStats()) - .addAllMutations(mutationsProto) - .setSingleUseTransaction( - TransactionOptions.newBuilder() - .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())); + .addAllMutations(mutationsProto); + + TransactionOptions.Builder transactionOptionsBuilder = + TransactionOptions.newBuilder() + .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()); + if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) { + transactionOptionsBuilder.setExcludeTxnFromChangeStreams(true); + } + requestBuilder.setSingleUseTransaction(transactionOptionsBuilder); + if (options.hasMaxCommitDelay()) { requestBuilder.setMaxCommitDelay( Duration.newBuilder() @@ -266,6 +277,10 @@ public ServerStream batchWriteAtLeastOnce( if (batchWriteRequestOptions != null) { requestBuilder.setRequestOptions(batchWriteRequestOptions); } + if (Options.fromTransactionOptions(transactionOptions).withExcludeTxnFromChangeStreams() + == Boolean.TRUE) { + requestBuilder.setExcludeTxnFromChangeStreams(true); + } ISpan span = tracer.spanBuilder(SpannerImpl.BATCH_WRITE); try (IScope s = tracer.withSpan(span)) { return spanner.getRpc().batchWriteAtLeastOnce(requestBuilder.build(), this.options); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 3249be1bdb..370d2f662f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -76,6 +76,10 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner { private static final String TRANSACTION_ALREADY_COMMITTED_MESSAGE = "Transaction has already committed"; + private static final String DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE = + "Options.excludeTxnFromChangeStreams() cannot be specified for individual DML requests. " + + "This option should be set at the transaction level."; + @VisibleForTesting static class TransactionContextImpl extends AbstractReadContext implements TransactionContext { @@ -371,7 +375,9 @@ public void run() { if (transactionId == null && transactionIdFuture == null) { requestBuilder.setSingleUseTransaction( TransactionOptions.newBuilder() - .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())); + .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()) + .setExcludeTxnFromChangeStreams( + options.withExcludeTxnFromChangeStreams() == Boolean.TRUE)); } else { requestBuilder.setTransactionId( transactionId == null @@ -725,14 +731,16 @@ public long executeUpdate(Statement statement, UpdateOption... options) { } private ResultSet internalExecuteUpdate( - Statement statement, QueryMode queryMode, UpdateOption... options) { + Statement statement, QueryMode queryMode, UpdateOption... updateOptions) { beforeReadOrQuery(); + final Options options = Options.fromUpdateOptions(updateOptions); + if (options.withExcludeTxnFromChangeStreams() != null) { + throw newSpannerException( + ErrorCode.INVALID_ARGUMENT, DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE); + } final ExecuteSqlRequest.Builder builder = getExecuteSqlRequestBuilder( - statement, - queryMode, - Options.fromUpdateOptions(options), - /* withTransactionSelector = */ true); + statement, queryMode, options, /* withTransactionSelector = */ true); try { com.google.spanner.v1.ResultSet resultSet = rpc.executeQuery(builder.build(), session.getOptions(), isRouteToLeader()); @@ -753,14 +761,16 @@ private ResultSet internalExecuteUpdate( } @Override - public ApiFuture executeUpdateAsync(Statement statement, UpdateOption... options) { + public ApiFuture executeUpdateAsync(Statement statement, UpdateOption... updateOptions) { beforeReadOrQuery(); + final Options options = Options.fromUpdateOptions(updateOptions); + if (options.withExcludeTxnFromChangeStreams() != null) { + throw newSpannerException( + ErrorCode.INVALID_ARGUMENT, DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE); + } final ExecuteSqlRequest.Builder builder = getExecuteSqlRequestBuilder( - statement, - QueryMode.NORMAL, - Options.fromUpdateOptions(options), - /* withTransactionSelector = */ true); + statement, QueryMode.NORMAL, options, /* withTransactionSelector = */ true); final ApiFuture resultSet; try { // Register the update as an async operation that must finish before the transaction may @@ -832,10 +842,15 @@ private SpannerException createAbortedExceptionForBatchDml(ExecuteBatchDmlRespon } @Override - public long[] batchUpdate(Iterable statements, UpdateOption... options) { + public long[] batchUpdate(Iterable statements, UpdateOption... updateOptions) { beforeReadOrQuery(); + final Options options = Options.fromUpdateOptions(updateOptions); + if (options.withExcludeTxnFromChangeStreams() != null) { + throw newSpannerException( + ErrorCode.INVALID_ARGUMENT, DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE); + } final ExecuteBatchDmlRequest.Builder builder = - getExecuteBatchDmlRequestBuilder(statements, Options.fromUpdateOptions(options)); + getExecuteBatchDmlRequestBuilder(statements, options); try { com.google.spanner.v1.ExecuteBatchDmlResponse response = rpc.executeBatchDml(builder.build(), session.getOptions()); @@ -869,10 +884,15 @@ public long[] batchUpdate(Iterable statements, UpdateOption... option @Override public ApiFuture batchUpdateAsync( - Iterable statements, UpdateOption... options) { + Iterable statements, UpdateOption... updateOptions) { beforeReadOrQuery(); + final Options options = Options.fromUpdateOptions(updateOptions); + if (options.withExcludeTxnFromChangeStreams() != null) { + throw newSpannerException( + ErrorCode.INVALID_ARGUMENT, DML_INVALID_EXCLUDE_CHANGE_STREAMS_OPTION_MESSAGE); + } final ExecuteBatchDmlRequest.Builder builder = - getExecuteBatchDmlRequestBuilder(statements, Options.fromUpdateOptions(options)); + getExecuteBatchDmlRequestBuilder(statements, options); ApiFuture response; try { // Register the update as an async operation that must finish before the transaction may diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 7cba80edd8..4ab6829a32 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -75,6 +75,7 @@ import com.google.rpc.RetryInfo; import com.google.spanner.v1.BatchWriteRequest; import com.google.spanner.v1.BatchWriteResponse; +import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.DeleteSessionRequest; import com.google.spanner.v1.DirectedReadOptions; @@ -1334,6 +1335,14 @@ public void testWrite() { Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build())); assertNotNull(timestamp); + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasReadWrite()); + assertFalse(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(commitRequests).hasSize(1); CommitRequest commit = commitRequests.get(0); @@ -1388,6 +1397,14 @@ public void testWriteWithOptions() { Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), Options.priority(RpcPriority.HIGH)); + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasReadWrite()); + assertFalse(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + List commits = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(commits).hasSize(1); CommitRequest commit = commits.get(0); @@ -1409,6 +1426,24 @@ public void testWriteWithCommitStats() { assertNotNull(response.getCommitStats()); } + @Test + public void testWriteWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + client.writeWithOptions( + Collections.singletonList( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), + Options.excludeTxnFromChangeStreams()); + + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasReadWrite()); + assertTrue(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + } + @Test public void testWriteAtLeastOnce() { DatabaseClient client = @@ -1418,6 +1453,15 @@ public void testWriteAtLeastOnce() { Collections.singletonList( Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build())); assertNotNull(timestamp); + + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(commitRequests).hasSize(1); + CommitRequest commit = commitRequests.get(0); + assertNotNull(commit.getSingleUseTransaction()); + assertTrue(commit.getSingleUseTransaction().hasReadWrite()); + assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); + assertNotNull(commit.getRequestOptions()); + assertEquals(Priority.PRIORITY_UNSPECIFIED, commit.getRequestOptions().getPriority()); } @Test @@ -1438,6 +1482,7 @@ public void testWriteAtLeastOnceWithCommitStats() { CommitRequest commit = commitRequests.get(0); assertNotNull(commit.getSingleUseTransaction()); assertTrue(commit.getSingleUseTransaction().hasReadWrite()); + assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); assertNotNull(commit.getRequestOptions()); assertEquals(Priority.PRIORITY_UNSPECIFIED, commit.getRequestOptions().getPriority()); } @@ -1456,6 +1501,7 @@ public void testWriteAtLeastOnceWithOptions() { CommitRequest commit = commitRequests.get(0); assertNotNull(commit.getSingleUseTransaction()); assertTrue(commit.getSingleUseTransaction().hasReadWrite()); + assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); assertNotNull(commit.getRequestOptions()); assertEquals(Priority.PRIORITY_LOW, commit.getRequestOptions().getPriority()); } @@ -1474,11 +1520,29 @@ public void testWriteAtLeastOnceWithTagOptions() { CommitRequest commit = commitRequests.get(0); assertNotNull(commit.getSingleUseTransaction()); assertTrue(commit.getSingleUseTransaction().hasReadWrite()); + assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); assertNotNull(commit.getRequestOptions()); assertThat(commit.getRequestOptions().getTransactionTag()).isEqualTo("app=spanner,env=test"); assertThat(commit.getRequestOptions().getRequestTag()).isEmpty(); } + @Test + public void testWriteAtLeastOnceWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + client.writeAtLeastOnceWithOptions( + Collections.singletonList( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), + Options.excludeTxnFromChangeStreams()); + + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(commitRequests).hasSize(1); + CommitRequest commit = commitRequests.get(0); + assertNotNull(commit.getSingleUseTransaction()); + assertTrue(commit.getSingleUseTransaction().hasReadWrite()); + assertTrue(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams()); + } + @Test public void testBatchWriteAtLeastOnceWithoutOptions() { DatabaseClient client = @@ -1500,6 +1564,7 @@ public void testBatchWriteAtLeastOnceWithoutOptions() { BatchWriteRequest request = requests.get(0); assertEquals(request.getMutationGroupsCount(), 4); assertEquals(request.getRequestOptions().getPriority(), Priority.PRIORITY_UNSPECIFIED); + assertFalse(request.getExcludeTxnFromChangeStreams()); } @Test @@ -1514,6 +1579,7 @@ public void testBatchWriteAtLeastOnceWithOptions() { BatchWriteRequest request = requests.get(0); assertEquals(request.getMutationGroupsCount(), 4); assertEquals(request.getRequestOptions().getPriority(), Priority.PRIORITY_LOW); + assertFalse(request.getExcludeTxnFromChangeStreams()); } @Test @@ -1529,6 +1595,21 @@ public void testBatchWriteAtLeastOnceWithTagOptions() { assertEquals(request.getMutationGroupsCount(), 4); assertEquals(request.getRequestOptions().getTransactionTag(), "app=spanner,env=test"); assertThat(request.getRequestOptions().getRequestTag()).isEmpty(); + assertFalse(request.getExcludeTxnFromChangeStreams()); + } + + @Test + public void testBatchWriteAtLeastOnceWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + consumeBatchWriteStream( + client.batchWriteAtLeastOnce(MUTATION_GROUPS, Options.excludeTxnFromChangeStreams())); + + List requests = mockSpanner.getRequestsOfType(BatchWriteRequest.class); + assertEquals(requests.size(), 1); + BatchWriteRequest request = requests.get(0); + assertEquals(request.getMutationGroupsCount(), 4); + assertTrue(request.getExcludeTxnFromChangeStreams()); } @Test @@ -1782,6 +1863,9 @@ public void testExecuteUpdateWithTag() { assertThat(request.getRequestOptions().getRequestTag()) .isEqualTo("app=spanner,env=test,action=update"); assertThat(request.getRequestOptions().getTransactionTag()).isEmpty(); + assertNotNull(request.getTransaction().getBegin()); + assertTrue(request.getTransaction().getBegin().hasReadWrite()); + assertFalse(request.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); } @Test @@ -1805,6 +1889,9 @@ public void testBatchUpdateWithTag() { .isEqualTo("app=spanner,env=test,action=batch"); assertThat(request.getRequestOptions().getTransactionTag()) .isEqualTo("app=spanner,env=test,action=txn"); + assertNotNull(request.getTransaction().getBegin()); + assertTrue(request.getTransaction().getBegin().hasReadWrite()); + assertFalse(request.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); } @Test @@ -1814,6 +1901,14 @@ public void testPartitionedDMLWithTag() { client.executePartitionedUpdate( UPDATE_STATEMENT, Options.tag("app=spanner,env=test,action=dml")); + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasPartitionedDml()); + assertFalse(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); assertThat(requests).hasSize(1); ExecuteSqlRequest request = requests.get(0); @@ -1835,6 +1930,14 @@ public void testCommitWithTag() { return null; }); + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasReadWrite()); + assertFalse(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + List requests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(requests).hasSize(1); CommitRequest request = requests.get(0); @@ -1855,6 +1958,14 @@ public void testTransactionManagerCommitWithTag() { manager.commit(); } + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasReadWrite()); + assertFalse(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + List requests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(requests).hasSize(1); CommitRequest request = requests.get(0); @@ -1877,6 +1988,14 @@ public void testAsyncRunnerCommitWithTag() { }, executor)); + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasReadWrite()); + assertFalse(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + List requests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(requests).hasSize(1); CommitRequest request = requests.get(0); @@ -1904,6 +2023,14 @@ public void testAsyncTransactionManagerCommitWithTag() { .commitAsync()); } + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasReadWrite()); + assertFalse(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + List requests = mockSpanner.getRequestsOfType(CommitRequest.class); assertThat(requests).hasSize(1); CommitRequest request = requests.get(0); @@ -1913,6 +2040,275 @@ public void testAsyncTransactionManagerCommitWithTag() { .isEqualTo("app=spanner,env=test,action=manager"); } + @Test + public void testReadWriteTxnWithExcludeTxnFromChangeStreams_executeUpdate() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = client.readWriteTransaction(Options.excludeTxnFromChangeStreams()); + runner.run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT)); + + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertThat(requests).hasSize(1); + ExecuteSqlRequest request = requests.get(0); + assertNotNull(request.getTransaction().getBegin()); + assertTrue(request.getTransaction().getBegin().hasReadWrite()); + assertTrue(request.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); + } + + @Test + public void testReadWriteTxnWithExcludeTxnFromChangeStreams_batchUpdate() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = client.readWriteTransaction(Options.excludeTxnFromChangeStreams()); + runner.run(transaction -> transaction.batchUpdate(Collections.singletonList(UPDATE_STATEMENT))); + + List requests = + mockSpanner.getRequestsOfType(ExecuteBatchDmlRequest.class); + assertThat(requests).hasSize(1); + ExecuteBatchDmlRequest request = requests.get(0); + assertNotNull(request.getTransaction().getBegin()); + assertTrue(request.getTransaction().getBegin().hasReadWrite()); + assertTrue(request.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); + } + + @Test + public void testPartitionedDMLWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + client.executePartitionedUpdate(UPDATE_STATEMENT, Options.excludeTxnFromChangeStreams()); + + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasPartitionedDml()); + assertTrue(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + } + + @Test + public void testCommitWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = client.readWriteTransaction(Options.excludeTxnFromChangeStreams()); + runner.run( + transaction -> { + transaction.buffer(Mutation.delete("TEST", KeySet.all())); + return null; + }); + + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasReadWrite()); + assertTrue(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + } + + @Test + public void testTransactionManagerCommitWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + try (TransactionManager manager = + client.transactionManager(Options.excludeTxnFromChangeStreams())) { + TransactionContext transaction = manager.begin(); + transaction.buffer(Mutation.delete("TEST", KeySet.all())); + manager.commit(); + } + + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasReadWrite()); + assertTrue(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + } + + @Test + public void testAsyncRunnerCommitWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + AsyncRunner runner = client.runAsync(Options.excludeTxnFromChangeStreams()); + get( + runner.runAsync( + txn -> { + txn.buffer(Mutation.delete("TEST", KeySet.all())); + return ApiFutures.immediateFuture(null); + }, + executor)); + + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasReadWrite()); + assertTrue(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + } + + @Test + public void testAsyncTransactionManagerCommitWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + try (AsyncTransactionManager manager = + client.transactionManagerAsync(Options.excludeTxnFromChangeStreams())) { + TransactionContextFuture transaction = manager.beginAsync(); + get( + transaction + .then( + (txn, input) -> { + txn.buffer(Mutation.delete("TEST", KeySet.all())); + return ApiFutures.immediateFuture(null); + }, + executor) + .commitAsync()); + } + + List beginTransactions = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + assertThat(beginTransactions).hasSize(1); + BeginTransactionRequest beginTransaction = beginTransactions.get(0); + assertNotNull(beginTransaction.getOptions()); + assertTrue(beginTransaction.getOptions().hasReadWrite()); + assertTrue(beginTransaction.getOptions().getExcludeTxnFromChangeStreams()); + } + + @Test + public void testExecuteUpdateWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = client.readWriteTransaction(); + SpannerException e = + assertThrows( + SpannerException.class, + () -> + runner.run( + transaction -> + transaction.executeUpdate( + UPDATE_STATEMENT, Options.excludeTxnFromChangeStreams()))); + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); + assertThat(e.getMessage()) + .contains( + "Options.excludeTxnFromChangeStreams() cannot be specified for individual DML requests." + + " This option should be set at the transaction level."); + } + + @Test + public void testExecuteUpdateAsyncWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + AsyncRunner runner = client.runAsync(); + SpannerException e = + assertThrows( + SpannerException.class, + () -> + get( + runner.runAsync( + txn -> { + txn.executeUpdateAsync( + UPDATE_STATEMENT, Options.excludeTxnFromChangeStreams()); + return ApiFutures.immediateFuture(null); + }, + executor))); + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); + assertThat(e.getMessage()) + .contains( + "Options.excludeTxnFromChangeStreams() cannot be specified for individual DML requests." + + " This option should be set at the transaction level."); + } + + @Test + public void testAnalyzeUpdateWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = client.readWriteTransaction(); + SpannerException e = + assertThrows( + SpannerException.class, + () -> + runner.run( + transaction -> + transaction.analyzeUpdate( + UPDATE_STATEMENT, + QueryAnalyzeMode.PROFILE, + Options.excludeTxnFromChangeStreams()))); + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); + assertThat(e.getMessage()) + .contains( + "Options.excludeTxnFromChangeStreams() cannot be specified for individual DML requests." + + " This option should be set at the transaction level."); + } + + @Test + public void testAnalyzeUpdateStatementWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = client.readWriteTransaction(); + SpannerException e = + assertThrows( + SpannerException.class, + () -> + runner.run( + transaction -> + transaction.analyzeUpdateStatement( + UPDATE_STATEMENT, + QueryAnalyzeMode.PROFILE, + Options.excludeTxnFromChangeStreams()))); + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); + assertThat(e.getMessage()) + .contains( + "Options.excludeTxnFromChangeStreams() cannot be specified for individual DML requests." + + " This option should be set at the transaction level."); + } + + @Test + public void testBatchUpdateWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = client.readWriteTransaction(); + SpannerException e = + assertThrows( + SpannerException.class, + () -> + runner.run( + transaction -> + transaction.batchUpdate( + Collections.singletonList(UPDATE_STATEMENT), + Options.excludeTxnFromChangeStreams()))); + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); + assertThat(e.getMessage()) + .contains( + "Options.excludeTxnFromChangeStreams() cannot be specified for individual DML requests." + + " This option should be set at the transaction level."); + } + + @Test + public void testBatchUpdateAsyncWithExcludeTxnFromChangeStreams() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + AsyncRunner runner = client.runAsync(); + SpannerException e = + assertThrows( + SpannerException.class, + () -> + get( + runner.runAsync( + txn -> { + txn.batchUpdateAsync( + Collections.singletonList(UPDATE_STATEMENT), + Options.excludeTxnFromChangeStreams()); + return ApiFutures.immediateFuture(null); + }, + executor))); + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); + assertThat(e.getMessage()) + .contains( + "Options.excludeTxnFromChangeStreams() cannot be specified for individual DML requests." + + " This option should be set at the transaction level."); + } + @Test public void singleUse() { DatabaseClientImpl client = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java index e0bbf81f29..8c9a5d957e 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -100,6 +101,7 @@ public void allOptionsAbsent() { assertThat(options.hasTag()).isFalse(); assertThat(options.hasDataBoostEnabled()).isFalse(); assertThat(options.hasDirectedReadOptions()).isFalse(); + assertNull(options.withExcludeTxnFromChangeStreams()); assertThat(options.toString()).isEqualTo(""); assertThat(options.equals(options)).isTrue(); assertThat(options.equals(null)).isFalse(); @@ -691,4 +693,40 @@ public void directedReadHashCode() { public void directedReadsNullNotAllowed() { assertThrows(NullPointerException.class, () -> Options.directedRead(null)); } + + @Test + public void transactionOptionsExcludeTxnFromChangeStreams() { + Options option1 = Options.fromTransactionOptions(Options.excludeTxnFromChangeStreams()); + Options option2 = Options.fromTransactionOptions(Options.excludeTxnFromChangeStreams()); + Options option3 = Options.fromTransactionOptions(); + + assertEquals(option1, option2); + assertEquals(option1.hashCode(), option2.hashCode()); + assertNotEquals(option1, option3); + assertNotEquals(option1.hashCode(), option3.hashCode()); + + assertTrue(option1.withExcludeTxnFromChangeStreams()); + assertThat(option1.toString()).contains("withExcludeTxnFromChangeStreams: true"); + + assertNull(option3.withExcludeTxnFromChangeStreams()); + assertThat(option3.toString()).doesNotContain("withExcludeTxnFromChangeStreams: true"); + } + + @Test + public void updateOptionsExcludeTxnFromChangeStreams() { + Options option1 = Options.fromUpdateOptions(Options.excludeTxnFromChangeStreams()); + Options option2 = Options.fromUpdateOptions(Options.excludeTxnFromChangeStreams()); + Options option3 = Options.fromUpdateOptions(); + + assertEquals(option1, option2); + assertEquals(option1.hashCode(), option2.hashCode()); + assertNotEquals(option1, option3); + assertNotEquals(option1.hashCode(), option3.hashCode()); + + assertTrue(option1.withExcludeTxnFromChangeStreams()); + assertThat(option1.toString()).contains("withExcludeTxnFromChangeStreams: true"); + + assertNull(option3.withExcludeTxnFromChangeStreams()); + assertThat(option3.toString()).doesNotContain("withExcludeTxnFromChangeStreams: true"); + } }