Skip to content

Commit

Permalink
fix: BulkWriter: add backoff on retries (#1447)
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Chen committed Mar 15, 2021
1 parent a36bb09 commit f483083
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 21 deletions.
6 changes: 3 additions & 3 deletions dev/src/backoff.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ import {logger} from './logger';
* The default initial backoff time in milliseconds after an error.
* Set to 1s according to https://cloud.google.com/apis/design/errors.
*/
const DEFAULT_BACKOFF_INITIAL_DELAY_MS = 1000;
export const DEFAULT_BACKOFF_INITIAL_DELAY_MS = 1000;

/*!
* The default maximum backoff time in milliseconds.
*/
const DEFAULT_BACKOFF_MAX_DELAY_MS = 60 * 1000;
export const DEFAULT_BACKOFF_MAX_DELAY_MS = 60 * 1000;

/*!
* The default factor to increase the backup by after each failed attempt.
*/
const DEFAULT_BACKOFF_FACTOR = 1.5;
export const DEFAULT_BACKOFF_FACTOR = 1.5;

/*!
* The default jitter to distribute the backoff attempts by (0 means no
Expand Down
84 changes: 70 additions & 14 deletions dev/src/bulk-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ import * as assert from 'assert';

import {google} from '../protos/firestore_v1_proto_api';
import {FieldPath, Firestore} from '.';
import {delayExecution, MAX_RETRY_ATTEMPTS} from './backoff';
import {
DEFAULT_BACKOFF_FACTOR,
DEFAULT_BACKOFF_INITIAL_DELAY_MS,
DEFAULT_BACKOFF_MAX_DELAY_MS,
delayExecution,
MAX_RETRY_ATTEMPTS,
} from './backoff';
import {RateLimiter} from './rate-limiter';
import {DocumentReference} from './reference';
import {Timestamp} from './timestamp';
Expand Down Expand Up @@ -65,6 +71,12 @@ export const DEFAULT_INITIAL_OPS_PER_SECOND_LIMIT = 500;
*/
export const DEFAULT_MAXIMUM_OPS_PER_SECOND_LIMIT = 10000;

/*!
* The default jitter to apply to the exponential backoff used in retries. For
* example, a factor of 0.3 means a 30% jitter is applied.
*/
export const DEFAULT_JITTER_FACTOR = 0.3;

/*!
* The rate by which to increase the capacity as specified by the 500/50/5 rule.
*/
Expand All @@ -84,6 +96,8 @@ const RATE_LIMITER_MULTIPLIER_MILLIS = 5 * 60 * 1000;
class BulkWriterOperation {
private deferred = new Deferred<WriteResult>();
private failedAttempts = 0;
private lastStatus?: Status;
private _backoffDuration = 0;

/**
* @param ref The document reference being written to.
Expand All @@ -107,6 +121,10 @@ class BulkWriterOperation {
return this.deferred.promise;
}

get backoffDuration(): number {
return this._backoffDuration;
}

onError(error: GoogleError): void {
++this.failedAttempts;

Expand All @@ -131,6 +149,8 @@ class BulkWriterOperation {
);

if (shouldRetry) {
this.lastStatus = error.code;
this.updateBackoffDuration();
this.sendFn(this);
} else {
this.deferred.reject(bulkWriterError);
Expand All @@ -140,6 +160,16 @@ class BulkWriterOperation {
}
}

private updateBackoffDuration(): void {
if (this.lastStatus === Status.RESOURCE_EXHAUSTED) {
this._backoffDuration = DEFAULT_BACKOFF_MAX_DELAY_MS;
} else if (this._backoffDuration === 0) {
this._backoffDuration = DEFAULT_BACKOFF_INITIAL_DELAY_MS;
} else {
this._backoffDuration *= DEFAULT_BACKOFF_FACTOR;
}
}

onSuccess(result: WriteResult): void {
try {
this.successFn(this.ref, result);
Expand All @@ -161,7 +191,7 @@ class BulkCommitBatch extends WriteBatch {

// An array of pending write operations. Only contains writes that have not
// been resolved.
private pendingOps: Array<BulkWriterOperation> = [];
readonly pendingOps: Array<BulkWriterOperation> = [];

has(documentRef: firestore.DocumentReference<unknown>): boolean {
return this.docPaths.has(documentRef.path);
Expand Down Expand Up @@ -279,7 +309,7 @@ export class BulkWriter {
private _bulkCommitBatch = new BulkCommitBatch(this.firestore);

/**
* A pointer to the tail of all active BulkWriter applications. This pointer
* A pointer to the tail of all active BulkWriter operations. This pointer
* is advanced every time a new write is enqueued.
* @private
*/
Expand Down Expand Up @@ -714,35 +744,61 @@ export class BulkWriter {
const tag = requestTag();
const pendingBatch = this._bulkCommitBatch;

this._bulkCommitBatch = new BulkCommitBatch(this.firestore);

// Send the batch if it is under the rate limit, or schedule another
// attempt after the appropriate timeout.
const underRateLimit = this._rateLimiter.tryMakeRequest(
// Use the write with the longest backoff duration when determining backoff.
const highestBackoffDuration = pendingBatch.pendingOps.reduce((prev, cur) =>
prev.backoffDuration > cur.backoffDuration ? prev : cur
).backoffDuration;
const backoffMsWithJitter = BulkWriter.applyJitter(highestBackoffDuration);
const delayMs = this._rateLimiter.getNextRequestDelayMs(
pendingBatch._opCount
);
const finalDelayMs = Math.max(backoffMsWithJitter, delayMs);

const delayedExecution = new Deferred<void>();
if (underRateLimit) {
delayedExecution.resolve();
} else {
const delayMs = this._rateLimiter.getNextRequestDelayMs(
this._bulkCommitBatch = new BulkCommitBatch(this.firestore);

// Send the batch if it is does not require any delay, or schedule another
// attempt after the appropriate timeout.
if (finalDelayMs === 0) {
const underRateLimit = this._rateLimiter.tryMakeRequest(
pendingBatch._opCount
);
assert(
underRateLimit,
'RateLimiter should allow request if delayMs === 0'
);
delayedExecution.resolve();
} else {
logger(
'BulkWriter._sendCurrentBatch',
tag,
`Backing off for ${delayMs} seconds`
`Backing off for ${finalDelayMs} seconds`
);
delayExecution(() => delayedExecution.resolve(), delayMs);
delayExecution(() => delayedExecution.resolve(), finalDelayMs);
}

delayedExecution.promise.then(async () => {
// This should subtract rate limit, but it's not.
await pendingBatch.bulkCommit({requestTag: tag});
if (flush) this._sendCurrentBatch(flush);
});
}

/**
* Adds a 30% jitter to the provided backoff.
*
* @private
*/
private static applyJitter(backoffMs: number): number {
if (backoffMs === 0) return 0;
// Random value in [-0.3, 0.3].
const jitter = DEFAULT_JITTER_FACTOR * (Math.random() * 2 - 1);
return Math.min(
DEFAULT_BACKOFF_MAX_DELAY_MS,
backoffMs + jitter * backoffMs
);
}

/**
* Schedules and runs the provided operation on the next available batch.
* @private
Expand Down
140 changes: 136 additions & 4 deletions dev/test/bulk-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,17 @@ import {
Timestamp,
WriteResult,
} from '../src';
import {setTimeoutHandler} from '../src/backoff';
import {
DEFAULT_BACKOFF_FACTOR,
DEFAULT_BACKOFF_INITIAL_DELAY_MS,
DEFAULT_BACKOFF_MAX_DELAY_MS,
MAX_RETRY_ATTEMPTS,
setTimeoutHandler,
} from '../src/backoff';
import {
BulkWriterError,
DEFAULT_INITIAL_OPS_PER_SECOND_LIMIT,
DEFAULT_JITTER_FACTOR,
DEFAULT_MAXIMUM_OPS_PER_SECOND_LIMIT,
} from '../src/bulk-writer';
import {
Expand All @@ -45,7 +52,6 @@ import {
updateMask,
verifyInstance,
} from './util/helpers';

import api = proto.google.firestore.v1;

// Change the argument to 'console.log' to enable debug output.
Expand Down Expand Up @@ -180,7 +186,6 @@ describe('BulkWriter', () => {

afterEach(() => {
verifyInstance(firestore);
expect(timeoutHandlerCounter).to.equal(0);
setTimeoutHandler(setTimeout);
});

Expand Down Expand Up @@ -579,6 +584,7 @@ describe('BulkWriter', () => {
'success',
]);
expect(writeResults).to.deep.equal([1, 2, 3, 4]);
expect(timeoutHandlerCounter).to.equal(1);
});
});

Expand Down Expand Up @@ -707,6 +713,7 @@ describe('BulkWriter', () => {
});
await bulkWriter.close();
expect(writeResult).to.equal(1);
expect(timeoutHandlerCounter).to.equal(3);
});

it('retries maintain correct write resolution ordering', async () => {
Expand Down Expand Up @@ -1027,7 +1034,16 @@ describe('BulkWriter', () => {
});

it('fails writes after all retry attempts failed', async () => {
setTimeoutHandler(setImmediate);
setTimeoutHandler((fn, timeout) => {
const expected =
DEFAULT_BACKOFF_INITIAL_DELAY_MS * Math.pow(1.5, timeoutHandlerCounter);
expect(timeout).to.be.within(
(1 - DEFAULT_JITTER_FACTOR) * expected,
(1 + DEFAULT_JITTER_FACTOR) * expected
);
timeoutHandlerCounter++;
fn();
});
function instantiateInstance(): Promise<BulkWriter> {
const overrides: ApiOverride = {
batchWrite: () => {
Expand All @@ -1053,7 +1069,123 @@ describe('BulkWriter', () => {
});
return bulkWriter.close().then(() => {
verifyOpCount(1);
expect(timeoutHandlerCounter).to.equal(MAX_RETRY_ATTEMPTS - 1);
});
});

it('applies maximum backoff on retries for RESOURCE_EXHAUSTED', async () => {
setTimeoutHandler((fn, timeout) => {
timeoutHandlerCounter++;
expect(timeout).to.be.within(
(1 - DEFAULT_JITTER_FACTOR) * DEFAULT_BACKOFF_MAX_DELAY_MS,
(1 + DEFAULT_JITTER_FACTOR) * DEFAULT_BACKOFF_MAX_DELAY_MS
);
fn();
});
function instantiateInstance(): Promise<BulkWriter> {
const overrides: ApiOverride = {
batchWrite: () => {
const error = new GoogleError('Mock batchWrite failed in test');
error.code = Status.RESOURCE_EXHAUSTED;
throw error;
},
};
return createInstance(overrides).then(firestoreClient => {
firestore = firestoreClient;
return firestore.bulkWriter();
});
}
const bulkWriter = await instantiateInstance();
bulkWriter.onWriteError(err => err.failedAttempts < 5);
bulkWriter
.create(firestore.doc('collectionId/doc'), {
foo: 'bar',
})
.catch(err => {
expect(err instanceof BulkWriterError).to.be.true;
expect(err.code).to.equal(Status.RESOURCE_EXHAUSTED);
incrementOpCount();
});
return bulkWriter.close().then(() => {
verifyOpCount(1);
expect(timeoutHandlerCounter).to.equal(4);
});
});

it('uses the highest backoff found in the batch', async () => {
const expected = [
DEFAULT_BACKOFF_MAX_DELAY_MS,
DEFAULT_BACKOFF_INITIAL_DELAY_MS * DEFAULT_BACKOFF_FACTOR,
];
setTimeoutHandler((fn, timeout) => {
// 1st batch should have max backoff. 2nd batch should have 1 round
// of backoff applied.
expect(timeout).to.be.within(
(1 - DEFAULT_JITTER_FACTOR) * expected[timeoutHandlerCounter],
(1 + DEFAULT_JITTER_FACTOR) * expected[timeoutHandlerCounter]
);
timeoutHandlerCounter++;
fn();
});
const bulkWriter = await instantiateInstance([
{
request: createRequest([createOp('doc1', 'bar'), setOp('doc2', 'bar')]),
response: mergeResponses([
failedResponse(Status.RESOURCE_EXHAUSTED),
failedResponse(Status.UNAVAILABLE),
]),
},
{
request: createRequest([createOp('doc1', 'bar'), setOp('doc2', 'bar')]),
response: mergeResponses([
successResponse(1),
failedResponse(Status.UNAVAILABLE),
]),
},
{
request: createRequest([setOp('doc2', 'bar')]),
response: successResponse(2),
},
]);

bulkWriter.onWriteError(err => err.failedAttempts < 5);
bulkWriter.create(firestore.doc('collectionId/doc1'), {
foo: 'bar',
});
bulkWriter.set(firestore.doc('collectionId/doc2'), {
foo: 'bar',
});
return bulkWriter.close().then(() => {
expect(timeoutHandlerCounter).to.equal(2);
});
});

it('sends backoff batch after other enqueued batches', async () => {
setTimeoutHandler(setImmediate);
const bulkWriter = await instantiateInstance([
{
request: createRequest([createOp('doc1', 'bar')]),
response: failedResponse(Status.RESOURCE_EXHAUSTED),
},
{
request: createRequest([setOp('doc2', 'bar')]),
response: successResponse(1),
},
{
request: createRequest([createOp('doc1', 'bar')]),
response: successResponse(2),
},
]);

bulkWriter.onWriteError(err => err.failedAttempts < 5);
bulkWriter.create(firestore.doc('collectionId/doc1'), {
foo: 'bar',
});
bulkWriter.flush();
bulkWriter.set(firestore.doc('collectionId/doc2'), {
foo: 'bar',
});
return bulkWriter.close();
});

describe('if bulkCommit() fails', async () => {
Expand Down

0 comments on commit f483083

Please sign in to comment.