Skip to content

Commit

Permalink
fix: BulkWriter: apply rate limiter before sending batch (#1451)
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Chen committed Mar 16, 2021
1 parent fee8a04 commit 3a50f8b
Showing 1 changed file with 33 additions and 31 deletions.
64 changes: 33 additions & 31 deletions dev/src/bulk-writer.ts
Expand Up @@ -684,7 +684,7 @@ export class BulkWriter {
*/
flush(): Promise<void> {
this._verifyNotClosed();
this._sendCurrentBatch(/* flush= */ true);
this._scheduleCurrentBatch(/* flush= */ true);
return this._lastOp;
}

Expand Down Expand Up @@ -738,58 +738,60 @@ export class BulkWriter {
* `flush()` or `close()` call.
* @private
*/
private _sendCurrentBatch(flush = false): void {
private _scheduleCurrentBatch(flush = false): void {
if (this._bulkCommitBatch._opCount === 0) return;

const tag = requestTag();
const pendingBatch = this._bulkCommitBatch;
this._bulkCommitBatch = new BulkCommitBatch(this.firestore);

// Use the write with the longest backoff duration when determining backoff.
const highestBackoffDuration = pendingBatch.pendingOps.reduce((prev, cur) =>
prev.backoffDuration > cur.backoffDuration ? prev : cur
).backoffDuration;
const backoffMsWithJitter = BulkWriter.applyJitter(highestBackoffDuration);
const delayMs = this._rateLimiter.getNextRequestDelayMs(
pendingBatch._opCount
);
const finalDelayMs = Math.max(backoffMsWithJitter, delayMs);

const backoffMsWithJitter = BulkWriter._applyJitter(highestBackoffDuration);
const delayedExecution = new Deferred<void>();
this._bulkCommitBatch = new BulkCommitBatch(this.firestore);

if (backoffMsWithJitter > 0) {
delayExecution(() => delayedExecution.resolve(), backoffMsWithJitter);
} else {
delayedExecution.resolve();
}

delayedExecution.promise.then(() => this._sendBatch(pendingBatch, flush));
}

/**
* Sends the provided batch once the rate limiter does not require any delay.
*/
private async _sendBatch(
batch: BulkCommitBatch,
flush = false
): Promise<void> {
const tag = requestTag();

// Send the batch if it is does not require any delay, or schedule another
// attempt after the appropriate timeout.
if (finalDelayMs === 0) {
const underRateLimit = this._rateLimiter.tryMakeRequest(
pendingBatch._opCount
);
assert(
underRateLimit,
'RateLimiter should allow request if delayMs === 0'
);
delayedExecution.resolve();
const underRateLimit = this._rateLimiter.tryMakeRequest(batch._opCount);
if (underRateLimit) {
await batch.bulkCommit({requestTag: tag});
if (flush) this._scheduleCurrentBatch(flush);
} else {
const delayMs = this._rateLimiter.getNextRequestDelayMs(batch._opCount);
logger(
'BulkWriter._sendCurrentBatch',
'BulkWriter._sendBatch',
tag,
`Backing off for ${finalDelayMs} seconds`
`Backing off for ${delayMs} seconds`
);
delayExecution(() => delayedExecution.resolve(), finalDelayMs);
delayExecution(() => this._sendBatch(batch, flush), delayMs);
}

delayedExecution.promise.then(async () => {
// This should subtract rate limit, but it's not.
await pendingBatch.bulkCommit({requestTag: tag});
if (flush) this._sendCurrentBatch(flush);
});
}

/**
* Adds a 30% jitter to the provided backoff.
*
* @private
*/
private static applyJitter(backoffMs: number): number {
private static _applyJitter(backoffMs: number): number {
if (backoffMs === 0) return 0;
// Random value in [-0.3, 0.3].
const jitter = DEFAULT_JITTER_FACTOR * (Math.random() * 2 - 1);
Expand Down Expand Up @@ -832,7 +834,7 @@ export class BulkWriter {
if (this._bulkCommitBatch.has(op.ref)) {
// Create a new batch since the backend doesn't support batches with two
// writes to the same document.
this._sendCurrentBatch();
this._scheduleCurrentBatch();
}

// Run the operation on the current batch and advance the `_lastOp` pointer.
Expand All @@ -843,7 +845,7 @@ export class BulkWriter {
this._lastOp = this._lastOp.then(() => silencePromise(op.promise));

if (this._bulkCommitBatch._opCount === this._maxBatchSize) {
this._sendCurrentBatch();
this._scheduleCurrentBatch();
}
}
}
Expand Down

0 comments on commit 3a50f8b

Please sign in to comment.