Skip to content

Commit

Permalink
Merge e24c591 into 685e75c
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Chen committed Feb 23, 2021
2 parents 685e75c + e24c591 commit 800dfee
Show file tree
Hide file tree
Showing 7 changed files with 839 additions and 143 deletions.
2 changes: 1 addition & 1 deletion dev/src/bulk-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ export class BulkWriter {
* Throws an error if the BulkWriter instance has been closed.
* @private
*/
private _verifyNotClosed(): void {
_verifyNotClosed(): void {
if (this._closing) {
throw new Error('BulkWriter has already been closed.');
}
Expand Down
137 changes: 129 additions & 8 deletions dev/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

import * as firestore from '@google-cloud/firestore';

import {CallOptions, grpc, RetryOptions} from 'google-gax';
import {CallOptions, GoogleError, grpc, RetryOptions, Status} from 'google-gax';
import {Duplex, PassThrough, Transform} from 'stream';

import {URL} from 'url';

import {google} from '../protos/firestore_v1_proto_api';
import {ExponentialBackoff, ExponentialBackoffSetting} from './backoff';
import {BulkWriter} from './bulk-writer';
import {BulkWriter, BulkWriterError} from './bulk-writer';
import {BundleBuilder} from './bundle';
import {fieldsFromJson, timestampFromJson} from './convert';
import {
Expand Down Expand Up @@ -141,7 +141,7 @@ const CLOUD_RESOURCE_HEADER = 'google-cloud-resource-prefix';
/*!
* The maximum number of times to retry idempotent requests.
*/
const MAX_REQUEST_RETRIES = 5;
export const MAX_REQUEST_RETRIES = 5;

/*!
* The default number of idle GRPC channel to keep.
Expand All @@ -166,7 +166,7 @@ const MAX_CONCURRENT_REQUESTS_PER_CLIENT = 100;
*
* @private
*/
const REFERENCE_NAME_MIN_ID = '__id-9223372036854775808__';
export const REFERENCE_NAME_MIN_ID = '__id-9223372036854775808__';

/**
* Document data (e.g. for use with
Expand Down Expand Up @@ -399,6 +399,26 @@ export class Firestore implements firestore.Firestore {
*/
private registeredListenersCount = 0;

/**
* A lazy-loaded BulkWriter instance to be used with recursiveDelete() if no
* BulkWriter instance is provided.
*
* @private
*/
private _bulkWriter: BulkWriter | undefined;

/**
* Lazy-load the Firestore's default BulkWriter.
*
* @private
*/
private getBulkWriter(): BulkWriter {
if (!this._bulkWriter) {
this._bulkWriter = this.bulkWriter();
}
return this._bulkWriter;
}

/**
* Number of pending operations on the client.
*
Expand Down Expand Up @@ -1198,27 +1218,128 @@ export class Firestore implements firestore.Firestore {
this.bulkWritersCount -= 1;
}

/**
* Recursively deletes all documents and subcollections at and under the
* specified level.
*
* If any delete fails, the promise is rejected with an error message
* containing the number of failed deletes and the stack trace of the last
* failed delete. The provided reference is deleted regardless of whether
* all deletes succeeded.
*
* `recursiveDelete()` uses a BulkWriter instance with default settings to
* perform the deletes. To customize throttling rates or add success/error
* callbacks, pass in a custom BulkWriter instance.
*
* @param ref The reference of a document or collection to delete.
* @param bulkWriter A custom BulkWriter instance used to perform the
* deletes.
* @return A promise that resolves when all deletes have been performed.
* The promise is rejected if any of the deletes fail.
*
* @example
* // Recursively delete a reference and log the references of failures.
* const bulkWriter = firestore.bulkWriter();
* bulkWriter
* .onWriteError((error) => {
* if (
* error.failedAttempts < MAX_RETRY_ATTEMPTS
* ) {
* return true;
* } else {
* console.log('Failed write at document: ', error.documentRef.path);
* return false;
* }
* });
* await firestore.recursiveDelete(docRef, bulkWriter);
*/
recursiveDelete(
ref:
| firestore.CollectionReference<unknown>
| firestore.DocumentReference<unknown>,
bulkWriter?: BulkWriter
): Promise<void> {
// Capture the error stack to preserve stack tracing across async calls.
const stack = Error().stack!;

const writer = bulkWriter ?? this.getBulkWriter();
writer._verifyNotClosed();
const docStream = this._getAllDescendants(
ref instanceof CollectionReference
? (ref as CollectionReference<unknown>)
: (ref as DocumentReference<unknown>)
);
const resultDeferred = new Deferred<void>();
let errorCount = 0;
let lastError: Error | undefined;
const incrementErrorCount = (err: Error): void => {
errorCount++;
lastError = err;
};
const onStreamEnd = (): void => {
if (ref instanceof DocumentReference) {
writer.delete(ref).catch(err => incrementErrorCount(err));
}
writer.flush().then(async () => {
if (lastError === undefined) {
resultDeferred.resolve();
} else {
let error = new GoogleError(
`${errorCount} ` +
`${errorCount !== 1 ? 'deletes' : 'delete'} ` +
'failed. The last delete failed with: '
);
if (lastError instanceof BulkWriterError) {
error.code = (lastError.code as number) as Status;
}
error = wrapError(error, stack);

// Wrap the BulkWriter error last to provide the full stack trace.
resultDeferred.reject(
lastError.stack ? wrapError(error, lastError.stack ?? '') : error
);
}
});
};

docStream
.on('error', err => {
err.code = Status.UNAVAILABLE;
err.message = 'Failed to fetch children documents.';
lastError = err;
onStreamEnd();
})
.on('data', (snap: QueryDocumentSnapshot) => {
writer.delete(snap.ref).catch(err => incrementErrorCount(err));
})
.on('end', () => onStreamEnd());

return resultDeferred.promise;
}

/**
* Retrieves all descendant documents nested under the provided reference.
*
* @private
* @return {Stream<QueryDocumentSnapshot>} Stream of descendant documents.
*/
// TODO(chenbrian): Make this a private method after adding recursive delete.
_getAllDescendants(
ref: CollectionReference | DocumentReference
private _getAllDescendants(
ref: CollectionReference<unknown> | DocumentReference<unknown>
): NodeJS.ReadableStream {
// The parent is the closest ancestor document to the location we're
// deleting. If we are deleting a document, the parent is the path of that
// document. If we are deleting a collection, the parent is the path of the
// document containing that collection (or the database root, if it is a
// root collection).

let parentPath = ref._resourcePath;
if (ref instanceof CollectionReference) {
parentPath = parentPath.popLast();
}
const collectionId =
ref instanceof CollectionReference ? ref.id : ref.parent.id;
ref instanceof CollectionReference
? ref.id
: (ref as DocumentReference<unknown>).parent.id;

let query: Query = new Query(
this,
Expand Down
150 changes: 93 additions & 57 deletions dev/system-test/firestore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2629,69 +2629,105 @@ describe('BulkWriter class', () => {
return firestore.terminate();
});

// TODO(chenbrian): This is a temporary test used to validate that the
// StructuredQuery calls work properly. Remove these tests after adding
// recursive delete tests.
it('finds nested documents and collection', async () => {
// ROOT-DB
// └── randomCol
// ├── anna
// └── bob
// └── parentsCol
// ├── charlie
// └── daniel
// └── childCol
// ├── ernie
// └── francis
const batch = firestore.batch();
batch.set(randomCol.doc('anna'), {name: 'anna'});
batch.set(randomCol.doc('bob'), {name: 'bob'});
batch.set(randomCol.doc('bob/parentsCol/charlie'), {name: 'charlie'});
batch.set(randomCol.doc('bob/parentsCol/daniel'), {name: 'daniel'});
batch.set(randomCol.doc('bob/parentsCol/daniel/childCol/ernie'), {
name: 'ernie',
});
batch.set(randomCol.doc('bob/parentsCol/daniel/childCol/francis'), {
name: 'francis',
});
await batch.commit();
describe('recursiveDelete()', () => {
async function countDocumentChildren(
ref: DocumentReference
): Promise<number> {
let count = 0;
const collections = await ref.listCollections();
for (const collection of collections) {
count += await countCollectionChildren(collection);
}
return count;
}

const numStreamItems = async (
stream: NodeJS.ReadableStream
): Promise<number> => {
async function countCollectionChildren(
ref: CollectionReference
): Promise<number> {
let count = 0;
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _ of stream) {
++count;
const docs = await ref.listDocuments();
for (const doc of docs) {
count += (await countDocumentChildren(doc)) + 1;
}
return count;
};
}

// Query all descendants of collections.
let descendantsStream = await firestore._getAllDescendants(randomCol);
expect(await numStreamItems(descendantsStream)).to.equal(6);
descendantsStream = await firestore._getAllDescendants(
randomCol.doc('bob').collection('parentsCol')
);
expect(await numStreamItems(descendantsStream)).to.equal(4);
descendantsStream = await firestore._getAllDescendants(
randomCol.doc('bob').collection('parentsCol/daniel/childCol')
);
expect(await numStreamItems(descendantsStream)).to.equal(2);
beforeEach(async () => {
// ROOT-DB
// └── randomCol
// ├── anna
// └── bob
// └── parentsCol
// ├── charlie
// └── daniel
// └── childCol
// ├── ernie
// └── francis
const batch = firestore.batch();
batch.set(randomCol.doc('anna'), {name: 'anna'});
batch.set(randomCol.doc('bob'), {name: 'bob'});
batch.set(randomCol.doc('bob/parentsCol/charlie'), {name: 'charlie'});
batch.set(randomCol.doc('bob/parentsCol/daniel'), {name: 'daniel'});
batch.set(randomCol.doc('bob/parentsCol/daniel/childCol/ernie'), {
name: 'ernie',
});
batch.set(randomCol.doc('bob/parentsCol/daniel/childCol/francis'), {
name: 'francis',
});
await batch.commit();
});

// Query all descendants of documents.
descendantsStream = await firestore._getAllDescendants(
randomCol.doc('bob')
);
expect(await numStreamItems(descendantsStream)).to.equal(4);
descendantsStream = await firestore._getAllDescendants(
randomCol.doc('bob/parentsCol/daniel')
);
expect(await numStreamItems(descendantsStream)).to.equal(2);
descendantsStream = await firestore._getAllDescendants(
randomCol.doc('anna')
);
expect(await numStreamItems(descendantsStream)).to.equal(0);
it('on top-level collection', async () => {
await firestore.recursiveDelete(randomCol);
expect(await countCollectionChildren(randomCol)).to.equal(0);
});

it('on nested collection', async () => {
const coll = randomCol.doc('bob').collection('parentsCol');
await firestore.recursiveDelete(coll);

expect(await countCollectionChildren(coll)).to.equal(0);
expect(await countCollectionChildren(randomCol)).to.equal(2);
});

it('on nested document', async () => {
const doc = randomCol.doc('bob/parentsCol/daniel');
await firestore.recursiveDelete(doc);

const docSnap = await doc.get();
expect(docSnap.exists).to.be.false;
expect(await countDocumentChildren(randomCol.doc('bob'))).to.equal(1);
expect(await countCollectionChildren(randomCol)).to.equal(3);
});

it('on leaf document', async () => {
const doc = randomCol.doc('bob/parentsCol/daniel/childCol/ernie');
await firestore.recursiveDelete(doc);

const docSnap = await doc.get();
expect(docSnap.exists).to.be.false;
expect(await countCollectionChildren(randomCol)).to.equal(5);
});

it('does not affect other collections', async () => {
// Add other nested collection that shouldn't be deleted.
const collB = firestore.collection('doggos');
await collB.doc('doggo').set({name: 'goodboi'});

await firestore.recursiveDelete(collB);
expect(await countCollectionChildren(randomCol)).to.equal(6);
expect(await countCollectionChildren(collB)).to.equal(0);
});

it('with custom BulkWriter instance', async () => {
const bulkWriter = firestore.bulkWriter();
let callbackCount = 0;
bulkWriter.onWriteResult(() => {
callbackCount++;
});
await firestore.recursiveDelete(randomCol, bulkWriter);
expect(callbackCount).to.equal(6);
});
});

it('can retry failed writes with a provided callback', async () => {
Expand Down

0 comments on commit 800dfee

Please sign in to comment.