From d85b0e93bcb4a5806490ee66a4443afd4a0a519a Mon Sep 17 00:00:00 2001 From: Sebastian Schmidt Date: Mon, 12 Sep 2022 14:49:39 -0600 Subject: [PATCH] feat: Use REST (#1698) To use REST transport when possible, pass `{preferRest: true}` to the constructor: ```ts const db = new firestore.Firestore({preferRest: true}); ``` --- dev/src/bulk-writer.ts | 9 +-- dev/src/collection-group.ts | 1 + dev/src/document-reader.ts | 1 + dev/src/index.ts | 118 ++++++++++++++++++++++-------- dev/src/pool.ts | 66 +++++++++++------ dev/src/recursive-delete.ts | 4 +- dev/src/reference.ts | 1 + dev/src/util.ts | 10 +-- dev/src/watch.ts | 7 +- dev/src/write-batch.ts | 4 +- dev/system-test/firestore.ts | 60 ++++++++-------- dev/test/index.ts | 4 +- dev/test/pool.ts | 136 ++++++++++++++++++++++++++--------- dev/test/util/helpers.ts | 11 +-- package.json | 6 +- types/firestore.d.ts | 10 +++ 16 files changed, 314 insertions(+), 134 deletions(-) diff --git a/dev/src/bulk-writer.ts b/dev/src/bulk-writer.ts index f16166b6c..1a7270bef 100644 --- a/dev/src/bulk-writer.ts +++ b/dev/src/bulk-writer.ts @@ -16,7 +16,7 @@ import * as firestore from '@google-cloud/firestore'; import * as assert from 'assert'; -import {GoogleError} from 'google-gax'; +import type {GoogleError} from 'google-gax'; import {google} from '../protos/firestore_v1_proto_api'; import {FieldPath, Firestore} from '.'; @@ -285,9 +285,10 @@ class BulkCommitBatch extends WriteBatch { ); this.pendingOps[i].onSuccess(new WriteResult(updateTime)); } else { - const error = new (require('google-gax').GoogleError)( - status.message || undefined - ); + const error = + new (require('google-gax/build/src/fallback').GoogleError)( + status.message || undefined + ); error.code = status.code as number; this.pendingOps[i].onError(wrapError(error, stack)); } diff --git a/dev/src/collection-group.ts b/dev/src/collection-group.ts index 2e9c10cd7..80df4420d 100644 --- a/dev/src/collection-group.ts +++ b/dev/src/collection-group.ts @@ -96,6 +96,7 @@ export class CollectionGroup const stream = await this.firestore.requestStream( 'partitionQueryStream', + /* bidirectional= */ false, request, tag ); diff --git a/dev/src/document-reader.ts b/dev/src/document-reader.ts index 5cdf87a3c..e8d54214c 100644 --- a/dev/src/document-reader.ts +++ b/dev/src/document-reader.ts @@ -112,6 +112,7 @@ export class DocumentReader { try { const stream = await this.firestore.requestStream( 'batchGetDocuments', + /* bidirectional= */ false, request, requestTag ); diff --git a/dev/src/index.ts b/dev/src/index.ts index a70de3a84..9ff8254e9 100644 --- a/dev/src/index.ts +++ b/dev/src/index.ts @@ -16,7 +16,9 @@ import * as firestore from '@google-cloud/firestore'; -import {CallOptions} from 'google-gax'; +import type {CallOptions} from 'google-gax'; +import type * as googleGax from 'google-gax'; +import type * as googleGaxFallback from 'google-gax/build/src/fallback'; import {Duplex, PassThrough, Transform} from 'stream'; import {URL} from 'url'; @@ -393,6 +395,16 @@ export class Firestore implements firestore.Firestore { */ private _clientPool: ClientPool; + /** + * Preloaded instance of google-gax (full module, with gRPC support). + */ + private _gax?: typeof googleGax; + + /** + * Preloaded instance of google-gax HTTP fallback implementation (no gRPC). + */ + private _gaxFallback?: typeof googleGaxFallback; + /** * The configuration options for the GAPIC client. * @private @@ -534,19 +546,48 @@ export class Firestore implements firestore.Firestore { this._clientPool = new ClientPool( MAX_CONCURRENT_REQUESTS_PER_CLIENT, maxIdleChannels, - /* clientFactory= */ () => { + /* clientFactory= */ (requiresGrpc: boolean) => { let client: GapicClient; + // Use the rest fallback if enabled and if the method does not require GRPC + const useFallback = + !this._settings.preferRest || requiresGrpc ? false : 'rest'; + + let gax: typeof googleGax | typeof googleGaxFallback; + if (useFallback) { + if (!this._gaxFallback) { + gax = this._gaxFallback = require('google-gax/build/src/fallback'); + } else { + gax = this._gaxFallback; + } + } else { + if (!this._gax) { + gax = this._gax = require('google-gax'); + } else { + gax = this._gax; + } + } + if (this._settings.ssl === false) { const grpcModule = this._settings.grpc ?? require('google-gax').grpc; const sslCreds = grpcModule.credentials.createInsecure(); - client = new module.exports.v1({ - sslCreds, - ...this._settings, - }); + client = new module.exports.v1( + { + sslCreds, + ...this._settings, + fallback: useFallback, + }, + gax + ); } else { - client = new module.exports.v1(this._settings); + client = new module.exports.v1( + { + ...this._settings, + fallback: useFallback, + }, + gax + ); } logger('Firestore', null, 'Initialized Firestore GAPIC Client'); @@ -1379,8 +1420,10 @@ export class Firestore implements firestore.Firestore { if (this._projectId === undefined) { try { - this._projectId = await this._clientPool.run(requestTag, gapicClient => - gapicClient.getProjectId() + this._projectId = await this._clientPool.run( + requestTag, + /* requiresGrpc= */ false, + gapicClient => gapicClient.getProjectId() ); logger( 'Firestore.initializeIfNeeded', @@ -1421,10 +1464,11 @@ export class Firestore implements firestore.Firestore { if (retryCodes) { const retryParams = getRetryParams(methodName); - callOptions.retry = new (require('google-gax').RetryOptions)( - retryCodes, - retryParams - ); + callOptions.retry = + new (require('google-gax/build/src/fallback').RetryOptions)( + retryCodes, + retryParams + ); } return callOptions; @@ -1627,24 +1671,33 @@ export class Firestore implements firestore.Firestore { ): Promise { const callOptions = this.createCallOptions(methodName, retryCodes); - return this._clientPool.run(requestTag, async gapicClient => { - try { - logger('Firestore.request', requestTag, 'Sending request: %j', request); - const [result] = await ( - gapicClient[methodName] as UnaryMethod - )(request, callOptions); - logger( - 'Firestore.request', - requestTag, - 'Received response: %j', - result - ); - return result; - } catch (err) { - logger('Firestore.request', requestTag, 'Received error:', err); - return Promise.reject(err); + return this._clientPool.run( + requestTag, + /* requiresGrpc= */ false, + async gapicClient => { + try { + logger( + 'Firestore.request', + requestTag, + 'Sending request: %j', + request + ); + const [result] = await ( + gapicClient[methodName] as UnaryMethod + )(request, callOptions); + logger( + 'Firestore.request', + requestTag, + 'Received response: %j', + result + ); + return result; + } catch (err) { + logger('Firestore.request', requestTag, 'Received error:', err); + return Promise.reject(err); + } } - }); + ); } /** @@ -1658,12 +1711,15 @@ export class Firestore implements firestore.Firestore { * @internal * @param methodName Name of the streaming Veneer API endpoint that * takes a request and GAX options. + * @param bidrectional Whether the request is bidirectional (true) or + * unidirectional (false_ * @param request The Protobuf request to send. * @param requestTag A unique client-assigned identifier for this request. * @returns A Promise with the resulting read-only stream. */ requestStream( methodName: FirestoreStreamingMethod, + bidrectional: boolean, request: {}, requestTag: string ): Promise { @@ -1674,7 +1730,7 @@ export class Firestore implements firestore.Firestore { return this._retry(methodName, requestTag, () => { const result = new Deferred(); - this._clientPool.run(requestTag, async gapicClient => { + this._clientPool.run(requestTag, bidrectional, async gapicClient => { logger( 'Firestore.requestStream', requestTag, diff --git a/dev/src/pool.ts b/dev/src/pool.ts index c9351556e..0110f2798 100644 --- a/dev/src/pool.ts +++ b/dev/src/pool.ts @@ -35,10 +35,15 @@ export const CLIENT_TERMINATED_ERROR_MSG = * @internal */ export class ClientPool { + private grpcEnabled = false; + /** * Stores each active clients and how many operations it has outstanding. */ - private activeClients = new Map(); + private activeClients = new Map< + T, + {activeRequestCount: number; grpcEnabled: boolean} + >(); /** * A set of clients that have seen RST_STREAM errors (see @@ -72,7 +77,7 @@ export class ClientPool { constructor( private readonly concurrentOperationLimit: number, private readonly maxIdleClients: number, - private readonly clientFactory: () => T, + private readonly clientFactory: (requiresGrpc: boolean) => T, private readonly clientDestructor: (client: T) => Promise = () => Promise.resolve() ) {} @@ -84,21 +89,22 @@ export class ClientPool { * @private * @internal */ - private acquire(requestTag: string): T { + private acquire(requestTag: string, requiresGrpc: boolean): T { let selectedClient: T | null = null; let selectedClientRequestCount = -1; - for (const [client, requestCount] of this.activeClients) { + for (const [client, metadata] of this.activeClients) { // Use the "most-full" client that can still accommodate the request // in order to maximize the number of idle clients as operations start to // complete. if ( !this.failedClients.has(client) && - requestCount > selectedClientRequestCount && - requestCount < this.concurrentOperationLimit + metadata.activeRequestCount > selectedClientRequestCount && + metadata.activeRequestCount < this.concurrentOperationLimit && + (!requiresGrpc || metadata.grpcEnabled) ) { selectedClient = client; - selectedClientRequestCount = requestCount; + selectedClientRequestCount = metadata.activeRequestCount; } } @@ -111,7 +117,7 @@ export class ClientPool { ); } else { logger('ClientPool.acquire', requestTag, 'Creating a new client'); - selectedClient = this.clientFactory(); + selectedClient = this.clientFactory(requiresGrpc); selectedClientRequestCount = 0; assert( !this.activeClients.has(selectedClient), @@ -119,7 +125,10 @@ export class ClientPool { ); } - this.activeClients.set(selectedClient, selectedClientRequestCount + 1); + this.activeClients.set(selectedClient, { + grpcEnabled: requiresGrpc, + activeRequestCount: selectedClientRequestCount + 1, + }); return selectedClient!; } @@ -131,9 +140,12 @@ export class ClientPool { * @internal */ private async release(requestTag: string, client: T): Promise { - const requestCount = this.activeClients.get(client) || 0; - assert(requestCount > 0, 'No active requests'); - this.activeClients.set(client, requestCount - 1); + const metadata = this.activeClients.get(client); + assert(metadata && metadata.activeRequestCount > 0, 'No active requests'); + this.activeClients.set(client, { + grpcEnabled: metadata.grpcEnabled, + activeRequestCount: metadata.activeRequestCount - 1, + }); if (this.terminated && this.opCount === 0) { this.terminateDeferred.resolve(); } @@ -153,11 +165,18 @@ export class ClientPool { * @internal */ private shouldGarbageCollectClient(client: T): boolean { - // Don't garbage collect clients that have active requests. - if (this.activeClients.get(client) !== 0) { + const clientMetadata = this.activeClients.get(client)!; + + if (clientMetadata.activeRequestCount !== 0) { + // Don't garbage collect clients that have active requests. return false; } + if (this.grpcEnabled !== clientMetadata.grpcEnabled) { + // We are transitioning to GRPC. Garbage collect REST clients. + return true; + } + // Idle clients that have received RST_STREAM errors are always garbage // collected. if (this.failedClients.has(client)) { @@ -165,10 +184,11 @@ export class ClientPool { } // Otherwise, only garbage collect if we have too much idle capacity (e.g. - // more than 100 idle capacity with default settings) . + // more than 100 idle capacity with default settings). let idleCapacityCount = 0; - for (const [, count] of this.activeClients) { - idleCapacityCount += this.concurrentOperationLimit - count; + for (const [, metadata] of this.activeClients) { + idleCapacityCount += + this.concurrentOperationLimit - metadata.activeRequestCount; } return ( idleCapacityCount > this.maxIdleClients * this.concurrentOperationLimit @@ -197,7 +217,9 @@ export class ClientPool { // Visible for testing. get opCount(): number { let activeOperationCount = 0; - this.activeClients.forEach(count => (activeOperationCount += count)); + this.activeClients.forEach( + metadata => (activeOperationCount += metadata.activeRequestCount) + ); return activeOperationCount; } @@ -213,11 +235,15 @@ export class ClientPool { * @private * @internal */ - run(requestTag: string, op: (client: T) => Promise): Promise { + run( + requestTag: string, + requiresGrpc: boolean, + op: (client: T) => Promise + ): Promise { if (this.terminated) { return Promise.reject(new Error(CLIENT_TERMINATED_ERROR_MSG)); } - const client = this.acquire(requestTag); + const client = this.acquire(requestTag, requiresGrpc); return op(client) .catch(async (err: GoogleError) => { diff --git a/dev/src/recursive-delete.ts b/dev/src/recursive-delete.ts index c4a8bbec0..addd8e9d8 100644 --- a/dev/src/recursive-delete.ts +++ b/dev/src/recursive-delete.ts @@ -26,7 +26,7 @@ import Firestore, { QueryDocumentSnapshot, } from '.'; import {Deferred, wrapError} from './util'; -import {GoogleError} from 'google-gax'; +import type {GoogleError} from 'google-gax'; import {BulkWriterError} from './bulk-writer'; import {QueryOptions} from './reference'; import {StatusCode} from './status-code'; @@ -291,7 +291,7 @@ export class RecursiveDelete { if (this.lastError === undefined) { this.completionDeferred.resolve(); } else { - let error = new (require('google-gax').GoogleError)( + let error = new (require('google-gax/build/src/fallback').GoogleError)( `${this.errorCount} ` + `${this.errorCount !== 1 ? 'deletes' : 'delete'} ` + 'failed. The last delete failed with: ' diff --git a/dev/src/reference.ts b/dev/src/reference.ts index 65e16e42d..97dfc7f60 100644 --- a/dev/src/reference.ts +++ b/dev/src/reference.ts @@ -2306,6 +2306,7 @@ export class Query implements firestore.Query { streamActive = new Deferred(); backendStream = await this._firestore.requestStream( 'runQuery', + /* bidirectional= */ false, request, tag ); diff --git a/dev/src/util.ts b/dev/src/util.ts index 0b15be22c..5a189269c 100644 --- a/dev/src/util.ts +++ b/dev/src/util.ts @@ -17,8 +17,8 @@ import {DocumentData} from '@google-cloud/firestore'; import {randomBytes} from 'crypto'; -import {CallSettings, ClientConfig, GoogleError} from 'google-gax'; -import {BackoffSettings} from 'google-gax/build/src/gax'; +import type {CallSettings, ClientConfig, GoogleError} from 'google-gax'; +import type {BackoffSettings} from 'google-gax/build/src/gax'; import * as gapicConfig from './v1/firestore_client_config.json'; /** @@ -157,11 +157,11 @@ let serviceConfig: Record | undefined; /** Lazy-loads the service config when first accessed. */ function getServiceConfig(methodName: string): CallSettings | undefined { if (!serviceConfig) { - serviceConfig = require('google-gax').constructSettings( + serviceConfig = require('google-gax/build/src/fallback').constructSettings( 'google.firestore.v1.Firestore', gapicConfig as ClientConfig, {}, - require('google-gax').Status + require('google-gax/build/src/status').Status ) as {[k: string]: CallSettings}; } return serviceConfig[methodName]; @@ -185,7 +185,7 @@ export function getRetryCodes(methodName: string): number[] { export function getRetryParams(methodName: string): BackoffSettings { return ( getServiceConfig(methodName)?.retry?.backoffSettings ?? - require('google-gax').createDefaultBackoffSettings() + require('google-gax/build/src/fallback').createDefaultBackoffSettings() ); } diff --git a/dev/src/watch.ts b/dev/src/watch.ts index 9e92e5d93..d1871ae23 100644 --- a/dev/src/watch.ts +++ b/dev/src/watch.ts @@ -452,7 +452,12 @@ abstract class Watch { // Note that we need to call the internal _listen API to pass additional // header values in readWriteStream. return this.firestore - .requestStream('listen', request, this.requestTag) + .requestStream( + 'listen', + /* bidirectional= */ true, + request, + this.requestTag + ) .then(backendStream => { if (!this.isActive) { logger( diff --git a/dev/src/write-batch.ts b/dev/src/write-batch.ts index e2d685d07..b77eede51 100644 --- a/dev/src/write-batch.ts +++ b/dev/src/write-batch.ts @@ -350,7 +350,9 @@ export class WriteBatch implements firestore.WriteBatch { let documentMask: DocumentMask; if (mergePaths) { - documentMask = DocumentMask.fromFieldMask(options!.mergeFields!); + documentMask = DocumentMask.fromFieldMask( + (options as {mergeFields: Array}).mergeFields + ); firestoreData = documentMask.applyTo(firestoreData); } diff --git a/dev/system-test/firestore.ts b/dev/system-test/firestore.ts index 8b67588b2..5d8f9ca1d 100644 --- a/dev/system-test/firestore.ts +++ b/dev/system-test/firestore.ts @@ -18,6 +18,7 @@ import { WithFieldValue, PartialWithFieldValue, SetOptions, + Settings, } from '@google-cloud/firestore'; import {describe, it, before, beforeEach, afterEach} from 'mocha'; @@ -50,7 +51,7 @@ import { verifyInstance, } from '../test/util/helpers'; import {BulkWriter} from '../src/bulk-writer'; -import {Status} from 'google-gax'; +import {Status} from 'google-gax/build/src/status'; import {QueryPartition} from '../src/query-partition'; import {CollectionGroup} from '../src/collection-group'; @@ -80,7 +81,11 @@ if (process.env.NODE_ENV === 'DEBUG') { setLogFunction(console.log); } -function getTestRoot(firestore: Firestore) { +function getTestRoot(settings: Settings = {}) { + const firestore = new Firestore({ + ...settings, + preferRest: !!process.env.USE_REST_FALLBACK, + }); return firestore.collection(`node_${version}_${autoId()}`); } @@ -89,8 +94,8 @@ describe('Firestore class', () => { let randomCol: CollectionReference; beforeEach(() => { - firestore = new Firestore(); - randomCol = getTestRoot(firestore); + randomCol = getTestRoot(); + firestore = randomCol.firestore; }); afterEach(() => verifyInstance(firestore)); @@ -213,8 +218,8 @@ describe('CollectionGroup class', () => { let collectionGroup: CollectionGroup; before(async () => { - firestore = new Firestore({}); - randomColl = getTestRoot(firestore); + randomColl = getTestRoot(); + firestore = randomColl.firestore; collectionGroup = firestore.collectionGroup(randomColl.id); const batch = firestore.batch(); @@ -327,8 +332,8 @@ describe('CollectionReference class', () => { let randomCol: CollectionReference; beforeEach(() => { - firestore = new Firestore({}); - randomCol = getTestRoot(firestore); + randomCol = getTestRoot(); + firestore = randomCol.firestore; }); afterEach(() => verifyInstance(firestore)); @@ -406,8 +411,8 @@ describe('DocumentReference class', () => { let randomCol: CollectionReference; beforeEach(() => { - firestore = new Firestore(); - randomCol = getTestRoot(firestore); + randomCol = getTestRoot(); + firestore = randomCol.firestore; }); afterEach(() => verifyInstance(firestore)); @@ -508,8 +513,7 @@ describe('DocumentReference class', () => { it('round-trips BigInts', () => { const bigIntValue = BigInt(Number.MAX_SAFE_INTEGER) + BigInt(1); - const firestore = new Firestore({useBigInt: true}); - const randomCol = getTestRoot(firestore); + const randomCol = getTestRoot({useBigInt: true}); const ref = randomCol.doc('doc'); return ref .set({bigIntValue}) @@ -1321,8 +1325,8 @@ describe('Query class', () => { } beforeEach(() => { - firestore = new Firestore({}); - randomCol = getTestRoot(firestore); + randomCol = getTestRoot(); + firestore = randomCol.firestore; }); afterEach(() => verifyInstance(firestore)); @@ -2141,8 +2145,8 @@ describe('Transaction class', () => { let randomCol: CollectionReference; beforeEach(() => { - firestore = new Firestore({}); - randomCol = getTestRoot(firestore); + randomCol = getTestRoot(); + firestore = randomCol.firestore; }); afterEach(() => verifyInstance(firestore)); @@ -2434,8 +2438,8 @@ describe('WriteBatch class', () => { let randomCol: CollectionReference; beforeEach(() => { - firestore = new Firestore({}); - randomCol = getTestRoot(firestore); + randomCol = getTestRoot(); + firestore = randomCol.firestore; }); afterEach(() => verifyInstance(firestore)); @@ -2578,9 +2582,9 @@ describe('QuerySnapshot class', () => { let querySnapshot: Promise; beforeEach(() => { - firestore = new Firestore({}); + const randomCol = getTestRoot(); + firestore = randomCol.firestore; - const randomCol = getTestRoot(firestore); const ref1 = randomCol.doc('doc1'); const ref2 = randomCol.doc('doc2'); @@ -2649,9 +2653,9 @@ describe('BulkWriter class', () => { let writer: BulkWriter; beforeEach(() => { - firestore = new Firestore({}); + randomCol = getTestRoot(); + firestore = randomCol.firestore; writer = firestore.bulkWriter(); - randomCol = getTestRoot(firestore); }); afterEach(() => verifyInstance(firestore)); @@ -2926,8 +2930,7 @@ describe('Client initialization', () => { for (const [description, op] of ops) { it(`succeeds for ${description}`, () => { - const firestore = new Firestore(); - const randomCol = getTestRoot(firestore); + const randomCol = getTestRoot(); return op(randomCol); }); } @@ -2938,8 +2941,9 @@ describe('Bundle building', () => { let testCol: CollectionReference; beforeEach(async () => { - firestore = new Firestore({}); - testCol = getTestRoot(firestore); + testCol = getTestRoot(); + firestore = testCol.firestore; + const ref1 = testCol.doc('doc1'); const ref2 = testCol.doc('doc2'); const ref3 = testCol.doc('doc3'); @@ -3133,8 +3137,8 @@ describe('Types test', () => { }; beforeEach(async () => { - firestore = new Firestore({}); - randomCol = getTestRoot(firestore); + randomCol = getTestRoot(); + firestore = randomCol.firestore; doc = randomCol.doc(); await doc.set(initialData); diff --git a/dev/test/index.ts b/dev/test/index.ts index a3cf866fe..a13240d30 100644 --- a/dev/test/index.ts +++ b/dev/test/index.ts @@ -601,7 +601,9 @@ describe('instantiation', () => { ssl: false, projectId: 'foo', }); - await firestore['_clientPool'].run('tag', () => Promise.resolve()); + await firestore['_clientPool'].run('tag', /* requiresGrpc= */ false, () => + Promise.resolve() + ); }); it('exports all types', () => { diff --git a/dev/test/pool.ts b/dev/test/pool.ts index 8a377de68..3c1466f6f 100644 --- a/dev/test/pool.ts +++ b/dev/test/pool.ts @@ -23,6 +23,8 @@ import {Deferred} from '../src/util'; use(chaiAsPromised); const REQUEST_TAG = 'tag'; +const USE_REST = false; +const USE_GRPC = true; function deferredPromises(count: number): Array> { const deferred: Array> = []; @@ -42,14 +44,14 @@ describe('Client pool', () => { const operationPromises = deferredPromises(4); - clientPool.run(REQUEST_TAG, () => operationPromises[0].promise); + clientPool.run(REQUEST_TAG, USE_REST, () => operationPromises[0].promise); expect(clientPool.size).to.equal(1); - clientPool.run(REQUEST_TAG, () => operationPromises[1].promise); + clientPool.run(REQUEST_TAG, USE_REST, () => operationPromises[1].promise); expect(clientPool.size).to.equal(1); - clientPool.run(REQUEST_TAG, () => operationPromises[2].promise); + clientPool.run(REQUEST_TAG, USE_REST, () => operationPromises[2].promise); expect(clientPool.size).to.equal(1); - clientPool.run(REQUEST_TAG, () => operationPromises[3].promise); + clientPool.run(REQUEST_TAG, USE_REST, () => operationPromises[3].promise); expect(clientPool.size).to.equal(2); }); @@ -64,20 +66,21 @@ describe('Client pool', () => { const completionPromise = clientPool.run( REQUEST_TAG, + USE_REST, () => operationPromises[0].promise ); expect(clientPool.size).to.equal(1); - clientPool.run(REQUEST_TAG, () => operationPromises[1].promise); + clientPool.run(REQUEST_TAG, USE_REST, () => operationPromises[1].promise); expect(clientPool.size).to.equal(1); - clientPool.run(REQUEST_TAG, () => operationPromises[2].promise); + clientPool.run(REQUEST_TAG, USE_REST, () => operationPromises[2].promise); expect(clientPool.size).to.equal(2); - clientPool.run(REQUEST_TAG, () => operationPromises[3].promise); + clientPool.run(REQUEST_TAG, USE_REST, () => operationPromises[3].promise); expect(clientPool.size).to.equal(2); operationPromises[0].resolve(); return completionPromise.then(() => { - clientPool.run(REQUEST_TAG, () => operationPromises[4].promise); + clientPool.run(REQUEST_TAG, USE_REST, () => operationPromises[4].promise); expect(clientPool.size).to.equal(2); }); }); @@ -93,6 +96,7 @@ describe('Client pool', () => { let completionPromise = clientPool.run( REQUEST_TAG, + USE_REST, () => operationPromises[0].promise ); expect(clientPool.size).to.equal(1); @@ -101,6 +105,7 @@ describe('Client pool', () => { completionPromise = clientPool.run( REQUEST_TAG, + USE_REST, () => operationPromises[1].promise ); expect(clientPool.size).to.equal(1); @@ -110,6 +115,52 @@ describe('Client pool', () => { expect(instanceCount).to.equal(1); }); + it('does not re-use rest instance for grpc call', async () => { + const clientPool = new ClientPool<{}>(10, 1, () => { + return {}; + }); + + const operationPromises = deferredPromises(2); + + void clientPool.run( + REQUEST_TAG, + USE_REST, + () => operationPromises[0].promise + ); + void clientPool.run( + REQUEST_TAG, + USE_GRPC, + () => operationPromises[1].promise + ); + expect(clientPool.size).to.equal(2); + + operationPromises[0].resolve(); + operationPromises[1].resolve(); + }); + + it('re-uses grpc instance for rest calls', async () => { + const clientPool = new ClientPool<{}>(10, 1, () => { + return {}; + }); + + const operationPromises = deferredPromises(2); + + void clientPool.run( + REQUEST_TAG, + USE_GRPC, + () => operationPromises[0].promise + ); + void clientPool.run( + REQUEST_TAG, + USE_REST, + () => operationPromises[1].promise + ); + expect(clientPool.size).to.equal(1); + + operationPromises[0].resolve(); + operationPromises[1].resolve(); + }); + it('bin packs operations', async () => { let clientCount = 0; const clientPool = new ClientPool(2, 0, () => { @@ -121,23 +172,23 @@ describe('Client pool', () => { // Create 5 operations, which should schedule 2 operations on the first // client, 2 on the second and 1 on the third. const operationPromises = deferredPromises(7); - clientPool.run(REQUEST_TAG, client => { + clientPool.run(REQUEST_TAG, USE_REST, client => { expect(client).to.be.equal(1); return operationPromises[0].promise; }); - clientPool.run(REQUEST_TAG, client => { + clientPool.run(REQUEST_TAG, USE_REST, client => { expect(client).to.be.equal(1); return operationPromises[1].promise; }); - const thirdOperation = clientPool.run(REQUEST_TAG, client => { + const thirdOperation = clientPool.run(REQUEST_TAG, USE_REST, client => { expect(client).to.be.equal(2); return operationPromises[2].promise; }); - clientPool.run(REQUEST_TAG, client => { + clientPool.run(REQUEST_TAG, USE_REST, client => { expect(client).to.be.equal(2); return operationPromises[3].promise; }); - clientPool.run(REQUEST_TAG, client => { + clientPool.run(REQUEST_TAG, USE_REST, client => { expect(client).to.be.equal(3); return operationPromises[4].promise; }); @@ -148,7 +199,7 @@ describe('Client pool', () => { // A newly scheduled operation should use the first client that has a free // slot. - clientPool.run(REQUEST_TAG, async client => { + clientPool.run(REQUEST_TAG, USE_REST, async client => { expect(client).to.be.equal(2); }); }); @@ -164,19 +215,19 @@ describe('Client pool', () => { const completionPromises: Array> = []; completionPromises.push( - clientPool.run(REQUEST_TAG, () => operationPromises[0].promise) + clientPool.run(REQUEST_TAG, USE_REST, () => operationPromises[0].promise) ); expect(clientPool.size).to.equal(1); completionPromises.push( - clientPool.run(REQUEST_TAG, () => operationPromises[1].promise) + clientPool.run(REQUEST_TAG, USE_REST, () => operationPromises[1].promise) ); expect(clientPool.size).to.equal(1); completionPromises.push( - clientPool.run(REQUEST_TAG, () => operationPromises[2].promise) + clientPool.run(REQUEST_TAG, USE_REST, () => operationPromises[2].promise) ); expect(clientPool.size).to.equal(2); completionPromises.push( - clientPool.run(REQUEST_TAG, () => operationPromises[3].promise) + clientPool.run(REQUEST_TAG, USE_REST, () => operationPromises[3].promise) ); expect(clientPool.size).to.equal(2); @@ -198,19 +249,19 @@ describe('Client pool', () => { const completionPromises: Array> = []; completionPromises.push( - clientPool.run(REQUEST_TAG, () => operationPromises[0].promise) + clientPool.run(REQUEST_TAG, USE_REST, () => operationPromises[0].promise) ); expect(clientPool.size).to.equal(1); completionPromises.push( - clientPool.run(REQUEST_TAG, () => operationPromises[1].promise) + clientPool.run(REQUEST_TAG, USE_REST, () => operationPromises[1].promise) ); expect(clientPool.size).to.equal(1); completionPromises.push( - clientPool.run(REQUEST_TAG, () => operationPromises[2].promise) + clientPool.run(REQUEST_TAG, USE_REST, () => operationPromises[2].promise) ); expect(clientPool.size).to.equal(2); completionPromises.push( - clientPool.run(REQUEST_TAG, () => operationPromises[3].promise) + clientPool.run(REQUEST_TAG, USE_REST, () => operationPromises[3].promise) ); expect(clientPool.size).to.equal(2); @@ -234,8 +285,8 @@ describe('Client pool', () => { const operationPromises = deferredPromises(2); // Create two pending operations that each spawn their own client - clientPool.run(REQUEST_TAG, () => operationPromises[0].promise); - clientPool.run(REQUEST_TAG, () => operationPromises[1].promise); + clientPool.run(REQUEST_TAG, USE_REST, () => operationPromises[0].promise); + clientPool.run(REQUEST_TAG, USE_REST, () => operationPromises[1].promise); operationPromises.forEach(deferred => deferred.resolve()); @@ -247,7 +298,9 @@ describe('Client pool', () => { return {}; }); - const op = clientPool.run(REQUEST_TAG, () => Promise.resolve('Success')); + const op = clientPool.run(REQUEST_TAG, USE_REST, () => + Promise.resolve('Success') + ); return expect(op).to.become('Success'); }); @@ -256,7 +309,7 @@ describe('Client pool', () => { return {}; }); - const op = clientPool.run(REQUEST_TAG, () => + const op = clientPool.run(REQUEST_TAG, USE_REST, () => Promise.reject('Generated error') ); return expect(op).to.eventually.be.rejectedWith('Generated error'); @@ -269,14 +322,14 @@ describe('Client pool', () => { return {}; }); - const op = clientPool.run(REQUEST_TAG, () => + const op = clientPool.run(REQUEST_TAG, USE_REST, () => Promise.reject( new GoogleError('13 INTERNAL: Received RST_STREAM with code 2') ) ); await op.catch(() => {}); - await clientPool.run(REQUEST_TAG, async () => {}); + await clientPool.run(REQUEST_TAG, USE_REST, async () => {}); expect(instanceCount).to.equal(2); }); @@ -286,7 +339,7 @@ describe('Client pool', () => { return {}; }); - const op = clientPool.run(REQUEST_TAG, () => + const op = clientPool.run(REQUEST_TAG, USE_REST, () => Promise.reject( new GoogleError('13 INTERNAL: Received RST_STREAM with code 2') ) @@ -296,6 +349,17 @@ describe('Client pool', () => { expect(clientPool.size).to.equal(0); }); + it('garbage collects rest clients after GRPC', async () => { + const clientPool = new ClientPool<{}>(10, 1, () => { + return {}; + }); + + await clientPool.run(REQUEST_TAG, USE_REST, () => Promise.resolve()); + await clientPool.run(REQUEST_TAG, USE_GRPC, () => Promise.resolve()); + + expect(clientPool.size).to.equal(1); + }); + it('keeps pool of idle clients', async () => { const clientPool = new ClientPool<{}>( /* concurrentOperationLimit= */ 1, @@ -306,11 +370,12 @@ describe('Client pool', () => { ); const operationPromises = deferredPromises(4); - clientPool.run(REQUEST_TAG, () => operationPromises[0].promise); - clientPool.run(REQUEST_TAG, () => operationPromises[1].promise); - clientPool.run(REQUEST_TAG, () => operationPromises[2].promise); + clientPool.run(REQUEST_TAG, USE_REST, () => operationPromises[0].promise); + clientPool.run(REQUEST_TAG, USE_REST, () => operationPromises[1].promise); + clientPool.run(REQUEST_TAG, USE_REST, () => operationPromises[2].promise); const lastOp = clientPool.run( REQUEST_TAG, + USE_REST, () => operationPromises[3].promise ); expect(clientPool.size).to.equal(4); @@ -332,9 +397,10 @@ describe('Client pool', () => { ); const operationPromises = deferredPromises(2); - clientPool.run(REQUEST_TAG, () => operationPromises[0].promise); + clientPool.run(REQUEST_TAG, USE_REST, () => operationPromises[0].promise); const completionPromise = clientPool.run( REQUEST_TAG, + USE_REST, () => operationPromises[1].promise ); expect(clientPool.size).to.equal(2); @@ -353,7 +419,7 @@ describe('Client pool', () => { return clientPool .terminate() .then(() => { - return clientPool.run(REQUEST_TAG, () => + return clientPool.run(REQUEST_TAG, USE_REST, () => Promise.reject('Call to run() should have failed') ); }) @@ -370,7 +436,7 @@ describe('Client pool', () => { let terminated = false; // Run operation that completes after terminate() is called. - clientPool.run(REQUEST_TAG, () => { + clientPool.run(REQUEST_TAG, USE_REST, () => { return deferred.promise; }); const terminateOp = clientPool.terminate().then(() => { diff --git a/dev/test/util/helpers.ts b/dev/test/util/helpers.ts index 770732733..7d6b6e278 100644 --- a/dev/test/util/helpers.ts +++ b/dev/test/util/helpers.ts @@ -21,12 +21,11 @@ import { import {expect} from 'chai'; import * as extend from 'extend'; -import {grpc} from 'google-gax'; import {JSONStreamIterator} from 'length-prefixed-json-stream'; import {Duplex, PassThrough} from 'stream'; import * as through2 from 'through2'; import {firestore, google} from '../../protos/firestore_v1_proto_api'; - +import type {grpc} from 'google-gax'; import * as proto from '../../protos/firestore_v1_proto_api'; import * as v1 from '../../src/v1'; import {Firestore, QueryDocumentSnapshot} from '../../src'; @@ -35,7 +34,11 @@ import {GapicClient} from '../../src/types'; import api = proto.google.firestore.v1; -const SSL_CREDENTIALS = grpc.credentials.createInsecure(); +let SSL_CREDENTIALS: grpc.ChannelCredentials | null = null; +if (!process.env.USE_REST_FALLBACK) { + const grpc = require('google-gax').grpc; + SSL_CREDENTIALS = grpc.credentials.createInsecure(); +} export const PROJECT_ID = 'test-project'; export const DATABASE_ROOT = `projects/${PROJECT_ID}/databases/(default)`; @@ -62,7 +65,7 @@ export function createInstance( firestoreSettings?: Settings ): Promise { const initializationOptions = { - ...{projectId: PROJECT_ID, sslCreds: SSL_CREDENTIALS}, + ...{projectId: PROJECT_ID, sslCreds: SSL_CREDENTIALS!}, ...firestoreSettings, }; diff --git a/package.json b/package.json index e5565d57c..aeab58cfc 100644 --- a/package.json +++ b/package.json @@ -30,7 +30,9 @@ "scripts": { "predocs": "npm run compile", "docs": "jsdoc -c .jsdoc.js", - "system-test": "mocha build/system-test --timeout 600000", + "system-test:rest": "USE_REST_FALLBACK=YES mocha build/system-test --timeout 600000", + "system-test:grpc": "mocha build/system-test --timeout 600000", + "system-test": "npm run system-test:grpc && npm run system-test:rest", "presystem-test": "npm run compile", "samples-test": "npm link && cd samples/ && npm link ../ && npm test && cd ../", "conformance": "mocha build/conformance", @@ -52,7 +54,7 @@ "dependencies": { "fast-deep-equal": "^3.1.1", "functional-red-black-tree": "^1.0.1", - "google-gax": "^3.2.1", + "google-gax": "^3.4.0", "protobufjs": "^7.0.0" }, "devDependencies": { diff --git a/types/firestore.d.ts b/types/firestore.d.ts index 2e32bc40a..9bc0d6760 100644 --- a/types/firestore.d.ts +++ b/types/firestore.d.ts @@ -279,6 +279,16 @@ declare namespace FirebaseFirestore { */ ignoreUndefinedProperties?: boolean; + /** + * Use HTTP for requests that can be served over HTTP and JSON. This reduces + * the amount of networking code that is loaded to serve requests within + * Firestore. + * + * This setting does not apply to `onSnapshot` APIs as they cannot be served + * over native HTTP. + */ + preferRest?: boolean; + [key: string]: any; // Accept other properties, such as GRPC settings. }