Permalink
Browse files

packager: fork local version of worker-farm

Summary:
I suggest we grab our own version of worker-farm, since there are a few changes we'd like to do. There are two reasons for forking:

* the original project does not seem maintained anymore, with a PR remaining unanswered (rvagg/node-worker-farm#42);
* we don't need to keep the level of genericity of the original project: for example, we don't need the option `maxConcurrentCallsPerWorker`, that we always keep to one.

Forking gives us opportunity to simplify the code for our use case. Later on we could reuse it for other projects such as `jest`.

A few things we'd like to do:

* remove special node options from the forks, such as `--inspect`, or even, allow adding special options (if you want to debug a worker specifically for example);
* allow us to pipe `stdout` and `stderr` instead of having transform spit stuff out to the parent process output;
* remove code managing `maxConcurrentCallsPerWorker` and clean up the code in general;
* add `flow` typing.

Reviewed By: davidaurelio

Differential Revision: D4993300

fbshipit-source-id: 10f0c2a18b010c2a8b2e2afebcb3aab3504d7923
  • Loading branch information...
jeanlauliac authored and facebook-github-bot committed May 3, 2017
1 parent 5f2edfc commit e5920e710c7c8a1bef6f7dc7a9c33bc3afa49a2e
View
@@ -3,3 +3,4 @@
**/staticBundle.js
**/main.js
Libraries/vendor/**/*
packager/src/worker-farm/**/*
View
@@ -160,6 +160,7 @@
"create-react-class": "^15.5.2",
"debug": "^2.2.0",
"denodeify": "^1.2.1",
"errno": ">=0.1.1 <0.2.0-0",
"event-target-shim": "^1.0.5",
"fbjs": "~0.8.9",
"fbjs-scripts": "^0.7.0",
@@ -206,12 +207,12 @@
"uglify-js": "2.7.5",
"whatwg-fetch": "^1.0.0",
"wordwrap": "^1.0.0",
"worker-farm": "^1.3.1",
"write-file-atomic": "^1.2.0",
"ws": "^1.1.0",
"xcode": "^0.9.1",
"xmldoc": "^0.4.0",
"xpipe": "^1.0.5",
"xtend": ">=4.0.0 <4.1.0-0",
"yargs": "^6.4.0"
},
"devDependencies": {
@@ -12,6 +12,7 @@ jest.disableAutomock();
jest
.setMock('worker-farm', () => () => undefined)
.setMock('../../worker-farm', () => () => undefined)
.setMock('uglify-js')
.mock('image-size')
.mock('fs')
@@ -19,6 +19,7 @@ const workerFarm = jest.fn();
jest.setMock('fs', fs);
jest.setMock('temp', temp);
jest.setMock('worker-farm', workerFarm);
jest.setMock('../../worker-farm', workerFarm);
var Transformer = require('../');
@@ -18,7 +18,7 @@ const denodeify = require('denodeify');
const invariant = require('fbjs/lib/invariant');
const path = require('path');
const util = require('util');
const workerFarm = require('worker-farm');
const workerFarm = require('../worker-farm');
import type {Data as TransformData, Options as TransformOptions} from './worker/worker';
import type {MappingsMap} from '../lib/SourceMap';
@@ -11,7 +11,8 @@
jest.disableAutomock();
jest.mock('worker-farm', () => () => () => {})
jest.mock('../../worker-farm', () => () => () => {})
.mock('worker-farm', () => () => () => {})
.mock('timers', () => ({setImmediate: fn => setTimeout(fn, 0)}))
.mock('uglify-js')
.mock('crypto')
@@ -0,0 +1 @@
node_modules
@@ -0,0 +1,9 @@
language: node_js
node_js:
- "0.10"
branches:
only:
- master
notifications:
email:
- rod@vagg.org
@@ -0,0 +1,13 @@
The MIT License (MIT)
=====================
Copyright (c) 2014 LevelUP contributors
---------------------------------------
*LevelUP contributors listed at <https://github.com/rvagg/node-levelup#contributors>*
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
@@ -0,0 +1,143 @@
# Worker Farm
NOTE: this was forked from npm module `worker-farm`. Below is the original documentation, that may not be up-to-date.
---
Distribute processing tasks to child processes with an über-simple API and baked-in durability & custom concurrency options. *Available in npm as <strong>worker-farm</strong>*.
## Example
Given a file, *child.js*:
```js
module.exports = function (inp, callback) {
callback(null, inp + ' BAR (' + process.pid + ')')
}
```
And a main file:
```js
var workerFarm = require('worker-farm')
, workers = workerFarm(require.resolve('./child'))
, ret = 0
for (var i = 0; i < 10; i++) {
workers('#' + i + ' FOO', function (err, outp) {
console.log(outp)
if (++ret == 10)
workerFarm.end(workers)
})
}
```
We'll get an output something like the following:
```
#1 FOO BAR (8546)
#0 FOO BAR (8545)
#8 FOO BAR (8545)
#9 FOO BAR (8546)
#2 FOO BAR (8548)
#4 FOO BAR (8551)
#3 FOO BAR (8549)
#6 FOO BAR (8555)
#5 FOO BAR (8553)
#7 FOO BAR (8557)
```
This example is contained in the *[examples/basic](https://github.com/rvagg/node-worker-farm/tree/master/examples/basic/)* directory.
### Example #1: Estimating π using child workers
You will also find a more complex example in *[examples/pi](https://github.com/rvagg/node-worker-farm/tree/master/examples/pi/)* that estimates the value of **π** by using a Monte Carlo *area-under-the-curve* method and compares the speed of doing it all in-process vs using child workers to complete separate portions.
Running `node examples/pi` will give you something like:
```
Doing it the slow (single-process) way...
π ≈ 3.1416269360000006 (0.0000342824102075312 away from actual!)
took 8341 milliseconds
Doing it the fast (multi-process) way...
π ≈ 3.1416233600000036 (0.00003070641021052367 away from actual!)
took 1985 milliseconds
```
## Durability
An important feature of Worker Farm is **call durability**. If a child process dies for any reason during the execution of call(s), those calls will be re-queued and taken care of by other child processes. In this way, when you ask for something to be done, unless there is something *seriously* wrong with what you're doing, you should get a result on your callback function.
## My use-case
There are other libraries for managing worker processes available but my use-case was fairly specific: I need to make heavy use of the [node-java](https://github.com/nearinfinity/node-java) library to interact with JVM code. Unfortunately, because the JVM garbage collector is so difficult to interact with, it's prone to killing your Node process when the GC kicks under heavy load. For safety I needed a durable way to make calls so that (1) it wouldn't kill my main process and (2) any calls that weren't successful would be resubmitted for processing.
Worker Farm allows me to spin up multiple JVMs to be controlled by Node, and have a single, uncomplicated API that acts the same way as an in-process API and the calls will be taken care of by a child process even if an error kills a child process while it is working as the call will simply be passed to a new child process.
**But**, don't think that Worker Farm is specific to that use-case, it's designed to be very generic and simple to adapt to anything requiring the use of child Node processes.
## API
Worker Farm exports a main function an an `end()` method. The main function sets up a "farm" of coordinated child-process workers and it can be used to instantiate multiple farms, all operating independently.
### workerFarm([options, ]pathToModule[, exportedMethods])
In its most basic form, you call `workerFarm()` with the path to a module file to be invoked by the child process. You should use an **absolute path** to the module file, the best way to obtain the path is with `require.resolve('./path/to/module')`, this function can be used in exactly the same way as `require('./path/to/module')` but it returns an absolute path.
#### `exportedMethods`
If your module exports a single function on `module.exports` then you should omit the final parameter. However, if you are exporting multiple functions on `module.exports` then you should list them in an Array of Strings:
```js
var workers = workerFarm(require.resolve('./mod'), [ 'doSomething', 'doSomethingElse' ])
workers.doSomething(function () {})
workers.doSomethingElse(function () {})
```
Listing the available methods will instruct Worker Farm what API to provide you with on the returned object. If you don't list a `exportedMethods` Array then you'll get a single callable function to use; but if you list the available methods then you'll get an object with callable functions by those names.
**It is assumed that each function you call on your child module will take a `callback` function as the last argument.**
#### `options`
If you don't provide an `options` object then the following defaults will be used:
```js
{
maxCallsPerWorker : Infinity
, maxConcurrentWorkers : require('os').cpus().length
, maxConcurrentCallsPerWorker : 10
, maxConcurrentCalls : Infinity
, maxCallTime : Infinity
, maxRetries : Infinity
, autoStart : false
}
```
* **<code>maxCallsPerWorker</code>** allows you to control the lifespan of your child processes. A positive number will indicate that you only want each child to accept that many calls before it is terminated. This may be useful if you need to control memory leaks or similar in child processes.
* **<code>maxConcurrentWorkers</code>** will set the number of child processes to maintain concurrently. By default it is set to the number of CPUs available on the current system, but it can be any reasonable number, including `1`.
* **<code>maxConcurrentCallsPerWorker</code>** allows you to control the *concurrency* of individual child processes. Calls are placed into a queue and farmed out to child processes according to the number of calls they are allowed to handle concurrently. It is arbitrarily set to 10 by default so that calls are shared relatively evenly across workers, however if your calls predictably take a similar amount of time then you could set it to `Infinity` and Worker Farm won't queue any calls but spread them evenly across child processes and let them go at it. If your calls aren't I/O bound then it won't matter what value you use here as the individual workers won't be able to execute more than a single call at a time.
* **<code>maxConcurrentCalls</code>** allows you to control the maximum number of calls in the queue&mdash;either actively being processed or waiting for a worker to be processed. `Infinity` indicates no limit but if you have conditions that may endlessly queue jobs and you need to set a limit then provide a `>0` value and any calls that push the limit will return on their callback with a `MaxConcurrentCallsError` error (check `err.type == 'MaxConcurrentCallsError'`).
* **<code>maxCallTime</code>** *(use with caution, understand what this does before you use it!)* when `!== Infinity`, will cap a time, in milliseconds, that *any single call* can take to execute in a worker. If this time limit is exceeded by just a single call then the worker running that call will be killed and any calls running on that worker will have their callbacks returned with a `TimeoutError` (check `err.type == 'TimeoutError'`). If you are running with `maxConcurrentCallsPerWorker` value greater than `1` then **all calls currently executing** will fail and will be automatically resubmitted uless you've changed the `maxRetries` option. Use this if you have jobs that may potentially end in infinite loops that you can't programatically end with your child code. Preferably run this with a `maxConcurrentCallsPerWorker` so you don't interrupt other calls when you have a timeout. This timeout operates on a per-call basis but will interrupt a whole worker.
* **<code>maxRetries</code>** allows you to control the max number of call requeues after worker termination (unexpected or timeout). By default this option is set to `Infinity` which means that each call of each terminated worker will always be auto requeued. When the number of retries exceeds `maxRetries` value, the job callback will be executed with a `ProcessTerminatedError`. Note that if you are running with finite `maxCallTime` and `maxConcurrentCallsPerWorkers` greater than `1` then any `TimeoutError` will increase the retries counter *for each* concurrent call of the terminated worker.
* **<code>autoStart</code>** when set to `true` will start the workers as early as possible. Use this when your workers have to do expensive initialization. That way they'll be ready when the first request comes through.
### workerFarm.end(farm)
Child processes stay alive waiting for jobs indefinitely and your farm manager will stay alive managing its workers, so if you need it to stop then you have to do so explicitly. If you send your farm API to `workerFarm.end()` then it'll cleanly end your worker processes. Note though that it's a *soft* ending so it'll wait for child processes to finish what they are working on before asking them to die.
Any calls that are queued and not yet being handled by a child process will be discarded. `end()` only waits for those currently in progress.
Once you end a farm, it won't handle any more calls, so don't even try!
## License
Worker Farm is Copyright (c) 2014 Rod Vagg [@rvagg](https://twitter.com/rvagg) and licensed under the MIT license. All rights not explicitly granted in the MIT license are reserved. See the included LICENSE.md file for more details.
@@ -0,0 +1,13 @@
/**
* Copyright (c) 2013-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
/* eslint-disable */
module.exports = function (inp, callback) {
callback(null, inp + ' BAR (' + process.pid + ')')
}
@@ -0,0 +1,21 @@
/**
* Copyright (c) 2013-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
/* eslint-disable */
var workerFarm = require('../../')
, workers = workerFarm(require.resolve('./child'))
, ret = 0
for (var i = 0; i < 10; i++) {
workers('#' + i + ' FOO', function (err, outp) {
console.log(outp)
if (++ret == 10)
workerFarm.end(workers)
})
}
@@ -0,0 +1,30 @@
/**
* Copyright (c) 2013-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
/* eslint-disable */
/* A simple PI estimation function using a Monte Carlo method
* For 0 to `points`, take 2 random numbers < 1, square and add them to
* find the area under that point in a 1x1 square. If that area is <= 1
* then it's *within* a quarter-circle, otherwise it's outside.
* Take the number of points <= 1 and multiply it by 4 and you have an
* estimate!
* Do this across multiple processes and average the results to
* increase accuracy.
*/
module.exports = function (points, callback) {
var inside = 0
, i = points
while (i--)
if (Math.pow(Math.random(), 2) + Math.pow(Math.random(), 2) <= 1)
inside++
callback(null, (inside / points) * 4)
}
@@ -0,0 +1,49 @@
/**
* Copyright (c) 2013-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
/* eslint-disable */
const CHILDREN = 500
, POINTS_PER_CHILD = 1000000
, FARM_OPTIONS = {
maxConcurrentWorkers : require('os').cpus().length
, maxCallsPerWorker : Infinity
, maxConcurrentCallsPerWorker : 1
}
var workerFarm = require('../../')
, calcDirect = require('./calc')
, calcWorker = workerFarm(FARM_OPTIONS, require.resolve('./calc'))
, ret
, start
, tally = function (finish, err, avg) {
ret.push(avg)
if (ret.length == CHILDREN) {
var pi = ret.reduce(function (a, b) { return a + b }) / ret.length
, end = +new Date()
console.log('PI ~=', pi, '\t(' + Math.abs(pi - Math.PI), 'away from actual!)')
console.log('took', end - start, 'milliseconds')
if (finish)
finish()
}
}
, calc = function (method, callback) {
ret = []
start = +new Date()
for (var i = 0; i < CHILDREN; i++)
method(POINTS_PER_CHILD, tally.bind(null, callback))
}
console.log('Doing it the slow (single-process) way...')
calc(calcDirect, function () {
console.log('Doing it the fast (multi-process) way...')
calc(calcWorker, process.exit)
})
Oops, something went wrong.

0 comments on commit e5920e7

Please sign in to comment.