Skip to content

Commit

Permalink
fix: bulkWriter: ensure buffered batches are sent after flush (#1535)
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Chen committed Jun 16, 2021
1 parent 9f4c313 commit 115a134
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 6 deletions.
50 changes: 44 additions & 6 deletions dev/src/bulk-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ class BulkWriterOperation {
private lastStatus?: StatusCode;
private _backoffDuration = 0;

/** Whether flush() was called when this was the last enqueued operation. */
private _flushed = false;

/**
* @param ref The document reference being written to.
* @param type The type of operation that created this write.
Expand All @@ -134,6 +137,14 @@ class BulkWriterOperation {
return this._backoffDuration;
}

markFlushed(): void {
this._flushed = true;
}

get flushed(): boolean {
return this._flushed;
}

onError(error: GoogleError): void {
++this.failedAttempts;

Expand Down Expand Up @@ -271,6 +282,18 @@ class BulkCommitBatch extends WriteBatch {
}
}

/**
* Used to represent a buffered BulkWriterOperation.
*
* @private
*/
class BufferedOperation {
constructor(
readonly operation: BulkWriterOperation,
readonly sendFn: () => void
) {}
}

/**
* The error thrown when a BulkWriter operation fails.
*
Expand Down Expand Up @@ -354,7 +377,7 @@ export class BulkWriter {
* of pending operations has been enqueued.
* @private
*/
private _bufferedOperations: Array<() => void> = [];
private _bufferedOperations: Array<BufferedOperation> = [];

// Visible for testing.
_getBufferedOperationsCount(): number {
Expand Down Expand Up @@ -751,6 +774,15 @@ export class BulkWriter {
flush(): Promise<void> {
this._verifyNotClosed();
this._scheduleCurrentBatch(/* flush= */ true);

// Mark the most recent operation as flushed to ensure that the batch
// containing it will be sent once it's popped from the buffer.
if (this._bufferedOperations.length > 0) {
this._bufferedOperations[
this._bufferedOperations.length - 1
].operation.markFlushed();
}

return this._lastOp;
}

Expand Down Expand Up @@ -898,10 +930,12 @@ export class BulkWriter {
this._pendingOpsCount++;
this._sendFn(enqueueOnBatchCallback, bulkWriterOp);
} else {
this._bufferedOperations.push(() => {
this._pendingOpsCount++;
this._sendFn(enqueueOnBatchCallback, bulkWriterOp);
});
this._bufferedOperations.push(
new BufferedOperation(bulkWriterOp, () => {
this._pendingOpsCount++;
this._sendFn(enqueueOnBatchCallback, bulkWriterOp);
})
);
}

// Chain the BulkWriter operation promise with the buffer processing logic
Expand Down Expand Up @@ -931,7 +965,7 @@ export class BulkWriter {
this._bufferedOperations.length > 0
) {
const nextOp = this._bufferedOperations.shift()!;
nextOp();
nextOp.sendFn();
}
}

Expand All @@ -956,6 +990,10 @@ export class BulkWriter {

if (this._bulkCommitBatch._opCount === this._maxBatchSize) {
this._scheduleCurrentBatch();
} else if (op.flushed) {
// If flush() was called before this operation was enqueued into a batch,
// we still need to schedule it.
this._scheduleCurrentBatch(/* flush= */ true);
}
}
}
Expand Down
62 changes: 62 additions & 0 deletions dev/test/bulk-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,68 @@ describe('BulkWriter', () => {
});
});

it('buffered operations are flushed after being enqueued', async () => {
const bulkWriter = await instantiateInstance([
{
request: createRequest([
setOp('doc1', 'bar'),
setOp('doc2', 'bar'),
setOp('doc3', 'bar'),
]),
response: mergeResponses([
successResponse(1),
successResponse(2),
successResponse(3),
]),
},
{
request: createRequest([
setOp('doc4', 'bar'),
setOp('doc5', 'bar'),
setOp('doc6', 'bar'),
]),
response: mergeResponses([
successResponse(4),
successResponse(5),
successResponse(6),
]),
},
{
request: createRequest([setOp('doc7', 'bar')]),
response: successResponse(7),
},
]);
bulkWriter._setMaxPendingOpCount(6);
bulkWriter._maxBatchSize = 3;
bulkWriter
.set(firestore.doc('collectionId/doc1'), {foo: 'bar'})
.then(incrementOpCount);
bulkWriter
.set(firestore.doc('collectionId/doc2'), {foo: 'bar'})
.then(incrementOpCount);
bulkWriter
.set(firestore.doc('collectionId/doc3'), {foo: 'bar'})
.then(incrementOpCount);
bulkWriter
.set(firestore.doc('collectionId/doc4'), {foo: 'bar'})
.then(incrementOpCount);
bulkWriter
.set(firestore.doc('collectionId/doc5'), {foo: 'bar'})
.then(incrementOpCount);
bulkWriter
.set(firestore.doc('collectionId/doc6'), {foo: 'bar'})
.then(incrementOpCount);

// The 7th operation is buffered. We want to check that the operation is
// still sent even though it is not enqueued when close() is called.
bulkWriter
.set(firestore.doc('collectionId/doc7'), {foo: 'bar'})
.then(incrementOpCount);
return bulkWriter.close().then(async () => {
verifyOpCount(7);
});
});

it('runs the success handler', async () => {
const bulkWriter = await instantiateInstance([
{
Expand Down

0 comments on commit 115a134

Please sign in to comment.