Permalink
Browse files

Send options to worker process over IPC

In large projects, the options may be too big to be passed through the
process arguments. Fixes #2032.
  • Loading branch information...
novemberborn committed Feb 3, 2019
1 parent 010914b commit 3078892236c4f78409cc8475ed88f07d1141c0f0
Showing with 122 additions and 105 deletions.
  1. +6 βˆ’1 lib/fork.js
  2. +1 βˆ’3 lib/worker/consume-argv.js
  3. +4 βˆ’0 lib/worker/ipc.js
  4. +108 βˆ’96 lib/worker/subprocess.js
  5. +3 βˆ’5 profile.js
@@ -50,7 +50,7 @@ module.exports = (file, opts, execArgv) => {
}
}, opts);

const args = [JSON.stringify(opts), opts.color ? '--color' : '--no-color'].concat(opts.workerArgv);
const args = [opts.color ? '--color' : '--no-color'].concat(opts.workerArgv);

const subprocess = childProcess.fork(workerPath, args, {
cwd: opts.projectDir,
@@ -85,6 +85,11 @@ module.exports = (file, opts, execArgv) => {
return;
}

if (message.ava.type === 'ready-for-options') {
send({type: 'options', options: opts});
return;
}

if (message.ava.type === 'ping') {
send({type: 'pong'});
} else {
@@ -1,5 +1,3 @@
'use strict';
require('./options').set(JSON.parse(process.argv[2]));

// Remove arguments received from fork.js and leave those specified by the user.
process.argv.splice(2, 2);
process.argv.splice(2, 1);
@@ -12,6 +12,9 @@ process.on('message', message => {
}

switch (message.ava.type) {
case 'options':
emitter.emit('options', message.ava.options);
break;
case 'peer-failed':
emitter.emit('peerFailed');
break;
@@ -23,6 +26,7 @@ process.on('message', message => {
}
});

exports.options = emitter.once('options');
exports.peerFailed = emitter.once('peerFailed');

function send(evt) {
@@ -5,126 +5,138 @@ const currentlyUnhandled = require('currently-unhandled')();
require('./ensure-forked');
require('./load-chalk');
require('./consume-argv');
require('./fake-tty');
/* eslint-enable import/no-unassigned-import */

const nowAndTimers = require('../now-and-timers');
const Runner = require('../runner');
const serializeError = require('../serialize-error');
const dependencyTracking = require('./dependency-tracker');
const ipc = require('./ipc');
const options = require('./options').get();
const precompilerHook = require('./precompiler-hook');

function exit(code) {
if (!process.exitCode) {
process.exitCode = code;
}
ipc.send({type: 'ready-for-options'});
ipc.options.then(options => {
require('./options').set(options);
require('./fake-tty'); // eslint-disable-line import/no-unassigned-import

dependencyTracking.flush();
return ipc.flush().then(() => process.exit()); // eslint-disable-line unicorn/no-process-exit
}

const runner = new Runner({
failFast: options.failFast,
failWithoutAssertions: options.failWithoutAssertions,
file: options.file,
match: options.match,
projectDir: options.projectDir,
runOnlyExclusive: options.runOnlyExclusive,
serial: options.serial,
snapshotDir: options.snapshotDir,
updateSnapshots: options.updateSnapshots
});
const nowAndTimers = require('../now-and-timers');
const Runner = require('../runner');
const serializeError = require('../serialize-error');
const dependencyTracking = require('./dependency-tracker');
const precompilerHook = require('./precompiler-hook');

ipc.peerFailed.then(() => {
runner.interrupt();
});
function exit(code) {
if (!process.exitCode) {
process.exitCode = code;
}

const attributedRejections = new Set();
process.on('unhandledRejection', (reason, promise) => {
if (runner.attributeLeakedError(reason)) {
attributedRejections.add(promise);
dependencyTracking.flush();
return ipc.flush().then(() => process.exit()); // eslint-disable-line unicorn/no-process-exit
}
});

runner.on('dependency', dependencyTracking.track);
runner.on('stateChange', state => ipc.send(state));
const runner = new Runner({
failFast: options.failFast,
failWithoutAssertions: options.failWithoutAssertions,
file: options.file,
match: options.match,
projectDir: options.projectDir,
runOnlyExclusive: options.runOnlyExclusive,
serial: options.serial,
snapshotDir: options.snapshotDir,
updateSnapshots: options.updateSnapshots
});

runner.on('error', error => {
ipc.send({type: 'internal-error', err: serializeError('Internal runner error', false, error)});
exit(1);
});
ipc.peerFailed.then(() => {
runner.interrupt();
});

runner.on('finish', () => {
try {
const touchedFiles = runner.saveSnapshotState();
if (touchedFiles) {
ipc.send({type: 'touched-files', files: touchedFiles});
const attributedRejections = new Set();
process.on('unhandledRejection', (reason, promise) => {
if (runner.attributeLeakedError(reason)) {
attributedRejections.add(promise);
}
} catch (error) {
});

runner.on('dependency', dependencyTracking.track);
runner.on('stateChange', state => ipc.send(state));

runner.on('error', error => {
ipc.send({type: 'internal-error', err: serializeError('Internal runner error', false, error)});
exit(1);
return;
}
});

nowAndTimers.setImmediate(() => {
currentlyUnhandled()
.filter(rejection => !attributedRejections.has(rejection.promise))
.forEach(rejection => {
ipc.send({type: 'unhandled-rejection', err: serializeError('Unhandled rejection', true, rejection.reason)});
});
runner.on('finish', () => {
try {
const touchedFiles = runner.saveSnapshotState();
if (touchedFiles) {
ipc.send({type: 'touched-files', files: touchedFiles});
}
} catch (error) {
ipc.send({type: 'internal-error', err: serializeError('Internal runner error', false, error)});
exit(1);
return;
}

exit(0);
});
});
nowAndTimers.setImmediate(() => {
currentlyUnhandled()
.filter(rejection => !attributedRejections.has(rejection.promise))
.forEach(rejection => {
ipc.send({type: 'unhandled-rejection', err: serializeError('Unhandled rejection', true, rejection.reason)});
});

process.on('uncaughtException', error => {
if (runner.attributeLeakedError(error)) {
return;
}
exit(0);
});
});

ipc.send({type: 'uncaught-exception', err: serializeError('Uncaught exception', true, error)});
exit(1);
});
process.on('uncaughtException', error => {
if (runner.attributeLeakedError(error)) {
return;
}

let accessedRunner = false;
exports.getRunner = () => {
accessedRunner = true;
return runner;
};
ipc.send({type: 'uncaught-exception', err: serializeError('Uncaught exception', true, error)});
exit(1);
});

// Store value in case to prevent required modules from modifying it.
const testPath = options.file;
let accessedRunner = false;
exports.getRunner = () => {
accessedRunner = true;
return runner;
};

// Install before processing options.require, so if helpers are added to the
// require configuration the *compiled* helper will be loaded.
dependencyTracking.install(testPath);
precompilerHook.install();
// Store value in case to prevent required modules from modifying it.
const testPath = options.file;

try {
for (const mod of (options.require || [])) {
const required = require(mod);
// Install before processing options.require, so if helpers are added to the
// require configuration the *compiled* helper will be loaded.
dependencyTracking.install(testPath);
precompilerHook.install();

try {
if (required[Symbol.for('esm\u200D:package')]) {
require = required(module); // eslint-disable-line no-global-assign
}
} catch (_) {}
}
try {
for (const mod of (options.require || [])) {
const required = require(mod);

try {
if (required[Symbol.for('esm\u200D:package')]) {
require = required(module); // eslint-disable-line no-global-assign
}
} catch (_) {}
}

require(testPath);
require(testPath);

if (accessedRunner) {
// Unreference the IPC channel if the test file required AVA. This stops it
// from keeping the event loop busy, which means the `beforeExit` event can be
// used to detect when tests stall.
ipc.unref();
} else {
ipc.send({type: 'missing-ava-import'});
if (accessedRunner) {
// Unreference the IPC channel if the test file required AVA. This stops it
// from keeping the event loop busy, which means the `beforeExit` event can be
// used to detect when tests stall.
ipc.unref();
} else {
ipc.send({type: 'missing-ava-import'});
exit(1);
}
} catch (error) {
ipc.send({type: 'uncaught-exception', err: serializeError('Uncaught exception', true, error)});
exit(1);
}
} catch (error) {
ipc.send({type: 'uncaught-exception', err: serializeError('Uncaught exception', true, error)});
exit(1);
}
}).catch(error => {
// There shouldn't be any errors, but if there are we may not have managed
// to bootstrap enough code to serialize them. Re-throw and let the process
// crash.
setImmediate(() => {
throw error;
});
});
@@ -116,7 +116,9 @@ runStatus.observeWorker({
process.send = data => {
if (data && data.ava) {
const evt = data.ava;
if (evt.type === 'ping') {
if (evt.type === 'ready-for-options') {
process.emit('message', {ava: {type: 'options', options: opts}});
} else if (evt.type === 'ping') {
if (console.profileEnd) {
console.profileEnd();
}
@@ -152,10 +154,6 @@ process.on('beforeExit', () => {
process.exitCode = process.exitCode || runStatus.suggestExitCode({matching: false});
});

// The "subprocess" will read process.argv[2] for options
process.argv[2] = JSON.stringify(opts);
process.argv.length = 3;

if (console.profile) {
console.profile('AVA test-worker process');
}

0 comments on commit 3078892

Please sign in to comment.