diff --git a/src/bucket.ts b/src/bucket.ts index 10a18b05a..20aca29cf 100644 --- a/src/bucket.ts +++ b/src/bucket.ts @@ -2028,6 +2028,7 @@ class Bucket extends ServiceObject { } const MAX_PARALLEL_LIMIT = 10; + const MAX_QUEUE_SIZE = 1000; const errors = [] as Error[]; const deleteFile = (file: File) => { @@ -2039,15 +2040,32 @@ class Bucket extends ServiceObject { }); }; - this.getFiles(query) - .then(([files]) => { + (async () => { + try { + let promises = []; const limit = pLimit(MAX_PARALLEL_LIMIT); - const promises = files!.map(file => { - return limit(() => deleteFile(file)); - }); - return Promise.all(promises); - }) - .then(() => callback!(errors.length > 0 ? errors : null), callback!); + const filesStream = this.getFilesStream(query); + + for await (const curFile of filesStream) { + if (promises.length >= MAX_QUEUE_SIZE) { + await Promise.all(promises); + promises = []; + } + promises.push( + limit(() => deleteFile(curFile)).catch(e => { + filesStream.destroy(); + throw e; + }) + ); + } + + await Promise.all(promises); + callback!(errors.length > 0 ? errors : null); + } catch (e) { + callback!(e as Error); + return; + } + })(); } deleteLabels(labels?: string | string[]): Promise; diff --git a/test/bucket.ts b/test/bucket.ts index 72a6cbfe0..3951a1c52 100644 --- a/test/bucket.ts +++ b/test/bucket.ts @@ -60,6 +60,7 @@ class FakeFile { options: FileOptions; metadata: {}; createWriteStream: Function; + delete: Function; isSameFile = () => false; constructor(bucket: Bucket, name: string, options?: FileOptions) { // eslint-disable-next-line prefer-rest-params @@ -79,6 +80,10 @@ class FakeFile { }; return ws; }; + + this.delete = () => { + return Promise.resolve(); + }; } } @@ -1226,10 +1231,35 @@ describe('Bucket', () => { }); describe('deleteFiles', () => { + let readCount: number; + + beforeEach(() => { + readCount = 0; + }); + it('should accept only a callback', done => { - bucket.getFiles = (query: {}) => { + const files = [bucket.file('1'), bucket.file('2')].map(file => { + file.delete = () => { + return Promise.resolve(); + }; + return file; + }); + + const readable = new stream.Readable({ + objectMode: true, + read() { + if (readCount < 1) { + this.push(files[readCount]); + readCount++; + } else { + this.push(null); + } + }, + }); + + bucket.getFilesStream = (query: {}) => { assert.deepStrictEqual(query, {}); - return Promise.all([[]]); + return readable; }; bucket.deleteFiles(done); @@ -1238,9 +1268,28 @@ describe('Bucket', () => { it('should get files from the bucket', done => { const query = {a: 'b', c: 'd'}; - bucket.getFiles = (query_: {}) => { + const files = [bucket.file('1'), bucket.file('2')].map(file => { + file.delete = () => { + return Promise.resolve(); + }; + return file; + }); + + const readable = new stream.Readable({ + objectMode: true, + read() { + if (readCount < 1) { + this.push(files[readCount]); + readCount++; + } else { + this.push(null); + } + }, + }); + + bucket.getFilesStream = (query_: {}) => { assert.deepStrictEqual(query_, query); - return Promise.resolve([[]]); + return readable; }; bucket.deleteFiles(query, done); @@ -1253,7 +1302,26 @@ describe('Bucket', () => { return () => {}; }; - bucket.getFiles = () => Promise.resolve([[]]); + const files = [bucket.file('1'), bucket.file('2')].map(file => { + file.delete = () => { + return Promise.resolve(); + }; + return file; + }); + + const readable = new stream.Readable({ + objectMode: true, + read() { + if (readCount < 1) { + this.push(files[readCount]); + readCount++; + } else { + this.push(null); + } + }, + }); + + bucket.getFilesStream = () => readable; bucket.deleteFiles({}, assert.ifError); }); @@ -1270,9 +1338,21 @@ describe('Bucket', () => { return file; }); - bucket.getFiles = (query_: {}) => { + const readable = new stream.Readable({ + objectMode: true, + read() { + if (readCount < files.length) { + this.push(files[readCount]); + readCount++; + } else { + this.push(null); + } + }, + }); + + bucket.getFilesStream = (query_: {}) => { assert.strictEqual(query_, query); - return Promise.resolve([files]); + return readable; }; bucket.deleteFiles(query, (err: Error) => { @@ -1284,9 +1364,15 @@ describe('Bucket', () => { it('should execute callback with error from getting files', done => { const error = new Error('Error.'); + const readable = new stream.Readable({ + objectMode: true, + read() { + this.destroy(error); + }, + }); - bucket.getFiles = () => { - return Promise.reject(error); + bucket.getFilesStream = () => { + return readable; }; bucket.deleteFiles({}, (err: Error) => { @@ -1303,8 +1389,20 @@ describe('Bucket', () => { return file; }); - bucket.getFiles = () => { - return Promise.resolve([files]); + const readable = new stream.Readable({ + objectMode: true, + read() { + if (readCount < files.length) { + this.push(files[readCount]); + readCount++; + } else { + this.push(null); + } + }, + }); + + bucket.getFilesStream = () => { + return readable; }; bucket.deleteFiles({}, (err: Error) => { @@ -1321,8 +1419,20 @@ describe('Bucket', () => { return file; }); - bucket.getFiles = () => { - return Promise.resolve([files]); + const readable = new stream.Readable({ + objectMode: true, + read() { + if (readCount < files.length) { + this.push(files[readCount]); + readCount++; + } else { + this.push(null); + } + }, + }); + + bucket.getFilesStream = () => { + return readable; }; bucket.deleteFiles({force: true}, (errs: Array<{}>) => {