Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): add support for change streams transaction exclusion option #2049

Merged
merged 37 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
af733bc
add launch file
alkatrivedi May 13, 2024
97d43ff
feat: change stream option in transaction
alkatrivedi May 13, 2024
23496fd
feat: change stream option in transaction
alkatrivedi May 13, 2024
1f8c2d5
add test for runtransaction with excludeTxnFromChangeStreams option
alkatrivedi May 13, 2024
62cc8e0
fix: lint errors
alkatrivedi May 13, 2024
9bc85be
Merge branch 'googleapis:main' into change-stream
alkatrivedi May 15, 2024
fe0f1d1
Merge branch 'googleapis:main' into change-stream
alkatrivedi May 17, 2024
86bb1be
feat(spanner): add changes in the runPartitionedUpdate and insert met…
alkatrivedi May 20, 2024
cf3bf9f
fix: failed unit test in database file
alkatrivedi May 21, 2024
29a3729
fix: lint errors
alkatrivedi May 21, 2024
a0e1b2e
delete insertion sample
alkatrivedi May 21, 2024
09e2d3b
fix: lint errors
alkatrivedi May 21, 2024
85132f2
fix: lint changes in singer.js file
alkatrivedi May 21, 2024
55a23e7
refactor test file
alkatrivedi May 21, 2024
339f341
fix: lint errors
alkatrivedi May 21, 2024
7d77b34
fix: lint errors
alkatrivedi May 21, 2024
aaeed9d
refactor
alkatrivedi May 21, 2024
c60d029
refactor
alkatrivedi May 21, 2024
614be94
refactor
alkatrivedi May 21, 2024
aebb3a1
refactor
alkatrivedi May 21, 2024
dbed5f3
refactor
alkatrivedi May 21, 2024
ac842a9
refactor: remove extra lines
alkatrivedi May 22, 2024
5a96547
feat: add excludeTransactionOption for runPrtitionedUpdate
alkatrivedi May 22, 2024
03e5be0
docs: for excludeTxnFromChangeStreams function
alkatrivedi May 22, 2024
1567c32
Merge branch 'googleapis:main' into change-stream
alkatrivedi May 22, 2024
9490926
refactor test
alkatrivedi May 22, 2024
7cbef54
refactor test
alkatrivedi May 23, 2024
cd8f711
refactor imports
alkatrivedi May 23, 2024
b6c5001
refactor: test file and _mutate method
alkatrivedi May 23, 2024
4a0d234
refactor: test file and _mutate method
alkatrivedi May 23, 2024
dc7cfaf
refactor: test file and _mutate method
alkatrivedi May 23, 2024
4c38b9c
refactor test
alkatrivedi May 23, 2024
0d1e239
refactor test
alkatrivedi May 23, 2024
ece6d16
fix: test errors
alkatrivedi May 23, 2024
31e2763
add pdml unit tests
alkatrivedi May 23, 2024
5bf2102
add test for mutation
alkatrivedi May 23, 2024
3c45d98
add test for mutation
alkatrivedi May 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@
updateMask?: FieldMask | null;
}

export interface RunPartitionedUpdateOptions extends ExecuteSqlRequest {
excludeTxnFromChangeStreams?: boolean;
}

export type UpdateSchemaCallback = ResourceCallback<
GaxOperation,
databaseAdmin.longrunning.IOperation
Expand Down Expand Up @@ -1527,7 +1531,7 @@
): void;
async getDatabaseDialect(
optionsOrCallback?: CallOptions | GetDatabaseDialectCallback,
cb?: GetDatabaseDialectCallback

Check warning on line 1534 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

'cb' is defined but never used
): Promise<
| EnumKey<typeof databaseAdmin.spanner.admin.database.v1.DatabaseDialect>
| undefined
Expand Down Expand Up @@ -2092,6 +2096,9 @@
if (options.optimisticLock) {
transaction!.useOptimisticLock();
}
if (options.excludeTxnFromChangeStreams) {
transaction!.excludeTxnFromChangeStreams();
}
if (!err) {
this._releaseOnEnd(session!, transaction!);
}
Expand Down Expand Up @@ -2711,13 +2718,15 @@
* @param {RunUpdateCallback} [callback] Callback function.
* @returns {Promise<RunUpdateResponse>}
*/
runPartitionedUpdate(query: string | ExecuteSqlRequest): Promise<[number]>;
runPartitionedUpdate(
query: string | ExecuteSqlRequest,
query: string | RunPartitionedUpdateOptions
): Promise<[number]>;
runPartitionedUpdate(
query: string | RunPartitionedUpdateOptions,
callback?: RunUpdateCallback
): void;
runPartitionedUpdate(
query: string | ExecuteSqlRequest,
query: string | RunPartitionedUpdateOptions,
callback?: RunUpdateCallback
): void | Promise<[number]> {
this.pool_.getSession((err, session) => {
Expand All @@ -2732,11 +2741,14 @@

_runPartitionedUpdate(
session: Session,
query: string | ExecuteSqlRequest,
query: string | RunPartitionedUpdateOptions,
callback?: RunUpdateCallback
): void | Promise<number> {
const transaction = session.partitionedDml();

if (typeof query !== 'string' && query.excludeTxnFromChangeStreams) {
transaction.excludeTxnFromChangeStreams();
}
transaction.begin(err => {
if (err) {
this.pool_.release(session!);
Expand Down Expand Up @@ -3059,6 +3071,9 @@
if (options.optimisticLock) {
transaction!.useOptimisticLock();
}
if (options.excludeTxnFromChangeStreams) {
transaction!.excludeTxnFromChangeStreams();
}

const release = this.pool_.release.bind(this.pool_, session!);
const runner = new TransactionRunner(
Expand Down Expand Up @@ -3173,6 +3188,9 @@
if (options.optimisticLock) {
transaction.useOptimisticLock();
}
if (options.excludeTxnFromChangeStreams) {
transaction.excludeTxnFromChangeStreams();
}
const runner = new AsyncTransactionRunner<T>(
session,
transaction,
Expand Down
29 changes: 21 additions & 8 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export type DropTableCallback = UpdateSchemaCallback;

interface MutateRowsOptions extends CommitOptions {
requestOptions?: Omit<IRequestOptions, 'requestTag'>;
excludeTxnFromChangeStreams?: boolean;
}

export type DeleteRowsCallback = CommitCallback;
Expand Down Expand Up @@ -1073,15 +1074,27 @@ class Table {
): void {
const requestOptions =
'requestOptions' in options ? options.requestOptions : {};
this.database.runTransaction({requestOptions}, (err, transaction) => {
if (err) {
callback(err);
return;
}

transaction![method](this.name, rows as Key[]);
transaction!.commit(options, callback);
});
const excludeTxnFromChangeStreams =
'excludeTxnFromChangeStreams' in options
? options.excludeTxnFromChangeStreams
: false;

this.database.runTransaction(
{
requestOptions: requestOptions,
excludeTxnFromChangeStreams: excludeTxnFromChangeStreams,
},
(err, transaction) => {
if (err) {
callback(err);
return;
}

transaction![method](this.name, rows as Key[]);
transaction!.commit(options, callback);
}
);
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/transaction-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export interface RunTransactionOptions {
timeout?: number;
requestOptions?: Pick<IRequestOptions, 'transactionTag'>;
optimisticLock?: boolean;
excludeTxnFromChangeStreams?: boolean;
}

/**
Expand Down Expand Up @@ -204,6 +205,9 @@ export abstract class Runner<T> {
if (this.options.optimisticLock) {
transaction.useOptimisticLock();
}
if (this.options.excludeTxnFromChangeStreams) {
transaction.excludeTxnFromChangeStreams();
}
if (this.attempts > 0) {
await transaction.begin();
}
Expand Down
23 changes: 23 additions & 0 deletions src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2470,6 +2470,18 @@ export class Transaction extends Dml {
useOptimisticLock(): void {
this._options.readWrite!.readLockMode = ReadLockMode.OPTIMISTIC;
}

/**
* Use option excludeTxnFromChangeStreams to exclude read/write transactions
* from being tracked in change streams.
*
* Enabling this options to true will effectively disable change stream tracking
* for a specified transaction, allowing read/write transaction to operate without being
* included in change streams.
*/
excludeTxnFromChangeStreams(): void {
this._options.excludeTxnFromChangeStreams = true;
}
}

/*! Developer Documentation
Expand Down Expand Up @@ -2503,6 +2515,17 @@ export class PartitionedDml extends Dml {
super(session);
this._options = {partitionedDml: options};
}
/**
* Use option excludeTxnFromChangeStreams to exclude partitionedDml
* queries from being tracked in change streams.
*
* Enabling this options to true will effectively disable change stream tracking
* for a specified partitionedDml query, allowing write queries to operate
* without being included in change streams.
*/
excludeTxnFromChangeStreams(): void {
this._options.excludeTxnFromChangeStreams = true;
}

/**
* Execute a DML statement and get the affected row count. Unlike
Expand Down
21 changes: 20 additions & 1 deletion test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2544,7 +2544,8 @@ describe('Database', () => {

const [query] = runUpdateStub.lastCall.args;

assert.strictEqual(query, QUERY);
assert.strictEqual(query.sql, QUERY.sql);
assert.deepStrictEqual(query.params, QUERY.params);
assert.ok(fakeCallback.calledOnce);
});

Expand Down Expand Up @@ -2581,6 +2582,24 @@ describe('Database', () => {
assert.ok(fakeCallback.calledOnce);
});

it('should accept excludeTxnFromChangeStreams', () => {
const fakeCallback = sandbox.spy();

database.runPartitionedUpdate(
{
excludeTxnFromChangeStream: true,
},
fakeCallback
);

const [query] = runUpdateStub.lastCall.args;

assert.deepStrictEqual(query, {
excludeTxnFromChangeStream: true,
});
assert.ok(fakeCallback.calledOnce);
});

it('should ignore directedReadOptions set for client', () => {
const fakeCallback = sandbox.spy();

Expand Down
Loading
Loading