Permalink
Browse files

packager: BatchProcessor: use Promise for queue()

Reviewed By: cpojer

Differential Revision: D4572181

fbshipit-source-id: 34c3824f05efd93847df9ba5931b0735c6711c28
  • Loading branch information...
jeanlauliac authored and facebook-github-bot committed Feb 20, 2017
1 parent 1a5b56d commit bac576c433ac3b85dc50df567fe008589af99091
@@ -24,6 +24,12 @@ type BatchProcessorOptions = {
concurrency: number,
};
type QueueItem<TItem, TResult> = {
item: TItem,
reject: (error: mixed) => mixed,
resolve: (result: TResult) => mixed,
};
/**
* We batch items together trying to minimize their processing, for example as
* network queries. For that we wait a small moment before processing a batch.
@@ -33,14 +39,11 @@ type BatchProcessorOptions = {
*/
class BatchProcessor<TItem, TResult> {
_currentProcessCount: number;
_options: BatchProcessorOptions;
_processBatch: ProcessBatch<TItem, TResult>;
_queue: Array<{
item: TItem,
callback: (error?: Error, result?: TResult) => mixed,
}>;
_queue: Array<QueueItem<TItem, TResult>>;
_timeoutHandle: ?number;
_currentProcessCount: number;
constructor(
options: BatchProcessorOptions,
@@ -64,12 +67,16 @@ class BatchProcessor<TItem, TResult> {
const jobs = this._queue.splice(0, this._options.maximumItems);
const items = jobs.map(job => job.item);
this._processBatch(items, (error, results) => {
invariant(
results == null || results.length === items.length,
'Not enough results returned.',
);
for (let i = 0; i < items.length; ++i) {
jobs[i].callback(error, results && results[i]);
if (error != null) {
for (let i = 0; i < jobs.length; ++i) {
jobs[i].reject(error);
}
} else {
invariant(results != null, 'Neither results or error were returned.');
invariant(results.length === items.length, 'Not enough results returned.');
for (let i = 0; i < jobs.length; ++i) {
jobs[i].resolve(results[i]);
}
}
this._currentProcessCount--;
this._processQueueOnceReady();
@@ -91,12 +98,11 @@ class BatchProcessor<TItem, TResult> {
}
}
queue(
item: TItem,
callback: (error?: Error, result?: TResult) => mixed,
) {
this._queue.push({item, callback});
this._processQueueOnceReady();
queue(item: TItem): Promise<TResult> {
return new Promise((resolve, reject) => {
this._queue.push({item, resolve, reject});
this._processQueueOnceReady();
});
}
}
@@ -74,7 +74,10 @@ class KeyURIFetcher {
}
fetch(key: string, callback: FetchURICallback) {
this._batchProcessor.queue(key, callback);
this._batchProcessor.queue(key).then(
res => process.nextTick(callback.bind(undefined, undefined, res)),
err => process.nextTick(callback.bind(undefined, err)),
);
}
constructor(fetchResultURIs: FetchResultURIs, processError: (error: Error) => mixed) {
@@ -107,7 +110,7 @@ class KeyResultStore {
}
store(key: string, result: CachedResult) {
this._batchProcessor.queue({key, result}, () => {});
this._batchProcessor.queue({key, result});
}
constructor(storeResults: StoreResults) {

This file was deleted.

Oops, something went wrong.
@@ -38,12 +38,12 @@ describe('BatchProcessor', () => {
}, 0);
});
const results = [];
const callback = (error, res) => {
expect(error).toBe(null);
results.push(res);
};
input.forEach(e => bp.queue(e, callback));
input.forEach(e => bp.queue(e).then(
res => results.push(res),
error => process.nextTick(() => { throw error; }),
));
jest.runAllTimers();
jest.runAllTicks();
expect(batches).toEqual([
[1, 2, 3],
[4, 5, 6],
@@ -56,10 +56,12 @@ describe('BatchProcessor', () => {
it('report errors', () => {
const error = new Error('oh noes');
const bp = new BatchProcessor(options, (items, callback) => {
process.nextTick(callback.bind(null, error));
setTimeout(callback.bind(null, error), 0);
});
let receivedError;
bp.queue('foo', err => { receivedError = err; });
bp.queue('foo').catch(
err => { receivedError = err; },
);
jest.runAllTimers();
jest.runAllTicks();
expect(receivedError).toBe(error);

0 comments on commit bac576c

Please sign in to comment.