-
Notifications
You must be signed in to change notification settings - Fork 149
/
recursive-delete.ts
335 lines (303 loc) · 10.5 KB
/
recursive-delete.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
/*!
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as firestore from '@google-cloud/firestore';
import * as assert from 'assert';
import Firestore, {
BulkWriter,
CollectionReference,
DocumentReference,
FieldPath,
Query,
QueryDocumentSnapshot,
} from '.';
import {Deferred, wrapError} from './util';
import {GoogleError} from 'google-gax';
import {BulkWriterError} from './bulk-writer';
import {QueryOptions} from './reference';
import {StatusCode} from './status-code';
/**
* Datastore allowed numeric IDs where Firestore only allows strings. Numeric
* IDs are exposed to Firestore as __idNUM__, so this is the lowest possible
* negative numeric value expressed in that format.
*
* This constant is used to specify startAt/endAt values when querying for all
* descendants in a single collection.
*
* @private
*/
export const REFERENCE_NAME_MIN_ID = '__id-9223372036854775808__';
/*!
* The query limit used for recursive deletes when fetching all descendants of
* the specified reference to delete. This is done to prevent the query stream
* from streaming documents faster than Firestore can delete.
*/
// Visible for testing.
export const RECURSIVE_DELETE_MAX_PENDING_OPS = 5000;
/**
* The number of pending BulkWriter operations at which RecursiveDelete
* starts the next limit query to fetch descendants. By starting the query
* while there are pending operations, Firestore can improve BulkWriter
* throughput. This helps prevent BulkWriter from idling while Firestore
* fetches the next query.
*/
export const RECURSIVE_DELETE_MIN_PENDING_OPS = 1000;
/**
* Class used to store state required for running a recursive delete operation.
* Each recursive delete call should use a new instance of the class.
* @private
*/
export class RecursiveDelete {
/**
* The number of deletes that failed with a permanent error.
* @private
*/
private errorCount = 0;
/**
* The most recently thrown error. Used to populate the developer-facing
* error message when the recursive delete operation completes.
* @private
*/
private lastError: GoogleError | BulkWriterError | undefined;
/**
* Whether there are still documents to delete that still need to be fetched.
* @private
*/
private documentsPending = true;
/**
* Whether run() has been called.
* @private
*/
private started = false;
/**
* Query limit to use when fetching all descendants.
* @private
*/
private readonly maxPendingOps: number;
/**
* The number of pending BulkWriter operations at which RecursiveDelete
* starts the next limit query to fetch descendants.
* @private
*/
private readonly minPendingOps: number;
/**
* A deferred promise that resolves when the recursive delete operation
* is completed.
* @private
*/
private readonly completionDeferred = new Deferred<void>();
/**
* Whether a query stream is currently in progress. Only one stream can be
* run at a time.
* @private
*/
private streamInProgress = false;
/**
* The last document snapshot returned by the stream. Used to set the
* startAfter() field in the subsequent stream.
* @private
*/
private lastDocumentSnap: QueryDocumentSnapshot | undefined;
/**
* The number of pending BulkWriter operations. Used to determine when the
* next query can be run.
* @private
*/
private pendingOpsCount = 0;
private errorStack = '';
/**
*
* @param firestore The Firestore instance to use.
* @param writer The BulkWriter instance to use for delete operations.
* @param ref The document or collection reference to recursively delete.
* @param maxLimit The query limit to use when fetching descendants
* @param minLimit The number of pending BulkWriter operations at which
* RecursiveDelete starts the next limit query to fetch descendants.
*/
constructor(
private readonly firestore: Firestore,
private readonly writer: BulkWriter,
private readonly ref:
| firestore.CollectionReference<unknown>
| firestore.DocumentReference<unknown>,
private readonly maxLimit: number,
private readonly minLimit: number
) {
this.maxPendingOps = maxLimit;
this.minPendingOps = minLimit;
}
/**
* Recursively deletes the reference provided in the class constructor.
* Returns a promise that resolves when all descendants have been deleted, or
* if an error occurs.
*/
run(): Promise<void> {
assert(!this.started, 'RecursiveDelete.run() should only be called once.');
// Capture the error stack to preserve stack tracing across async calls.
this.errorStack = Error().stack!;
this.writer._verifyNotClosed();
this.setupStream();
return this.completionDeferred.promise;
}
/**
* Creates a query stream and attaches event handlers to it.
* @private
*/
private setupStream(): void {
const stream = this.getAllDescendants(
this.ref instanceof CollectionReference
? (this.ref as CollectionReference<unknown>)
: (this.ref as DocumentReference<unknown>)
);
this.streamInProgress = true;
let streamedDocsCount = 0;
stream
.on('error', err => {
err.code = StatusCode.UNAVAILABLE;
err.stack = 'Failed to fetch children documents: ' + err.stack;
this.lastError = err;
this.onQueryEnd();
})
.on('data', (snap: QueryDocumentSnapshot) => {
streamedDocsCount++;
this.lastDocumentSnap = snap;
this.deleteRef(snap.ref);
})
.on('end', () => {
this.streamInProgress = false;
// If there are fewer than the number of documents specified in the
// limit() field, we know that the query is complete.
if (streamedDocsCount < this.minPendingOps) {
this.onQueryEnd();
} else if (this.pendingOpsCount === 0) {
this.setupStream();
}
});
}
/**
* Retrieves all descendant documents nested under the provided reference.
* @param ref The reference to fetch all descendants for.
* @private
* @return {Stream<QueryDocumentSnapshot>} Stream of descendant documents.
*/
private getAllDescendants(
ref: CollectionReference<unknown> | DocumentReference<unknown>
): NodeJS.ReadableStream {
// The parent is the closest ancestor document to the location we're
// deleting. If we are deleting a document, the parent is the path of that
// document. If we are deleting a collection, the parent is the path of the
// document containing that collection (or the database root, if it is a
// root collection).
let parentPath = ref._resourcePath;
if (ref instanceof CollectionReference) {
parentPath = parentPath.popLast();
}
const collectionId =
ref instanceof CollectionReference
? ref.id
: (ref as DocumentReference<unknown>).parent.id;
let query: Query = new Query(
this.firestore,
QueryOptions.forKindlessAllDescendants(
parentPath,
collectionId,
/* requireConsistency= */ false
)
);
// Query for names only to fetch empty snapshots.
query = query.select(FieldPath.documentId()).limit(this.maxPendingOps);
if (ref instanceof CollectionReference) {
// To find all descendants of a collection reference, we need to use a
// composite filter that captures all documents that start with the
// collection prefix. The MIN_KEY constant represents the minimum key in
// this collection, and a null byte + the MIN_KEY represents the minimum
// key is the next possible collection.
const nullChar = String.fromCharCode(0);
const startAt = collectionId + '/' + REFERENCE_NAME_MIN_ID;
const endAt = collectionId + nullChar + '/' + REFERENCE_NAME_MIN_ID;
query = query
.where(FieldPath.documentId(), '>=', startAt)
.where(FieldPath.documentId(), '<', endAt);
}
if (this.lastDocumentSnap) {
query = query.startAfter(this.lastDocumentSnap);
}
return query.stream();
}
/**
* Called when all descendants of the provided reference have been streamed
* or if a permanent error occurs during the stream. Deletes the developer
* provided reference and wraps any errors that occurred.
* @private
*/
private onQueryEnd(): void {
this.documentsPending = false;
if (this.ref instanceof DocumentReference) {
this.writer.delete(this.ref).catch(err => this.incrementErrorCount(err));
}
this.writer.flush().then(async () => {
if (this.lastError === undefined) {
this.completionDeferred.resolve();
} else {
let error = new (require('google-gax').GoogleError)(
`${this.errorCount} ` +
`${this.errorCount !== 1 ? 'deletes' : 'delete'} ` +
'failed. The last delete failed with: '
);
if (this.lastError.code !== undefined) {
error.code = this.lastError.code as number;
}
error = wrapError(error, this.errorStack);
// Wrap the BulkWriter error last to provide the full stack trace.
this.completionDeferred.reject(
this.lastError.stack
? wrapError(error, this.lastError.stack ?? '')
: error
);
}
});
}
/**
* Deletes the provided reference and starts the next stream if conditions
* are met.
* @private
*/
private deleteRef(docRef: DocumentReference): void {
this.pendingOpsCount++;
this.writer
.delete(docRef)
.catch(err => {
this.incrementErrorCount(err);
})
.then(() => {
this.pendingOpsCount--;
// We wait until the previous stream has ended in order to sure the
// startAfter document is correct. Starting the next stream while
// there are pending operations allows Firestore to maximize
// BulkWriter throughput.
if (
this.documentsPending &&
!this.streamInProgress &&
this.pendingOpsCount < this.minPendingOps
) {
this.setupStream();
}
});
}
private incrementErrorCount(err: Error): void {
this.errorCount++;
this.lastError = err;
}
}