Skip to content

Commit

Permalink
fix: create new batch for writes to the same doc (#1352)
Browse files Browse the repository at this point in the history
* fix: create new batch for writes to the same doc

* fix test naming
  • Loading branch information
Brian Chen committed Nov 3, 2020
1 parent 6fb360c commit bd5adc3
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 34 deletions.
32 changes: 24 additions & 8 deletions dev/src/bulk-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ class BulkCommitBatch {
*/
state = BatchState.OPEN;

// The set of document reference paths present in the WriteBatch.
readonly docPaths = new Set<string>();

// An array of pending write operations. Only contains writes that have not
// been resolved.
private pendingOps: Array<Deferred<BatchWriteResult>> = [];
Expand Down Expand Up @@ -126,7 +129,7 @@ class BulkCommitBatch {
data: T
): Promise<WriteResult> {
this.writeBatch.create(documentRef, data);
return this.processLastOperation();
return this.processLastOperation(documentRef);
}

/**
Expand All @@ -138,7 +141,7 @@ class BulkCommitBatch {
precondition?: firestore.Precondition
): Promise<WriteResult> {
this.writeBatch.delete(documentRef, precondition);
return this.processLastOperation();
return this.processLastOperation(documentRef);
}

set<T>(
Expand All @@ -165,7 +168,7 @@ class BulkCommitBatch {
options?: firestore.SetOptions
): Promise<WriteResult> {
this.writeBatch.set(documentRef, data, options);
return this.processLastOperation();
return this.processLastOperation(documentRef);
}

/**
Expand All @@ -180,14 +183,21 @@ class BulkCommitBatch {
>
): Promise<WriteResult> {
this.writeBatch.update(documentRef, dataOrField, ...preconditionOrValues);
return this.processLastOperation();
return this.processLastOperation(documentRef);
}

/**
* Helper to update data structures associated with the operation and
* return the result.
*/
private processLastOperation<T>(): Promise<WriteResult> {
private processLastOperation<T>(
documentRef: firestore.DocumentReference
): Promise<WriteResult> {
assert(
!this.docPaths.has(documentRef.path),
'Batch should not contain writes to the same document'
);
this.docPaths.add(documentRef.path);
assert(
this.state === BatchState.OPEN,
'Batch should be OPEN when adding writes'
Expand Down Expand Up @@ -762,10 +772,16 @@ export class BulkWriter {
*
* @private
*/
private getEligibleBatch<T>(batchQueue: BulkCommitBatch[]): BulkCommitBatch {
private getEligibleBatch<T>(
documentRef: firestore.DocumentReference,
batchQueue: BulkCommitBatch[]
): BulkCommitBatch {
if (batchQueue.length > 0) {
const lastBatch = batchQueue[batchQueue.length - 1];
if (lastBatch.state === BatchState.OPEN) {
if (
lastBatch.state === BatchState.OPEN &&
!lastBatch.docPaths.has(documentRef.path)
) {
return lastBatch;
}
}
Expand Down Expand Up @@ -881,7 +897,7 @@ export class BulkWriter {
for (let failedAttempts = 0; ; ++failedAttempts) {
const batchQueue =
failedAttempts > 0 ? this._retryBatchQueue : this._batchQueue;
const bulkCommitBatch = this.getEligibleBatch(batchQueue);
const bulkCommitBatch = this.getEligibleBatch(documentRef, batchQueue);

// Send ready batches if this is the first attempt. Subsequent retry
// batches are scheduled after the initial batch returns.
Expand Down
13 changes: 13 additions & 0 deletions dev/system-test/firestore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2607,6 +2607,19 @@ describe('BulkWriter class', () => {
expect(deleteResult.writeTime).to.deep.equal(new Timestamp(0, 0));
});

it('can write to the same document twice', async () => {
const ref = randomCol.doc('doc1');
const op1 = writer.set(ref, {foo: 'bar'});
const op2 = writer.set(ref, {foo: 'bar2'});
await writer.close();
const result = await ref.get();
expect(result.data()).to.deep.equal({foo: 'bar2'});
const writeTime1 = (await op1).writeTime;
const writeTime2 = (await op2).writeTime;
expect(writeTime1).to.not.be.null;
expect(writeTime2).to.not.be.null;
});

it('can terminate once BulkWriter is closed', async () => {
const ref = randomCol.doc('doc1');
writer.set(ref, {foo: 'bar'});
Expand Down
53 changes: 27 additions & 26 deletions dev/test/bulk-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -455,14 +455,15 @@ describe('BulkWriter', () => {
expect(() => bulkWriter.close()).to.throw(expected);
});

it('can send writes to the same documents in the same batch', async () => {
it('send writes to the same documents in the different batches', async () => {
const bulkWriter = await instantiateInstance([
{
request: createRequest([
setOp('doc1', 'bar'),
updateOp('doc1', 'bar2'),
]),
response: mergeResponses([successResponse(1), successResponse(2)]),
request: createRequest([setOp('doc1', 'bar')]),
response: successResponse(1),
},
{
request: createRequest([updateOp('doc1', 'bar2')]),
response: successResponse(2),
},
]);

Expand Down Expand Up @@ -497,10 +498,10 @@ describe('BulkWriter', () => {
const bulkWriter = await instantiateInstance([
{
request: createRequest([
createOp('doc', 'bar'),
setOp('doc', 'bar'),
updateOp('doc', 'bar'),
deleteOp('doc'),
createOp('doc1', 'bar'),
setOp('doc2', 'bar'),
updateOp('doc3', 'bar'),
deleteOp('doc4'),
]),
response: mergeResponses([
successResponse(1),
Expand All @@ -514,10 +515,10 @@ describe('BulkWriter', () => {
bulkWriter.onWriteResult((documentRef, result) => {
writeResults.push(result.writeTime.seconds);
});
bulkWriter.create(firestore.doc('collectionId/doc'), {foo: 'bar'});
bulkWriter.set(firestore.doc('collectionId/doc'), {foo: 'bar'});
bulkWriter.update(firestore.doc('collectionId/doc'), {foo: 'bar'});
bulkWriter.delete(firestore.doc('collectionId/doc'));
bulkWriter.create(firestore.doc('collectionId/doc1'), {foo: 'bar'});
bulkWriter.set(firestore.doc('collectionId/doc2'), {foo: 'bar'});
bulkWriter.update(firestore.doc('collectionId/doc3'), {foo: 'bar'});
bulkWriter.delete(firestore.doc('collectionId/doc4'));
return bulkWriter.close().then(() => {
expect(writeResults).to.deep.equal([1, 2, 3, 4]);
});
Expand All @@ -528,9 +529,9 @@ describe('BulkWriter', () => {
{
request: createRequest([
createOp('doc', 'bar'),
setOp('doc', 'bar'),
updateOp('doc', 'bar'),
deleteOp('doc'),
setOp('doc1', 'bar'),
updateOp('doc2', 'bar'),
deleteOp('doc3'),
]),
response: mergeResponses([
successResponse(1),
Expand All @@ -541,9 +542,9 @@ describe('BulkWriter', () => {
},
{
request: createRequest([
setOp('doc', 'bar'),
updateOp('doc', 'bar'),
deleteOp('doc'),
setOp('doc1', 'bar'),
updateOp('doc2', 'bar'),
deleteOp('doc3'),
]),
response: mergeResponses([
successResponse(2),
Expand All @@ -563,9 +564,9 @@ describe('BulkWriter', () => {
writeResults.push(result.writeTime.seconds);
});
bulkWriter.create(firestore.doc('collectionId/doc'), {foo: 'bar'});
bulkWriter.set(firestore.doc('collectionId/doc'), {foo: 'bar'});
bulkWriter.update(firestore.doc('collectionId/doc'), {foo: 'bar'});
bulkWriter.delete(firestore.doc('collectionId/doc'));
bulkWriter.set(firestore.doc('collectionId/doc1'), {foo: 'bar'});
bulkWriter.update(firestore.doc('collectionId/doc2'), {foo: 'bar'});
bulkWriter.delete(firestore.doc('collectionId/doc3'));
return bulkWriter.close().then(() => {
expect(ops).to.deep.equal([
'success',
Expand Down Expand Up @@ -867,7 +868,7 @@ describe('BulkWriter', () => {
{
request: createRequest([
setOp('doc1', 'bar'),
setOp('doc1', 'bar2'),
setOp('doc2', 'bar2'),
setOp('doc3', 'bar'),
]),
response: mergeResponses([
Expand All @@ -877,7 +878,7 @@ describe('BulkWriter', () => {
]),
},
{
request: createRequest([setOp('doc1', 'bar2'), setOp('doc3', 'bar')]),
request: createRequest([setOp('doc2', 'bar2'), setOp('doc3', 'bar')]),
response: mergeResponses([
successResponse(2),
failedResponse(Status.ABORTED),
Expand All @@ -896,7 +897,7 @@ describe('BulkWriter', () => {
foo: 'bar',
})
.catch(incrementOpCount);
const set2 = bulkWriter.set(firestore.doc('collectionId/doc1'), {
const set2 = bulkWriter.set(firestore.doc('collectionId/doc2'), {
foo: 'bar2',
});
const set3 = bulkWriter.set(firestore.doc('collectionId/doc3'), {
Expand Down

0 comments on commit bd5adc3

Please sign in to comment.