Skip to content

Commit

Permalink
fix: reduce memory footprint of deleteFiles by utilizing getFilesStre…
Browse files Browse the repository at this point in the history
…am and using smaller queue of promises (#2147)
  • Loading branch information
ddelgrosso1 committed Feb 15, 2023
1 parent 6851cd2 commit f792f25
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 21 deletions.
34 changes: 26 additions & 8 deletions src/bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2028,6 +2028,7 @@ class Bucket extends ServiceObject {
}

const MAX_PARALLEL_LIMIT = 10;
const MAX_QUEUE_SIZE = 1000;
const errors = [] as Error[];

const deleteFile = (file: File) => {
Expand All @@ -2039,15 +2040,32 @@ class Bucket extends ServiceObject {
});
};

this.getFiles(query)
.then(([files]) => {
(async () => {
try {
let promises = [];
const limit = pLimit(MAX_PARALLEL_LIMIT);
const promises = files!.map(file => {
return limit(() => deleteFile(file));
});
return Promise.all(promises);
})
.then(() => callback!(errors.length > 0 ? errors : null), callback!);
const filesStream = this.getFilesStream(query);

for await (const curFile of filesStream) {
if (promises.length >= MAX_QUEUE_SIZE) {
await Promise.all(promises);
promises = [];
}
promises.push(
limit(() => deleteFile(curFile)).catch(e => {
filesStream.destroy();
throw e;
})
);
}

await Promise.all(promises);
callback!(errors.length > 0 ? errors : null);
} catch (e) {
callback!(e as Error);
return;
}
})();
}

deleteLabels(labels?: string | string[]): Promise<DeleteLabelsResponse>;
Expand Down
136 changes: 123 additions & 13 deletions test/bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class FakeFile {
options: FileOptions;
metadata: {};
createWriteStream: Function;
delete: Function;
isSameFile = () => false;
constructor(bucket: Bucket, name: string, options?: FileOptions) {
// eslint-disable-next-line prefer-rest-params
Expand All @@ -79,6 +80,10 @@ class FakeFile {
};
return ws;
};

this.delete = () => {
return Promise.resolve();
};
}
}

Expand Down Expand Up @@ -1226,10 +1231,35 @@ describe('Bucket', () => {
});

describe('deleteFiles', () => {
let readCount: number;

beforeEach(() => {
readCount = 0;
});

it('should accept only a callback', done => {
bucket.getFiles = (query: {}) => {
const files = [bucket.file('1'), bucket.file('2')].map(file => {
file.delete = () => {
return Promise.resolve();
};
return file;
});

const readable = new stream.Readable({
objectMode: true,
read() {
if (readCount < 1) {
this.push(files[readCount]);
readCount++;
} else {
this.push(null);
}
},
});

bucket.getFilesStream = (query: {}) => {
assert.deepStrictEqual(query, {});
return Promise.all([[]]);
return readable;
};

bucket.deleteFiles(done);
Expand All @@ -1238,9 +1268,28 @@ describe('Bucket', () => {
it('should get files from the bucket', done => {
const query = {a: 'b', c: 'd'};

bucket.getFiles = (query_: {}) => {
const files = [bucket.file('1'), bucket.file('2')].map(file => {
file.delete = () => {
return Promise.resolve();
};
return file;
});

const readable = new stream.Readable({
objectMode: true,
read() {
if (readCount < 1) {
this.push(files[readCount]);
readCount++;
} else {
this.push(null);
}
},
});

bucket.getFilesStream = (query_: {}) => {
assert.deepStrictEqual(query_, query);
return Promise.resolve([[]]);
return readable;
};

bucket.deleteFiles(query, done);
Expand All @@ -1253,7 +1302,26 @@ describe('Bucket', () => {
return () => {};
};

bucket.getFiles = () => Promise.resolve([[]]);
const files = [bucket.file('1'), bucket.file('2')].map(file => {
file.delete = () => {
return Promise.resolve();
};
return file;
});

const readable = new stream.Readable({
objectMode: true,
read() {
if (readCount < 1) {
this.push(files[readCount]);
readCount++;
} else {
this.push(null);
}
},
});

bucket.getFilesStream = () => readable;
bucket.deleteFiles({}, assert.ifError);
});

Expand All @@ -1270,9 +1338,21 @@ describe('Bucket', () => {
return file;
});

bucket.getFiles = (query_: {}) => {
const readable = new stream.Readable({
objectMode: true,
read() {
if (readCount < files.length) {
this.push(files[readCount]);
readCount++;
} else {
this.push(null);
}
},
});

bucket.getFilesStream = (query_: {}) => {
assert.strictEqual(query_, query);
return Promise.resolve([files]);
return readable;
};

bucket.deleteFiles(query, (err: Error) => {
Expand All @@ -1284,9 +1364,15 @@ describe('Bucket', () => {

it('should execute callback with error from getting files', done => {
const error = new Error('Error.');
const readable = new stream.Readable({
objectMode: true,
read() {
this.destroy(error);
},
});

bucket.getFiles = () => {
return Promise.reject(error);
bucket.getFilesStream = () => {
return readable;
};

bucket.deleteFiles({}, (err: Error) => {
Expand All @@ -1303,8 +1389,20 @@ describe('Bucket', () => {
return file;
});

bucket.getFiles = () => {
return Promise.resolve([files]);
const readable = new stream.Readable({
objectMode: true,
read() {
if (readCount < files.length) {
this.push(files[readCount]);
readCount++;
} else {
this.push(null);
}
},
});

bucket.getFilesStream = () => {
return readable;
};

bucket.deleteFiles({}, (err: Error) => {
Expand All @@ -1321,8 +1419,20 @@ describe('Bucket', () => {
return file;
});

bucket.getFiles = () => {
return Promise.resolve([files]);
const readable = new stream.Readable({
objectMode: true,
read() {
if (readCount < files.length) {
this.push(files[readCount]);
readCount++;
} else {
this.push(null);
}
},
});

bucket.getFilesStream = () => {
return readable;
};

bucket.deleteFiles({force: true}, (errs: Array<{}>) => {
Expand Down

0 comments on commit f792f25

Please sign in to comment.