diff --git a/dev/src/backoff.ts b/dev/src/backoff.ts index 961f6fe21..9c2c37eb3 100644 --- a/dev/src/backoff.ts +++ b/dev/src/backoff.ts @@ -48,6 +48,12 @@ const DEFAULT_BACKOFF_FACTOR = 1.5; */ const DEFAULT_JITTER_FACTOR = 1.0; +/*! + * The maximum number of retries that will be attempted by backoff + * before stopping all retry attempts. + */ +export const MAX_RETRY_ATTEMPTS = 10; + /*! * The timeout handler used by `ExponentialBackoff`. */ @@ -135,6 +141,13 @@ export class ExponentialBackoff { */ private readonly jitterFactor: number; + /** + * The number of retries that has been attempted. + * + * @private + */ + private _retryCount = 0; + /** * The backoff delay of the current attempt. * @@ -162,7 +175,7 @@ export class ExponentialBackoff { } /** - * Resets the backoff delay. + * Resets the backoff delay and retry count. * * The very next backoffAndWait() will have no delay. If it is called again * (i.e. due to an error), initialDelayMs (plus jitter) will be used, and @@ -171,6 +184,7 @@ export class ExponentialBackoff { * @private */ reset(): void { + this._retryCount = 0; this.currentBaseMs = 0; } @@ -192,6 +206,11 @@ export class ExponentialBackoff { * @private */ backoffAndWait(): Promise { + if (this.retryCount > MAX_RETRY_ATTEMPTS) { + return Promise.reject( + new Error('Exceeded maximum number of retries allowed.') + ); + } // First schedule using the current base (which may be 0 and should be // honored as such). const delayWithJitterMs = this.currentBaseMs + this.jitterDelayMs(); @@ -209,12 +228,17 @@ export class ExponentialBackoff { this.currentBaseMs *= this.backoffFactor; this.currentBaseMs = Math.max(this.currentBaseMs, this.initialDelayMs); this.currentBaseMs = Math.min(this.currentBaseMs, this.maxDelayMs); - + this._retryCount += 1; return new Promise(resolve => { delayExecution(resolve, delayWithJitterMs); }); } + // Visible for testing. + get retryCount(): number { + return this._retryCount; + } + /** * Returns a randomized "jitter" delay based on the current base and jitter * factor. diff --git a/dev/src/watch.ts b/dev/src/watch.ts index 6e41bee2a..b20090786 100644 --- a/dev/src/watch.ts +++ b/dev/src/watch.ts @@ -271,6 +271,7 @@ abstract class Watch { } switch (error.code) { + case GRPC_STATUS_CODE.ABORTED: case GRPC_STATUS_CODE.CANCELLED: case GRPC_STATUS_CODE.UNKNOWN: case GRPC_STATUS_CODE.DEADLINE_EXCEEDED: @@ -426,50 +427,52 @@ abstract class Watch { * Initializes a new stream to the backend with backoff. */ const initStream = () => { - this._backoff.backoffAndWait().then(async () => { - if (!isActive) { - logger( - 'Watch.onSnapshot', - this._requestTag, - 'Not initializing inactive stream' - ); - return; - } + this._backoff + .backoffAndWait() + .then(async () => { + if (!isActive) { + logger( + 'Watch.onSnapshot', + this._requestTag, + 'Not initializing inactive stream' + ); + return; + } - await this._firestore.initializeIfNeeded(); - - request.database = this._firestore.formattedName; - request.addTarget = this.getTarget(resumeToken); - - // Note that we need to call the internal _listen API to pass additional - // header values in readWriteStream. - this._firestore - .readWriteStream('listen', request, this._requestTag, true) - .then(backendStream => { - if (!isActive) { - logger( - 'Watch.onSnapshot', - this._requestTag, - 'Closing inactive stream' - ); - backendStream.end(); - return; - } - logger('Watch.onSnapshot', this._requestTag, 'Opened new stream'); - currentStream = backendStream; - currentStream!.on('error', err => { - maybeReopenStream(err); - }); - currentStream!.on('end', () => { - const err = new GrpcError('Stream ended unexpectedly'); - err.code = GRPC_STATUS_CODE.UNKNOWN; - maybeReopenStream(err); + await this._firestore.initializeIfNeeded(); + + request.database = this._firestore.formattedName; + request.addTarget = this.getTarget(resumeToken); + + // Note that we need to call the internal _listen API to pass additional + // header values in readWriteStream. + return this._firestore + .readWriteStream('listen', request, this._requestTag, true) + .then(backendStream => { + if (!isActive) { + logger( + 'Watch.onSnapshot', + this._requestTag, + 'Closing inactive stream' + ); + backendStream.end(); + return; + } + logger('Watch.onSnapshot', this._requestTag, 'Opened new stream'); + currentStream = backendStream; + currentStream!.on('error', err => { + maybeReopenStream(err); + }); + currentStream!.on('end', () => { + const err = new GrpcError('Stream ended unexpectedly'); + err.code = GRPC_STATUS_CODE.UNKNOWN; + maybeReopenStream(err); + }); + currentStream!.pipe(stream); + currentStream!.resume(); }); - currentStream!.pipe(stream); - currentStream!.resume(); - }) - .catch(closeStream); - }); + }) + .catch(closeStream); }; /** diff --git a/dev/test/backoff.ts b/dev/test/backoff.ts index c10c559c9..b02f0327b 100644 --- a/dev/test/backoff.ts +++ b/dev/test/backoff.ts @@ -144,4 +144,19 @@ describe('ExponentialBackoff', () => { await backoff.backoffAndWait().then(nop); assertDelayBetween(36, 44); }); + + it('tracks the number of retry attempts', async () => { + const backoff = new ExponentialBackoff({ + initialDelayMs: 10, + backoffFactor: 2, + jitterFactor: 0.1, + }); + expect(backoff.retryCount).to.equal(0); + await backoff.backoffAndWait().then(nop); + expect(backoff.retryCount).to.equal(1); + await backoff.backoffAndWait().then(nop); + expect(backoff.retryCount).to.equal(2); + backoff.reset(); + expect(backoff.retryCount).to.equal(0); + }); }); diff --git a/dev/test/watch.ts b/dev/test/watch.ts index 3b3207e51..8285fdb86 100644 --- a/dev/test/watch.ts +++ b/dev/test/watch.ts @@ -39,7 +39,7 @@ import { QueryDocumentSnapshot, QuerySnapshot, } from '../src'; -import {setTimeoutHandler} from '../src/backoff'; +import {MAX_RETRY_ATTEMPTS, setTimeoutHandler} from '../src/backoff'; import {DocumentSnapshotBuilder} from '../src/document'; import {DocumentChangeType} from '../src/document-change'; import {Serializer} from '../src/serializer'; @@ -819,6 +819,26 @@ describe('Query watch', () => { }); }); + it('stops attempts after maximum retry attempts', () => { + const err = new GrpcError('GRPC Error'); + err.code = Number(10 /* ABORTED */); + return watchHelper.runFailedTest( + collQueryJSON(), + async () => { + // Retry for the maximum of retry attempts. + for (let i = 0; i < MAX_RETRY_ATTEMPTS; i++) { + streamHelper.destroyStream(err); + await streamHelper.awaitReopen(); + } + // The next retry should fail with an error. + streamHelper.destroyStream(err); + await streamHelper.await('error'); + await streamHelper.await('close'); + }, + 'Exceeded maximum number of retries allowed.' + ); + }); + it("doesn't re-open inactive stream", () => { // This test uses the normal timeout handler since it relies on the actual // backoff window during the the stream recovery. We then use this window to @@ -854,7 +874,7 @@ describe('Query watch', () => { /* PermissionDenied */ 7: false, /* ResourceExhausted */ 8: true, /* FailedPrecondition */ 9: false, - /* Aborted */ 10: false, + /* Aborted */ 10: true, /* OutOfRange */ 11: false, /* Unimplemented */ 12: false, /* Internal */ 13: true,