Skip to content

Commit

Permalink
child_process: do not keep list of sent sockets
Browse files Browse the repository at this point in the history
Keeping list of all sockets that were sent to child process causes memory
leak and thus unacceptable (see nodejs#4587). However `server.close()` should
still work properly.

This commit introduces two options:

* child.send(socket, { track: true }) - will send socket and track its status.
  You should use it when you want to receive `close` event on sent sockets.
* child.send(socket) - will send socket without tracking it status. This
  performs much better, because of smaller number of RTT between master and
  child.

With both of these options `server.close()` will wait for all sent
sockets to get closed.
  • Loading branch information
indutny committed Jan 17, 2013
1 parent 47f3fc9 commit bc0db5e
Show file tree
Hide file tree
Showing 7 changed files with 461 additions and 99 deletions.
8 changes: 7 additions & 1 deletion doc/api/child_process.markdown
Expand Up @@ -124,10 +124,11 @@ process may not actually kill it. `kill` really just sends a signal to a proces


See `kill(2)` See `kill(2)`


### child.send(message, [sendHandle]) ### child.send(message, [sendHandle], [options])


* `message` {Object} * `message` {Object}
* `sendHandle` {Handle object} * `sendHandle` {Handle object}
* `options` {Object}


When using `child_process.fork()` you can write to the child using When using `child_process.fork()` you can write to the child using
`child.send(message, [sendHandle])` and messages are received by `child.send(message, [sendHandle])` and messages are received by
Expand Down Expand Up @@ -166,6 +167,11 @@ The `sendHandle` option to `child.send()` is for sending a TCP server or
socket object to another process. The child will receive the object as its socket object to another process. The child will receive the object as its
second argument to the `message` event. second argument to the `message` event.


The `options` object may have the following properties:

* `track` - Notify master process when `sendHandle` will be closed in child
process. (`false` by default)

**send server object** **send server object**


Here is an example of sending a server: Here is an example of sending a server:
Expand Down
12 changes: 11 additions & 1 deletion doc/api/net.markdown
Expand Up @@ -229,12 +229,22 @@ with `child_process.fork()`.


### server.connections ### server.connections


This function is **deprecated**; please use [server.getConnections()][] instead.
The number of concurrent connections on the server. The number of concurrent connections on the server.


This becomes `null` when sending a socket to a child with `child_process.fork()`. This becomes `null` when sending a socket to a child with
`child_process.fork()`. To poll forks and get current number of active
connections use asynchronous `server.getConnections` instead.


`net.Server` is an [EventEmitter][] with the following events: `net.Server` is an [EventEmitter][] with the following events:


### server.getConnections(callback)

Asynchronously get the number of concurrent connections on the server. Works
when sockets were sent to forks.

Callback should take two arguments `err` and `count`.

### Event: 'listening' ### Event: 'listening'


Emitted when the server has been bound after calling `server.listen`. Emitted when the server has been bound after calling `server.listen`.
Expand Down
191 changes: 137 additions & 54 deletions lib/child_process.js
Expand Up @@ -107,36 +107,43 @@ var handleConversion = {
}, },


'net.Socket': { 'net.Socket': {
send: function(message, socket) { send: function(message, socket, options) {
// pause socket so no data is lost, will be resumed later // if the socket was created by net.Server

// if the socket wsa created by net.Server
if (socket.server) { if (socket.server) {
// the slave should keep track of the socket // the slave should keep track of the socket
message.key = socket.server._connectionKey; message.key = socket.server._connectionKey;


var firstTime = !this._channel.sockets.send[message.key]; var firstTime = !this._channel.sockets.send[message.key];

// add socket to connections list
var socketList = getSocketList('send', this, message.key); var socketList = getSocketList('send', this, message.key);
socketList.add(socket);


// the server should no longer expose a .connection property if (options && options.track) {
// and when asked to close it should query the socket status from slaves // Keep track of socket's status
if (firstTime) { message.id = socketList.add(socket);
socket.server._setupSlave(socketList); } else {
// the server should no longer expose a .connection property
// and when asked to close it should query the socket status from
// the slaves
if (firstTime) socket.server._setupSlave(socketList);

// Act like socket is detached
socket.server._connections--;
} }
} }


// remove handle from socket object, it will be closed when the socket // remove handle from socket object, it will be closed when the socket
// has been send // will be sent
var handle = socket._handle; var handle = socket._handle;
handle.onread = function() {}; handle.onread = function() {};
socket._handle = null; socket._handle = null;


return handle; return handle;
}, },


postSend: function(handle) {
// Close the Socket handle after sending it
handle.close();
},

got: function(message, handle, emit) { got: function(message, handle, emit) {
var socket = new net.Socket({handle: handle}); var socket = new net.Socket({handle: handle});
socket.readable = socket.writable = true; socket.readable = socket.writable = true;
Expand All @@ -146,7 +153,10 @@ var handleConversion = {


// add socket to connections list // add socket to connections list
var socketList = getSocketList('got', this, message.key); var socketList = getSocketList('got', this, message.key);
socketList.add(socket); socketList.add({
id: message.id,
socket: socket
});
} }


emit(socket); emit(socket);
Expand All @@ -161,39 +171,98 @@ function SocketListSend(slave, key) {
var self = this; var self = this;


this.key = key; this.key = key;
this.list = [];
this.slave = slave; this.slave = slave;


// These two arrays are used to store the list of sockets and the freelist of
// indexes in this list. After insertion, item will have persistent index
// until it's removed. This way we can use this index as an identifier for
// sockets.
this.list = [];
this.freelist = [];

slave.once('disconnect', function() { slave.once('disconnect', function() {
self.flush(); self.flush();
}); });


this.slave.on('internalMessage', function(msg) { this.slave.on('internalMessage', function(msg) {
if (msg.cmd !== 'NODE_SOCKET_CLOSED' || msg.key !== self.key) return; if (msg.cmd !== 'NODE_SOCKET_CLOSED' || msg.key !== self.key) return;
self.flush(); self.remove(msg.id);
}); });
} }
util.inherits(SocketListSend, EventEmitter); util.inherits(SocketListSend, EventEmitter);


SocketListSend.prototype.add = function(socket) { SocketListSend.prototype.add = function(socket) {
this.list.push(socket); var index;

// Pick one of free indexes, or insert in the end of the list
if (this.freelist.length > 0) {
index = this.freelist.pop();
this.list[index] = socket;
} else {
index = this.list.push(socket) - 1;
}

return index;
};

SocketListSend.prototype.remove = function(index) {
var socket = this.list[index];
if (!socket) return;

// Create a hole in the list and move index to the freelist
this.list[index] = null;
this.freelist.push(index);

socket.destroy();
}; };


SocketListSend.prototype.flush = function() { SocketListSend.prototype.flush = function() {
var list = this.list; var list = this.list;
this.list = []; this.list = [];
this.freelist = [];


list.forEach(function(socket) { list.forEach(function(socket) {
socket.destroy(); if (socket) socket.destroy();
}); });
}; };


SocketListSend.prototype.update = function() { SocketListSend.prototype._request = function(msg, cmd, callback) {
if (this.slave.connected === false) return; var self = this;

if (!this.slave.connected) return process.nextTick(onclose);
this.slave.send(msg);

function onclose() {
self.slave.removeListener('internalMessage', onreply);
callback(new Error('Slave closed before reply'));
};

function onreply(msg) {
if (!(msg.cmd === cmd && msg.key === self.key)) return;
self.slave.removeListener('disconnect', onclose);
self.slave.removeListener('internalMessage', onreply);

callback(null, msg);
};

this.slave.once('disconnect', onclose);
this.slave.on('internalMessage', onreply);
};

SocketListSend.prototype.close = function close(callback) {
this._request({
cmd: 'NODE_SOCKET_NOTIFY_CLOSE',
key: this.key
}, 'NODE_SOCKET_ALL_CLOSED', callback);
};


this.slave.send({ SocketListSend.prototype.getConnections = function getConnections(callback) {
cmd: 'NODE_SOCKET_FETCH', this._request({
cmd: 'NODE_SOCKET_GET_COUNT',
key: this.key key: this.key
}, 'NODE_SOCKET_COUNT', function(err, msg) {
if (err) return callback(err);
callback(null, msg.count);
}); });
}; };


Expand All @@ -203,45 +272,59 @@ function SocketListReceive(slave, key) {


var self = this; var self = this;


this.connections = 0;
this.key = key; this.key = key;
this.list = [];
this.slave = slave; this.slave = slave;


slave.on('internalMessage', function(msg) { function onempty() {
if (msg.cmd !== 'NODE_SOCKET_FETCH' || msg.key !== self.key) return; if (!self.slave.connected) return;

if (self.list.length === 0) {
self.flush();
return;
}


self.on('itemRemoved', function removeMe() { self.slave.send({
if (self.list.length !== 0) return; cmd: 'NODE_SOCKET_ALL_CLOSED',
self.removeListener('itemRemoved', removeMe); key: self.key
self.flush();
}); });
}

this.slave.on('internalMessage', function(msg) {
if (msg.key !== self.key) return;

if (msg.cmd === 'NODE_SOCKET_NOTIFY_CLOSE') {
// Already empty
if (self.connections === 0) return onempty();

// Wait for sockets to get closed
self.once('empty', onempty);
} else if (msg.cmd === 'NODE_SOCKET_GET_COUNT') {
if (!self.slave.connected) return;
self.slave.send({
cmd: 'NODE_SOCKET_COUNT',
key: self.key,
count: self.connections
});
}
}); });
} }
util.inherits(SocketListReceive, EventEmitter); util.inherits(SocketListReceive, EventEmitter);


SocketListReceive.prototype.flush = function() { SocketListReceive.prototype.add = function(obj) {
this.list = []; var self = this;


if (this.slave.connected) { this.connections++;
this.slave.send({
cmd: 'NODE_SOCKET_CLOSED',
key: this.key
});
}
};


SocketListReceive.prototype.add = function(socket) { // Notify previous owner of socket about its state change
var self = this; obj.socket.once('close', function() {
this.list.push(socket); self.connections--;


socket.on('close', function() { if (obj.id !== undefined && self.slave.connected) {
self.list.splice(self.list.indexOf(socket), 1); // Master wants to keep eye on socket status
self.emit('itemRemoved'); self.slave.send({
cmd: 'NODE_SOCKET_CLOSED',
key: self.key,
id: obj.id
});
}

if (self.connections === 0) self.emit('empty');
}); });
}; };


Expand Down Expand Up @@ -366,17 +449,16 @@ function setupChannel(target, channel) {
var string = JSON.stringify(message) + '\n'; var string = JSON.stringify(message) + '\n';
var writeReq = channel.writeUtf8String(string, handle); var writeReq = channel.writeUtf8String(string, handle);


// Close the Socket handle after sending it
if (message && message.type === 'net.Socket') {
handle.close();
}

if (!writeReq) { if (!writeReq) {
var er = errnoException(errno, 'write', 'cannot write to IPC channel.'); var er = errnoException(errno, 'write', 'cannot write to IPC channel.');
this.emit('error', er); this.emit('error', er);
} }


writeReq.oncomplete = nop; if (obj && obj.postSend) {
writeReq.oncomplete = obj.postSend.bind(null, handle);
} else {
writeReq.oncomplete = nop;
}


/* If the master is > 2 read() calls behind, please stop sending. */ /* If the master is > 2 read() calls behind, please stop sending. */
return channel.writeQueueSize < (65536 * 2); return channel.writeQueueSize < (65536 * 2);
Expand Down Expand Up @@ -656,6 +738,7 @@ function ChildProcess() {


this._closesNeeded = 1; this._closesNeeded = 1;
this._closesGot = 0; this._closesGot = 0;
this.connected = false;


this.signalCode = null; this.signalCode = null;
this.exitCode = null; this.exitCode = null;
Expand Down

0 comments on commit bc0db5e

Please sign in to comment.