Skip to content

Commit

Permalink
feature(putout) add concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
coderaiser committed May 11, 2020
1 parent 762cea5 commit 6374b05
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 31 deletions.
9 changes: 9 additions & 0 deletions .eslintrc.json
@@ -1,4 +1,13 @@
{
"rules": {
"putout/putout": ["error", {
"rules": {
"remove-unused-variables": "off",
"remove-console": "off",
"remove-debugger": "off"
}
}]
},
"extends": [
"plugin:putout/recommended",
"plugin:node/recommended"
Expand Down
1 change: 0 additions & 1 deletion .gitignore
@@ -1,6 +1,5 @@
node_modules
.nyc_output
legacy
*.swp
yarn-error.log
yarn.lock
Expand Down
89 changes: 89 additions & 0 deletions packages/putout/lib/cli/concurrency/index.js
@@ -0,0 +1,89 @@
'use strict';

const {readFile} = require('fs').promises;
const {promisify} = require('util');
const WorkerPool = require('./worker_pool.js');

const tryCatch = require('try-catch');
const once = require('once');

const getFormatter = once(_getFormatter);
const report = require('../../report')();
const stub = () => () => {};

const {exit, stdout} = process;
const write = stdout.write.bind(stdout);

module.exports = promisify(async (names, options, fn) => {
let finished = 0;
const pool = new WorkerPool('./task_processor.js');

const {
dir,
format,
formatter = 'dump',
} = options;

const currentFormat = getFormatter(format || formatter, exit);

const {length} = names;
for (const name of names) {
const source = await readFile(name, 'utf8');
const task = {
name,
source,
options,
length,
};
const mainResult = [];
pool.runTask(task, (error, result) => {
if (error)
return fn(error);

const {
source,
allPlaces,
resolvedName,
} = result;

mainResult.push(allPlaces);

const line = report(currentFormat, {
name: resolvedName,
places: result,
index: finished,
count: length,
source,
});

write(line || '');

if (++finished === names.length) {
pool.close();
fn(null, mainResult);
}
});
}
});

module.exports._getFormatter = _getFormatter;
function _getFormatter(name, exit) {
let e;
let reporter;

if (name === 'none')
return stub();

[e, reporter] = tryCatch(require, `@putout/formatter-${name}`);

if (!e)
return reporter;

[e, reporter] = tryCatch(require, `putout-formatter-${name}`);

if (e)
exit(e);

return reporter;
}

12 changes: 12 additions & 0 deletions packages/putout/lib/cli/concurrency/task_processor.js
@@ -0,0 +1,12 @@
'use strict';

const {parentPort} = require('worker_threads');
const processFile = require('../process-file');

let index = 0;

parentPort.on('message', async (args) => {
args[2] = index++;
const result = await processFile(...args);
parentPort.postMessage(result);
});
98 changes: 98 additions & 0 deletions packages/putout/lib/cli/concurrency/worker_pool.js
@@ -0,0 +1,98 @@
'use strict';

const {AsyncResource} = require('async_hooks');
const {EventEmitter} = require('events');
const path = require('path');
const {Worker} = require('worker_threads');
const {cpus} = require('os');

const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');

const {THREAD_IT_COUNT} = process.env;
const numThreads = isNaN(THREAD_IT_COUNT) ? cpus().length : Number(THREAD_IT_COUNT);

class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo');
this.callback = callback;
}

done(err, result) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.
}
}
module.exports = class ThreadIt extends EventEmitter {
constructor(name) {
super();
this.workers = [];
this.freeWorkers = [];
this.name = name;

this.setMaxListeners(Infinity);

for (let i = 0; i < numThreads; i++)
this.addNewWorker();
}

addNewWorker() {
const worker = new Worker(path.resolve(__dirname, this.name));

worker.on('message', (result) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
this.emit('error', err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}

runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.once(kWorkerFreedEvent, () => this.runTask(task, callback));
return;
}

const {
name,
options,
length,
source,
} = task;

const worker = this.freeWorkers.pop();
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage([
options,
name,
0,
{length},
source,
]);
}

close() {
for (const worker of this.workers)
worker.terminate();
}
};

26 changes: 9 additions & 17 deletions packages/putout/lib/cli/index.js
Expand Up @@ -4,9 +4,10 @@ const {red} = require('chalk');
const yargsParser = require('yargs-parser');

const merge = require('../merge');
const processFile = require('./process-file');
//const processFile = require('./process-file');
const getFiles = require('./get-files');
const cacheFiles = require('./cache-files');
const processFile = require('./concurrency');

const isString = (a) => typeof a === 'string';
const isStringAll = (...a) => a.filter(isString).length;
Expand Down Expand Up @@ -124,7 +125,7 @@ module.exports = async ({argv, halt, log, write, logError}) => {

const options = {
fix,
fileCache,
//fileCache,
updateCache,
removeCache,
rulesdir,
Expand All @@ -140,24 +141,15 @@ module.exports = async ({argv, halt, log, write, logError}) => {
enableAll,
},

exit,
log,
logError,
write,
//exit,
//log,
//logError,
//write,
noOptions: !args.options,
};

const rawPlaces = [];

const process = processFile(options);
const {length} = files;

for (let i = 0; i < length; i++) {
const file = files[i];
const place = await process(file, i, {length});
rawPlaces.push(place);
}

debugger;
const rawPlaces = await processFile(files, options);
const places = rawPlaces.filter(Boolean);

const mergedPlaces = merge(...places);
Expand Down

0 comments on commit 6374b05

Please sign in to comment.