Skip to content
Browse files

first commit

  • Loading branch information...
0 parents commit 1ced9f9f6772d10858ff4f568a866d26ad00e8ab Igor Urminček committed
Showing with 1,687 additions and 0 deletions.
  1. +386 −0 async.js
  2. +917 −0 redis-client.js
  3. +384 −0 redis2json.js
386 async.js
@@ -0,0 +1,386 @@
+(function(exports){
+
+ //// cross-browser compatiblity functions ////
+
+ var _forEach = function(arr, iterator){
+ if(arr.forEach) return arr.forEach(iterator);
+ for(var i=0; i<arr.length; i++){
+ iterator(arr[i], i, arr);
+ }
+ };
+
+ var _map = function(arr, iterator){
+ if(arr.map) return arr.map(iterator);
+ var results = [];
+ _forEach(arr, function(x, i, a){
+ results.push(iterator(x, i, a));
+ })
+ return results;
+ };
+
+ var _reduce = function(arr, iterator, memo){
+ if(arr.reduce) return arr.reduce(iterator, memo);
+ _forEach(arr, function(x, i, a){
+ memo = iterator(memo, x, i, a);
+ });
+ return memo;
+ };
+
+ var _keys = function(obj){
+ if(Object.keys) return Object.keys(obj);
+ var keys = [];
+ for(var k in obj){
+ if(obj.hasOwnProperty(k)) keys.push(k);
+ }
+ return keys;
+ };
+
+ var _indexOf = function(arr, item){
+ if(arr.indexOf) return arr.indexOf(item);
+ for(var i=0; i<arr.length; i++){
+ if(arr[i] === item) return i;
+ }
+ return -1;
+ };
+
+ //// nextTick implementation with browser-compatible fallback ////
+
+ var _nextTick;
+ if(typeof process === 'undefined' || !process.nextTick){
+ _nextTick = function(fn){
+ setTimeout(fn, 0);
+ };
+ }
+ else _nextTick = process.nextTick;
+
+
+ //// exported async module functions ////
+
+ exports.forEach = function(arr, iterator, callback){
+ if(!arr.length) return callback();
+ var completed = 0;
+ _forEach(arr, function(x){
+ iterator(x, function(err){
+ if(err){
+ callback(err);
+ callback = function(){};
+ }
+ else {
+ completed++;
+ if(completed == arr.length) callback();
+ }
+ });
+ });
+ };
+
+ exports.forEachSeries = function(arr, iterator, callback){
+ if(!arr.length) return callback();
+ var completed = 0;
+ var iterate = function(){
+ iterator(arr[completed], function(err){
+ if(err){
+ callback(err);
+ callback = function(){};
+ }
+ else {
+ completed++;
+ if(completed == arr.length) callback();
+ else iterate();
+ }
+ });
+ };
+ iterate();
+ };
+
+
+ var doParallel = function(fn){
+ return function(){
+ var args = Array.prototype.slice.call(arguments);
+ return fn.apply(null, [exports.forEach].concat(args));
+ };
+ };
+ var doSeries = function(fn){
+ return function(){
+ var args = Array.prototype.slice.call(arguments);
+ return fn.apply(null, [exports.forEachSeries].concat(args));
+ };
+ };
+
+
+ var _asyncMap = function(eachfn, arr, iterator, callback){
+ var results = [];
+ arr = _map(arr, function(x, i){
+ return {index: i, value: x};
+ });
+ eachfn(arr, function(x, callback){
+ iterator(x.value, function(err, v){
+ results[x.index] = v;
+ callback(err);
+ });
+ }, function(err){
+ callback(err, results);
+ });
+ };
+ exports.map = doParallel(_asyncMap);
+ exports.mapSeries = doSeries(_asyncMap);
+
+
+ // reduce only has a series version, as doing reduce in parallel won't
+ // work in many situations.
+ exports.reduce = function(arr, memo, iterator, callback){
+ exports.forEachSeries(arr, function(x, callback){
+ iterator(memo, x, function(err, v){
+ memo = v;
+ callback(err);
+ });
+ }, function(err){
+ callback(err, memo);
+ });
+ };
+ // inject alias
+ exports.inject = exports.reduce;
+ // foldl alias
+ exports.foldl = exports.reduce;
+
+ exports.reduceRight = function(arr, memo, iterator, callback){
+ var reversed = _map(arr, function(x){return x;}).reverse();
+ exports.reduce(reversed, memo, iterator, callback);
+ };
+ // foldr alias
+ exports.foldr = exports.reduceRight;
+
+ var _filter = function(eachfn, arr, iterator, callback){
+ var results = [];
+ arr = _map(arr, function(x, i){
+ return {index: i, value: x};
+ });
+ eachfn(arr, function(x, callback){
+ iterator(x.value, function(v){
+ if(v) results.push(x);
+ callback();
+ });
+ }, function(err){
+ callback(_map(results.sort(function(a,b){
+ return a.index - b.index;
+ }), function(x){
+ return x.value;
+ }));
+ });
+ };
+ exports.filter = doParallel(_filter);
+ exports.filterSeries = doSeries(_filter);
+ // select alias
+ exports.select = exports.filter;
+ exports.selectSeries = exports.filterSeries;
+
+ var _reject = function(eachfn, arr, iterator, callback){
+ var results = [];
+ arr = _map(arr, function(x, i){
+ return {index: i, value: x};
+ });
+ eachfn(arr, function(x, callback){
+ iterator(x.value, function(v){
+ if(!v) results.push(x);
+ callback();
+ });
+ }, function(err){
+ callback(_map(results.sort(function(a,b){
+ return a.index - b.index;
+ }), function(x){
+ return x.value;
+ }));
+ });
+ };
+ exports.reject = doParallel(_reject);
+ exports.rejectSeries = doSeries(_reject);
+
+ var _detect = function(eachfn, arr, iterator, main_callback){
+ eachfn(arr, function(x, callback){
+ iterator(x, function(result){
+ if(result) main_callback(x);
+ else callback();
+ });
+ }, function(err){
+ main_callback();
+ });
+ };
+ exports.detect = doParallel(_detect);
+ exports.detectSeries = doSeries(_detect);
+
+ exports.some = function(arr, iterator, main_callback){
+ exports.forEach(arr, function(x, callback){
+ iterator(x, function(v){
+ if(v){
+ main_callback(true);
+ main_callback = function(){};
+ }
+ callback();
+ });
+ }, function(err){
+ main_callback(false);
+ });
+ };
+ // any alias
+ exports.any = exports.some;
+
+ exports.every = function(arr, iterator, main_callback){
+ exports.forEach(arr, function(x, callback){
+ iterator(x, function(v){
+ if(!v){
+ main_callback(false);
+ main_callback = function(){};
+ }
+ callback();
+ });
+ }, function(err){
+ main_callback(true);
+ });
+ };
+ // all alias
+ exports.all = exports.every;
+
+ exports.sortBy = function(arr, iterator, callback){
+ exports.map(arr, function(x, callback){
+ iterator(x, function(err, criteria){
+ if(err) callback(err);
+ else callback(null, {value: x, criteria: criteria});
+ });
+ }, function(err, results){
+ if(err) return callback(err);
+ else callback(null, _map(results.sort(function(left, right){
+ var a = left.criteria, b = right.criteria;
+ return a < b ? -1 : a > b ? 1 : 0;
+ }), function(x){return x.value;}));
+ })
+ };
+
+ exports.auto = function(tasks, callback){
+ callback = callback || function(){};
+ var keys = _keys(tasks);
+ if(!keys.length) return callback(null);
+
+ var completed = [];
+
+ var listeners = [];
+ var addListener = function(fn){
+ listeners.unshift(fn);
+ };
+ var removeListener = function(fn){
+ for(var i=0; i<listeners.length; i++){
+ if(listeners[i] === fn){
+ listeners.splice(i, 1);
+ return;
+ }
+ }
+ };
+ var taskComplete = function(){
+ _forEach(listeners, function(fn){fn();});
+ };
+
+ addListener(function(){
+ if(completed.length == keys.length){
+ callback(null);
+ }
+ });
+
+ _forEach(keys, function(k){
+ var task = (tasks[k] instanceof Function)? [tasks[k]]: tasks[k];
+ var taskCallback = function(err){
+ if(err){
+ callback(err);
+ // stop subsequent errors hitting callback multiple times
+ callback = function(){};
+ }
+ else {
+ completed.push(k);
+ taskComplete();
+ }
+ };
+ var requires = task.slice(0, Math.abs(task.length-1)) || [];
+ var ready = function(){
+ return _reduce(requires, function(a,x){
+ return (a && _indexOf(completed, x) != -1);
+ }, true);
+ };
+ if(ready()) task[task.length-1](taskCallback);
+ else {
+ var listener = function(){
+ if(ready()){
+ removeListener(listener);
+ task[task.length-1](taskCallback);
+ }
+ };
+ addListener(listener);
+ }
+ });
+ };
+
+ exports.waterfall = function(tasks, callback){
+ if(!tasks.length) return callback();
+ callback = callback || function(){};
+ var wrapIterator = function(iterator){
+ return function(err){
+ if(err){
+ callback(err);
+ callback = function(){};
+ }
+ else {
+ var args = Array.prototype.slice.call(arguments, 1);
+ var next = iterator.next();
+ if(next) args.push(wrapIterator(next));
+ else args.push(callback);
+ _nextTick(function(){iterator.apply(null, args);});
+ }
+ };
+ };
+ wrapIterator(exports.iterator(tasks))();
+ };
+
+ exports.parallel = function(tasks, callback){
+ callback = callback || function(){};
+ exports.map(tasks, function(fn, callback){
+ if(fn){
+ fn(function(err){
+ var args = Array.prototype.slice.call(arguments,1);
+ if(args.length <= 1) args = args[0];
+ callback.call(null, err, args || null);
+ });
+ }
+ }, callback);
+ };
+
+ exports.series = function(tasks, callback){
+ callback = callback || function(){};
+ exports.mapSeries(tasks, function(fn, callback){
+ if(fn){
+ fn(function(err){
+ var args = Array.prototype.slice.call(arguments,1);
+ if(args.length <= 1) args = args[0];
+ callback.call(null, err, args || null);
+ });
+ }
+ }, callback);
+ };
+
+ exports.iterator = function(tasks){
+ var makeCallback = function(index){
+ var fn = function(){
+ if(tasks.length) tasks[index].apply(null, arguments);
+ return fn.next();
+ };
+ fn.next = function(){
+ return (index < tasks.length-1)? makeCallback(index+1): null;
+ };
+ return fn;
+ };
+ return makeCallback(0);
+ };
+
+ exports.apply = function(fn){
+ var args = Array.prototype.slice.call(arguments, 1);
+ return function(){
+ fn.apply(null, args.concat(Array.prototype.slice.call(arguments)));
+ };
+ };
+
+})((typeof exports == 'undefined') ? this['async']={}: exports);
917 redis-client.js
@@ -0,0 +1,917 @@
+/*
+
+© 2010 by Fictorial LLC
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+
+*/
+
+// To add support for new commands, edit the array called "commands" at the
+// bottom of this file.
+
+// Set this to true to aid in debugging wire protocol input/output,
+// parsing methods, etc.
+
+exports.debugMode = false;
+
+var net = require("net"),
+ sys = require("sys"),
+ Buffer = require('buffer').Buffer,
+ events = require('events'),
+
+ CRLF = "\r\n",
+ CRLF_LEN = 2,
+
+ PLUS = exports.PLUS = 0x2B, // +
+ MINUS = exports.MINUS = 0x2D, // -
+ DOLLAR = exports.DOLLAR = 0x24, // $
+ STAR = exports.STAR = 0x2A, // *
+ COLON = exports.COLON = 0x3A, // :
+ CR = exports.CR = 0x0D, // \r
+ LF = exports.LF = 0x0A, // \n
+
+ NONE = exports.NONE = "NONE",
+ BULK = exports.BULK = "BULK",
+ MULTIBULK = exports.MULTIBULK = "MULTIBULK",
+ INLINE = exports.INLINE = "INLINE",
+ INTEGER = exports.INTEGER = "INTEGER",
+ ERROR = exports.ERROR = "ERROR";
+
+exports.DEFAULT_HOST = '127.0.0.1';
+exports.DEFAULT_PORT = 6379;
+
+exports.COMMAND_ORPHANED_ERROR = "connection lost before reply received";
+exports.NO_CONNECTION_ERROR = "failed to establish a connection to Redis";
+
+function debugFilter(buffer, len) {
+ // Redis is binary-safe but assume for debug display that
+ // the encoding of textual data is UTF-8.
+
+ var filtered = buffer.utf8Slice(0, len || buffer.length);
+
+ filtered = filtered.replace(/\r\n/g, '<CRLF>');
+ filtered = filtered.replace(/\r/g, '<CR>');
+ filtered = filtered.replace(/\n/g, '<LF>');
+
+ return filtered;
+}
+
+// A fully interruptable, binary-safe Redis reply parser.
+// 'callback' is called with each reply parsed in 'feed'.
+// 'thisArg' is the "thisArg" for the callback "call".
+
+function ReplyParser(callback, thisArg) {
+ this.onReply = callback;
+ this.thisArg = thisArg;
+ this.clearState();
+ this.clearMultiBulkState();
+}
+
+exports.ReplyParser = ReplyParser;
+
+ReplyParser.prototype.clearState = function () {
+ this.type = NONE;
+ this.bulkLengthExpected = null;
+ this.valueBufferLen = 0;
+ this.skip = 0;
+ this.valueBuffer = new Buffer(4096);
+};
+
+ReplyParser.prototype.clearMultiBulkState = function () {
+ this.multibulkReplies = null;
+ this.multibulkRepliesExpected = null;
+};
+
+ReplyParser.prototype.feed = function (inbound) {
+ for (var i=0; i < inbound.length; ++i) {
+ if (this.skip > 0) {
+ this.skip--;
+ continue;
+ }
+
+ var typeBefore = this.type;
+
+ if (this.type === NONE) {
+ switch (inbound[i]) {
+ case DOLLAR: this.type = BULK; break;
+ case STAR: this.type = MULTIBULK; break;
+ case COLON: this.type = INTEGER; break;
+ case PLUS: this.type = INLINE; break;
+ case MINUS: this.type = ERROR; break;
+ }
+ }
+
+ // Just a state transition on '*', '+', etc.?
+
+ if (typeBefore != this.type)
+ continue;
+
+ // If the reply is a part of a multi-bulk reply. Save it. If we have
+ // received all the expected replies of a multi-bulk reply, then
+ // callback. If the reply is not part of a multi-bulk. Call back
+ // immediately.
+
+ var self = this;
+
+ var maybeCallbackWithReply = function (reply) {
+ if (self.multibulkReplies != null) {
+ self.multibulkReplies.push(reply);
+ if (--self.multibulkRepliesExpected == 0) {
+ self.onReply.call(self.thisArg, {
+ type: MULTIBULK,
+ value: self.multibulkReplies
+ });
+ self.clearMultiBulkState();
+ }
+ } else {
+ self.onReply.call(self.thisArg, reply);
+ }
+ self.clearState();
+ self.skip = 1; // Skip LF
+ };
+
+ switch (inbound[i]) {
+ case CR:
+ switch (this.type) {
+ case INLINE:
+ case ERROR:
+ // CR denotes end of the inline/error value.
+ // +OK\r\n
+ // ^
+
+ var inlineBuf = new Buffer(this.valueBufferLen);
+ this.valueBuffer.copy(inlineBuf, 0, 0, this.valueBufferLen);
+ maybeCallbackWithReply({ type:this.type, value:inlineBuf });
+ break;
+
+ case INTEGER:
+ // CR denotes the end of the integer value.
+ // :42\r\n
+ // ^
+
+ var n = parseInt(this.valueBuffer.asciiSlice(0, this.valueBufferLen), 10);
+ maybeCallbackWithReply({ type:INTEGER, value:n });
+ break;
+
+ case BULK:
+ if (this.bulkLengthExpected == null) {
+ // CR denotes end of first line of a bulk reply,
+ // which is the length of the bulk reply value.
+ // $5\r\nhello\r\n
+ // ^
+
+ var bulkLengthExpected =
+ parseInt(this.valueBuffer.asciiSlice(0, this.valueBufferLen), 10);
+
+ if (bulkLengthExpected <= 0) {
+ maybeCallbackWithReply({ type:BULK, value:null });
+ } else {
+ this.clearState();
+
+ this.bulkLengthExpected = bulkLengthExpected;
+ this.type = BULK;
+ this.skip = 1; // skip LF
+ }
+ } else if (this.valueBufferLen == this.bulkLengthExpected) {
+ // CR denotes end of the bulk reply value.
+ // $5\r\nhello\r\n
+ // ^
+
+ var bulkBuf = new Buffer(this.valueBufferLen);
+ this.valueBuffer.copy(bulkBuf, 0, 0, this.valueBufferLen);
+ maybeCallbackWithReply({ type:BULK, value:bulkBuf });
+ } else {
+ // CR is just an embedded CR and has nothing to do
+ // with the reply specification.
+ // $11\r\nhello\rworld\r\n
+ // ^
+
+ this.valueBuffer[this.valueBufferLen++] = inbound[i];
+ }
+ break;
+
+ case MULTIBULK:
+ // Parse the count which is the number of expected replies
+ // in the multi-bulk reply.
+ // *2\r\n$5\r\nhello\r\n$5\r\nworld\r\n
+ // ^
+
+ var multibulkRepliesExpected =
+ parseInt(this.valueBuffer.asciiSlice(0, this.valueBufferLen), 10);
+
+ if (multibulkRepliesExpected <= 0) {
+ maybeCallbackWithReply({ type:MULTIBULK, value:null });
+ } else {
+ this.clearState();
+ this.skip = 1; // skip LF
+ this.multibulkReplies = [];
+ this.multibulkRepliesExpected = multibulkRepliesExpected;
+ }
+ break;
+ }
+ break;
+
+ default:
+ this.valueBuffer[this.valueBufferLen++] = inbound[i];
+ break;
+ }
+
+ // If the current value buffer is too big, create a new buffer, copy in
+ // the old buffer, and replace the old buffer with the new buffer.
+
+ if (this.valueBufferLen === this.valueBuffer.length) {
+ var newBuffer = new Buffer(this.valueBuffer.length * 2);
+ this.valueBuffer.copy(newBuffer, 0, 0);
+ this.valueBuffer = newBuffer;
+ }
+ }
+};
+
+/**
+ * Emits:
+ *
+ * - 'connected' when connected (or on a reconnection, reconnected).
+ * - 'reconnecting' when about to retry to connect to Redis.
+ * - 'reconnected' when connected after a reconnection was established.
+ * - 'noconnection' when a connection (or reconnection) cannot be established.
+ * - 'drained' when no submitted commands are expecting a reply from Redis.
+ *
+ * Options:
+ *
+ * - maxReconnectionAttempts (default: 10)
+ */
+
+function Client(stream, options) {
+ events.EventEmitter.call(this);
+
+ this.stream = stream;
+ this.originalCommands = [];
+ this.queuedOriginalCommands = [];
+ this.queuedRequestBuffers = [];
+ this.channelCallbacks = {};
+ this.requestBuffer = new Buffer(512);
+ this.replyParser = new ReplyParser(this.onReply_, this);
+ this.reconnectionTimer = null;
+ this.maxReconnectionAttempts = 10;
+ this.reconnectionAttempts = 0;
+ this.reconnectionDelay = 500; // doubles, so starts at 1s delay
+ this.connectionsMade = 0;
+
+ if (options !== undefined)
+ this.maxReconnectionAttempts = Math.abs(options.maxReconnectionAttempts || 10);
+
+ var client = this;
+
+ stream.addListener("connect", function () {
+ if (exports.debugMode)
+ sys.debug("[CONNECT]");
+
+ stream.setNoDelay();
+ stream.setTimeout(0);
+
+ client.reconnectionAttempts = 0;
+ client.reconnectionDelay = 500;
+ if (client.reconnectionTimer) {
+ clearTimeout(client.reconnectionTimer);
+ client.reconnectionTimer = null;
+ }
+
+ var eventName = client.connectionsMade == 0
+ ? 'connected'
+ : 'reconnected';
+
+ client.connectionsMade++;
+ client.expectingClose = false;
+
+ // If this a reconnection and there were commands submitted, then they
+ // are gone! We cannot say with any confidence which were processed by
+ // Redis; perhaps some were processed but we never got the reply, or
+ // perhaps all were processed but Redis is configured with less than
+ // 100% durable writes, etc.
+ //
+ // We punt to the user by calling their callback with an I/O error.
+ // However, we provide enough information to allow the user to retry
+ // the interrupted operation. We are certainly not retrying anything
+ // for them as it is too dangerous and application-specific.
+
+ if (client.connectionsMade > 1 && client.originalCommands.length > 0) {
+ if (exports.debug) {
+ sys.debug("[RECONNECTION] some commands orphaned (" +
+ client.originalCommands.length + "). notifying...");
+ }
+
+ client.callbackOrphanedCommandsWithError();
+ }
+
+ client.originalCommands = [];
+ client.flushQueuedCommands();
+
+ client.emit(eventName, client);
+ });
+
+ stream.addListener('error', function (e) {
+ if (exports.debugMode)
+ sys.debug("[ERROR] Connection to redis encountered an error: " + e);
+ });
+
+ stream.addListener("data", function (buffer) {
+ if (exports.debugMode)
+ sys.debug("[RECV] " + debugFilter(buffer));
+
+ client.replyParser.feed(buffer);
+ });
+
+ stream.addListener("error", function (e) {
+ if (exports.debugMode)
+ sys.debug('[ERROR] ' + e);
+ client.replyParser.clearState();
+ client.maybeReconnect();
+ throw e;
+ });
+
+ stream.addListener("end", function () {
+ if (exports.debugMode && client.originalCommands.length > 0) {
+ sys.debug("Connection to redis closed with " +
+ client.originalCommands.length +
+ " commands pending replies that will never arrive!");
+ }
+
+ stream.end();
+ });
+
+ stream.addListener("close", function (hadError) {
+ if (exports.debugMode)
+ sys.debug("[NO CONNECTION]");
+
+ client.maybeReconnect();
+ });
+}
+
+sys.inherits(Client, events.EventEmitter);
+
+exports.Client = Client;
+
+exports.createClient = function (port, host, options) {
+ var port = port || exports.DEFAULT_PORT;
+ var host = host || exports.DEFAULT_HOST;
+
+ var client = new Client(net.createConnection(port, host), options);
+
+ client.port = port;
+ client.host = host;
+
+ return client;
+};
+
+Client.prototype.close = function () {
+ this.expectingClose = true;
+ this.stream.end();
+};
+
+Client.prototype.onReply_ = function (reply) {
+ this.flushQueuedCommands();
+
+ if (this.handlePublishedMessage_(reply))
+ return;
+
+ var originalCommand = this.originalCommands.shift();
+ var callback = originalCommand[originalCommand.length - 1];
+
+ // Callbacks expect (err, reply) as args.
+
+ if (typeof callback == "function") {
+ if (reply.type == ERROR) {
+ callback(reply.value.utf8Slice(0, reply.value.length), null);
+ } else {
+ callback(null, maybeConvertReplyValue(originalCommand[0], reply));
+ }
+ }
+
+ if (this.originalCommands.length == 0)
+ this.emit('drained', this);
+};
+
+Client.prototype.handlePublishedMessage_ = function (reply) {
+ // We're looking for a multibulk resembling
+ // ["message", "channelName", messageBuffer]; or
+ // ["pmessage", "matchingPattern", "channelName", messageBuffer]
+ // The latter is sent when the client subscribed to a channel by a pattern;
+ // the former when subscribed to a channel by name.
+ // If the client subscribes by name -and- by pattern and there's some
+ // overlap, the client -will- receive multiple p/message notifications.
+
+ if (reply.type != MULTIBULK || !(reply.value instanceof Array))
+ return false;
+
+ var isMessage = (reply.value.length == 3 &&
+ reply.value[0].value.length == 7 &&
+ reply.value[0].value.asciiSlice(0, 7) == 'message');
+
+ var isPMessage = (reply.value.length == 4 &&
+ reply.value[0].value.length == 8 &&
+ reply.value[0].value.asciiSlice(0, 8) == 'pmessage');
+
+ if (!isMessage && !isPMessage)
+ return false;
+
+ // This is tricky. We are returning true even though there
+ // might not be any callback called! This may happen when a
+ // caller subscribes then unsubscribes while a published
+ // message is in transit to us. When the message arrives, no
+ // one is there to consume it. In essence, as long as the
+ // reply type is a published message (see above), then we've
+ // "handled" the reply.
+
+ if (Object.getOwnPropertyNames(this.channelCallbacks).length == 0)
+ return true;
+
+ var channelName, channelPattern, channelCallback, payload;
+
+ if (isMessage) {
+ channelName = reply.value[1].value;
+ channelCallback = this.channelCallbacks[channelName];
+ payload = reply.value[2].value;
+ } else if (isPMessage) {
+ channelPattern = reply.value[1].value;
+ channelName = reply.value[2].value;
+ channelCallback = this.channelCallbacks[channelPattern];
+ payload = reply.value[3].value;
+ } else {
+ return false;
+ }
+
+ if (typeof channelCallback == "function") {
+ channelCallback(channelName, payload, channelPattern);
+ return true;
+ }
+
+ return false;
+}
+
+function maybeAsNumber(str) {
+ var value = parseInt(str, 10);
+
+ if (isNaN(value))
+ value = parseFloat(str);
+
+ if (isNaN(value))
+ return str;
+
+ return value;
+}
+
+function maybeConvertReplyValue(commandName, reply) {
+ if (reply.value === null)
+ return null;
+
+ // Redis' INFO command returns a BULK reply of the form:
+ // "redis_version:1.3.8
+ // arch_bits:64
+ // multiplexing_api:kqueue
+ // process_id:11604
+ // ..."
+ //
+ // We convert that to a JS object like:
+ // { redis_version: '1.3.8'
+ // , arch_bits: '64'
+ // , multiplexing_api: 'kqueue'
+ // , process_id: '11604'
+ // , ... }
+
+ if (commandName === 'info' && reply.type === BULK) {
+ var info = {};
+ reply.value.asciiSlice(0, reply.value.length).split(/\r\n/g)
+ .forEach(function (line) {
+ var parts = line.split(':');
+ if (parts.length === 2)
+ info[parts[0]] = parts[1];
+ });
+ return info;
+ }
+
+ // HGETALL returns a MULTIBULK where each consecutive reply-pair
+ // is a key and value for the Redis HASH. We convert this into
+ // a JS object.
+
+ if (commandName === 'hgetall' &&
+ reply.type === MULTIBULK &&
+ reply.value.length % 2 === 0) {
+
+ var hash = {};
+ for (var i=0; i<reply.value.length; i += 2)
+ hash[reply.value[i].value] = reply.value[i + 1].value;
+ return hash;
+ }
+
+ // Redis returns "+OK\r\n" to signify success.
+ // We convert this into a JS boolean with value true.
+
+ if (reply.type === INLINE && reply.value.asciiSlice(0,2) === 'OK')
+ return true;
+
+ // ZSCORE returns a string representation of a floating point number.
+ // We convert this into a JS number.
+
+ if (commandName === "zscore")
+ return maybeAsNumber(reply.value);
+
+ // Multibulk replies are returned from our reply parser as an
+ // array like: [ {type:BULK, value:"foo"}, {type:BULK, value:"bar"} ]
+ // But, end-users want the value and don't care about the
+ // Redis protocol reply types. We here extract the value from each
+ // object in the multi-bulk array.
+
+ if (reply.type === MULTIBULK)
+ return reply.value.map(function (element) { return element.value; });
+
+ // Otherwise, we have no conversions to offer.
+
+ return reply.value;
+}
+
+exports.maybeConvertReplyValue_ = maybeConvertReplyValue;
+
+var commands = [
+ "append",
+ "auth",
+ "bgsave",
+ "blpop",
+ "brpop",
+ "dbsize",
+ "decr",
+ "decrby",
+ "del",
+ "exists",
+ "expire",
+ "expireat",
+ "flushall",
+ "flushdb",
+ "get",
+ "getset",
+ "hdel",
+ "hexists",
+ "hget",
+ "hgetall",
+ "hincrby",
+ "hkeys",
+ "hlen",
+ "hmget",
+ "hmset",
+ "hset",
+ "hvals",
+ "incr",
+ "incrby",
+ "info",
+ "keys",
+ "lastsave",
+ "len",
+ "lindex",
+ "llen",
+ "lpop",
+ "lpush",
+ "lrange",
+ "lrem",
+ "lset",
+ "ltrim",
+ "mget",
+ "move",
+ "mset",
+ "msetnx",
+ "psubscribe",
+ "publish",
+ "punsubscribe",
+ "randomkey",
+ "rename",
+ "renamenx",
+ "rpop",
+ "rpoplpush",
+ "rpush",
+ "sadd",
+ "save",
+ "scard",
+ "sdiff",
+ "sdiffstore",
+ "select",
+ "set",
+ "setex",
+ "setnx",
+ "shutdown",
+ "sinter",
+ "sinterstore",
+ "sismember",
+ "smembers",
+ "smove",
+ "sort",
+ "spop",
+ "srandmember",
+ "srem",
+ "subscribe",
+ "sunion",
+ "sunionstore",
+ "ttl",
+ "type",
+ "unsubscribe",
+ "zadd",
+ "zcard",
+ "zcount",
+ "zincrby",
+ "zinter",
+ "zrange",
+ "zrangebyscore",
+ "zrank",
+ "zrem",
+ "zrembyrank",
+ "zremrangebyrank",
+ "zremrangebyscore",
+ "zrevrange",
+ "zrevrank",
+ "zscore",
+ "zunion",
+];
+
+// For internal use but maybe useful in rare cases or when the client command
+// set is not 100% up to date with Redis' latest commands.
+// client.sendCommand('GET', 'foo', function (err, value) {...});
+//
+// arguments[0] = commandName
+// arguments[1..N-2] = Redis command arguments
+// arguments[N-1] = callback function
+
+Client.prototype.sendCommand = function () {
+ var originalCommand = Array.prototype.slice.call(arguments);
+
+ // If this client has given up trying to connect/reconnect to Redis,
+ // just call the errback (if any). Regardless, don't enqueue the command.
+
+ if (this.noConnection) {
+ if (arguments.length > 0 && typeof arguments[arguments.length - 1] == 'function')
+ arguments[arguments.length - 1](this.makeErrorForCommand(originalCommand, exports.NO_CONNECTION_ERROR));
+ return;
+ }
+
+ this.flushQueuedCommands();
+
+ var commandName = arguments[0].toLowerCase();
+
+ // Invariant: number of queued callbacks == number of commands sent to
+ // Redis whose replies have not yet been received and processed. Thus,
+ // if no callback was given, we create a dummy callback.
+
+ var argCount = arguments.length;
+ if (typeof arguments[argCount - 1] == 'function')
+ --argCount;
+
+ // All requests are formatted as multi-bulk.
+ // The first line of a multi-bulk request is "*<number of parts to follow>\r\n".
+ // Next is: "$<length of the command name>\r\n<command name>\r\n".
+
+ // Write the request as we go into a request Buffer. Recall that buffers
+ // are fixed length. We thus guess at how much space is needed. If we
+ // need to grow beyond this, we create a new buffer, copy the old one, and
+ // continue. Once we're ready to write the buffer, we use a 0-copy slice
+ // to send just that which we've written to the buffer.
+ //
+ // We reuse the buffer after each request. When the buffer "grows" to
+ // accomodate a request, it stays that size until it needs to grown again,
+ // which may of course be never.
+
+ var offset = this.requestBuffer.utf8Write('*' + argCount.toString() + CRLF +
+ '$' + commandName.length + CRLF +
+ commandName + CRLF, 0);
+
+ var self = this;
+
+ function ensureSpaceFor(atLeast) {
+ var currentLength = self.requestBuffer.length;
+
+ if (offset + atLeast > currentLength) {
+ // If we know how much space we need, use that + 10%.
+ // Else double the size of the buffer.
+
+ var bufferLength = Math.max(currentLength * 2, atLeast * 1.1);
+ var newBuffer = new Buffer(Math.round(bufferLength));
+ self.requestBuffer.copy(newBuffer, 0, 0, offset); // target, targetStart, srcStart, srcEnd
+ self.requestBuffer = newBuffer;
+ }
+ }
+
+ // Serialize the arguments into the request buffer
+ // If the request is a Buffer, just copy. Else if
+ // the arg has a .toString() method, call it and write
+ // it to the request buffer as UTF8.
+
+ var extrasLength = 5; // '$', '\r\n', '\r\n'
+
+ for (var i=1; i < argCount; ++i) {
+ var arg = arguments[i];
+ if (arg instanceof Buffer) {
+ ensureSpaceFor(arg.length + arg.length.toString().length + extrasLength);
+ offset += this.requestBuffer.asciiWrite('$' + arg.length + CRLF, offset);
+ offset += arg.copy(this.requestBuffer, offset, 0); // target, targetStart, srcStart
+ offset += this.requestBuffer.asciiWrite(CRLF, offset);
+ } else if (arg.toString) {
+ var asString = arg.toString();
+ var serialized = '$' + Buffer.byteLength(asString, "binary") + CRLF + asString + CRLF;
+ ensureSpaceFor(Buffer.byteLength(serialized, "binary"));
+ offset += this.requestBuffer.binaryWrite(serialized, offset);
+ }
+ }
+
+ // If the stream is writable, write the command. Else enqueue the command
+ // for when we first establish a connection or reconnect.
+
+ if (this.stream.writable) {
+ this.originalCommands.push(originalCommand);
+ var outBuffer = new Buffer(offset);
+ this.requestBuffer.copy(outBuffer, 0, 0, offset);
+ this.stream.write(outBuffer, 'binary');
+
+ if (exports.debugMode)
+ sys.debug("[SEND] " + debugFilter(this.requestBuffer, offset) +
+ " originalCommands = " + this.originalCommands.length);
+ } else {
+ var toEnqueue = new Buffer(offset);
+ this.requestBuffer.copy(toEnqueue, 0, 0, offset); // dst, dstStart, srcStart, srcEnd
+ this.queuedRequestBuffers.push(toEnqueue);
+ this.queuedOriginalCommands.push(originalCommand);
+
+ if (exports.debugMode) {
+ sys.debug("[ENQUEUE] Not connected. Request queued. There are " +
+ this.queuedRequestBuffers.length + " requests queued.");
+ }
+ }
+};
+
+commands.forEach(function (commandName) {
+ Client.prototype[commandName] = function () {
+ var args = Array.prototype.slice.call(arguments);
+ // [[1,2,3],function(){}] => [1,2,3,function(){}]
+ if (args.length > 0 && Array.isArray(args[0]))
+ args = args.shift().concat(args);
+ args.unshift(commandName);
+ this.sendCommand.apply(this, args);
+ };
+});
+
+// Send any commands that were queued while we were not connected.
+
+Client.prototype.flushQueuedCommands = function () {
+ if (exports.debugMode && this.queuedRequestBuffers.length > 0)
+ sys.debug("[FLUSH QUEUE] " + this.queuedRequestBuffers.length +
+ " queued request buffers.");
+
+ for (var i=0; i<this.queuedRequestBuffers.length && this.stream.writable; ++i) {
+ var buffer = this.queuedRequestBuffers.shift();
+ this.stream.write(buffer, 'binary');
+ this.originalCommands.push(this.queuedOriginalCommands.shift());
+
+ if (exports.debugMode)
+ sys.debug("[DEQUEUE/SEND] " + debugFilter(buffer) +
+ ". queued buffers remaining = " +
+ this.queuedRequestBuffers.length);
+ }
+};
+
+Client.prototype.makeErrorForCommand = function (command, errorMessage) {
+ var err = new Error(errorMessage);
+ err.originalCommand = command;
+ return err;
+};
+
+Client.prototype.callbackCommandWithError = function (command, errorMessage) {
+ var callback = command[command.length - 1];
+ if (typeof callback == "function")
+ callback(this.makeErrorForCommand(command, errorMessage));
+};
+
+Client.prototype.callbackOrphanedCommandsWithError = function () {
+ for (var i=0, n=this.originalCommands.length; i<n; ++i)
+ this.callbackCommandWithError(this.originalCommands[i], exports.COMMAND_ORPHANED_ERROR);
+ this.originalCommands = [];
+};
+
+Client.prototype.callbackQueuedCommandsWithError = function () {
+ for (var i=0, n=this.queuedOriginalCommands.length; i<n; ++i)
+ this.callbackCommandWithError(this.queuedOriginalCommands[i], exports.NO_CONNECTION_ERROR);
+ this.queuedOriginalCommands = [];
+ this.queuedRequestBuffers = [];
+};
+
+Client.prototype.giveupConnectionAttempts = function () {
+ this.callbackOrphanedCommandsWithError();
+ this.callbackQueuedCommandsWithError();
+ this.noConnection = true;
+ this.emit('noconnection', this);
+};
+
+Client.prototype.maybeReconnect = function () {
+ if (this.stream.writable && this.stream.readable)
+ return;
+
+ if (this.expectingClose)
+ return;
+
+ // Do not reconnect on first connection failure.
+ // Else try to reconnect if we're asked to.
+
+ if (this.connectionsMade == 0) {
+ this.giveupConnectionAttempts();
+ } else if (this.maxReconnectionAttempts > 0) {
+ if (this.reconnectionAttempts++ >= this.maxReconnectionAttempts) {
+ this.giveupConnectionAttempts();
+ } else {
+ this.reconnectionDelay *= 2;
+
+ if (exports.debugMode) {
+ sys.debug("[RECONNECTING " + this.reconnectionAttempts + "/" +
+ this.maxReconnectionAttempts + "]");
+
+ sys.debug("[WAIT " + this.reconnectionDelay + " ms]");
+ }
+
+ var self = this;
+
+ this.reconnectionTimer = setTimeout(function () {
+ self.emit('reconnecting', self);
+ self.stream.connect(self.port, self.host);
+ }, this.reconnectionDelay);
+ }
+ }
+};
+
+// Wraps 'subscribe' and 'psubscribe' methods to manage a single
+// callback function per subscribed channel name/pattern.
+//
+// 'nameOrPattern' is a channel name like "hello" or a pattern like
+// "h*llo", "h?llo", or "h[ae]llo".
+//
+// 'callback' is a function that is called back with 2 args:
+// channel name/pattern and message payload.
+//
+// Note: You are not permitted to do anything but subscribe to
+// additional channels or unsubscribe from subscribed channels
+// when there are >= 1 subscriptions active. Should you need to
+// issue other commands, use a second client instance.
+
+Client.prototype.subscribeTo = function (nameOrPattern, callback) {
+ if (typeof this.channelCallbacks[nameOrPattern] === 'function')
+ return;
+
+ if (typeof(callback) !== 'function')
+ throw new Error("requires a callback function");
+
+ this.channelCallbacks[nameOrPattern] = callback;
+
+ var method = nameOrPattern.match(/[\*\?\[]/)
+ ? "psubscribe"
+ : "subscribe";
+
+ this[method](nameOrPattern);
+};
+
+Client.prototype.unsubscribeFrom = function (nameOrPattern) {
+ if (typeof this.channelCallbacks[nameOrPattern] === 'undefined')
+ return;
+
+ delete this.channelCallbacks[nameOrPattern];
+
+ var method = nameOrPattern.match(/[\*\?\[]/)
+ ? "punsubscribe"
+ : "unsubscribe";
+
+ this[method](nameOrPattern);
+};
+
+// Multi-bulk replies return an array of other replies. Perhaps all you care
+// about is the representation of such buffers as UTF-8 encoded strings? Use
+// this to convert each such Buffer to a (UTF-8 encoded) String in-place.
+
+exports.convertMultiBulkBuffersToUTF8Strings = function (o) {
+ if (o instanceof Array) {
+ for (var i=0; i<o.length; ++i)
+ if (o[i] instanceof Buffer)
+ o[i] = o[i].utf8Slice(0, o[i].length);
+ } else if (o instanceof Object) {
+ var props = Object.getOwnPropertyNames(o);
+ for (var i=0; i<props.length; ++i)
+ if (o[props[i]] instanceof Buffer)
+ o[props[i]] = o[props[i]].utf8Slice(0, o[props[i]].length);
+ }
+};
+
384 redis2json.js
@@ -0,0 +1,384 @@
+var sys = require('sys'),
+ async = require('./async'),
+ redislib = require("./redis-client"),
+ redis = redislib.createClient();
+
+sys.debug("Starting");
+
+function fillVariables(text, variables) {
+ var newText = text;
+ for(var prop in variables) {
+ // sys.debug("EXPANDING: " + prop);
+ if(variables.hasOwnProperty(prop)) {
+ // sys.debug("EXPANDING replacing: " + prop + " with " + variables[prop]);
+ newText = newText.replace("{" + prop + "}", variables[prop]);
+ }
+ }
+ return newText;
+}
+
+
+var map = {
+ text: "post:{postId}:text",
+ created: "post:{postId}:created",
+ $authorId: "post:{postId}:author",
+ author: {
+ username: "user:{authorId}:username",
+ password: "user:{authorId}:password"
+ },
+ // $$comments: ["commentId", "lrange", "post:{postId}:comments", 1, 20],
+ $$comments: { variable: "commentId", cmd: "lrange", key: "post:{postId}:comments", args: [1, 20] },
+ comments: [
+ {
+ text: "comment:{commentId}:text",
+ created: "comment:{commentId}:create",
+ $commentAuthorId: "comment:{commentId}:author",
+ author: {
+ username: "user:{commentAuthorId}:username",
+ password: "user:{commentAuthorId}:password"
+ }
+ }
+ ]
+}
+
+map = {
+ $$posts: { variable: "postId", cmd: "lrange", key: "posts", args: [5, 10] },
+ posts: [ map ]
+}
+
+var variables = {
+ // postId: 1
+}
+
+/*
+
+async.parallel([
+ loadRedis("text", "post:2:text"),
+ function(callback){
+ setTimeout(function(){
+ callback(null, 'two');
+ }, 100);
+ },
+],
+// optional callback
+function(err, results){
+ sys.debug("Result: " + sys.inspect(results));
+});
+ */
+/*
+async.waterfall([
+ function(callback){
+ callback(null, 'one', 'two');
+ },
+ function(arg1, arg2, callback){
+ callback(null, 'three');
+ },
+ function(arg1, callback){
+ // arg1 now equals 'three'
+ callback(null, arg1);
+ }
+], function(err, results){
+ sys.debug("waterfall Result: " + sys.inspect(results));
+});*/
+
+
+function loadValue(key, redisKey, variables) {
+ return function (callback) {
+ var expandedRedisKey = fillVariables(redisKey, variables);
+ sys.debug("REDIS EXPANDED " + redisKey + " to " + expandedRedisKey + " with: " + sys.inspect(variables));
+ redis.get(expandedRedisKey, function (error, value) {
+ // sys.debug("REDIS LOADED key " + expandedRedisKey + ": " + value);
+ if (key) {
+ var o = {};
+ o[key] = value;
+ redislib.convertMultiBulkBuffersToUTF8Strings(o);
+ callback(error, o);
+ } else {
+ callback(error, value)
+ }
+ });
+ }
+}
+
+function loadArray(key, map, variables, arrayCommand) {
+ return function (callback) {
+ // sys.debug("ARRAY LOADING " + sys.inspect(arrayCommand));
+ var expandedRedisKey = fillVariables(arrayCommand.key, variables);
+ var args = arrayCommand.args || [];
+ args.unshift(expandedRedisKey);
+ args.push(function (error, array) {
+ array = array || []; // avoid errors if array is empty
+ if (array) { // array is not empty
+ redislib.convertMultiBulkBuffersToUTF8Strings(array);
+ // sys.debug("ARRAY LOADED " + sys.inspect(array));
+ for (var i=0; i < array.length; i++) {
+ variables[arrayCommand.variable] = array[i];
+
+
+ // collect actions that will be loaded
+ var actions = [];
+ for (var prop in map) {
+ if(map.hasOwnProperty(prop) && prop.substr(0, 1) != "$") {
+ sys.debug("Property: " + prop + ", type: " + typeof map[prop]);
+ if (typeof map[prop] === "string") {
+ actions.push(loadValue(null, map[prop], variables));
+ } else if (typeof map[prop] === "object" && !Array.isArray(map[prop])) {
+ actions.push(loadObject(null, map[prop], variables));
+ } else if (typeof map[prop] === "object" && Array.isArray(map[prop])) {
+ actions.push(loadArray(null, map[prop], variables, map["$$" + prop]));
+ } else {
+ sys.debug("Property: " + prop + ", type: " + typeof map[prop]);
+ }
+ }
+ }
+ async.parallel(actions, function (error, results) {
+ sys.debug("ARRAY LOADED Result: " + sys.inspect(results));
+ if (key) {
+ var o = {};
+ o[key] = results;
+ callback(error, o);
+ } else {
+ callback(error, o)
+ }
+ });
+ }
+ } else {
+ if (key) {
+ var o = {};
+ o[key] = [];
+ callback(null, o);
+ } else {
+ callback(error, [])
+ }
+ }
+ });
+ redis[arrayCommand.cmd].apply(redis, args);
+ // redis.lrange(redis, args);
+ }
+}
+
+function loadObject(key, map, variables) {
+ // sys.debug("LOAD OBJECT: " + key + "; ");
+ return function (callback) {
+ var actions = [];
+ var newVariablesActions = [];
+
+ // collect actions that load new variables
+ for (var prop in map) {
+ if(map.hasOwnProperty(prop) && prop.substr(0, 1) == "$") {
+ // sys.debug("Property: " + prop + ", type: " + typeof map[prop]);
+ if (typeof map[prop] === "string") {
+ newVariablesActions.push(loadValue(prop.substr(1), map[prop], variables));
+ }
+ }
+ }
+
+ // load variables
+ sys.debug("load variables: Executing parallel: " + sys.inspect(newVariablesActions));
+ async.parallel(newVariablesActions, function (error, results) {
+ for (var i=0; i < results.length; i++) {
+ for (var prop in results[i]) {
+ variables[prop] = results[i][prop];
+ }
+ };
+ // sys.debug("VARIABLES LOADED Result: " + sys.inspect(variables));
+
+ // collect actions that will be loaded
+ for (var prop in map) {
+ if(map.hasOwnProperty(prop) && prop.substr(0, 1) != "$") {
+ sys.debug("Property: " + prop + ", type: " + typeof map[prop]);
+ if (typeof map[prop] === "string") {
+ actions.push(loadValue(prop, map[prop], variables));
+ } else if (typeof map[prop] === "object" && !Array.isArray(map[prop])) {
+ actions.push(loadObject(prop, map[prop], variables));
+ } else if (typeof map[prop] === "object" && Array.isArray(map[prop])) {
+ actions.push(loadArray(prop, map[prop], variables, map["$$" + prop]));
+ } else {
+ sys.debug("Property: " + prop + ", type: " + typeof map[prop]);
+ }
+ }
+ }
+
+ // load values, objects, arrays
+ sys.debug("Executing parallel: " + sys.inspect(actions));
+ async.parallel(actions, function (error, results) {
+ var o = {};
+ for (var i=0; i < results.length; i++) {
+ for (var prop in results[i]) {
+ o[prop] = results[i][prop];
+ }
+ };
+ sys.debug("OBJECT LOADED Result: " + sys.inspect(o));
+ if (key) {
+ var o2 = {};
+ o2[key] = o;
+ callback(error, o2);
+ } else {
+ callback(error, o)
+ }
+ });
+ // callback(error, o2);
+ });
+
+ }
+}
+
+function load(map, variables, callback) {
+ loadObject("object", map, variables)(function (error, result) {
+ callback(error, result.object)
+ });
+}
+
+load(map, variables, function (error, result) {
+ sys.debug("LOADED Result: " + sys.inspect(result) + "ERROR" + sys.inspect(error));
+ sys.debug("LOADED COMMENTS Result: " + sys.inspect(result.comments) + "ERROR" + sys.inspect(error));
+ // sys.debug("LOADED FINALE VARIABLES: " + sys.inspect(variables) + "MAP" + sys.inspect(map));
+});
+
+
+
+sys.debug(sys.inspect(map));
+
+
+
+function loadRedisOld(key, redisKey) {
+ return function(callback) {
+ redis.get(redisKey, function (error, value) {
+
+ sys.debug("redis loaded key " + redisKey + ": " + value);
+ callback(value);
+ });
+ }
+}
+
+function loadRedis(key, redisKey) {
+ return function(callback) {
+ redis.get(redisKey, function (error, value) {
+ sys.debug("redis loaded key " + redisKey + ": " + value);
+ var o = {};
+ o[key] = value;
+ redislib.convertMultiBulkBuffersToUTF8Strings(o);
+ callback(error, o);
+ });
+ }
+}
+
+// loadRedis("text", "post:2:text")(function(obj){
+// sys.debug("OBJ: " + sys.inspect(obj));
+// });
+
+
+function loadString(key, redisKey, variables) {
+ return function(callback) {
+ return loadRedis(key, fillVariables(redisKey, variables))(callback);
+ }
+}
+
+// loadString("text", "post:{postId}:text", {postId: 3})(function(obj){
+// sys.debug("SSSSSS: " + sys.inspect(obj));
+// });
+
+
+function loadObject2(map, variables) {
+ return function(callback) {
+ var actions = [];
+ for (var prop in map) {
+ if (map.hasOwnProperty(prop)) {
+ if (typeof map[prop] === "string") {
+ actions.push(loadString(prop, map[prop], variables));
+ }
+ }
+ }
+ Do.parallel(actions)(function (values) {
+ var object = {};
+ var vars = {};
+ for (var i=0; i < values.length; i++) {
+ for (prop in values[i]) {
+ if (prop.substr(0, 1) == "$") {
+ vars[prop.substr(1)] = values[i][prop];
+ } else {
+ object[prop] = values[i][prop];
+ }
+
+ }
+ };
+ callback({object: object, vars: vars});
+ // sys.debug("STRINGS: " + sys.inspect(obj));
+ });
+ }
+}
+
+
+function loadData(map, variables, callback) {
+ var keys = [];
+ var actions = [];
+
+ var strings = [];
+ for(var prop in map) {
+ if(map.hasOwnProperty(prop)) {
+ if (typeof map[prop] === "string") {
+ strings.push();
+ actions.push(loadRedis(prop, fillVariables(map[prop], variables)));
+ }
+ }
+ }
+ // loadStrings();
+}
+
+function nieco() {
+ function load(op, key) {
+ redis[op](key, function (error, value) {
+ // sys.debug(key + ": " + value);
+ });
+ }
+
+ var keys = [];
+ var actions = [];
+ var result = o || {};
+ for(var prop in map) {
+ if(map.hasOwnProperty(prop)) {
+ if (typeof map[prop] === "string") {
+ keys.push(prop);
+ actions.push(loadRedis(prop, fillVariables(map[prop], variables)));
+ } else if (typeof map[prop] === "object") {
+ // loadData(map[prop], variables);
+ }
+ }
+ }
+
+ Do.parallel(actions)(function (values) {
+ redislib.convertMultiBulkBuffersToUTF8Strings(values);
+ sys.debug("Keys: " + sys.inspect(keys));
+ sys.debug("Values: " + sys.inspect(values));
+ for (var i=0; i < keys.length; i++) {
+ sys.debug("$: " + keys[i].substr(0, 1));
+ if (keys[i].substr(0, 1) == "$") {
+ variables[keys[i].substr(1)] = values[i];
+ } else {
+ result[keys[i]] = values[i];
+ }
+ };
+
+ sys.debug("Result: " + sys.inspect(result));
+ sys.debug("Variables: " + sys.inspect(variables));
+
+
+
+ for(var prop in map) {
+ if(map.hasOwnProperty(prop)) {
+ if (typeof map[prop] === "object") {
+ result[prop] = {};
+ loadData(map[prop], variables, result[prop]);
+ }
+ }
+ }
+
+
+
+
+ });
+
+}
+
+sys.debug("Done");
+

0 comments on commit 1ced9f9

Please sign in to comment.
Something went wrong with that request. Please try again.