Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: BulkWriter: apply rate limiter before sending batch #1451

Merged
merged 4 commits into from Mar 16, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
64 changes: 33 additions & 31 deletions dev/src/bulk-writer.ts
Expand Up @@ -684,7 +684,7 @@ export class BulkWriter {
*/
flush(): Promise<void> {
this._verifyNotClosed();
this._sendCurrentBatch(/* flush= */ true);
this._scheduleCurrentBatch(/* flush= */ true);
return this._lastOp;
}

Expand Down Expand Up @@ -738,58 +738,60 @@ export class BulkWriter {
* `flush()` or `close()` call.
* @private
*/
private _sendCurrentBatch(flush = false): void {
private _scheduleCurrentBatch(flush = false): void {
if (this._bulkCommitBatch._opCount === 0) return;

const tag = requestTag();
const pendingBatch = this._bulkCommitBatch;
this._bulkCommitBatch = new BulkCommitBatch(this.firestore);

// Use the write with the longest backoff duration when determining backoff.
const highestBackoffDuration = pendingBatch.pendingOps.reduce((prev, cur) =>
prev.backoffDuration > cur.backoffDuration ? prev : cur
).backoffDuration;
const backoffMsWithJitter = BulkWriter.applyJitter(highestBackoffDuration);
const delayMs = this._rateLimiter.getNextRequestDelayMs(
pendingBatch._opCount
);
const finalDelayMs = Math.max(backoffMsWithJitter, delayMs);

const backoffMsWithJitter = BulkWriter._applyJitter(highestBackoffDuration);
const delayedExecution = new Deferred<void>();
this._bulkCommitBatch = new BulkCommitBatch(this.firestore);

if (backoffMsWithJitter > 0) {
delayExecution(() => delayedExecution.resolve(), backoffMsWithJitter);
} else {
delayedExecution.resolve();
}

delayedExecution.promise.then(() => this._sendBatch(pendingBatch, flush));
}

/**
* Sends the provided batch once the rate limiter does not require any delay.
*/
private async _sendBatch(
batch: BulkCommitBatch,
flush = false
): Promise<void> {
const tag = requestTag();

// Send the batch if it is does not require any delay, or schedule another
// attempt after the appropriate timeout.
if (finalDelayMs === 0) {
const underRateLimit = this._rateLimiter.tryMakeRequest(
pendingBatch._opCount
);
assert(
underRateLimit,
'RateLimiter should allow request if delayMs === 0'
);
delayedExecution.resolve();
const underRateLimit = this._rateLimiter.tryMakeRequest(batch._opCount);
if (underRateLimit) {
await batch.bulkCommit({requestTag: tag});
if (flush) this._scheduleCurrentBatch(flush);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This codeflow still confused me, but I now know why. This sound like it is trying to re-send the same batch again. Should we call this _scheduleNextBatch()? FWIW, I think this function could also just be called _sendBatch() since it takes the batch as an argument now.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that makes more sense. Renamed.

} else {
const delayMs = this._rateLimiter.getNextRequestDelayMs(batch._opCount);
logger(
'BulkWriter._sendCurrentBatch',
'BulkWriter._sendBatch',
tag,
`Backing off for ${finalDelayMs} seconds`
`Backing off for ${delayMs} seconds`
);
delayExecution(() => delayedExecution.resolve(), finalDelayMs);
delayExecution(() => this._sendBatch(batch, flush), delayMs);
}

delayedExecution.promise.then(async () => {
// This should subtract rate limit, but it's not.
await pendingBatch.bulkCommit({requestTag: tag});
if (flush) this._sendCurrentBatch(flush);
});
}

/**
* Adds a 30% jitter to the provided backoff.
*
* @private
*/
private static applyJitter(backoffMs: number): number {
private static _applyJitter(backoffMs: number): number {
if (backoffMs === 0) return 0;
// Random value in [-0.3, 0.3].
const jitter = DEFAULT_JITTER_FACTOR * (Math.random() * 2 - 1);
Expand Down Expand Up @@ -832,7 +834,7 @@ export class BulkWriter {
if (this._bulkCommitBatch.has(op.ref)) {
// Create a new batch since the backend doesn't support batches with two
// writes to the same document.
this._sendCurrentBatch();
this._scheduleCurrentBatch();
}

// Run the operation on the current batch and advance the `_lastOp` pointer.
Expand All @@ -843,7 +845,7 @@ export class BulkWriter {
this._lastOp = this._lastOp.then(() => silencePromise(op.promise));

if (this._bulkCommitBatch._opCount === this._maxBatchSize) {
this._sendCurrentBatch();
this._scheduleCurrentBatch();
}
}
}
Expand Down