From 0703f4160c4a0b4c9f9f716174daca110ab8e50f Mon Sep 17 00:00:00 2001 From: surbhigarg92 Date: Wed, 26 Apr 2023 07:45:59 +0000 Subject: [PATCH] feat: Leader aware routing (#1783) * feat: leader aware routing * routetoleaderenabled property and test cases * feat: Added unit test cases * test: for LAR disabled * fix: review comments * fix: review comments * fix: review comments * fix : test cases * fix: lint * feat: Default disable LAR feature --------- Co-authored-by: Astha Mohta <35952883+asthamohta@users.noreply.github.com> --- src/batch-transaction.ts | 19 ++++++- src/common.ts | 13 +++++ src/database.ts | 28 ++++++++-- src/index.ts | 12 +++++ src/session.ts | 25 ++++++++- src/transaction.ts | 73 +++++++++++++++++++++++--- test/batch-transaction.ts | 33 ++++++++++-- test/database.ts | 28 ++++++++-- test/index.ts | 5 ++ test/session.ts | 43 ++++++++++++++- test/spanner.ts | 71 ++++++++++++++++++++++++- test/transaction.ts | 108 ++++++++++++++++++++++++++++++++++---- 12 files changed, 423 insertions(+), 35 deletions(-) diff --git a/src/batch-transaction.ts b/src/batch-transaction.ts index c6c7b6702..c74432088 100644 --- a/src/batch-transaction.ts +++ b/src/batch-transaction.ts @@ -20,8 +20,11 @@ import * as extend from 'extend'; import * as is from 'is'; import {Snapshot} from './transaction'; import {google} from '../protos/protos'; -import {Session, Database} from '.'; -import {CLOUD_RESOURCE_HEADER} from '../src/common'; +import {Session, Database, Spanner} from '.'; +import { + CLOUD_RESOURCE_HEADER, + addLeaderAwareRoutingHeader, +} from '../src/common'; export interface TransactionIdentifier { session: string | Session; @@ -133,12 +136,18 @@ class BatchTransaction extends Snapshot { delete reqOpts.gaxOptions; delete reqOpts.types; + const headers: {[k: string]: string} = {}; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } + this.createPartitions_( { client: 'SpannerClient', method: 'partitionQuery', reqOpts, gaxOpts: query.gaxOptions, + headers: headers, }, callback ); @@ -225,12 +234,18 @@ class BatchTransaction extends Snapshot { delete reqOpts.keys; delete reqOpts.ranges; + const headers: {[k: string]: string} = {}; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } + this.createPartitions_( { client: 'SpannerClient', method: 'partitionRead', reqOpts, gaxOpts: options.gaxOptions, + headers: headers, }, callback ); diff --git a/src/common.ts b/src/common.ts index ddc79d4fd..634df1210 100644 --- a/src/common.ts +++ b/src/common.ts @@ -75,3 +75,16 @@ export interface PagedOptionsWithFilter extends PagedOptions { * by the backend. */ export const CLOUD_RESOURCE_HEADER = 'google-cloud-resource-prefix'; + +/*! + * HTTP header to route the requests at Leader + */ +export const LEADER_AWARE_ROUTING_HEADER = 'x-goog-spanner-route-to-leader'; + +/** + * Add Leader aware routing header to existing header list. + * @param headers Existing header list. + */ +export function addLeaderAwareRoutingHeader(headers: {[k: string]: string}) { + headers[LEADER_AWARE_ROUTING_HEADER] = 'true'; +} diff --git a/src/database.ts b/src/database.ts index 97a3e79dc..4beba3792 100644 --- a/src/database.ts +++ b/src/database.ts @@ -85,10 +85,11 @@ import { RequestCallback, ResourceCallback, Schema, + addLeaderAwareRoutingHeader, } from './common'; import {Duplex, Readable, Transform} from 'stream'; import {PreciseDate} from '@google-cloud/precise-date'; -import {EnumKey, RequestConfig, TranslateEnumKeys} from '.'; +import {EnumKey, RequestConfig, TranslateEnumKeys, Spanner} from '.'; import arrify = require('arrify'); import {ServiceError} from 'google-gax'; import IPolicy = google.iam.v1.IPolicy; @@ -523,13 +524,18 @@ class Database extends common.GrpcServiceObject { sessionCount: count, }; + const headers = this.resourceHeader_; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } + this.request( { client: 'SpannerClient', method: 'batchCreateSessions', reqOpts, gaxOpts: options.gaxOptions, - headers: this.resourceHeader_, + headers: headers, }, (err, resp) => { if (err) { @@ -791,13 +797,18 @@ class Database extends common.GrpcServiceObject { reqOpts.session.creatorRole = options.databaseRole || this.databaseRole || null; + const headers = this.resourceHeader_; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } + this.request( { client: 'SpannerClient', method: 'createSession', reqOpts, gaxOpts: options.gaxOptions, - headers: this.resourceHeader_, + headers: headers, }, (err, resp) => { if (err) { @@ -3261,6 +3272,17 @@ class Database extends common.GrpcServiceObject { const databaseName = name.split('/').pop(); return instanceName + '/databases/' + databaseName; } + + /** + * Gets the Spanner object + * + * @private + * + * @returns {Spanner} + */ + private _getSpanner(): Spanner { + return this.instance.parent as Spanner; + } } /*! Developer Documentation diff --git a/src/index.ts b/src/index.ts index e6501d933..fef4c7642 100644 --- a/src/index.ts +++ b/src/index.ts @@ -106,11 +106,17 @@ export type GetInstanceConfigOperationsCallback = PagedCallback< instanceAdmin.spanner.admin.instance.v1.IListInstanceConfigOperationsResponse >; +/** + * Session pool configuration options. + * @property {boolean} [routeToLeaderEnabled=True] If set to false leader aware routing will be disabled. + * Disabling leader aware routing would route all requests in RW/PDML transactions to any region. + */ export interface SpannerOptions extends GrpcClientOptions { apiEndpoint?: string; servicePath?: string; port?: number; sslCreds?: grpc.ChannelCredentials; + routeToLeaderEnabled?: boolean; } export interface RequestConfig { client: string; @@ -210,6 +216,7 @@ class Spanner extends GrpcService { projectIdReplaced_: boolean; projectFormattedName_: string; resourceHeader_: {[k: string]: string}; + routeToLeaderEnabled = false; /** * Placeholder used to auto populate a column with the commit timestamp. @@ -310,6 +317,11 @@ class Spanner extends GrpcService { packageJson: require('../../package.json'), } as {} as GrpcServiceConfig; super(config, options); + + if (options.routeToLeaderEnabled === true) { + this.routeToLeaderEnabled = true; + } + this.options = options; this.auth = new GoogleAuth(this.options); this.clients_ = new Map(); diff --git a/src/session.ts b/src/session.ts index 849e93914..256d8af14 100644 --- a/src/session.ts +++ b/src/session.ts @@ -36,9 +36,14 @@ import { CreateSessionOptions, } from './database'; import {ServiceObjectConfig} from '@google-cloud/common'; -import {NormalCallback, CLOUD_RESOURCE_HEADER} from './common'; +import { + NormalCallback, + CLOUD_RESOURCE_HEADER, + addLeaderAwareRoutingHeader, +} from './common'; import {grpc, CallOptions} from 'google-gax'; import IRequestOptions = google.spanner.v1.IRequestOptions; +import {Spanner} from '.'; export type GetSessionResponse = [Session, r.Response]; @@ -378,13 +383,18 @@ export class Session extends common.GrpcServiceObject { const reqOpts = { name: this.formattedName_, }; + + const headers = this.resourceHeader_; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } return this.request( { client: 'SpannerClient', method: 'getSession', reqOpts, gaxOpts, - headers: this.resourceHeader_, + headers: headers, }, (err, resp) => { if (resp) { @@ -512,6 +522,17 @@ export class Session extends common.GrpcServiceObject { const sessionName = name.split('/').pop(); return databaseName + '/sessions/' + sessionName; } + + /** + * Gets the Spanner object + * + * @private + * + * @returns {Spanner} + */ + private _getSpanner(): Spanner { + return this.parent.parent.parent as Spanner; + } } /*! Developer Documentation diff --git a/src/transaction.ts b/src/transaction.ts index 2d771b38c..d811161c2 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -33,12 +33,16 @@ import { import {Session} from './session'; import {Key} from './table'; import {google as spannerClient} from '../protos/protos'; -import {NormalCallback, CLOUD_RESOURCE_HEADER} from './common'; +import { + NormalCallback, + CLOUD_RESOURCE_HEADER, + addLeaderAwareRoutingHeader, +} from './common'; import {google} from '../protos/protos'; import IAny = google.protobuf.IAny; import IQueryOptions = google.spanner.v1.ExecuteSqlRequest.IQueryOptions; import IRequestOptions = google.spanner.v1.IRequestOptions; -import {Database} from '.'; +import {Database, Spanner} from '.'; import ReadLockMode = google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode; export type Rows = Array; @@ -370,13 +374,22 @@ export class Snapshot extends EventEmitter { reqOpts.requestOptions = this.requestOptions; } + const headers = this.resourceHeader_; + if ( + this._getSpanner().routeToLeaderEnabled && + (this._options.readWrite !== undefined || + this._options.partitionedDml !== undefined) + ) { + addLeaderAwareRoutingHeader(headers); + } + this.request( { client: 'SpannerClient', method: 'beginTransaction', reqOpts, gaxOpts, - headers: this.resourceHeader_, + headers: headers, }, ( err: null | grpc.ServiceError, @@ -602,6 +615,15 @@ export class Snapshot extends EventEmitter { } ); + const headers = this.resourceHeader_; + if ( + this._getSpanner().routeToLeaderEnabled && + (this._options.readWrite !== undefined || + this._options.partitionedDml !== undefined) + ) { + addLeaderAwareRoutingHeader(headers); + } + const makeRequest = (resumeToken?: ResumeToken): Readable => { if (this.id && transaction.begin) { delete transaction.begin; @@ -612,7 +634,7 @@ export class Snapshot extends EventEmitter { method: 'streamingRead', reqOpts: Object.assign({}, reqOpts, {resumeToken}), gaxOpts: gaxOptions, - headers: this.resourceHeader_, + headers: headers, }); }; @@ -1077,6 +1099,15 @@ export class Snapshot extends EventEmitter { }); }; + const headers = this.resourceHeader_; + if ( + this._getSpanner().routeToLeaderEnabled && + (this._options.readWrite !== undefined || + this._options.partitionedDml !== undefined) + ) { + addLeaderAwareRoutingHeader(headers); + } + const makeRequest = (resumeToken?: ResumeToken): Readable => { if (!reqOpts || (this.id && !reqOpts.transaction.id)) { try { @@ -1093,7 +1124,7 @@ export class Snapshot extends EventEmitter { method: 'executeStreamingSql', reqOpts: Object.assign({}, reqOpts, {resumeToken}), gaxOpts: gaxOptions, - headers: this.resourceHeader_, + headers: headers, }); }; @@ -1300,6 +1331,17 @@ export class Snapshot extends EventEmitter { .once('end', () => this._idWaiter.emit('end')) ); } + + /** + * Gets the Spanner object + * + * @private + * + * @returns {Spanner} + */ + protected _getSpanner(): Spanner { + return this.session.parent.parent.parent as Spanner; + } } /*! Developer Documentation @@ -1620,13 +1662,18 @@ export class Transaction extends Dml { statements, } as spannerClient.spanner.v1.ExecuteBatchDmlRequest; + const headers = this.resourceHeader_; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } + this.request( { client: 'SpannerClient', method: 'executeBatchDml', reqOpts, gaxOpts, - headers: this.resourceHeader_, + headers: headers, }, ( err: null | grpc.ServiceError, @@ -1789,13 +1836,18 @@ export class Transaction extends Dml { this.requestOptions ); + const headers = this.resourceHeader_; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } + this.request( { client: 'SpannerClient', method: 'commit', reqOpts, gaxOpts: gaxOpts, - headers: this.resourceHeader_, + headers: headers, }, (err: null | Error, resp: spannerClient.spanner.v1.ICommitResponse) => { this.end(); @@ -2124,13 +2176,18 @@ export class Transaction extends Dml { transactionId, }; + const headers = this.resourceHeader_; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } + this.request( { client: 'SpannerClient', method: 'rollback', reqOpts, gaxOpts, - headers: this.resourceHeader_, + headers: headers, }, (err: null | ServiceError) => { this.end(); diff --git a/test/batch-transaction.ts b/test/batch-transaction.ts index f9d02272d..33e3dd403 100644 --- a/test/batch-transaction.ts +++ b/test/batch-transaction.ts @@ -24,10 +24,13 @@ import * as extend from 'extend'; import * as proxyquire from 'proxyquire'; import * as sinon from 'sinon'; -import {Session, Database} from '../src'; +import {Session, Database, Spanner} from '../src'; import * as bt from '../src/batch-transaction'; import {PartialResultStream} from '../src/partial-result-stream'; -import {CLOUD_RESOURCE_HEADER} from '../src/common'; +import { + CLOUD_RESOURCE_HEADER, + LEADER_AWARE_ROUTING_HEADER, +} from '../src/common'; let promisified = false; const fakePfy = extend({}, pfy, { @@ -57,8 +60,17 @@ const fakeCodec: any = { convertProtoTimestampToDate() {}, }; +const SPANNER = { + routeToLeaderEnabled: true, +}; + +const INSTANCE = { + parent: SPANNER, +}; + const DATABASE = { formattedName_: 'database', + parent: INSTANCE, }; class FakeTransaction { @@ -74,6 +86,11 @@ class FakeTransaction { static encodeParams(): object { return {}; } + + _getSpanner(): Spanner { + return SPANNER as Spanner; + } + run() {} read() {} } @@ -149,11 +166,15 @@ describe('BatchTransaction', () => { batchTransaction.createQueryPartitions(QUERY, assert.ifError); - const {client, method, reqOpts, gaxOpts} = stub.lastCall.args[0]; + const {client, method, reqOpts, gaxOpts, headers} = stub.lastCall.args[0]; assert.strictEqual(client, 'SpannerClient'); assert.strictEqual(method, 'partitionQuery'); assert.deepStrictEqual(reqOpts, expectedQuery); assert.strictEqual(gaxOpts, GAX_OPTS); + assert.deepStrictEqual( + headers, + Object.assign({[LEADER_AWARE_ROUTING_HEADER]: 'true'}) + ); }); it('should accept query as string', () => { @@ -300,11 +321,15 @@ describe('BatchTransaction', () => { batchTransaction.createReadPartitions(QUERY, assert.ifError); - const {client, method, reqOpts, gaxOpts} = stub.lastCall.args[0]; + const {client, method, reqOpts, gaxOpts, headers} = stub.lastCall.args[0]; assert.strictEqual(client, 'SpannerClient'); assert.strictEqual(method, 'partitionRead'); assert.deepStrictEqual(reqOpts, expectedQuery); assert.strictEqual(gaxOpts, GAX_OPTS); + assert.deepStrictEqual( + headers, + Object.assign({[LEADER_AWARE_ROUTING_HEADER]: 'true'}) + ); }); }); diff --git a/test/database.ts b/test/database.ts index 41ed7e12c..2065e2f68 100644 --- a/test/database.ts +++ b/test/database.ts @@ -28,10 +28,13 @@ import * as through from 'through2'; import * as pfy from '@google-cloud/promisify'; import {grpc} from 'google-gax'; import * as db from '../src/database'; -import {Instance} from '../src'; +import {Spanner, Instance} from '../src'; import {MockError} from './mockserver/mockspanner'; import {IOperation} from '../src/instance'; -import {CLOUD_RESOURCE_HEADER} from '../src/common'; +import { + CLOUD_RESOURCE_HEADER, + LEADER_AWARE_ROUTING_HEADER, +} from '../src/common'; import {google} from '../protos/protos'; import RequestOptions = google.spanner.v1.RequestOptions; import EncryptionType = google.spanner.admin.database.v1.RestoreDatabaseEncryptionConfig.EncryptionType; @@ -176,11 +179,16 @@ describe('Database', () => { // tslint:disable-next-line variable-name let DatabaseCached: typeof db.Database; + const SPANNER = { + routeToLeaderEnabled: true, + } as {} as Spanner; + const INSTANCE = { request: util.noop, requestStream: util.noop, formattedName_: 'instance-name', databases_: new Map(), + parent: SPANNER, } as {} as Instance; const NAME = 'table-name'; @@ -355,7 +363,13 @@ describe('Database', () => { assert.strictEqual(reqOpts.database, DATABASE_FORMATTED_NAME); assert.strictEqual(reqOpts.sessionCount, count); assert.strictEqual(gaxOpts, undefined); - assert.deepStrictEqual(headers, database.resourceHeader_); + assert.deepStrictEqual( + headers, + Object.assign( + {[LEADER_AWARE_ROUTING_HEADER]: true}, + database.resourceHeader_ + ) + ); }); it('should accept just a count number', () => { @@ -1721,7 +1735,13 @@ describe('Database', () => { }, }); assert.strictEqual(config.gaxOpts, gaxOptions); - assert.deepStrictEqual(config.headers, database.resourceHeader_); + assert.deepStrictEqual( + config.headers, + Object.assign( + {[LEADER_AWARE_ROUTING_HEADER]: true}, + database.resourceHeader_ + ) + ); done(); }; diff --git a/test/index.ts b/test/index.ts index 6910454ad..a37b92404 100644 --- a/test/index.ts +++ b/test/index.ts @@ -276,6 +276,11 @@ describe('Spanner', () => { assert.strictEqual(config.baseUrl, SERVICE_PATH); }); + it('should optionally accept routeToLeaderEnabled', () => { + const spanner = new Spanner({routeToLeaderEnabled: false}); + assert.strictEqual(spanner.routeToLeaderEnabled, false); + }); + it('should set projectFormattedName_', () => { assert.strictEqual( spanner.projectFormattedName_, diff --git a/test/session.ts b/test/session.ts index 03c9e45eb..9570bcc27 100644 --- a/test/session.ts +++ b/test/session.ts @@ -21,8 +21,11 @@ import * as assert from 'assert'; import {before, beforeEach, describe, it} from 'mocha'; import * as extend from 'extend'; import * as proxyquire from 'proxyquire'; -import {CLOUD_RESOURCE_HEADER} from '../src/common'; -import {Database} from '../src'; +import { + CLOUD_RESOURCE_HEADER, + LEADER_AWARE_ROUTING_HEADER, +} from '../src/common'; +import {Database, Instance, Spanner} from '../src'; let promisified = false; const fakePfy = extend({}, pfy, { @@ -67,10 +70,19 @@ describe('Session', () => { // eslint-disable-next-line @typescript-eslint/no-explicit-any let session: any; + const SPANNER = { + routeToLeaderEnabled: true, + } as {} as Spanner; + + const INSTANCE = { + parent: SPANNER, + } as {} as Instance; + // eslint-disable-next-line @typescript-eslint/no-explicit-any const DATABASE: any = { request: () => {}, formattedName_: 'formatted-database-name', + parent: INSTANCE, }; const NAME = 'session-name'; @@ -278,6 +290,33 @@ describe('Session', () => { function callback() {} + session.request = config => { + assert.strictEqual(config.client, 'SpannerClient'); + assert.strictEqual(config.method, 'getSession'); + assert.deepStrictEqual(config.reqOpts, { + name: session.formattedName_, + }); + assert.deepStrictEqual(config.gaxOpts, {}); + assert.deepStrictEqual( + config.headers, + Object.assign( + {[LEADER_AWARE_ROUTING_HEADER]: true}, + session.resourceHeader_ + ) + ); + return requestReturnValue; + }; + + const returnValue = session.getMetadata(callback); + assert.strictEqual(returnValue, requestReturnValue); + }); + + it('should correctly call and return the request with Leader Aware Routing disabled.', () => { + const requestReturnValue = {}; + + function callback() {} + + session.parent.parent.parent.routeToLeaderEnabled = false; session.request = config => { assert.strictEqual(config.client, 'SpannerClient'); assert.strictEqual(config.method, 'getSession'); diff --git a/test/spanner.ts b/test/spanner.ts index 8b24a2468..322566389 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -50,7 +50,10 @@ import {Float, Int, Json, Numeric, SpannerDate} from '../src/codec'; import * as stream from 'stream'; import * as util from 'util'; import {PreciseDate} from '@google-cloud/precise-date'; -import {CLOUD_RESOURCE_HEADER} from '../src/common'; +import { + CLOUD_RESOURCE_HEADER, + LEADER_AWARE_ROUTING_HEADER, +} from '../src/common'; import CreateInstanceMetadata = google.spanner.admin.instance.v1.CreateInstanceMetadata; import QueryOptions = google.spanner.v1.ExecuteSqlRequest.QueryOptions; import v1 = google.spanner.v1; @@ -1486,6 +1489,72 @@ describe('Spanner with mock server', () => { }); }); }); + + describe('LeaderAwareRouting', () => { + let spannerWithLAREnabled: Spanner; + let instanceWithLAREnabled: Instance; + + function newTestDatabaseWithLAREnabled( + options?: SessionPoolOptions, + queryOptions?: IQueryOptions + ): Database { + return instanceWithLAREnabled.database( + `database-${dbCounter++}`, + options, + queryOptions + ); + } + + before(() => { + spannerWithLAREnabled = new Spanner({ + servicePath: 'localhost', + port, + sslCreds: grpc.credentials.createInsecure(), + routeToLeaderEnabled: true, + }); + // Gets a reference to a Cloud Spanner instance and database + instanceWithLAREnabled = spannerWithLAREnabled.instance('instance'); + }); + + it('should execute with leader aware routing enabled in a read/write transaction', async () => { + const database = newTestDatabaseWithLAREnabled(); + await database.runTransactionAsync(async tx => { + await tx!.runUpdate({ + sql: insertSql, + }); + return await tx.commit(); + }); + await database.close(); + let metadataCountWithLAREnabled = 0; + spannerMock.getMetadata().forEach(metadata => { + if (metadata.get(LEADER_AWARE_ROUTING_HEADER)[0] !== undefined) { + metadataCountWithLAREnabled++; + assert.strictEqual( + metadata.get(LEADER_AWARE_ROUTING_HEADER)[0], + 'true' + ); + } + }); + assert.notStrictEqual(metadataCountWithLAREnabled, 0); + }); + + it('should execute with leader aware routing disabled in a read/write transaction', async () => { + const database = newTestDatabase(); + await database.runTransactionAsync(async tx => { + await tx!.runUpdate({ + sql: insertSql, + }); + return await tx.commit(); + }); + await database.close(); + spannerMock.getMetadata().forEach(metadata => { + assert.strictEqual( + metadata.get(LEADER_AWARE_ROUTING_HEADER)[0], + undefined + ); + }); + }); + }); }); describe('queryOptions', () => { diff --git a/test/transaction.ts b/test/transaction.ts index 0dbefaf46..45cd9dbf6 100644 --- a/test/transaction.ts +++ b/test/transaction.ts @@ -24,7 +24,10 @@ import * as sinon from 'sinon'; import {codec} from '../src/codec'; import {google} from '../protos/protos'; -import {CLOUD_RESOURCE_HEADER} from '../src/common'; +import { + CLOUD_RESOURCE_HEADER, + LEADER_AWARE_ROUTING_HEADER, +} from '../src/common'; import RequestOptions = google.spanner.v1.RequestOptions; import { BatchUpdateOptions, @@ -36,13 +39,25 @@ import {grpc} from 'google-gax'; describe('Transaction', () => { const sandbox = sinon.createSandbox(); - const PARENT = {formattedName_: 'formatted-database-name'}; const REQUEST = sandbox.stub(); const REQUEST_STREAM = sandbox.stub(); const SESSION_NAME = 'session-123'; + const SPANNER = { + routeToLeaderEnabled: true, + }; + + const INSTANCE = { + parent: SPANNER, + }; + + const DATABASE = { + formattedName_: 'formatted-database-name', + parent: INSTANCE, + }; + const SESSION = { - parent: PARENT, + parent: DATABASE, formattedName_: SESSION_NAME, request: REQUEST, requestStream: REQUEST_STREAM, @@ -1209,7 +1224,13 @@ describe('Transaction', () => { assert.strictEqual(reqOpts.session, SESSION_NAME); assert.deepStrictEqual(reqOpts.transaction, {id: fakeId}); assert.strictEqual(reqOpts.seqno, 1); - assert.deepStrictEqual(headers, transaction.resourceHeader_); + assert.deepStrictEqual( + headers, + Object.assign( + {[LEADER_AWARE_ROUTING_HEADER]: true}, + transaction.resourceHeader_ + ) + ); }); it('should encode sql string statements', () => { @@ -1339,7 +1360,13 @@ describe('Transaction', () => { assert.strictEqual(client, 'SpannerClient'); assert.strictEqual(method, 'beginTransaction'); assert.deepStrictEqual(reqOpts.options, expectedOptions); - assert.deepStrictEqual(headers, transaction.resourceHeader_); + assert.deepStrictEqual( + headers, + Object.assign( + {[LEADER_AWARE_ROUTING_HEADER]: true}, + transaction.resourceHeader_ + ) + ); }); it('should accept gaxOptions', done => { @@ -1380,7 +1407,13 @@ describe('Transaction', () => { assert.strictEqual(client, 'SpannerClient'); assert.strictEqual(method, 'beginTransaction'); assert.deepStrictEqual(reqOpts.options, expectedOptions); - assert.deepStrictEqual(headers, transaction.resourceHeader_); + assert.deepStrictEqual( + headers, + Object.assign( + {[LEADER_AWARE_ROUTING_HEADER]: true}, + transaction.resourceHeader_ + ) + ); }); }); @@ -1398,7 +1431,13 @@ describe('Transaction', () => { assert.strictEqual(method, 'commit'); assert.strictEqual(reqOpts.session, SESSION_NAME); assert.deepStrictEqual(reqOpts.mutations, []); - assert.deepStrictEqual(headers, transaction.resourceHeader_); + assert.deepStrictEqual( + headers, + Object.assign( + {[LEADER_AWARE_ROUTING_HEADER]: true}, + transaction.resourceHeader_ + ) + ); }); it('should accept gaxOptions as CallOptions', done => { @@ -1767,7 +1806,13 @@ describe('Transaction', () => { assert.strictEqual(client, 'SpannerClient'); assert.strictEqual(method, 'rollback'); assert.deepStrictEqual(reqOpts, expectedReqOpts); - assert.deepStrictEqual(headers, transaction.resourceHeader_); + assert.deepStrictEqual( + headers, + Object.assign( + {[LEADER_AWARE_ROUTING_HEADER]: true}, + transaction.resourceHeader_ + ) + ); }); it('should accept gaxOptions', done => { @@ -1927,6 +1972,27 @@ describe('Transaction', () => { PARTIAL_RESULT_STREAM.callsFake(makeRequest => makeRequest()); }); + it('should send the correct options', done => { + const QUERY: ExecuteSqlRequest = { + sql: 'SELET * FROM `MyTable`', + }; + + transaction.requestStream = config => { + assert.strictEqual(config.client, 'SpannerClient'); + assert.strictEqual(config.method, 'executeStreamingSql'); + assert.deepStrictEqual( + config.headers, + Object.assign( + {[LEADER_AWARE_ROUTING_HEADER]: true}, + transaction.resourceHeader_ + ) + ); + done(); + }; + + transaction.runStream(QUERY); + }); + it('should set transaction tag when not `singleUse`', done => { const QUERY: ExecuteSqlRequest = { sql: 'SELET * FROM `MyTable`', @@ -1954,6 +2020,23 @@ describe('Transaction', () => { PARTIAL_RESULT_STREAM.callsFake(makeRequest => makeRequest()); }); + it('should send the correct options', () => { + const TABLE = 'my-table-123'; + transaction.createReadStream(TABLE); + + const {client, method, headers} = REQUEST_STREAM.lastCall.args[0]; + + assert.strictEqual(client, 'SpannerClient'); + assert.strictEqual(method, 'streamingRead'); + assert.deepStrictEqual( + headers, + Object.assign( + {[LEADER_AWARE_ROUTING_HEADER]: true}, + transaction.resourceHeader_ + ) + ); + }); + it('should set transaction tag if not `singleUse`', () => { const TABLE = 'my-table-123'; const transactionTag = 'bar'; @@ -2011,9 +2094,16 @@ describe('Transaction', () => { pdml.begin(); const expectedOptions = {partitionedDml: {}}; - const {reqOpts} = stub.lastCall.args[0]; + const {reqOpts, headers} = stub.lastCall.args[0]; assert.deepStrictEqual(reqOpts.options, expectedOptions); + assert.deepStrictEqual( + headers, + Object.assign( + {[LEADER_AWARE_ROUTING_HEADER]: true}, + pdml.resourceHeader_ + ) + ); }); });