From 2885d603f27dcf2e5b18cdc0bebab346b7576a83 Mon Sep 17 00:00:00 2001 From: 3rd-Eden Date: Fri, 14 Sep 2012 10:14:10 +0200 Subject: [PATCH] [minor] Implement 3rd-Eden/jackpot as connection pool, see #54 --- lib/connection.js | 66 ---------------- lib/memcached.js | 191 ++++++++++++++++++++++++---------------------- package.json | 3 +- 3 files changed, 101 insertions(+), 159 deletions(-) diff --git a/lib/connection.js b/lib/connection.js index 5553471..fe4b0c8 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -4,7 +4,6 @@ var EventEmitter = require('events').EventEmitter , spawn = require('child_process').spawn , Utils = require('./utils'); -exports.Manager = ConnectionManager; // connection pooling exports.IssueLog = IssueLog; // connection issue handling exports.Available = ping; // connection availablity @@ -104,68 +103,3 @@ issues.attemptReconnect = function attemptReconnect () { Utils.merge(issue, JSON.parse(JSON.stringify(issue.config))); }); }; - -function ConnectionManager (name, limit, constructor) { - this.name = name; - this.total = limit; - this.factory = constructor; - this.connections = []; -} - -var Manager = ConnectionManager.prototype; - -Manager.allocate = function allocate (callback) { - var total, i - , Manager = this; - - i = total = this.connections.length; - - // check for available - while (i--){ - if (this.isAvailable(this.connections[i])) { - return callback(false, this.connections[i]); - } - } - - // create new - if (total < this.total) { - return this.connections.push(this.factory.apply(this, arguments)); - } - - // give up and don't saturate the node.js process by retying #43 - var full = new Error("All the connections in the memcached pool are busy"); - full.connectionPool = true; - callback(full); -}; - -Manager.isAvailable = function isAvailable (connection) { - var readyState = connection.readyState; - return (readyState === 'open' || readyState === 'writeOnly') - && !(connection._writeQueue && connection._writeQueue.length) - && !(connection._handle && connection._handle.writeQueueSize); -}; - -Manager.remove = function remove (connection) { - var position = this.connections.indexOf(connection); - - if (position !== -1) this.connections.splice(position, 1); - if (connection.readyState && connection.readyState !== 'closed' && connection.end) { - connection.end(); - } -}; - -Manager.free = function freemymemories (keep) { - var save = 0 - , connection; - - while (this.connections.length) { - connection = this.connections.shift(); - - if (save < keep && this.isAvailable(this.connection[0])) { - save++; - continue; - } - - this.remove(connection); - } -}; diff --git a/lib/memcached.js b/lib/memcached.js index a8fbde9..e2f54c7 100644 --- a/lib/memcached.js +++ b/lib/memcached.js @@ -3,7 +3,6 @@ /** * Node's native modules */ - var EventEmitter = require('events').EventEmitter , Stream = require('net').Stream , Socket = require('net').Socket; @@ -11,17 +10,15 @@ var EventEmitter = require('events').EventEmitter /** * External or custom modules */ - var HashRing = require('hashring') , Connection = require('./connection') + , Jackpot = require('jackpot') , Utils = require('./utils') - , Manager = Connection.Manager , IssueLog = Connection.IssueLog; /** * Variable lookups */ - var curry = Utils.curry; /** @@ -32,7 +29,6 @@ var curry = Utils.curry; * @param {Object} options options * @api public */ - function Client (args, options) { var servers = [] , weights = {} @@ -72,21 +68,21 @@ function Client (args, options) { // Allows users to configure the memcached globally or per memcached client Client.config = { - maxKeySize: 251 // max key size allowed by Memcached -, maxExpiration: 2592000 // max expiration duration allowed by Memcached -, maxValue: 1048576 // max length of value allowed by Memcached - -, algorithm: 'crc32' // hashing algorithm that is used for key mapping - -, poolSize: 10 // maximal parallel connections -, reconnect: 18000000 // if dead, attempt reconnect each xx ms -, timeout: 5000 // after x ms the server should send a timeout if we can't connect -, retries: 5 // amount of retries before server is dead -, retry: 30000 // timeout between retries, all call will be marked as cache miss -, remove: false // remove server if dead if false, we will attempt to reconnect -, redundancy: false // allows you do re-distribute the keys over a x amount of servers -, keyCompression: true // compress keys if they are to large (md5) -, debug: false // Output the commands and responses + maxKeySize: 251 // max key size allowed by Memcached + , maxExpiration: 2592000 // max expiration duration allowed by Memcached + , maxValue: 1048576 // max length of value allowed by Memcached + + , algorithm: 'crc32' // hashing algorithm that is used for key mapping + + , poolSize: 10 // maximal parallel connections + , reconnect: 18000000 // if dead, attempt reconnect each xx ms + , timeout: 5000 // after x ms the server should send a timeout if we can't connect + , retries: 5 // amount of retries before server is dead + , retry: 30000 // timeout between retries, all call will be marked as cache miss + , remove: false // remove server if dead if false, we will attempt to reconnect + , redundancy: false // allows you do re-distribute the keys over a x amount of servers + , keyCompression: true // compress keys if they are to large (md5) + , debug: false // Output the commands and responses }; // There some functions we don't want users to touch so we scope them @@ -105,7 +101,7 @@ Client.config = { // Creates or generates a new connection for the give server, the callback // will receive the connection if the operation was successful - memcached.connect = function connect (server, callback) { + memcached.connect = function connect(server, callback) { // server is dead, bail out if (server in this.issues && this.issues[server].failed) { return callback(false, false); @@ -113,7 +109,7 @@ Client.config = { // fetch from connection pool if (server in this.connections) { - return this.connections[server].allocate(callback); + return this.connections[server].pull(callback); } // No connection factory created yet, so we must build one @@ -132,7 +128,8 @@ Client.config = { * Generate a new connection pool manager. */ - manager = new Manager(server, this.poolSize, function factory (callback) { + manager = new Jackpot(this.poolSize); + manager.factory(function factory() { var S = Array.isArray(serverTokens) ? new Stream : new Socket @@ -152,17 +149,14 @@ Client.config = { // Add the event listeners Utils.fuse(S, { - connect: function streamConnect () { - callback(false, this); - } - , close: function streamClose () { + close: function streamClose() { Manager.remove(this); } - , error: function streamError (err) { + , error: function streamError(err) { memcached.connectionIssue(err, S, callback); } , data: curry(memcached, privates.buffer, S) - , timeout: function streamTimeout () { + , timeout: function streamTimeout() { Manager.remove(this); } , end: S.end @@ -177,12 +171,12 @@ Client.config = { // now that we have setup our connection factory we can allocate a new // connection - this.connections[server].allocate(callback); + this.connections[server].pull(callback); }; // Creates a multi stream, so it's easier to query agains multiple memcached // servers. - memcached.multi = function memcachedMulti (keys, callback) { + memcached.multi = function memcachedMulti(keys, callback) { var map = {} , memcached = this , servers @@ -191,7 +185,7 @@ Client.config = { // gets all servers based on the supplied keys, // or just gives all servers if we don't have keys if (keys) { - keys.forEach(function fetchMultipleServers (key) { + keys.forEach(function fetchMultipleServers(key) { var server = memcached.servers.length === 1 ? memcached.servers[0] : memcached.HashRing.getNode(key); @@ -218,7 +212,7 @@ Client.config = { // Executes the command on the net.Stream, if no server is supplied it will // use the query.key to get the server from the HashRing - memcached.command = function memcachedCommand (queryCompiler, server) { + memcached.command = function memcachedCommand(queryCompiler, server) { // generate a regular query, var query = queryCompiler() , redundancy = this.redundancy && this.redundancy < this.servers.length @@ -249,9 +243,9 @@ Client.config = { return query.callback && query.callback('Server not available'); } - this.connect(server, function allocateMemcachedConnection (error, S) { + this.connect(server, function allocateMemcachedConnection(error, S) { if (memcached.debug) { - query.command.split(LINEBREAK).forEach(function errors (line) { + query.command.split(LINEBREAK).forEach(function errors(line) { console.log(S.streamID + ' << ' + line); }); } @@ -279,10 +273,10 @@ Client.config = { if (redundancy && queryRedundancy) { queryRedundancy = queryCompiler(queryRedundancy); - redundancy.forEach(function each (server) { + redundancy.forEach(function each(server) { if (server in memcached.issues && memcached.issues[server].failed) return; - memcached.connect(server, function allocateMemcachedConnection (error, S) { + memcached.connect(server, function allocateMemcachedConnection(error, S) { if (!S || error || S.readyState !== 'open') return; S.write(queryRedundancy.command + LINEBREAK); }); @@ -292,7 +286,7 @@ Client.config = { // Logs all connection issues, and handles them off. Marking all requests as // cache misses. - memcached.connectionIssue = function connectionIssue (error, S, callback) { + memcached.connectionIssue = function connectionIssue(error, S, callback) { // end connection and mark callback as cache miss if (S && S.end) S.end(); if (callback) callback(false, false); @@ -316,19 +310,19 @@ Client.config = { // proxy the events Utils.fuse(issues, { - issue: function issue (details) { + issue: function issue(details) { memcached.emit('issue', details); } - , failure: function failure (details) { + , failure: function failure(details) { memcached.emit('failure', details); } - , reconnecting: function reconnect (details) { + , reconnecting: function reconnect(details) { memcached.emit('reconnecting', details); } - , reconnected: function reconnected (details) { + , reconnected: function reconnected(details) { memcached.emit('reconnect', details); } - , remove: function remove (details) { + , remove: function remove(details) { // emit event and remove servers memcached.emit('remove', details); memcached.connections[server].end(); @@ -347,10 +341,10 @@ Client.config = { }; // Kills all active connections - memcached.end = function endMemcached () { + memcached.end = function endMemcached() { var memcached = this; - Object.keys(this.connections).forEach(function closeConnection (key) { + Object.keys(this.connections).forEach(function closeConnection(key) { memcached.connections[key].free(0); }); }; @@ -359,45 +353,53 @@ Client.config = { // parts of the whole client, the parser commands: privates.parsers = { // handle error responses - 'NOT_FOUND': function notfound (tokens, dataSet, err) { + 'NOT_FOUND': function notfound(tokens, dataSet, err) { return [CONTINUE, false]; } - , 'NOT_STORED': function notstored (tokens, dataSet, err) { + + , 'NOT_STORED': function notstored(tokens, dataSet, err) { return [CONTINUE, false]; } - , 'ERROR': function error (tokens, dataSet, err) { + + , 'ERROR': function error(tokens, dataSet, err) { err.push('Received an ERROR response'); return [FLUSH, false]; } - , 'CLIENT_ERROR': function clienterror (tokens, dataSet, err) { + + , 'CLIENT_ERROR': function clienterror(tokens, dataSet, err) { err.push(tokens.splice(1).join(' ')); return [CONTINUE, false]; } - , 'SERVER_ERROR': function servererror (tokens, dataSet, err, queue, S, memcached) { + + , 'SERVER_ERROR': function servererror(tokens, dataSet, err, queue, S, memcached) { (memcached || this.memcached).connectionIssue(tokens.splice(1).join(' '), S); return [CONTINUE, false]; } // keyword based responses - , 'STORED': function stored (tokens, dataSet) { + , 'STORED': function stored(tokens, dataSet) { return [CONTINUE, true]; } - , 'DELETED': function deleted (tokens, dataSet) { + + , 'DELETED': function deleted(tokens, dataSet) { return [CONTINUE, true]; } - , 'OK': function ok (tokens, dataSet) { + + , 'OK': function ok(tokens, dataSet) { return [CONTINUE, true]; } - , 'EXISTS': function exists (tokens, dataSet) { + + , 'EXISTS': function exists(tokens, dataSet) { return [CONTINUE, false]; } - , 'END': function end (tokens, dataSet, err, queue) { + + , 'END': function end(tokens, dataSet, err, queue) { if (!queue.length) queue.push(false); return [FLUSH, true]; } // value parsing: - , 'VALUE': function value (tokens, dataSet, err, queue) { + , 'VALUE': function value(tokens, dataSet, err, queue) { var key = tokens[1] , flag = +tokens[2] , expire = tokens[3] @@ -430,14 +432,16 @@ Client.config = { return [BUFFER, false]; } - , 'INCRDECR': function incrdecr (tokens) { + , 'INCRDECR': function incrdecr(tokens) { return [CONTINUE, +tokens[1]]; } - , 'STAT': function stat (tokens, dataSet, err, queue) { + + , 'STAT': function stat(tokens, dataSet, err, queue) { queue.push([tokens[1], /^\d+$/.test(tokens[2]) ? +tokens[2] : tokens[2]]); return [BUFFER, true]; } - , 'VERSION': function version (tokens, dataSet) { + + , 'VERSION': function version(tokens, dataSet) { var versionTokens = /(\d+)(?:\.)(\d+)(?:\.)(\d+)$/.exec(tokens.pop()); return [CONTINUE, { @@ -448,7 +452,8 @@ Client.config = { , bugfix: versionTokens[3] || 0 }]; } - , 'ITEM': function item (tokens, dataSet, err, queue) { + + , 'ITEM': function item(tokens, dataSet, err, queue) { queue.push({ key: tokens[1] , b: +tokens[2].substr(1) @@ -459,13 +464,13 @@ Client.config = { } }; - function resultSetIsEmpty (resultSet) { + function resultSetIsEmpty(resultSet) { return !resultSet || (resultSet.length === 1 && !resultSet[0]); } // Parses down result sets privates.resultParsers = { // combines the stats array, in to an object - 'stats': function stats (resultSet) { + 'stats': function stats(resultSet) { var response = {}; if (resultSetIsEmpty(resultSet)) return response; @@ -473,7 +478,7 @@ Client.config = { response.server = this.serverAddress; // Fill the object - resultSet.forEach(function each (statSet) { + resultSet.forEach(function each(statSet) { if (statSet) response[statSet[0]] = statSet[1]; }); @@ -481,11 +486,12 @@ Client.config = { } // the settings uses the same parse format as the regular stats - , 'stats settings': function settings () { + , 'stats settings': function settings() { return privates.resultParsers.stats.apply(this, arguments); } + // Group slabs by slab id - , 'stats slabs': function slabs (resultSet) { + , 'stats slabs': function slabs(resultSet) { var response = {}; if (resultSetIsEmpty(resultSet)) return response; @@ -493,7 +499,7 @@ Client.config = { response.server = this.serverAddress; // Fill the object - resultSet.forEach(function each (statSet) { + resultSet.forEach(function each(statSet) { if (statSet) { var identifier = statSet[0].split(':'); @@ -504,7 +510,8 @@ Client.config = { return response; } - , 'stats items': function items (resultSet) { + + , 'stats items': function items(resultSet) { var response = {}; if (resultSetIsEmpty(resultSet)) return response; @@ -512,7 +519,7 @@ Client.config = { response.server = this.serverAddress; // Fill the object - resultSet.forEach(function each (statSet) { + resultSet.forEach(function each(statSet) { if (statSet) { var identifier = statSet[0].split(':'); @@ -538,7 +545,7 @@ Client.config = { // a line-break, so we need to make sure, the last piece in the buffer is // a LINEBREAK because that is all what is sure about the Memcached Protocol, // all responds end with them. - privates.buffer = function BufferBuffer (S, BufferStream) { + privates.buffer = function BufferBuffer(S, BufferStream) { S.responseBuffer += BufferStream; // only call transform the data once we are sure, 100% sure, that we valid @@ -547,7 +554,7 @@ Client.config = { var chunks = S.responseBuffer.split(LINEBREAK); if (this.debug) { - chunks.forEach(function each (line) { + chunks.forEach(function each(line) { console.log(S.streamID + ' >> ' + line); }); } @@ -561,7 +568,7 @@ Client.config = { // Memcached response identifiers. Once we have found one, we will send it to // the dedicated parsers that will transform the data in a human readable // format, deciding if we should queue it up, or send it to a callback fn. - memcached.rawDataReceived = function rawDataReceived (S) { + memcached.rawDataReceived = function rawDataReceived(S) { var queue = [] , token , tokenSet @@ -656,7 +663,7 @@ Client.config = { }; // Small wrapper function that only executes errors when we have a callback - privates.errorResponse = function errorResponse (error, callback) { + privates.errorResponse = function errorResponse(error, callback) { if (typeof callback === 'function') callback(error, false); return false; @@ -666,7 +673,7 @@ Client.config = { memcached.get = function get(key, callback) { if (Array.isArray(key)) return this.getMulti.apply(this, arguments); - this.command(function getCommand (noreply) { + this.command(function getCommand(noreply) { return { key: key , callback: callback @@ -699,23 +706,23 @@ Client.config = { , calls; // handle multiple responses and cache them untill we receive all. - function handle (err, results) { + function handle(err, results) { if (err) { errors.push(err); } // add all responses to the array - (Array.isArray(results) ? results : [results]).forEach(function each (value) { + (Array.isArray(results) ? results : [results]).forEach(function each(value) { Utils.merge(responses, value); }); if (!--calls) callback(errors.length ? errors : false, responses); } - this.multi(keys, function multi (server, key, index, totals) { + this.multi(keys, function multi(server, key, index, totals) { if (!calls) calls = totals; - memcached.command(function getMultiCommand (noreply) { + memcached.command(function getMultiCommand(noreply) { return { callback: handle , multi:true @@ -731,7 +738,7 @@ Client.config = { // commands will use the same syntax for the Memcached server. Some commands // do not require a lifetime and a flag, but the memcached server is smart // enough to ignore those. - privates.setters = function setters (type, validate, key, value, lifetime, callback, cas) { + privates.setters = function setters(type, validate, key, value, lifetime, callback, cas) { var flag = 0 , valuetype = typeof value , length; @@ -751,7 +758,7 @@ Client.config = { return privates.errorResponse('The length of the value is greater than ' + this.maxValue, callback); } - this.command(function settersCommand (noreply) { + this.command(function settersCommand(noreply) { return { key: key , callback: callback @@ -800,7 +807,7 @@ Client.config = { ] ); - memcached.cas = function checkandset (key, value, cas, lifetime, callback) { + memcached.cas = function checkandset(key, value, cas, lifetime, callback) { privates.setters.call(this , 'cas' , [ @@ -817,7 +824,7 @@ Client.config = { ); }; - memcached.append = function append (key, value, callback) { + memcached.append = function append(key, value, callback) { privates.setters.call(this , 'append' , [ @@ -833,7 +840,7 @@ Client.config = { ); }; - memcached.prepend = function prepend (key, value, callback) { + memcached.prepend = function prepend(key, value, callback) { privates.setters.call(this , 'prepend' , [ @@ -850,8 +857,8 @@ Client.config = { }; // Small handler for incr and decr's - privates.incrdecr = function incrdecr (type, key, value, callback) { - this.command(function incredecrCommand (noreply) { + privates.incrdecr = function incrdecr(type, key, value, callback) { + this.command(function incredecrCommand(noreply) { return { key: key , callback: callback @@ -874,8 +881,8 @@ Client.config = { memcached.decrement = memcached.decr = curry(undefined, privates.incrdecr, 'decr'); // Deletes the keys from the servers - memcached.del = function del (key, callback){ - this.command(function deleteCommand (noreply) { + memcached.del = function del(key, callback){ + this.command(function deleteCommand(noreply) { return { key: key , callback: callback @@ -893,14 +900,14 @@ Client.config = { memcached['delete'] = memcached.del; // Small wrapper that handle single keyword commands such as FLUSH ALL, VERSION and STAT - privates.singles = function singles (type, callback) { + privates.singles = function singles(type, callback) { var memcached = this , responses = [] , errors , calls; // handle multiple servers - function handle (err, results) { + function handle(err, results) { if (err) { errors = errors || []; errors.push(err); @@ -911,10 +918,10 @@ Client.config = { if (!--calls) callback(errors && errors.length ? errors.pop() : undefined, responses); } - this.multi(false, function multi (server, keys, index, totals) { + this.multi(false, function multi(server, keys, index, totals) { if (!calls) calls = totals; - memcached.command(function singlesCommand (noreply) { + memcached.command(function singlesCommand(noreply) { return { callback: handle , type: type @@ -941,8 +948,8 @@ Client.config = { // You need to use the items dump to get the correct server and slab settings // see simple_cachedump.js for an example - memcached.cachedump = function cachedump (server, slabid, number, callback) { - this.command(function cachedumpCommand (noreply) { + memcached.cachedump = function cachedump(server, slabid, number, callback) { + this.command(function cachedumpCommand(noreply) { return { callback: callback , number: number diff --git a/package.json b/package.json index 78552e6..3ef7933 100644 --- a/package.json +++ b/package.json @@ -34,7 +34,8 @@ , "url" : "http://github.com/3rd-Eden/node-memcached.git" } , "dependencies": { - "hashring": "*" + "hashring": "0.0.x" + , "jackpot": "0.0.x" } , "devDependencies": { "mocha": "*"