Skip to content

Commit

Permalink
fix: retry on abort and limit retry count to 10 (#655)
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Chen authored and schmidt-sebastian committed May 28, 2019
1 parent eaf5a4e commit 9e97656
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 46 deletions.
28 changes: 26 additions & 2 deletions dev/src/backoff.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ const DEFAULT_BACKOFF_FACTOR = 1.5;
*/
const DEFAULT_JITTER_FACTOR = 1.0;

/*!
* The maximum number of retries that will be attempted by backoff
* before stopping all retry attempts.
*/
export const MAX_RETRY_ATTEMPTS = 10;

/*!
* The timeout handler used by `ExponentialBackoff`.
*/
Expand Down Expand Up @@ -135,6 +141,13 @@ export class ExponentialBackoff {
*/
private readonly jitterFactor: number;

/**
* The number of retries that has been attempted.
*
* @private
*/
private _retryCount = 0;

/**
* The backoff delay of the current attempt.
*
Expand Down Expand Up @@ -162,7 +175,7 @@ export class ExponentialBackoff {
}

/**
* Resets the backoff delay.
* Resets the backoff delay and retry count.
*
* The very next backoffAndWait() will have no delay. If it is called again
* (i.e. due to an error), initialDelayMs (plus jitter) will be used, and
Expand All @@ -171,6 +184,7 @@ export class ExponentialBackoff {
* @private
*/
reset(): void {
this._retryCount = 0;
this.currentBaseMs = 0;
}

Expand All @@ -192,6 +206,11 @@ export class ExponentialBackoff {
* @private
*/
backoffAndWait(): Promise<void> {
if (this.retryCount > MAX_RETRY_ATTEMPTS) {
return Promise.reject(
new Error('Exceeded maximum number of retries allowed.')
);
}
// First schedule using the current base (which may be 0 and should be
// honored as such).
const delayWithJitterMs = this.currentBaseMs + this.jitterDelayMs();
Expand All @@ -209,12 +228,17 @@ export class ExponentialBackoff {
this.currentBaseMs *= this.backoffFactor;
this.currentBaseMs = Math.max(this.currentBaseMs, this.initialDelayMs);
this.currentBaseMs = Math.min(this.currentBaseMs, this.maxDelayMs);

this._retryCount += 1;
return new Promise(resolve => {
delayExecution(resolve, delayWithJitterMs);
});
}

// Visible for testing.
get retryCount(): number {
return this._retryCount;
}

/**
* Returns a randomized "jitter" delay based on the current base and jitter
* factor.
Expand Down
87 changes: 45 additions & 42 deletions dev/src/watch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ abstract class Watch {
}

switch (error.code) {
case GRPC_STATUS_CODE.ABORTED:
case GRPC_STATUS_CODE.CANCELLED:
case GRPC_STATUS_CODE.UNKNOWN:
case GRPC_STATUS_CODE.DEADLINE_EXCEEDED:
Expand Down Expand Up @@ -426,50 +427,52 @@ abstract class Watch {
* Initializes a new stream to the backend with backoff.
*/
const initStream = () => {
this._backoff.backoffAndWait().then(async () => {
if (!isActive) {
logger(
'Watch.onSnapshot',
this._requestTag,
'Not initializing inactive stream'
);
return;
}
this._backoff
.backoffAndWait()
.then(async () => {
if (!isActive) {
logger(
'Watch.onSnapshot',
this._requestTag,
'Not initializing inactive stream'
);
return;
}

await this._firestore.initializeIfNeeded();

request.database = this._firestore.formattedName;
request.addTarget = this.getTarget(resumeToken);

// Note that we need to call the internal _listen API to pass additional
// header values in readWriteStream.
this._firestore
.readWriteStream('listen', request, this._requestTag, true)
.then(backendStream => {
if (!isActive) {
logger(
'Watch.onSnapshot',
this._requestTag,
'Closing inactive stream'
);
backendStream.end();
return;
}
logger('Watch.onSnapshot', this._requestTag, 'Opened new stream');
currentStream = backendStream;
currentStream!.on('error', err => {
maybeReopenStream(err);
});
currentStream!.on('end', () => {
const err = new GrpcError('Stream ended unexpectedly');
err.code = GRPC_STATUS_CODE.UNKNOWN;
maybeReopenStream(err);
await this._firestore.initializeIfNeeded();

request.database = this._firestore.formattedName;
request.addTarget = this.getTarget(resumeToken);

// Note that we need to call the internal _listen API to pass additional
// header values in readWriteStream.
return this._firestore
.readWriteStream('listen', request, this._requestTag, true)
.then(backendStream => {
if (!isActive) {
logger(
'Watch.onSnapshot',
this._requestTag,
'Closing inactive stream'
);
backendStream.end();
return;
}
logger('Watch.onSnapshot', this._requestTag, 'Opened new stream');
currentStream = backendStream;
currentStream!.on('error', err => {
maybeReopenStream(err);
});
currentStream!.on('end', () => {
const err = new GrpcError('Stream ended unexpectedly');
err.code = GRPC_STATUS_CODE.UNKNOWN;
maybeReopenStream(err);
});
currentStream!.pipe(stream);
currentStream!.resume();
});
currentStream!.pipe(stream);
currentStream!.resume();
})
.catch(closeStream);
});
})
.catch(closeStream);
};

/**
Expand Down
15 changes: 15 additions & 0 deletions dev/test/backoff.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,19 @@ describe('ExponentialBackoff', () => {
await backoff.backoffAndWait().then(nop);
assertDelayBetween(36, 44);
});

it('tracks the number of retry attempts', async () => {
const backoff = new ExponentialBackoff({
initialDelayMs: 10,
backoffFactor: 2,
jitterFactor: 0.1,
});
expect(backoff.retryCount).to.equal(0);
await backoff.backoffAndWait().then(nop);
expect(backoff.retryCount).to.equal(1);
await backoff.backoffAndWait().then(nop);
expect(backoff.retryCount).to.equal(2);
backoff.reset();
expect(backoff.retryCount).to.equal(0);
});
});
24 changes: 22 additions & 2 deletions dev/test/watch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import {
QueryDocumentSnapshot,
QuerySnapshot,
} from '../src';
import {setTimeoutHandler} from '../src/backoff';
import {MAX_RETRY_ATTEMPTS, setTimeoutHandler} from '../src/backoff';
import {DocumentSnapshotBuilder} from '../src/document';
import {DocumentChangeType} from '../src/document-change';
import {Serializer} from '../src/serializer';
Expand Down Expand Up @@ -819,6 +819,26 @@ describe('Query watch', () => {
});
});

it('stops attempts after maximum retry attempts', () => {
const err = new GrpcError('GRPC Error');
err.code = Number(10 /* ABORTED */);
return watchHelper.runFailedTest(
collQueryJSON(),
async () => {
// Retry for the maximum of retry attempts.
for (let i = 0; i < MAX_RETRY_ATTEMPTS; i++) {
streamHelper.destroyStream(err);
await streamHelper.awaitReopen();
}
// The next retry should fail with an error.
streamHelper.destroyStream(err);
await streamHelper.await('error');
await streamHelper.await('close');
},
'Exceeded maximum number of retries allowed.'
);
});

it("doesn't re-open inactive stream", () => {
// This test uses the normal timeout handler since it relies on the actual
// backoff window during the the stream recovery. We then use this window to
Expand Down Expand Up @@ -854,7 +874,7 @@ describe('Query watch', () => {
/* PermissionDenied */ 7: false,
/* ResourceExhausted */ 8: true,
/* FailedPrecondition */ 9: false,
/* Aborted */ 10: false,
/* Aborted */ 10: true,
/* OutOfRange */ 11: false,
/* Unimplemented */ 12: false,
/* Internal */ 13: true,
Expand Down

0 comments on commit 9e97656

Please sign in to comment.