From 8ffcfc1caa7b26fb90c47768d214c3825d2a7c9b Mon Sep 17 00:00:00 2001 From: Shuran Zhang Date: Wed, 8 May 2024 20:19:53 +0000 Subject: [PATCH] add sample for apply at leaast once --- .../spanner/integration_test.go | 6 ++++-- .../spanner_change_streams_txn_exclusion.go | 21 +++++++++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/spanner/spanner_snippets/spanner/integration_test.go b/spanner/spanner_snippets/spanner/integration_test.go index 6d5e85ad3f..46cc160c9b 100644 --- a/spanner/spanner_snippets/spanner/integration_test.go +++ b/spanner/spanner_snippets/spanner/integration_test.go @@ -414,11 +414,13 @@ func TestSample(t *testing.T) { out = runSample(t, directedReadOptions, dbName, "failed to read using directed read options") assertContains(t, out, "1 1 Total Junk") + out = runSample(t, rwTxnExcludedFromChangeStreams, dbName, "failed to commit rw txn excluded from change streams") + assertContains(t, out, "New singer inserted.") + assertContains(t, out, "Singer first name updated.") out = runSample(t, applyExcludedFromChangeStreams, dbName, "failed to apply mutations excluded from change streams") assertContains(t, out, "New singer inserted.") - out = runSample(t, rwTxnExcludedFromChangeStreams, dbName, "failed to commit rw txn excluded from change streams") + out = runSample(t, applyAtLeastOnceExcludedFromChangeStreams, dbName, "failed to apply at least once mutations excluded from change streams") assertContains(t, out, "New singer inserted.") - assertContains(t, out, "Singer first name updated.") out = runSample(t, batchWriteExcludedFromChangeStreams, dbName, "failed to write data using BatchWrite excluded from change streams") assertNotContains(t, out, "could not be applied with error") out = runSample(t, pdmlExcludedFromChangeStreams, dbName, "failed to update using partitioned DML excluded from change streams") diff --git a/spanner/spanner_snippets/spanner/spanner_change_streams_txn_exclusion.go b/spanner/spanner_snippets/spanner/spanner_change_streams_txn_exclusion.go index 5ae5b40bbc..3f72ccfcf6 100644 --- a/spanner/spanner_snippets/spanner/spanner_change_streams_txn_exclusion.go +++ b/spanner/spanner_snippets/spanner/spanner_change_streams_txn_exclusion.go @@ -83,6 +83,27 @@ func applyExcludedFromChangeStreams(w io.Writer, db string) error { return err } +// applyExcludedFromChangeStreams apply the insert mutations on Singers table excluded from allowed tracking change streams +func applyAtLeastOnceExcludedFromChangeStreams(w io.Writer, db string) error { + // db = `projects//instances//database/` + ctx := context.Background() + client, err := spanner.NewClient(ctx, db) + if err != nil { + return fmt.Errorf("applyExcludedFromChangeStreams.NewClient: %w", err) + } + defer client.Close() + m := spanner.Insert("Singers", + []string{"SingerId", "FirstName", "LastName"}, + []interface{}{989, "Hellen", "Lee"}) + _, err = client.Apply(ctx, []*spanner.Mutation{m}, []spanner.ApplyOption{spanner.ExcludeTxnFromChangeStreams(), spanner.ApplyAtLeastOnce()}...) + + if err != nil { + return err + } + fmt.Fprint(w, "applyExcludedFromChangeStreams.ApplyAtLeastOnce: New singer inserted.") + return err +} + // batchWriteExcludedFromChangeStreams executes the insert mutation on Singers table excluded from allowed tracking change streams func batchWriteExcludedFromChangeStreams(w io.Writer, db string) error { // db := "projects/my-project/instances/my-instance/databases/my-database"