Skip to content

Commit

Permalink
cleanup some logic in pending and add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tjfontaine committed Jul 12, 2012
1 parent e1e11a6 commit 4946e15
Showing 1 changed file with 94 additions and 71 deletions.
165 changes: 94 additions & 71 deletions lib/pending.js
Expand Up @@ -27,65 +27,68 @@ var dgram = require('dgram'),
TCPMessage = require('./utils').TCPMessage,
Socket = require('./utils').Socket;

var SocketCache = function(parent) {
this._pending = {};
this._socket = {};
this._parent = parent;
};

SocketCache.prototype._hash = function(server) {
var serverHash = function(server) {
if (server.type === 'tcp')
return server.address + ':' + server.port;
else
return 'udp' + net.isIP(server.address);
};

// Data structure that creates sockets and listens for new packets
var SocketCache = function(parent) {
// This holds callbacks for people trying to get a socket
this._pending = {};
// Holds actual sockets
this._socket = {};
this._parent = parent;
};

SocketCache.prototype._getPending = function(server) {
var name = this._hash(server);
var name = serverHash(server);
return this._pending[name];
};

SocketCache.prototype._pendingAdd = function(server, cb) {
var name = this._hash(server);
var name = serverHash(server);
if (!this._pending[name]) {
this._pending[name] = [];
}
this._pending[name].push(cb);
};

SocketCache.prototype._pendingRemove = function(server) {
var name = this._hash(server);
var name = serverHash(server);
delete this._pending[name];
};

SocketCache.prototype._toInternalSocket = function(server, socket) {
var S;

if (server.type === 'tcp') {
S = new Socket(null, socket);
} else {
S = new Socket(socket, server);
}

return S;
};

SocketCache.prototype._pendingEmit = function(server, socket) {
var S, pending, self = this;
var dnssocket, pending;

pending = this._getPending(server);

if (pending) {
self._socketAdd(server, socket);
this._socketAdd(server, socket);

this._pendingRemove(server);
S = this._toInternalSocket(server, socket);

if (server.type === 'tcp') {
dnssocket = new Socket(null, socket);
} else {
dnssocket = new Socket(socket, server);
}

pending.forEach(function(cb) {
cb(S);
cb(dnssocket);
});
}
};

SocketCache.prototype._getSocket = function(server) {
var name = this._hash(server);
return this._socket[name];
var name = serverHash(server);
var socket = this._socket[name]
if (socket)
socket.last = Date.now();
return socket;
};

SocketCache.prototype._socketRemoveInternal = function(shash, socket) {
Expand All @@ -100,18 +103,21 @@ SocketCache.prototype._socketRemoveInternal = function(shash, socket) {
};

SocketCache.prototype._socketRemove = function(server) {
var cache_name = this._hash(server);
var cache_name = serverHash(server);
var socket = this._getSocket(server);
this._socketRemoveInternal(cache_name, socket);
};

SocketCache.prototype._socketAdd = function(server, socket) {
var self = this;
var cache_name = this._hash(server);
this._socket[cache_name] = {
last: new Date().getTime(),
socket: socket
};
var cache_name = serverHash(server);
var cached = this._getSocket(server);
if (!cached) {
cached = this._socket[cache_name] = {
last: Date.now(),
socket: socket
};
}
return cached;
};

SocketCache.prototype._createTcp = function(server) {
Expand Down Expand Up @@ -152,11 +158,11 @@ SocketCache.prototype._createUdp = function(server) {
};

var onClose = function() {
self._pendingRemove(server);
self._socketRemove(server);
};

var onListening = function() {
//self._socketAdd(server, socket);
self._pendingEmit(server, socket);
};

Expand All @@ -168,25 +174,24 @@ SocketCache.prototype._createUdp = function(server) {
}
};

// Request a socket, if it's not currently open create it
SocketCache.prototype.get = function(server, cb) {
var socket, pending, S;
var socket, self = this;

this._pendingAdd(server, cb);

socket = this._getSocket(server);
pending = this._getPending(server);

if (!socket) {
this._pendingAdd(server, cb);
if (!pending) {
if (server.type === 'tcp') {
this._createTcp(server);
} else {
this._createUdp(server);
}
if (server.type === 'tcp') {
this._createTcp(server);
} else {
this._createUdp(server);
}
} else {
socket.last = new Date().getTime();
S = this._toInternalSocket(server, socket.socket);
cb(S);
process.nextTick(function () {
self._pendingEmit(server, socket.socket);
});
}
};

Expand All @@ -199,54 +204,54 @@ var random_integer = function() {
return Math.floor(Math.random() * 50000 + 1);
};

// TODO XXX FIXME -- Until we can unref a socket, close a socket if no
// requests have come in for in 300ms
var SOCKET_TIMEOUT = 300;

// Data strucutre that manages making sure there are only so many requests in
// in flight at a given time, as well closing sockets as needed
var ServerQueue = module.exports = function(parent, active) {
var self = this;

// Where requests are stored if the socket isn't ready or if the socket is
// full
this._queue = {};
// requests active by socket and type
this._active = {};
// this handles socket creation and message catching
this._socketCache = new SocketCache(parent);
this._max_queue = active;

var check_sockets = function() {
var s, now;
now = new Date().getTime();
now = Date.now();
Object.keys(self._socketCache._socket).forEach(function(s) {
var socket = self._socketCache._socket[s];
var delta = now - socket.last;

var m = { server: s, delta: delta };

if (self._queue[s])
m.queue = self._queue[s].order.length;

if (self._active[s])
m.active = self._active[s].count;

// If it's been longer than SOCKET_TIMEOUT and there are no requests
// queued or still in flight we're safe to close the socket
if (delta > SOCKET_TIMEOUT && self._queue[s].order.length === 0 &&
self._active[s].count === 0) {
self._socketCache.close(s);
}
});
// only readd the timer if we actually have sockets to pay attention to
if (Object.keys(self._socketCache._socket).length) {
self._timer = setTimeout(check_sockets, SOCKET_TIMEOUT);
}
};

// TODO XXX FIXME -- When we can unref sockets we'll ref on active requests
// and unref when empty, that way these sockets won't hold the loop
self._timer = setTimeout(check_sockets, SOCKET_TIMEOUT);
};

ServerQueue.prototype._hash = function(server) {
if (server.type === 'tcp')
return server.address + ':' + server.port;
else
return 'udp' + net.isIP(server.address);
};

ServerQueue.prototype._getQueue = function(server) {
var name = this._hash(server);
var name = serverHash(server);

// order allows us to preserve the order since iterating the object keys
// may not preserve that
if (!this._queue[name]) {
this._queue[name] = {
order: []
Expand All @@ -257,7 +262,7 @@ ServerQueue.prototype._getQueue = function(server) {
};

ServerQueue.prototype._getActive = function(server) {
var name = this._hash(server);
var name = serverHash(server);

if (!this._active[name]) {
this._active[name] = {
Expand All @@ -269,21 +274,27 @@ ServerQueue.prototype._getActive = function(server) {
};

ServerQueue.prototype.add = function(server, request, cb) {
var name, id, queue, active;
var id, queue, active;

name = this._hash(server);
queue = this._getQueue(server);
active = this._getActive(server);

// create request id, make sure it's not going to collide with a queued
// request or request already in flight for this server
// ids are unique per server not system wide
id = random_integer();
while (queue[id] || active[id]) id = random_integer();

queue[id] = {
request: request,
cb: cb
};

// append to head of the order
queue.order.splice(0, 0, id);

request.id = id;

this.fill(server);
};

Expand Down Expand Up @@ -321,6 +332,7 @@ ServerQueue.prototype.pop = function(server) {
}
};

// While we have space go ahead and send as many requests as we can fit
ServerQueue.prototype.fill = function(server) {
var active, cb;
active = this._getActive(server);
Expand All @@ -339,15 +351,21 @@ ServerQueue.prototype.getRequest = function(server, id) {
};

var PendingRequests = function() {
// 100 requests in flight per server
// 100 for 8.8.8.8 udp, 100 for 8.8.8.8 tcp
this._server_queue = new ServerQueue(this, 100);
this.autopromote = true;
};


PendingRequests.prototype.send = function(request) {
// The socket may not be created yet, or there may already be too many
// requests in flight, packets won't be generated until the socket is
// actually ready for sending.
// Request IDs aren't created until we're sure we're not going to collide
// with an existing request
this._server_queue.add(request.server, request, function(socket) {
var packet;

// TODO -- We might want to slab allocate these? or perhaps in .send
try {
packet = new Packet(socket);
packet.header.id = request.id;
Expand All @@ -366,14 +384,19 @@ PendingRequests.prototype.send = function(request) {
});
};

// Stop caring about this request, deregister it from the queue
PendingRequests.prototype.remove = function(request) {
if (request.server && request.id)
if (request && request.server && request.id)
this._server_queue.remove(request.server, request.id);
};

// Proxy response back to Request object
// TODO -- Should this be handled in ServerQueue instead?
PendingRequests.prototype.handleMessage = function(server, msg, socket) {
var err, request, answer, start, end;

// TODO -- Handle parse failure, we may have enough information to pass that
// information back to Request
answer = Packet.parse(msg, socket);

request = this._server_queue.getRequest(server, answer.header.id);
Expand Down

0 comments on commit 4946e15

Please sign in to comment.