Permalink
Browse files

child_process: new stdio API for .spawn() method

  • Loading branch information...
1 parent 30a0e58 commit af98fc9d5f20a8c5dc0db95121f18355337762f1 @indutny committed May 16, 2012
Showing with 472 additions and 246 deletions.
  1. +158 −78 lib/child_process.js
  2. +3 −0 node.gyp
  3. +3 −2 src/node.js
  4. +59 −41 src/process_wrap.cc
  5. +12 −0 src/tcp_wrap.cc
  6. +2 −0 src/tcp_wrap.h
  7. +99 −85 src/tty_wrap.cc
  8. +58 −0 src/tty_wrap.h
  9. +13 −39 src/udp_wrap.cc
  10. +60 −0 src/udp_wrap.h
  11. +5 −1 test/simple/test-process-wrap.js
View
@@ -25,17 +25,38 @@ var Process = process.binding('process_wrap').Process;
var util = require('util');
var constants; // if (!constants) constants = process.binding('constants');
-var Pipe;
+var handleWraps = {};
+function handleWrapGetter(name, callback) {
+ var cons;
+
+ Object.defineProperty(handleWraps, name, {
+ get: function() {
+ if (cons !== undefined) return cons;
+ return cons = callback();
+ }
+ });
+}
+
+handleWrapGetter('Pipe', function() {
+ return process.binding('pipe_wrap').Pipe;
+});
+
+handleWrapGetter('TTY', function() {
+ return process.binding('tty_wrap').TTY;
+});
+
+handleWrapGetter('TCP', function() {
+ return process.binding('tcp_wrap').TCP;
+});
+
+handleWrapGetter('UDP', function() {
+ return process.binding('udp_wrap').UDP;
+});
// constructors for lazy loading
function createPipe(ipc) {
- // Lazy load
- if (!Pipe) {
- Pipe = process.binding('pipe_wrap').Pipe;
- }
-
- return new Pipe(ipc);
+ return new handleWraps.Pipe(ipc);
}
function createSocket(pipe, readable) {
@@ -414,39 +435,18 @@ exports.fork = function(modulePath /*, args, options*/) {
execArgv = options.execArgv || process.execArgv;
args = execArgv.concat([modulePath], args);
- // Don't allow stdinStream and customFds since a stdin channel will be used
- if (options.stdinStream) {
- throw new Error('stdinStream not allowed for fork()');
- }
-
- if (options.customFds) {
- throw new Error('customFds not allowed for fork()');
- }
-
// Leave stdin open for the IPC channel. stdout and stderr should be the
// same as the parent's if silent isn't set.
- options.customFds = (options.silent ? [-1, -1, -1] : [-1, 1, 2]);
-
- // Just need to set this - child process won't actually use the fd.
- // For backwards compat - this can be changed to 'NODE_CHANNEL' before v0.6.
- options.env = util._extend({}, options.env || process.env);
- options.env.NODE_CHANNEL_FD = 42;
+ options.stdio = options.silent ? ['ipc', 'pipe', 'pipe'] : ['ipc', 1, 2];
- // stdin is the IPC channel.
- options.stdinStream = createPipe(true);
-
- var child = spawn(process.execPath, args, options);
-
- setupChannel(child, options.stdinStream);
-
- return child;
+ return spawn(process.execPath, args, options);
};
-exports._forkChild = function() {
+exports._forkChild = function(fd) {
// set process.send()
var p = createPipe(true);
- p.open(0);
+ p.open(fd);
setupChannel(process, p);
};
@@ -591,15 +591,19 @@ var spawn = exports.spawn = function(file, args, options) {
}
var child = new ChildProcess();
+ if (options && options.customFds && !options.stdio) {
+ options.stdio = options.customFds.map(function(fd) {
+ return fd === -1 ? 'pipe' : fd;
+ });
+ }
child.spawn({
file: file,
args: args,
cwd: options ? options.cwd : null,
windowsVerbatimArguments: !!(options && options.windowsVerbatimArguments),
envPairs: envPairs,
- customFds: options ? options.customFds : null,
- stdinStream: options ? options.stdinStream : null,
+ stdio: options ? options.stdio : null,
uid: options ? options.uid : null,
gid: options ? options.gid : null
});
@@ -658,72 +662,148 @@ function ChildProcess() {
util.inherits(ChildProcess, EventEmitter);
-function setStreamOption(name, index, options) {
- // Skip if we already have options.stdinStream
- if (options[name]) return;
+function getHandleWrapType(stream) {
+ if (stream instanceof handleWraps.Pipe) return 'pipe';
+ if (stream instanceof handleWraps.TTY) return 'tty';
+ if (stream instanceof handleWraps.TCP) return 'tcp';
+ if (stream instanceof handleWraps.UDP) return 'udp';
- if (options.customFds &&
- typeof options.customFds[index] == 'number' &&
- options.customFds[index] !== -1) {
- if (options.customFds[index] === index) {
- options[name] = null;
- } else {
- throw new Error('customFds not yet supported');
- }
- } else {
- options[name] = createPipe();
- }
+ return false;
}
ChildProcess.prototype.spawn = function(options) {
- var self = this;
-
- setStreamOption('stdinStream', 0, options);
- setStreamOption('stdoutStream', 1, options);
- setStreamOption('stderrStream', 2, options);
+ var self = this,
+ ipc,
+ ipcFd,
+ // If no `stdio` option was given - use default
+ stdio = options.stdio || 'pipe';
+
+ // Replace shortcut with an array
+ if (typeof stdio === 'string') {
+ switch (stdio) {
+ case 'ignore': stdio = ['ignore', 'ignore', 'ignore']; break;
+ case 'pipe': stdio = ['pipe', 'pipe', 'pipe']; break;
+ case 'inherit': stdio = [0, 1, 2]; break;
+ default: throw new TypeError('Incorrect value of stdio option: ' + stdio);
+ }
+ } else if (!Array.isArray(stdio)) {
+ throw new TypeError('Incorrect value of stdio option: ' + stdio);
+ }
- var r = this._handle.spawn(options);
+ // At least 3 stdio will be created
+ if (stdio.length < 3) {
+ stdio = stdio.concat(new Array(3 - stdio.length));
+ }
- if (r) {
- if (options.stdinStream) {
- options.stdinStream.close();
+ // Translate stdio into C++-readable form
+ // (i.e. PipeWraps or fds)
+ stdio = stdio.reduce(function(acc, stdio, i) {
+ function cleanup() {
+ acc.filter(function(stdio) {
+ return stdio.type === 'pipe' || stdio.type === 'ipc';
+ }).forEach(function(stdio) {
+ stdio.handle.close();
+ });
}
- if (options.stdoutStream) {
- options.stdoutStream.close();
+ // Defaults
+ if (stdio === undefined || stdio === null) {
+ stdio = i < 3 ? 'pipe' : 'ignore';
}
- if (options.stderrStream) {
- options.stderrStream.close();
+ if (stdio === 'ignore') {
+ acc.push({type: 'ignore'});
+ } else if (stdio === 'pipe' || typeof stdio === 'number' && stdio < 0) {
+ acc.push({type: 'pipe', handle: createPipe()});
+ } else if (stdio === 'ipc') {
+ if (ipc !== undefined) {
+ // Cleanup previously created pipes
+ cleanup();
+ throw Error('Child process can have only one IPC pipe');
+ }
+
+ ipc = createPipe(true);
+ ipcFd = i;
+
+ acc.push({ type: 'pipe', handle: ipc });
+ } else if (typeof stdio === 'number' || typeof stdio.fd === 'number') {
+ acc.push({ type: 'fd', fd: stdio.fd || stdio });
+ } else if (getHandleWrapType(stdio) || getHandleWrapType(stdio.handle) ||
+ getHandleWrapType(stdio._handle)) {
+ var handle = getHandleWrapType(stdio) ?
+ stdio :
+ getHandleWrapType(stdio.handle) ? stdio.handle : stdio._handle;
+
+ acc.push({
+ type: 'wrap',
+ wrapType: getHandleWrapType(handle),
+ handle: handle
+ });
+ } else {
+ // Cleanup
+ cleanup();
+ throw new TypeError('Incorrect value for stdio stream: ' + stdio);
}
+ return acc;
+ }, []);
+
+ options.stdio = stdio;
+
+ if (ipc !== undefined) {
+ // Let child process know about opened IPC channel
+ options.envPairs = options.envPairs || [];
+ options.envPairs.push('NODE_CHANNEL_FD=' + ipcFd);
+ }
+
+ var r = this._handle.spawn(options);
+
+ if (r) {
+ // Close all opened fds on error
+ stdio.forEach(function(stdio) {
+ if (stdio.type === 'pipe') {
+ stdio.handle.close();
+ }
+ });
+
this._handle.close();
this._handle = null;
throw errnoException(errno, 'spawn');
}
this.pid = this._handle.pid;
- if (options.stdinStream) {
- this.stdin = createSocket(options.stdinStream, false);
- }
+ stdio.forEach(function(stdio, i) {
+ if (stdio.type === 'ignore') return;
- if (options.stdoutStream) {
- this.stdout = createSocket(options.stdoutStream, true);
- this._closesNeeded++;
- this.stdout.on('close', function() {
- maybeClose(self);
- });
- }
+ if (stdio.handle) {
+ // when i === 0 - we're dealing with stdin
+ // (which is the only one writable pipe)
+ stdio.socket = createSocket(stdio.handle, i > 0);
- if (options.stderrStream) {
- this.stderr = createSocket(options.stderrStream, true);
- this._closesNeeded++;
- this.stderr.on('close', function() {
- maybeClose(self);
- });
- }
+ if (i > 0) {
+ self._closesNeeded++;
+ stdio.socket.on('close', function() {
+ maybeClose(self);
+ });
+ }
+ }
+ });
+
+ this.stdin = stdio.length >= 1 && stdio[0].socket !== undefined ?
+ stdio[0].socket : null;
+ this.stdout = stdio.length >= 2 && stdio[1].socket !== undefined ?
+ stdio[1].socket : null;
+ this.stderr = stdio.length >= 3 && stdio[2].socket !== undefined ?
+ stdio[2].socket : null;
+
+ this.stdio = stdio.map(function(stdio) {
+ return stdio.socket === undefined ? null : stdio.socket;
+ });
+
+ // Add .send() method and start listening for IPC data
+ if (ipc !== undefined) setupChannel(this, ipc);
return r;
};
View
@@ -109,6 +109,9 @@
'src/node_version.h',
'src/ngx-queue.h',
'src/pipe_wrap.h',
+ 'src/tty_wrap.h',
+ 'src/tcp_wrap.h',
+ 'src/udp_wrap.h',
'src/req_wrap.h',
'src/slab_allocator.h',
'src/stream_wrap.h',
View
@@ -482,7 +482,8 @@
// If we were spawned with env NODE_CHANNEL_FD then load that up and
// start parsing data from that stream.
if (process.env.NODE_CHANNEL_FD) {
- assert(parseInt(process.env.NODE_CHANNEL_FD) >= 0);
+ var fd = parseInt(process.env.NODE_CHANNEL_FD, 10);
+ assert(fd >= 0);
// Make sure it's not accidentally inherited by child processes.
delete process.env.NODE_CHANNEL_FD;
@@ -494,7 +495,7 @@
// FIXME is this really necessary?
process.binding('tcp_wrap');
- cp._forkChild();
+ cp._forkChild(fd);
assert(process.send);
}
}
Oops, something went wrong.

0 comments on commit af98fc9

Please sign in to comment.