Skip to content

Commit

Permalink
fix: idwaiter with multiple requests (#1910)
Browse files Browse the repository at this point in the history
Fixes #1908 🦕
  • Loading branch information
surbhigarg92 committed Aug 30, 2023
1 parent 19e0765 commit 83dd1f8
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 11 deletions.
37 changes: 26 additions & 11 deletions src/transaction.ts
Expand Up @@ -17,6 +17,7 @@
import {DateStruct, PreciseDate} from '@google-cloud/precise-date';
import {promisifyAll} from '@google-cloud/promisify';
import arrify = require('arrify');
import Long = require('long');
import {EventEmitter} from 'events';
import {grpc, CallOptions, ServiceError, Status, GoogleError} from 'google-gax';
import * as is from 'is';
Expand Down Expand Up @@ -101,7 +102,7 @@ export interface ReadRequest extends RequestOptions {
keys?: string[] | string[][];
ranges?: KeyRange[];
keySet?: spannerClient.spanner.v1.IKeySet | null;
limit?: number | Long | null;
limit?: number | Long | string | null;
resumeToken?: Uint8Array | null;
partitionToken?: Uint8Array | null;
requestOptions?: Omit<IRequestOptions, 'transactionTag'>;
Expand Down Expand Up @@ -223,7 +224,7 @@ export type CommitCallback =
export class Snapshot extends EventEmitter {
protected _options!: spannerClient.spanner.v1.ITransactionOptions;
protected _seqno = 1;
protected _idWaiter: Readable;
protected _waitingRequests: Array<() => void>;
protected _inlineBeginStarted;
protected _useInRunner = false;
id?: Uint8Array | string;
Expand Down Expand Up @@ -299,9 +300,7 @@ export class Snapshot extends EventEmitter {
this.resourceHeader_ = {
[CLOUD_RESOURCE_HEADER]: (this.session.parent as Database).formattedName_,
};
this._idWaiter = new Readable({
read() {},
});
this._waitingRequests = [];
this._inlineBeginStarted = false;
}

Expand Down Expand Up @@ -1306,7 +1305,7 @@ export class Snapshot extends EventEmitter {
this.readTimestampProto = readTimestamp;
this.readTimestamp = new PreciseDate(readTimestamp as DateStruct);
}
this._idWaiter.emit('notify');
this._releaseWaitingRequests();
}

/**
Expand All @@ -1326,12 +1325,28 @@ export class Snapshot extends EventEmitter {
this._inlineBeginStarted = true;
return makeRequest;
}
return (resumeToken?: ResumeToken): Readable =>
this._idWaiter.once('notify', () =>

// Queue subsequent requests.
return (resumeToken?: ResumeToken): Readable => {
const streamProxy = new Readable({
read() {},
});

this._waitingRequests.push(() => {
makeRequest(resumeToken)
.on('data', chunk => this._idWaiter.emit('data', chunk))
.once('end', () => this._idWaiter.emit('end'))
);
.on('data', chunk => streamProxy.emit('data', chunk))
.on('end', () => streamProxy.emit('end'));
});

return streamProxy;
};
}

_releaseWaitingRequests() {
while (this._waitingRequests.length > 0) {
const request = this._waitingRequests.shift();
request?.();
}
}

/**
Expand Down
34 changes: 34 additions & 0 deletions test/spanner.ts
Expand Up @@ -3296,6 +3296,40 @@ describe('Spanner with mock server', () => {
assert.ok(!beginTxnRequest, 'beginTransaction was called');
});

it('should handle parallel request with inline begin transaction', async () => {
const database = newTestDatabase();
await database.runTransactionAsync(async tx => {
const rowCount1 = getRowCountFromStreamingSql(tx!, {sql: selectSql});
const rowCount2 = getRowCountFromStreamingSql(tx!, {sql: selectSql});
const rowCount3 = getRowCountFromStreamingSql(tx!, {sql: selectSql});
await Promise.all([rowCount1, rowCount2, rowCount3]);
await tx.commit();
});
await database.close();

let request = spannerMock.getRequests().find(val => {
return (val as v1.ExecuteSqlRequest).sql;
}) as v1.ExecuteSqlRequest;
assert.ok(request, 'no ExecuteSqlRequest found');
assert.ok(request.transaction!.begin!.readWrite, 'ReadWrite is not set');
assert.strictEqual(request.sql, selectSql);

request = spannerMock
.getRequests()
.slice()
.reverse()
.find(val => {
return (val as v1.ExecuteSqlRequest).sql;
}) as v1.ExecuteSqlRequest;
assert.ok(request, 'no ExecuteSqlRequest found');
assert.strictEqual(request.sql, selectSql);
assert.ok(request.transaction!.id, 'TransactionID is not set.');
const beginTxnRequest = spannerMock.getRequests().find(val => {
return (val as v1.BeginTransactionRequest).options?.readWrite;
}) as v1.BeginTransactionRequest;
assert.ok(!beginTxnRequest, 'beginTransaction was called');
});

it('should use beginTransaction on retry', async () => {
const database = newTestDatabase();
let attempts = 0;
Expand Down

0 comments on commit 83dd1f8

Please sign in to comment.