Skip to content
Browse files

The start of redis_util.

  • Loading branch information...
0 parents commit 1cc6dff30191f79fabaa424c535247f7b22ed100 @godsflaw committed
Showing with 325 additions and 0 deletions.
  1. +13 −0 LICENSE
  2. +37 −0 README.md
  3. +12 −0 examples/queue_client.js
  4. +7 −0 examples/queue_worker.js
  5. +170 −0 index.js
  6. +19 −0 package.json
  7. +16 −0 test/hash.js
  8. +51 −0 test/queue.js
13 LICENSE
@@ -0,0 +1,13 @@
+Copyright [2012] [craigslist Inc.]
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
37 README.md
@@ -0,0 +1,37 @@
+Redis Utils
+===========
+
+A collection of utilities for node-redis, such as a hashing frontend and
+queue interface.
+
+Installation
+============
+
+To install with [npm](http://github.com/isaacs/npm):
+
+ npm install redis_util
+
+Examples
+========
+
+Insert a key/value pair into the appropriate server after hasing a key.
+
+var key = 'test';
+var redis_hash = new RedisHash([{host: '10.0.0.1'}, {host: '10.0.0.2'}]);
+redis_hash.getServers(key)[0].set(key, 'some data for key');
+
+Insert data into a queue and wait for a response:
+
+var cluster = [{host: '10.0.0.1'}, {host: '10.0.0.2'}];
+var queue = new RedisQueue('test', cluster);
+queue.on('ready', function () {
+ queue.sync('payload', null, function (result) {
+ console.log(result);
+ queue.quit();
+ });
+});
+
+Run a worker and return uppercase version of payload.
+
+var worker = new RedisQueue('test', cluster);
+worker.work(function (key, data, next) { next(data.toUpperCase()); });
12 examples/queue_client.js
@@ -0,0 +1,12 @@
+#!/usr/bin/env node
+
+var redis_util = require('redis_util');
+
+var queue = new redis_util.RedisQueue('test',
+ [{port: 6379}, {port: 6380}, {port: 6381}]);
+queue.on('ready', function () {
+ queue.sync('payload', null, function (result) {
+ console.log(result);
+ queue.quit();
+ });
+});
7 examples/queue_worker.js
@@ -0,0 +1,7 @@
+#!/usr/bin/env node
+
+var redis_util = require('redis_util');
+
+var worker = new redis_util.RedisQueue('test',
+ [{port: 6379}, {port: 6380}, {port: 6381}]);
+worker.work(function (key, data, next) { next(data.toUpperCase()); });
170 index.js
@@ -0,0 +1,170 @@
+"use strict;"
+
+var DEFAULT_MAX_SERVERS = 3;
+
+var crypto = require('crypto');
+var events = require('events');
+var redis = require('redis');
+var util = require('util');
+
+function RedisHash(servers, max_servers) {
+ this.max_servers = max_servers | DEFAULT_MAX_SERVERS;
+ if (this.max_servers > servers.length)
+ this.max_servers = servers.length;
+ this.servers = []
+ this.ready = false;
+ var self = this;
+ for (var x = 0; x < servers.length; x++) {
+ var server = redis.createClient(servers[x].port, servers[x].host,
+ servers[x].options);
+ server.on('error', function(message) { self.emit('error', message); });
+ server.on('ready', function () {
+ if (self.ready == false) {
+ self.ready = true;
+ self.emit('ready');
+ }});
+ this.servers.push(server);
+ }
+ events.EventEmitter.call(this);
+}
+
+util.inherits(RedisHash, events.EventEmitter);
+
+RedisHash.prototype.getServers = function (key) {
+ var hash = crypto.createHash('md5').update('' + key).digest('hex');
+ hash = parseInt(hash.substring(0, 8), 16);
+ var servers = [];
+ var remaining = [];
+ for (var x = 0; x < this.servers.length; x++)
+ remaining.push(x);
+ for (var x = 0; x < this.max_servers; x++) {
+ var server = remaining.splice(hash % remaining.length, 1)[0];
+ server = this.servers[server];
+ if (server.ready)
+ servers.push(server);
+ }
+ return servers;
+}
+
+RedisHash.prototype.all = function (command, args, callback) {
+ for (var x = 0; x < this.servers.length; x++)
+ this.servers[x][command].apply(this.servers[x], args);
+}
+
+function RedisQueue(queue, servers, max_servers) {
+ this.hash = new RedisHash(servers, max_servers);
+ var self = this;
+ this.hash.on('error', function (message) { self.emit('error', message); });
+ this.hash.on('ready', function () { self.emit('ready'); });
+ this.queue = queue;
+ this.data = queue + '-data';
+ this.broken = queue + '-broken';
+ events.EventEmitter.call(this);
+}
+
+util.inherits(RedisQueue, events.EventEmitter);
+
+RedisQueue.prototype.quit = function () {
+ this.hash.all('quit');
+}
+
+RedisQueue.prototype.end = function () {
+ this.hash.all('end');
+}
+
+RedisQueue.prototype.async = function (value, key, callback) {
+ if (key == null || key == undefined)
+ key = Math.random() * 4294967296;
+ this._async(value, key, this.hash.getServers(key), callback);
+}
+
+RedisQueue.prototype._async = function (value, key, servers, callback) {
+ if (servers.length == 0) {
+ this.emit('error', new Error('No servers available for key'));
+ return;
+ }
+ server = servers.splice(0, 1)[0];
+ var self = this;
+ server.hexists(this.data, key, function (err, reply) {
+ if (err)
+ self._async(value, key, servers, callback);
+ else {
+ server.hset(self.data, key, value, function (err, reply) {
+ if (err)
+ self._async(value, key, servers, callback);
+ });
+ if (!reply) {
+ server.lpush(self.queue, key, function (err, reply) {
+ if (err)
+ self._async(value, key, servers, callback);
+ else if (callback)
+ callback();
+ });
+ }
+ }
+ });
+}
+
+RedisQueue.prototype.sync = function (value, key, callback) {
+ if (key == null || key == undefined)
+ key = Math.random() * 4294967296;
+ this._sync(value, key, this.hash.getServers(key), callback);
+}
+
+RedisQueue.prototype._sync = function (value, key, servers, callback) {
+ if (servers.length == 0) {
+ this.emit('error', new Error('No servers available for key'));
+ return;
+ }
+ var server = servers.splice(0, 1)[0];
+ var self = this;
+ server.hexists(this.data, key, function (err, reply) {
+ if (err)
+ self._sync(value, key, servers, callback);
+ else {
+ var sub = redis.createClient(server.port, server.host,
+ server.options);
+ sub.subscribe(key, function () {
+ sub.on('message', function (channel, message) {
+ callback(message);
+ sub.end();
+ });
+ server.hset(self.data, key, value, function (err, reply) {
+ if (err)
+ self._sync(value, key, servers, callback);
+ });
+ if (!reply) {
+ server.lpush(self.queue, key, function (err, reply) {
+ if (err) {
+ sub.end();
+ self._sync(value, key, servers, callback);
+ }
+ });
+ }
+ });
+ }
+ });
+}
+
+RedisQueue.prototype.work = function (callback) {
+ for (var x = 0; x < this.hash.servers.length; x++)
+ this._work(this.hash.servers[x], callback);
+}
+
+RedisQueue.prototype._work = function (server, callback) {
+ var self = this;
+ server.brpop(self.queue, 0, function (err, key) {
+ key = key[1];
+ server.hget(self.data, key, function (err, data) {
+ server.hdel(self.data, key);
+ callback(key, data, function (result) {
+ if (result != undefined)
+ server.publish(key, result);
+ self._work(server, callback);
+ });
+ });
+ });
+}
+
+exports.RedisHash = RedisHash;
+exports.RedisQueue = RedisQueue;
19 package.json
@@ -0,0 +1,19 @@
+{
+ "name" : "redis_util",
+ "version" : "0.0.1",
+ "description" : "A collection of utilities for node-redis.",
+ "main" : "./index.js",
+ "keywords": [
+ "redis",
+ "hash",
+ "queue"
+ ],
+ "repository": {
+ "type": "git",
+ "url": "http://github.com/godsflaw/redis_util.git"
+ },
+ "licenses": [ {
+ "type": "Apache2",
+ "url": "https://github.com/godsflaw/diablow/blob/master/LICENSE"
+ } ]
+}
16 test/hash.js
@@ -0,0 +1,16 @@
+var redis_util = require('../');
+var cluster = [{port: 6379}, {port: 6379}, {port: 6379}, {port: 6379}];
+
+exports.basic = function (test) {
+ test.expect(1);
+ var redis_hash = new redis_util.RedisHash(cluster);
+ redis_hash.on('error', function (message) {
+ redis_hash.all('end');
+ test.done();
+ });
+ setTimeout(function () {
+ test.equal(3, redis_hash.getServers('test').length);
+ redis_hash.all('quit');
+ test.done();
+ }, 100);
+};
51 test/queue.js
@@ -0,0 +1,51 @@
+var redis_util = require('../');
+var cluster = [{port: 6379}];
+
+exports.sync_queue = function (test) {
+ test.expect(3);
+ var client = new redis_util.RedisQueue('sync_queue', cluster);
+ var worker = new redis_util.RedisQueue('sync_queue', cluster);
+ var on_error = function (message) {
+ client.end();
+ worker.end();
+ test.done();
+ }
+ client.on('error', on_error);
+ worker.on('error', on_error);
+ worker.work(function (key, data, next) {
+ test.equal(key, '12345');
+ test.equal(data, 'payload');
+ next('Result: ' + key + data);
+ });
+ client.on('ready', function () {
+ client.sync('payload', 12345, function (result) {
+ test.equal(result, 'Result: 12345payload');
+ client.quit();
+ worker.end();
+ test.done();
+ });
+ });
+};
+
+exports.async_queue = function (test) {
+ test.expect(2);
+ var client = new redis_util.RedisQueue('async_queue', cluster);
+ var worker = new redis_util.RedisQueue('async_queue', cluster);
+ var on_error = function (message) {
+ client.end();
+ worker.end();
+ test.done();
+ }
+ client.on('error', on_error);
+ worker.on('error', on_error);
+ worker.work(function (key, data, next) {
+ test.equal(key, '12345');
+ test.equal(data, 'payload');
+ client.quit();
+ worker.end();
+ test.done();
+ });
+ client.on('ready', function () {
+ client.async('payload', 12345);
+ });
+};

0 comments on commit 1cc6dff

Please sign in to comment.
Something went wrong with that request. Please try again.