From 4946e155f6e99444fe5149021c8b94ea2001dba7 Mon Sep 17 00:00:00 2001 From: Timothy J Fontaine Date: Thu, 12 Jul 2012 12:40:54 -0400 Subject: [PATCH] cleanup some logic in pending and add comments --- lib/pending.js | 165 ++++++++++++++++++++++++++++--------------------- 1 file changed, 94 insertions(+), 71 deletions(-) diff --git a/lib/pending.js b/lib/pending.js index 4132919..72fc343 100644 --- a/lib/pending.js +++ b/lib/pending.js @@ -27,26 +27,29 @@ var dgram = require('dgram'), TCPMessage = require('./utils').TCPMessage, Socket = require('./utils').Socket; -var SocketCache = function(parent) { - this._pending = {}; - this._socket = {}; - this._parent = parent; -}; - -SocketCache.prototype._hash = function(server) { +var serverHash = function(server) { if (server.type === 'tcp') return server.address + ':' + server.port; else return 'udp' + net.isIP(server.address); }; +// Data structure that creates sockets and listens for new packets +var SocketCache = function(parent) { + // This holds callbacks for people trying to get a socket + this._pending = {}; + // Holds actual sockets + this._socket = {}; + this._parent = parent; +}; + SocketCache.prototype._getPending = function(server) { - var name = this._hash(server); + var name = serverHash(server); return this._pending[name]; }; SocketCache.prototype._pendingAdd = function(server, cb) { - var name = this._hash(server); + var name = serverHash(server); if (!this._pending[name]) { this._pending[name] = []; } @@ -54,38 +57,38 @@ SocketCache.prototype._pendingAdd = function(server, cb) { }; SocketCache.prototype._pendingRemove = function(server) { - var name = this._hash(server); + var name = serverHash(server); delete this._pending[name]; }; -SocketCache.prototype._toInternalSocket = function(server, socket) { - var S; - - if (server.type === 'tcp') { - S = new Socket(null, socket); - } else { - S = new Socket(socket, server); - } - - return S; -}; - SocketCache.prototype._pendingEmit = function(server, socket) { - var S, pending, self = this; + var dnssocket, pending; + pending = this._getPending(server); + if (pending) { - self._socketAdd(server, socket); + this._socketAdd(server, socket); + this._pendingRemove(server); - S = this._toInternalSocket(server, socket); + + if (server.type === 'tcp') { + dnssocket = new Socket(null, socket); + } else { + dnssocket = new Socket(socket, server); + } + pending.forEach(function(cb) { - cb(S); + cb(dnssocket); }); } }; SocketCache.prototype._getSocket = function(server) { - var name = this._hash(server); - return this._socket[name]; + var name = serverHash(server); + var socket = this._socket[name] + if (socket) + socket.last = Date.now(); + return socket; }; SocketCache.prototype._socketRemoveInternal = function(shash, socket) { @@ -100,18 +103,21 @@ SocketCache.prototype._socketRemoveInternal = function(shash, socket) { }; SocketCache.prototype._socketRemove = function(server) { - var cache_name = this._hash(server); + var cache_name = serverHash(server); var socket = this._getSocket(server); this._socketRemoveInternal(cache_name, socket); }; SocketCache.prototype._socketAdd = function(server, socket) { - var self = this; - var cache_name = this._hash(server); - this._socket[cache_name] = { - last: new Date().getTime(), - socket: socket - }; + var cache_name = serverHash(server); + var cached = this._getSocket(server); + if (!cached) { + cached = this._socket[cache_name] = { + last: Date.now(), + socket: socket + }; + } + return cached; }; SocketCache.prototype._createTcp = function(server) { @@ -152,11 +158,11 @@ SocketCache.prototype._createUdp = function(server) { }; var onClose = function() { + self._pendingRemove(server); self._socketRemove(server); }; var onListening = function() { - //self._socketAdd(server, socket); self._pendingEmit(server, socket); }; @@ -168,25 +174,24 @@ SocketCache.prototype._createUdp = function(server) { } }; +// Request a socket, if it's not currently open create it SocketCache.prototype.get = function(server, cb) { - var socket, pending, S; + var socket, self = this; + + this._pendingAdd(server, cb); socket = this._getSocket(server); - pending = this._getPending(server); if (!socket) { - this._pendingAdd(server, cb); - if (!pending) { - if (server.type === 'tcp') { - this._createTcp(server); - } else { - this._createUdp(server); - } + if (server.type === 'tcp') { + this._createTcp(server); + } else { + this._createUdp(server); } } else { - socket.last = new Date().getTime(); - S = this._toInternalSocket(server, socket.socket); - cb(S); + process.nextTick(function () { + self._pendingEmit(server, socket.socket); + }); } }; @@ -199,54 +204,54 @@ var random_integer = function() { return Math.floor(Math.random() * 50000 + 1); }; +// TODO XXX FIXME -- Until we can unref a socket, close a socket if no +// requests have come in for in 300ms var SOCKET_TIMEOUT = 300; +// Data strucutre that manages making sure there are only so many requests in +// in flight at a given time, as well closing sockets as needed var ServerQueue = module.exports = function(parent, active) { var self = this; + // Where requests are stored if the socket isn't ready or if the socket is + // full this._queue = {}; + // requests active by socket and type this._active = {}; + // this handles socket creation and message catching this._socketCache = new SocketCache(parent); this._max_queue = active; var check_sockets = function() { var s, now; - now = new Date().getTime(); + now = Date.now(); Object.keys(self._socketCache._socket).forEach(function(s) { var socket = self._socketCache._socket[s]; var delta = now - socket.last; - var m = { server: s, delta: delta }; - - if (self._queue[s]) - m.queue = self._queue[s].order.length; - - if (self._active[s]) - m.active = self._active[s].count; - + // If it's been longer than SOCKET_TIMEOUT and there are no requests + // queued or still in flight we're safe to close the socket if (delta > SOCKET_TIMEOUT && self._queue[s].order.length === 0 && self._active[s].count === 0) { self._socketCache.close(s); } }); + // only readd the timer if we actually have sockets to pay attention to if (Object.keys(self._socketCache._socket).length) { self._timer = setTimeout(check_sockets, SOCKET_TIMEOUT); } }; + // TODO XXX FIXME -- When we can unref sockets we'll ref on active requests + // and unref when empty, that way these sockets won't hold the loop self._timer = setTimeout(check_sockets, SOCKET_TIMEOUT); }; -ServerQueue.prototype._hash = function(server) { - if (server.type === 'tcp') - return server.address + ':' + server.port; - else - return 'udp' + net.isIP(server.address); -}; - ServerQueue.prototype._getQueue = function(server) { - var name = this._hash(server); + var name = serverHash(server); + // order allows us to preserve the order since iterating the object keys + // may not preserve that if (!this._queue[name]) { this._queue[name] = { order: [] @@ -257,7 +262,7 @@ ServerQueue.prototype._getQueue = function(server) { }; ServerQueue.prototype._getActive = function(server) { - var name = this._hash(server); + var name = serverHash(server); if (!this._active[name]) { this._active[name] = { @@ -269,12 +274,14 @@ ServerQueue.prototype._getActive = function(server) { }; ServerQueue.prototype.add = function(server, request, cb) { - var name, id, queue, active; + var id, queue, active; - name = this._hash(server); queue = this._getQueue(server); active = this._getActive(server); + // create request id, make sure it's not going to collide with a queued + // request or request already in flight for this server + // ids are unique per server not system wide id = random_integer(); while (queue[id] || active[id]) id = random_integer(); @@ -282,8 +289,12 @@ ServerQueue.prototype.add = function(server, request, cb) { request: request, cb: cb }; + + // append to head of the order queue.order.splice(0, 0, id); + request.id = id; + this.fill(server); }; @@ -321,6 +332,7 @@ ServerQueue.prototype.pop = function(server) { } }; +// While we have space go ahead and send as many requests as we can fit ServerQueue.prototype.fill = function(server) { var active, cb; active = this._getActive(server); @@ -339,15 +351,21 @@ ServerQueue.prototype.getRequest = function(server, id) { }; var PendingRequests = function() { + // 100 requests in flight per server + // 100 for 8.8.8.8 udp, 100 for 8.8.8.8 tcp this._server_queue = new ServerQueue(this, 100); - this.autopromote = true; }; - PendingRequests.prototype.send = function(request) { + // The socket may not be created yet, or there may already be too many + // requests in flight, packets won't be generated until the socket is + // actually ready for sending. + // Request IDs aren't created until we're sure we're not going to collide + // with an existing request this._server_queue.add(request.server, request, function(socket) { var packet; + // TODO -- We might want to slab allocate these? or perhaps in .send try { packet = new Packet(socket); packet.header.id = request.id; @@ -366,14 +384,19 @@ PendingRequests.prototype.send = function(request) { }); }; +// Stop caring about this request, deregister it from the queue PendingRequests.prototype.remove = function(request) { - if (request.server && request.id) + if (request && request.server && request.id) this._server_queue.remove(request.server, request.id); }; +// Proxy response back to Request object +// TODO -- Should this be handled in ServerQueue instead? PendingRequests.prototype.handleMessage = function(server, msg, socket) { var err, request, answer, start, end; + // TODO -- Handle parse failure, we may have enough information to pass that + // information back to Request answer = Packet.parse(msg, socket); request = this._server_queue.getRequest(server, answer.header.id);