From 15db7aac289a8799839809d5abf7e954872905aa Mon Sep 17 00:00:00 2001 From: Kevin Wilson Date: Wed, 24 May 2017 14:45:32 -0700 Subject: [PATCH 1/6] add example dlm gen_server implementation for reference --- examples/gen_server/dlm/README.md | 45 +++ examples/gen_server/dlm/dlm.js | 487 ++++++++++++++++++++++++++ examples/gen_server/dlm/multi_test.js | 105 ++++++ examples/gen_server/dlm/test.js | 85 +++++ 4 files changed, 722 insertions(+) create mode 100644 examples/gen_server/dlm/README.md create mode 100644 examples/gen_server/dlm/dlm.js create mode 100644 examples/gen_server/dlm/multi_test.js create mode 100644 examples/gen_server/dlm/test.js diff --git a/examples/gen_server/dlm/README.md b/examples/gen_server/dlm/README.md new file mode 100644 index 0000000..df17f57 --- /dev/null +++ b/examples/gen_server/dlm/README.md @@ -0,0 +1,45 @@ +DLM example +=========== + +A distributed lock manager `gen_server` implementation for reference, using the [Redlock algorithm](https://redis.io/topics/distlock). May be added into clusterluck as an optional feature if there's community interest. + +Example Usage +------------- + +A simple example usage of this `gen_server` follows. For a more detailed integration test on a single node, refer to `test.js`. For a multi-node integration test, refer to `multi_test.js`. In this test, we try to lock the same resource with two different requesters. + +``` javascript +const cl = require("../../index"), + os = require("os"), + assert = require("chai").assert; + +const nodeID = "name", + port = 7022, + host = os.hostname(); + +const node = cl.createCluster(nodeID, host, port), + gossip = node.gossip(), + kernel = node.kernel(); + +const DLMServer = require("./dlm"); +const dlm = new DLMServer(gossip, kernel, { + rquorum: 0.51, + wquorum: 0.51 +}); + +// load node state +node.load(() => { + // first start dlm, then start network kernel + dlm.start("locker"); + node.start("cookie", "ring", () => { + console.log("Node %s listening on hostname %s, port %s", nodeID, host, port); + dlm.wlock("id", "holder", 30000, (err, nodes) => { + assert.notOk(err); + assert.isArray(nodes); + dlm.wlock("id", "holder", 30000, (err, wNodes) => { + assert.ok(err); + }); + }); + }); +}); +``` diff --git a/examples/gen_server/dlm/dlm.js b/examples/gen_server/dlm/dlm.js new file mode 100644 index 0000000..f6cbfd7 --- /dev/null +++ b/examples/gen_server/dlm/dlm.js @@ -0,0 +1,487 @@ +var _ = require("lodash"), + microtime = require("microtime"), + cl = require("../../../index"), + util = require("util"), + debug = require("debug")("clusterluck:examples:dlm"); + +var GenServer = cl.GenServer; + +class Lock { + constructor(type, id, holder, timeout) { + this._type = type; + this._id = id; + this._holder = holder; + this._timeout = timeout; + } + + type(type) { + if (type !== undefined) { + this._type = type; + } + return this._type; + } + + id(id) { + if (id !== undefined) { + this._id = id; + } + return this._id; + } + + holder(holder) { + if (holder !== undefined) { + this._holder = holder; + } + return this._holder; + } + + timeout(timeout) { + if (timeout !== undefined) { + this._timeout = timeout; + } + return this._timeout; + } +} + +class DLMServer extends GenServer { + /** + * + * @class DLMServer + * @memberof Clusterluck + * + * @param {Clusterluck.GossipRing} gossip + * @param {Clusterluck.NetKernel} kernel + * @param {Object} [opts] + * @param {Number} [opts.rquorum] + * @param {Number} [opts.wquorum] + * + */ + constructor(gossip, kernel, opts = {rquorum: 0.51, wquorum: 0.51}) { + super(kernel); + this._gossip = gossip; + this._kernel = kernel; + this._locks = new Map(); + this._rquorum = opts.rquorum; + this._wquorum = opts.wquorum; + } + + /** + * + * @method start + * @memberof Clusterluck.DLMServer + * @instance + * + * @param {String} [name] + * + * @return {Clusterluck.DLMServer} + * + */ + start(name) { + super.start(name); + + var jobs = [ + {event: "rlock", method: "doRLock"}, + {event: "wlock", method: "doWLock"}, + {event: "runlock", method: "doRUnlock"}, + {event: "wunlock", method: "doWUnlock"} + ]; + jobs.forEach((job) => { + var handler = this[job.method].bind(this); + this.on(job.event, handler); + this.once("stop", _.partial(this.removeListener, job.event, handler).bind(this)); + }); + return this; + } + + /** + * + * @method stop + * @memberof Clusterluck.DLMServer + * @instance + * + * @param {Boolean} [force] + * + * @return {Clusterluck.DLMServer} + * + */ + stop(force = false) { + if (this.idle() || force === true) { + this._locks.clear(); + super.stop(); + return this; + } + this.once("idle", _.partial(this.stop, force).bind(this)); + return this; + } + + /** + * + * @method rlock + * @memberof Clusterluck.DLMServer + * @instance + * + * @param {String} id + * @param {String} holder + * @param {Number} timeout + * @param {Function} cb + * @param {Number} [reqTimeout] + * + */ + rlock(id, holder, timeout, cb, reqTimeout=Infinity) { + var nodes = this._gossip.find(id); + var time = microtime.now(); + this.multicall(nodes, this._id, "rlock", { + id: id, + holder: holder, + timeout: timeout + }, (err, data) => { + if (err) return cb(err); + var delta = microtime.now()-time; + var nData = DLMServer.findLockPasses(nodes, data); + if (nData.length/data.length >= this._rquorum && delta < timeout) { + return cb(null, nData); + } else { + this.runlockAsync(nData, id, holder); + return cb(new Error("Failed to achieve rlock quorum.")); + } + }, reqTimeout); + } + + /** + * + * @method wlock + * @memberof Clusterluck.DLMServer + * @instance + * + * @param {String} id + * @param {String} holderr + * @param {Number} timeout + * @param {Function} cb + * @param {Number} [reqTimeout] + * + */ + wlock(id, holder, timeout, cb, reqTimeout=Infinity) { + var nodes = this._gossip.find(id); + var time = microtime.now(); + this.multicall(nodes, this._id, "wlock", { + id: id, + holder: holder, + timeout: timeout + }, (err, data) => { + if (err) return cb(err); + var delta = microtime.now()-time; + var nData = DLMServer.findLockPasses(nodes, data); + if (nData.length/data.length >= this._wquorum && delta < timeout) { + return cb(null, nData); + } else { + this.wunlockAsync(nData, id, holder); + return cb(new Error("Failed to achieve wlock quorum.")); + } + }, reqTimeout); + } + + /** + * + * @method runlock + * @memberof Clusterluck.DLMServer + * @instance + * + * @param {Array} nodes + * @param {String} id + * @param {String} holder + * @param {Function} cb + * @param {Number} [reqTimeout] + * + */ + runlock(nodes, id, holder, cb, reqTimeout=Infinity) { + this.multicall(nodes, this._id, "runlock", { + id: id, + holder: holder + }, (err, res) => { + if (err) return cb(err); + return cb(); + }, reqTimeout); + } + + /** + * + * @method runlockAsync + * @memberof Clusterluck.DLMServer + * @instance + * + * @param {Array} nodes + * @param {String} id + * @param {String} holder + * + */ + runlockAsync(nodes, id, holder) { + this.abcast(nodes, this._id, "runlock", { + id: id, + holder: holder + }); + } + + /** + * + * @method wunlock + * @memberof Clusterluck.DLMServer + * @instance + * + * @param {Array} nodes + * @param {String} id + * @param {String} holder + * @param {Function} cb + * @param {Number} [reqTimeout] + * + */ + wunlock(nodes, id, holder, cb, reqTimeout=Infinity) { + this.multicall(nodes, this._id, "wunlock", { + id: id, + holder: holder + }, (err, res) => { + if (err) return cb(err); + return cb(); + }, reqTimeout); + } + + /** + * + * @method wunlockAsync + * @memberof Clusterluck.DLMServer + * @instance + * + * @param {Array} nodes + * @param {String} id + * @param {String} holder + * + */ + wunlockAsync(nodes, id, holder) { + this.abcast(nodes, this._id, "wunlock", { + id: id, + holder: holder + }); + } + + /** + * + * @method decodeJob + * @memberof Clusterluck.DLMServer + * @instance + * + * @param {Buffer} buf + * + * @return {Object|Error} + * + */ + decodeJob(buf) { + var out = super.decodeJob(buf); + if (out instanceof Error) return out; + var data = out.data; + if (out.event.endsWith("unlock")) { + data = DLMServer.parseUnlockJob(data); + } else { + data = DLMServer.parseLockJob(data); + } + if (data instanceof Error) { + return data; + } + out.data = data; + return out; + } + + /** + * + * @method doRLock + * @memberof Clusterluck.DLMServer + * @instance + * @private + * + * @param {Object} data + * @param {Object} from + * + */ + doRLock(data, from) { + var lock = this._locks.get(data.id); + if (lock && lock.type() === "write") { + return this.reply(from, DLMServer.encodeResp({ok: false})); + } + var timeout = setTimeout(() => { + var lock = this._locks.get(data.id); + if (!lock || lock.type() === "write") { + return; + } + var holder = lock.holder(); + holder.delete(data.holder); + if (holder.size === 0) { + this._locks.delete(data.id); + } + }, data.timeout); + if (!lock) { + lock = new Lock("read", data.id, new Set(), new Map()); + } + lock.holder().add(data.holder); + lock.timeout().set(data.holder, timeout); + this._locks.set(data.id, lock); + this.reply(from, DLMServer.encodeResp({ok: true})); + } + + /** + * + * @method doWLock + * @memberof Clusterluck.DLMServer + * @instance + * @private + * + * @param {Object} data + * @param {Object} from + * + */ + doWLock(data, from) { + if (this._locks.has(data.id)) { + return this.reply(from, DLMServer.encodeResp({ok: false})); + } + var timeout = setTimeout(() => { + var lock = this._locks.get(data.id); + if (!lock || lock.holder() !== data.holder()) { + return; + } + this._locks.delete(data.id); + }, data.timeout); + this._locks.set(data.id, new Lock("write", data.id, data.holder, timeout)); + this.reply(from, DLMServer.encodeResp({ok: true})); + } + + /** + * + * @method doRUnlock + * @memberof Clusterluck.DLMServer + * @instance + * @private + * + * @param {Object} data + * @param {Object} from + * + */ + doRUnlock(data, from) { + var lock = this._locks.get(data.id); + if (!lock || lock.type() !== "read") { + return this._safeReply(from, DLMServer.encodeResp({ok: false})); + } + + var holders = lock.holder(); + var timeouts = lock.timeout(); + holders.delete(data.holder); + clearTimeout(timeouts.get(data.holder)); + timeouts.delete(data.holder); + + if (holders.size === 0) { + this._locks.delete(data.id); + } + this._safeReply(from, DLMServer.encodeResp({ok: true})); + } + + /** + * + * @method doWUnlock + * @memberof Clusterluck.DLMServer + * @instance + * @private + * + * @param {Object} data + * @param {Object} from + * + */ + doWUnlock(data, from) { + var lock = this._locks.get(data.id); + if (!lock || lock.type() !== "write") { + return this._safeReply(from, DLMServer.encodeResp({ok: false})); + } + + var holder = lock.holder(); + if (holder === data.holder) { + clearTimeout(lock.timeout()); + this._locks.delete(data.id); + } + this._safeReply(from, DLMServer.encodeResp({ok: true})); + } + + /** + * + * @method parseLockJob + * @memberof Clusterluck.DLMServer + * @static + * + * @param {Object} job + * + * @return {Object|Error} + * + */ + static parseLockJob(job) { + if (!(util.isObject(job) && + util.isString(job.id) && + util.isString(job.holder) && + util.isNumber(job.timeout))) { + return new Error("Malformed lock job."); + } + return job; + } + + /** + * + * @method parseUnlockJob + * @memberof Clusterluck.DLMServer + * @static + * + * @param {Object} job + * + * @return {Object|Error} + * + */ + static parseUnlockJob(job) { + if (!(util.isObject(job) && + util.isString(job.id) && + util.isString(job.holder))) { + return new Error("Malformed unlock job."); + } + return job; + } + + /** + * + * @method encodeResp + * @memberof Clusterluck.DLMServer + * @static + * + * @param {Any} res + * + * @return {String} + * + */ + static encodeResp(res) { + return JSON.stringify(res); + } + + /** + * + * @method findLockPasses + * @memberof Clusterluck.DLMServer + * @static + * + * @param {Array} nodes + * @param {Array} data + * + * @return {Array} + * + */ + static findLockPasses(nodes, data) { + return data.reduce((memo, val, idx) => { + val = JSON.parse(val); + if (util.isObject(val) && val.ok === true) { + memo.push(nodes[idx]); + } + return memo; + }, []); + } +} + +module.exports = DLMServer; diff --git a/examples/gen_server/dlm/multi_test.js b/examples/gen_server/dlm/multi_test.js new file mode 100644 index 0000000..7a59832 --- /dev/null +++ b/examples/gen_server/dlm/multi_test.js @@ -0,0 +1,105 @@ +const cl = require("../../../index"), + _ = require("lodash"), + os = require("os"), + assert = require("chai").assert, + debug = require("debug")("examples:gen_server:dlm:multi_test"), + async = require("async"), + DLMServer = require("./dlm"); + +const nodeID = process.argv[2], + port = parseInt(process.argv[3]), + nodeID2 = process.argv[4], + port2 = parseInt(process.argv[5]), + host = os.hostname(); + +var nodes = []; +var dlms = []; +async.each([[nodeID, port], [nodeID2, port2]], (config, next) => { + const node = cl.createCluster(config[0], host, config[1]), + gossip = node.gossip(), + kernel = node.kernel(); + + const dlm = new DLMServer(gossip, kernel, { + rquorum: 0.51, + wquorum: 0.51 + }); + nodes.push(node); + dlms.push(dlm); + node.load(() => { + dlm.start("locker"); + node.start("cookie", "ring", () => { + debug("Node %s listening on hostname %s, port %s!", config[0], host, config[1]); + next(); + }); + }); +}, () => { + setTimeout(() => { + // make sure nodes know about each other + assert.ok(_.find(nodes[0].gossip().ring().nodes(), (node) => { + return node.equals(nodes[1].kernel().self()); + })); + var dlm = dlms[0]; + var holdingNodes; + async.series([ + (next) => { + dlm.wlock("id", "holder", 30000, (err, nodes) => { + assert.notOk(err); + debug("Successfully grabbed write lock on id '%s' with holder '%s'", "id", "holder"); + assert.lengthOf(nodes, 2); + holdingNodes = nodes; + next(); + }); + }, + (next) => { + dlm.wlock("id", "holder2", 30000, (err, wNodes) => { + assert.ok(err); + debug("Failed to grab write lock id '%s' with holder '%s'", "id", "holder2"); + next(); + }); + }, + (next) => { + dlm.wunlock(holdingNodes, "id", "holder", (err) => { + assert.notOk(err); + next(); + }); + }, + (next) => { + dlm.rlock("id", "holder", 30000, (err, nodes) => { + assert.notOk(err); + debug("Successfully grabbed read lock on id '%s' with holder '%s'", "id", "holder"); + assert.lengthOf(nodes, 2); + holdingNodes = nodes; + next(); + }); + }, + (next) => { + dlm.rlock("id", "holder2", 30000, (err, nodes) => { + assert.notOk(err); + debug("Successfully grabbed read lock on id '%s' with holder '%s'", "id", "holder2"); + next(); + }); + }, + (next) => { + dlm.wlock("id", "holder3", 30000, (err, wNodes) => { + assert.ok(err); + debug("Failed to grab write lock id '%s' with holder '%s'", "id", "holder2"); + next(); + }); + }, + (next) => { + dlm.runlockAsync(holdingNodes, "id", "holder"); + dlm.runlockAsync(holdingNodes, "id", "holder2"); + next(); + } + ], () => { + dlms.forEach((dlm) => { + dlm.stop(true); + }); + debug("Done!"); + process.exit(0); + }); + }, 1000); + // make nodes meet each other first + nodes[0].gossip().meet(nodes[1].kernel().self()); + debug("Waiting for nodes to meet each other, swear I had something for this..."); +}); diff --git a/examples/gen_server/dlm/test.js b/examples/gen_server/dlm/test.js new file mode 100644 index 0000000..d2e2018 --- /dev/null +++ b/examples/gen_server/dlm/test.js @@ -0,0 +1,85 @@ +const cl = require("../../../index"), + os = require("os"), + assert = require("chai").assert, + debug = require("debug")("examples:gen_server:dlm:test"), + async = require("async"); + +const nodeID = process.argv[2], + port = parseInt(process.argv[3]), + host = os.hostname(); + +const node = cl.createCluster(nodeID, host, port), + gossip = node.gossip(), + kernel = node.kernel(); + +const DLMServer = require("./dlm"); +const dlm = new DLMServer(gossip, kernel, { + rquorum: 0.51, + wquorum: 0.51 +}); + +// load node state +node.load(() => { + // first start dlm, then start network kernel + dlm.start("locker"); + node.start("cookie", "ring", () => { + debug("Node %s listening on hostname %s, port %s!", nodeID, host, port); + var holdingNodes; + async.series([ + (next) => { + dlm.wlock("id", "holder", 30000, (err, nodes) => { + assert.notOk(err); + debug("Successfully grabbed write lock on id '%s' with holder '%s'", "id", "holder"); + assert.isArray(nodes); + holdingNodes = nodes; + next(); + }); + }, + (next) => { + dlm.wlock("id", "holder2", 30000, (err, wNodes) => { + assert.ok(err); + debug("Failed to grab write lock id '%s' with holder '%s'", "id", "holder2"); + next(); + }); + }, + (next) => { + dlm.wunlock(holdingNodes, "id", "holder", (err) => { + assert.notOk(err); + next(); + }); + }, + (next) => { + dlm.rlock("id", "holder", 30000, (err, nodes) => { + assert.notOk(err); + debug("Successfully grabbed read lock on id '%s' with holder '%s'", "id", "holder"); + assert.isArray(nodes); + holdingNodes = nodes; + next(); + }); + }, + (next) => { + dlm.rlock("id", "holder2", 30000, (err, nodes) => { + assert.notOk(err); + debug("Successfully grabbed read lock on id '%s' with holder '%s'", "id", "holder2"); + next(); + }); + }, + (next) => { + dlm.wlock("id", "holder3", 30000, (err, wNodes) => { + assert.ok(err); + debug("Failed to grab write lock id '%s' with holder '%s'", "id", "holder2"); + next(); + }); + }, + (next) => { + dlm.runlockAsync(holdingNodes, "id", "holder"); + dlm.runlockAsync(holdingNodes, "id", "holder2"); + next(); + } + ], () => { + dlm.stop(true); + debug("Done!"); + process.exit(0); + }); + }); +}); From f14b727d516dab630807199c917e3ece25f3f7c4 Mon Sep 17 00:00:00 2001 From: Kevin Wilson Date: Thu, 25 May 2017 11:53:18 -0700 Subject: [PATCH 2/6] fix delta calculation to use milliseconds on example dlm server --- examples/gen_server/dlm/dlm.js | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/examples/gen_server/dlm/dlm.js b/examples/gen_server/dlm/dlm.js index f6cbfd7..8efb3d1 100644 --- a/examples/gen_server/dlm/dlm.js +++ b/examples/gen_server/dlm/dlm.js @@ -5,6 +5,7 @@ var _ = require("lodash"), debug = require("debug")("clusterluck:examples:dlm"); var GenServer = cl.GenServer; +const mcsToMs = 1000; class Lock { constructor(type, id, holder, timeout) { @@ -136,7 +137,7 @@ class DLMServer extends GenServer { timeout: timeout }, (err, data) => { if (err) return cb(err); - var delta = microtime.now()-time; + var delta = (microtime.now()-time)/mcsToMs; var nData = DLMServer.findLockPasses(nodes, data); if (nData.length/data.length >= this._rquorum && delta < timeout) { return cb(null, nData); @@ -169,7 +170,7 @@ class DLMServer extends GenServer { timeout: timeout }, (err, data) => { if (err) return cb(err); - var delta = microtime.now()-time; + var delta = (microtime.now()-time)/mcsToMs; var nData = DLMServer.findLockPasses(nodes, data); if (nData.length/data.length >= this._wquorum && delta < timeout) { return cb(null, nData); @@ -304,6 +305,8 @@ class DLMServer extends GenServer { var lock = this._locks.get(data.id); if (lock && lock.type() === "write") { return this.reply(from, DLMServer.encodeResp({ok: false})); + } else if (lock && lock.holder().has(data.holder)) { + return this.reply(from, DLMServer.encodeResp({ok: true})); } var timeout = setTimeout(() => { var lock = this._locks.get(data.id); From e79f9bd66073b0fd286170d9affe738f34056676 Mon Sep 17 00:00:00 2001 From: Kevin Wilson Date: Fri, 26 May 2017 18:29:39 -0700 Subject: [PATCH 3/6] fix dlm example to reference data.holder correctly on lock timeout --- examples/gen_server/dlm/dlm.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/gen_server/dlm/dlm.js b/examples/gen_server/dlm/dlm.js index 8efb3d1..be0ba30 100644 --- a/examples/gen_server/dlm/dlm.js +++ b/examples/gen_server/dlm/dlm.js @@ -345,7 +345,7 @@ class DLMServer extends GenServer { } var timeout = setTimeout(() => { var lock = this._locks.get(data.id); - if (!lock || lock.holder() !== data.holder()) { + if (!lock || lock.holder() !== data.holder) { return; } this._locks.delete(data.id); From e6a03d95d9c70c186b5151b48a139673bf8f6ded Mon Sep 17 00:00:00 2001 From: Kevin Wilson Date: Thu, 1 Jun 2017 21:29:27 -0700 Subject: [PATCH 4/6] add disk-based table using AOF+snapshot persistence model (for future feature sets involving disk writes), tests, docs --- Gruntfile.js | 1 + lib/consts.js | 14 +- lib/dtable.js | 740 ++++++++++++++++++++++++++++++++++++++++++++ lib/utils.js | 24 ++ test/unit/dtable.js | 570 ++++++++++++++++++++++++++++++++++ test/unit/index.js | 1 + test/unit/utils.js | 24 ++ 7 files changed, 1373 insertions(+), 1 deletion(-) create mode 100644 lib/dtable.js create mode 100644 test/unit/dtable.js diff --git a/Gruntfile.js b/Gruntfile.js index 2db6bb8..40fdbc0 100644 --- a/Gruntfile.js +++ b/Gruntfile.js @@ -12,6 +12,7 @@ module.exports = function (grunt) { "lib/kernel.js", "lib/node.js", "lib/vclock.js", + "lib/dtable.js" ]; grunt.initConfig({ diff --git a/lib/consts.js b/lib/consts.js index 8c3d333..4a286d4 100644 --- a/lib/consts.js +++ b/lib/consts.js @@ -13,6 +13,12 @@ var vclockOpts = { oldBound: 86400000000 }; +var dtableOpts = { + writeThreshold: 100, + autoSave: 180000, + fsyncInterval: 1000 +}; + var networkHost = os.hostname(); var networkPort = 7022; @@ -52,5 +58,11 @@ module.exports = { }), // vector clock defaults, in the event we want direct creation/manipulation of vector // clocks - vclockOpts: Object.freeze(_.cloneDeep(vclockOpts)) + vclockOpts: Object.freeze(_.cloneDeep(vclockOpts)), + dtableOpts: Object.freeze(dtableOpts), + dlmOpts: Object.freeze(_.extend({ + rquorum: 0.51, + wquorum: 0.51, + disk: false + }, dtableOpts)) }; diff --git a/lib/dtable.js b/lib/dtable.js new file mode 100644 index 0000000..48af88d --- /dev/null +++ b/lib/dtable.js @@ -0,0 +1,740 @@ +const _ = require("lodash"), + fs = require("fs"), + util = require("util"), + debug = require("debug")("clusterluck:lib:dtable"), + async = require("async"), + path = require("path"), + shortid = require("shortid"), + readline = require("readline"), + EventEmitter = require("events").EventEmitter; + +const utils = require("./utils"), + Queue = require("./queue"), + consts = require("./consts"); + +const defaults = consts.dtableOpts; + +class DTable extends EventEmitter { + /** + * + * @class DTable + * @memberof Clusterluck + * + * @param {Object} opts + * @param {String} opts.path + * @param {Number} [opts.writeThreshold] + * @param {Number} [opts.autoSave] + * @param {Number} [opts.fsyncInterval] + * + */ + constructor(opts = defaults) { + super(); + opts = _.defaults(_.clone(opts), defaults); + if (!util.isString(opts.path)) { + throw new Error("Missing 'path' option in options object."); + } + this._path = path.join(opts.path, "DATA.SNAP"); + this._aofPath = path.join(opts.path, "LATEST.LOG"); + this._tmpAOFPath = path.join(opts.path, "PREV.LOG"); + this._autoSave = opts.autoSave; + this._writeCount = 0; + this._writeThreshold = opts.writeThreshold; + this._table = new Map(); + this._idleTicks = 0; + this._idleTickInterval = 1000; + this._idleTickMax = this._autoSave/this._idleTickInterval; + this._fsyncInterval = opts.fsyncInterval; + this._queue = new Queue(); + this._flushing = false; + } + + /** + * + * @method start + * @memberof Clusterluck.DTable + * @instance + * + * @param {String} name + * + * @return {Clusterluck.DTable} This instance. + * + */ + start(name) { + this._name = name; + this._id = shortid.generate(); + this._setupDiskOps(); + return this; + } + + /** + * + * @method stop + * @memberof Clusterluck.DTable + * @instance + * + * @param {Function} cb + * + */ + stop(cb) { + if (this.idle() && this._fd) { + this.emit("stop"); + this._id = null; + return cb(); + } else if (this.idle() && !this._fd) { + return this.once("open", _.partial(this.stop, cb).bind(this)); + } + this.once("idle", _.partial(this.stop, cb).bind(this)); + } + + /** + * + * @method load + * @memberof Clusterluck.DTable + * @instance + * + * @param {Function} cb + * + */ + load(cb) { + async.series([ + _.partial(this._loadState).bind(this), + _.partial(this._loadAOF, this._tmpAOFPath).bind(this), + _.partial(this._loadAOF, this._aofPath).bind(this) + ], cb); + } + + /** + * + * @method idle + * @memberof Clusterluck.DTable + * @instance + * + * @return {Boolean} + * + */ + idle() { + return this._flushing === false; + } + + /** + * + * @method get + * @memberof Clusterluck.DTable + * @instance + * + * @param {String} key + * + * @return {Any} + * + */ + get(key) { + return this._table.get(key); + } + + /** + * + * @method smember + * @memberof Clusterluck.DTable + * @instance + * + * @param {String} key + * @param {String} val + * + * @return {Boolean} + * + */ + smember(key, val) { + const out = this._table.get(key) || new Set(); + if (!(out instanceof Set)) { + throw DTable.invalidTypeError("smember", key, typeof out); + } + return out.has(val); + } + + /** + * + * @method hget + * @memberof Clusterluck.DTable + * @instance + * + * @param {String} key + * @param {String} hkey + * + * @return {JSON} + * + */ + hget(key, hkey) { + var out = this._table.get(key) || new Map(); + if (!(out instanceof Map)) { + throw new DTable.invalidTypeError("hget", key, typeof out); + } + return out.get(hkey); + } + + /** + * + * @method set + * @memberof Clusterluck.DTable + * @instance + * + * @param {String} key + * @param {Any} val + * + * @return {Any} + * + */ + set(key, val) { + this._table.set(key, val); + this._writeToLog("set", key, val); + this._updateWriteCount(); + return val; + } + + /** + * + * @method sset + * @memberof Clusterluck.DTable + * @instance + * + * @param {String} key + * @param {String} val + * + * @return {String} + * + */ + sset(key, val) { + const out = this._table.get(key) || new Set(); + if (!(out instanceof Set)) { + throw DTable.invalidTypeError("sset", key, typeof out); + } + out.add(val); + this._table.set(key, out); + this._writeToLog("sset", key, val); + this._updateWriteCount(); + return out; + } + + /** + * + * @method hset + * @memberof Clusterluck.DTable + * @instance + * + * @param {String} key + * @param {String} hkey + * @param {JSON} val + * + * @return {JSON} + * + */ + hset(key, hkey, val) { + const out = this._table.get(key) || new Map(); + if (!(out instanceof Map)) { + throw DTable.invalidTypeError("hset", key, typeof out); + } + out.set(hkey, val); + this._table.set(key, out); + this._writeToLog("hset", key, hkey, val); + this._updateWriteCount(); + return out; + } + + /** + * + * @method del + * @memberof Clusterluck.DTable + * @instance + * + * @param {String} key + * + * @return {Clusterluck.DTable} + * + */ + del(key) { + this._table.delete(key); + this._writeToLog("del", key); + this._updateWriteCount(); + return this; + } + + /** + * + * @method sdel + * @memberof Clusterluck.DTable + * @instance + * + * @param {String} key + * @param {String} val + * + * @return {Clusterluck.DTable} + * + */ + sdel(key, val) { + const out = this._table.get(key) || new Set(); + if (!(out instanceof Set)) { + throw DTable.invalidTypeError("sdel", key, typeof out); + } + out.delete(val); + if (out.size === 0) this._table.delete(key); + this._writeToLog("sdel", key, val); + this._updateWriteCount(); + return this; + } + + /** + * + * @method hdel + * @memberof Clusterluck.DTable + * @instance + * + * @param {String} key + * @param {String} hkey + * + * @return {Clusterluck.DTable} + * + */ + hdel(key, hkey) { + const out = this._table.get(key) || new Map(); + if (!(out instanceof Map)) { + throw DTable.invalidTypeError("hdel", key, typeof out); + } + out.delete(hkey); + if (out.size === 0) this._table.delete(key); + this._writeToLog("hdel", key, hkey); + this._updateWriteCount(); + return this; + } + + /** + * + * @method forEach + * @memberof Clusterluck.DTable + * @instance + * + * @param {Function} cb + * @param {Function} fin + * + */ + forEach(cb, fin) { + const entries = this._table.entries(); + let done = false; + async.whilst(() => { + return done !== true; + }, (next) => { + const val = entries.next(); + if (val.done === true) { + done = true; + return next(); + } + cb(val.value[0], val.value[1], next); + }, fin); + } + + /** + * + * @method forEachSync + * @memberof Clusterluck.DTable + * @instance + * + * @param {Function} cb + * + * @return {Clusterluck.DTable} + * + */ + forEachSync(cb) { + this._table.forEach((val, key) => { + cb(key, val); + }); + return this; + } + + /** + * + * @method _setupDiskOps + * @memberof Clusterluck.DTable + * @instance + * @private + * + * @return {Clusterluck.DTable} + * + */ + _setupDiskOps() { + this._setupIdleFlushInterval(); + this._setupAOFSyncInterval(); + + this.once("stop", () => { + this._fd = null; + this._fstream.end(); + this._fstream = null; + clearInterval(this._idleInterval); + this._idleInterval = null; + clearInterval(this._syncInterval); + this._syncInterval = null; + }); + return this; + } + + /** + * + * @method _setupIdleFlushInterval + * @memberof Clusterluck.DTable + * @instance + * @private + * + * @return {Clusterluck.DTable} + * + */ + _setupIdleFlushInterval() { + this._idleInterval = setInterval(() => { + this._idleTicks++; + if (this._idleTicks >= this._idleTickMax) { + this._idleTicks = 0; + this._flush(); + } + }, this._idleTickInterval); + return this; + } + + /** + * + * @method _setupAOFSyncInterval + * @memberof Clusterluck.DTable + * @instance + * @private + * + * @return {Clusterluck.DTable} + * + */ + _setupAOFSyncInterval() { + this._fstream = fs.createWriteStream(this._aofPath, { + flags: "a" + }); + this._fstream.once("open", (fd) => { + this._fd = fd; + this._queue.flush().forEach((el) => { + const args = [el.op].concat(el.args); + this._writeToLog.apply(this, args); + }); + this.emit("open"); + }); + this._syncInterval = setInterval(() => { + if (!this._fd) return; + fs.fsync(this._fd, (err) => { + if (err) debug("failed to fsync underlying file descriptor for AOF file on table %s.", this._name); + }); + }, this._fsyncInterval); + return this; + } + + /** + * + * @method _updateWriteCount + * @memberof Clusterluck.DTable + * @instance + * @private + * + * @return {Clusterluck.DTable} + * + */ + _updateWriteCount() { + this._idleTicks = 0; + this._writeCount++; + if (this._writeCount >= this._writeThreshold) { + this._writeCount = 0; + this._flush(); + } + return this; + } + + /** + * + * @method _flush + * @memberof Clusterluck.DTable + * @instance + * @private + * + */ + _flush() { + if (this._flushing || !this._fd) return; + this._flushing = true; + clearInterval(this._syncInterval); + + var obj = utils.mapToObject(this._table); + this._fstream.once("close", () => { + async.series([ + _.partial(this._flushAOF).bind(this), + (next) => { + this._setupAOFSyncInterval(); + async.nextTick(next); + }, + _.partial(this._flushTable, obj).bind(this), + _.partial(fs.unlink, this._tmpAOFPath) + ], (err) => { + this._flushing = false; + this.emit("idle"); + }); + }); + this._fstream.end(); + this._fd = null; + this.emit("close"); + } + + /** + * + * @method _flushAOF + * @memberof Clusterluck.DTable + * @instance + * @private + * + * @param {Function} cb + * + */ + _flushAOF(cb) { + let called = false; + const wstream = fs.createWriteStream(this._tmpAOFPath, { + flags: "a" + }); + const rstream = fs.createReadStream(this._aofPath); + rstream.pipe(wstream); + wstream.on("close", () => { + if (called) return; + called = true; + fs.unlink(this._aofPath, cb); + }); + wstream.on("error", (err) => { + if (called) return; + called = true; + rstream.resume(); + return cb(err); + }); + } + + /** + * + * @method _flushTable + * @memberof Clusterluck.DTable + * @instance + * @private + * + * @param {Object} obj + * @param {Function} cb + * + */ + _flushTable(obj, cb) { + let called = false; + const wstream = fs.createWriteStream(this._path); + wstream.on("error", (err) => { + if (called) return; + called = true; + wstream.end(); + return cb(err); + }); + wstream.on("close", () => { + if (called) return; + called = true; + return cb(); + }); + async.eachLimit(Object.keys(obj), 4, (key, next) => { + if (called) return next("Closed prematurely."); + const val = obj[key]; + wstream.write(JSON.stringify({ + key: key, + value: DTable.encodeValue(val) + }) + "\n"); + async.nextTick(next); + }, (err) => { + if (called) return; + wstream.end(); + }); + } + + /** + * + * @method _writeToLog + * @memberof Clusterluck.DTable + * @instance + * @private + * + * @param {String} op + * + * @return {Clusterluck.DTable} + * + */ + _writeToLog(op, ...args) { + if (!this._fd) { + this._queue.enqueue({op: op, args: args}); + return; + } + args = args.map((val) => { + return DTable.encodeValue(val); + }); + const out = JSON.stringify({ + op: op, + args: args + }) + "\n"; + this._fstream.write(out); + return this; + } + + /** + * + * @method _loadState + * @memberof Clusterluck.DTable + * @instance + * @private + * + * @param {Function} cb + * + */ + _loadState(cb) { + fs.stat(this._path, (err) => { + if (err && err.code !== "ENOENT") return cb(err); + else if (err && err.code === "ENOENT") return cb(); + const rstream = fs.createReadStream(this._path); + const rline = readline.createInterface({ + input: rstream + }); + let called = false; + rline.once("close", () => { + if (called) return; + called = true; + return cb(); + }); + rstream.on("error", (err) => { + if (called) return; + called = true; + rstream.resume(); + rline.close(); + return cb(err); + }); + rline.on("line", (line) => { + const lineObj = JSON.parse(line); + const val = DTable.decodeValue(lineObj.value); + this._table.set(lineObj.key, val); + }); + }); + } + + /** + * + * @method _loadAOF + * @memberof Clusterluck.DTable + * @instance + * @private + * + * @param {String} path + * @param {Function} cb + * + */ + _loadAOF(path, cb) { + fs.stat(path, (err) => { + if (err && err.code !== "ENOENT") return cb(err); + else if (err && err.code === "ENOENT") return cb(); + const rstream = fs.createReadStream(path); + const rline = readline.createInterface({ + input: rstream + }); + let called = false; + rline.once("close", () => { + if (called) return; + called = true; + return cb(); + }); + rstream.on("error", (err) => { + if (called) return; + called = true; + rstream.resume(); + rline.close(); + return cb(err); + }); + rline.on("line", (line) => { + const lineObj = JSON.parse(line); + lineObj.args = lineObj.args.map((val) => { + return DTable.decodeValue(val); + }); + + try { + this._table[lineObj.op].apply(this, lineObj.args); + } catch (e) { + debug("failed to complete operation %s with args " + JSON.stringify(lineObj.args) + " from %s", lineObj.op, path); + } + }); + }); + } + + /** + * + * @method invalidTypeError + * @memberof Clusterluck.DTable + * @static + * + * @param {String} command + * @param {String} key + * @param {String} type + * + * @return {Error} + * + */ + static invalidTypeError(command, key, type) { + let msg = "Invalid command '" + command + "' against key '" + key + "'"; + msg += " of type '" + type +"'"; + return _.extend(new Error(msg), { + type: "INVALID_TYPE" + }); + } + + /** + * + * @method encodeValue + * @memberof Clusterluck.DTable + * @static + * + * @param {Map|Set|JSON} value + * + * @return {Object} + * + */ + static encodeValue(value) { + if (value instanceof Set) { + return { + type: "Set", + data: utils.setToList(value) + }; + } else if (value instanceof Map) { + return { + type: "Map", + data: utils.mapToList(value) + }; + } else { + return { + type: typeof value, + data: value + }; + } + } + + /** + * + * @method decodeValue + * @memberof Clusterluck.DTable + * @static + * + * @param {Object} value + * @param {String} value.type + * @param {JSON} value.data + * + * @return {Map|Set|JSON} + * + */ + static decodeValue(value) { + if (value.type === "Set") { + return new Set(value.data); + } else if (value.type === "Map") { + return new Map(value.data); + } else { + return value.data; + } + } +} + +module.exports = DTable; diff --git a/lib/utils.js b/lib/utils.js index 37e34f5..cd24c63 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -101,6 +101,30 @@ var utils = { if (out instanceof Error) return out; data.nodes = out; return data; + }, + + mapToObject: (data) => { + var obj = {}; + data.forEach((val, key) => { + obj[key] = val; + }); + return obj; + }, + + setToList: (data) => { + var memo = []; + data.forEach((val) => { + memo.push(val); + }); + return memo; + }, + + mapToList: (data) => { + var memo = []; + data.forEach((val, key) => { + memo.push([key, val]); + }); + return memo; } }; diff --git a/test/unit/dtable.js b/test/unit/dtable.js new file mode 100644 index 0000000..f61ac13 --- /dev/null +++ b/test/unit/dtable.js @@ -0,0 +1,570 @@ +var _ = require("lodash"), + async = require("async"), + sinon = require("sinon"), + stream = require("stream"), + fs = require("fs"), + assert = require("chai").assert; + +module.exports = function (mocks, lib) { + var DTable = lib.dtable; + var consts = lib.consts; + + describe("DTable unit tests", function () { + var dtable; + beforeEach(function (done) { + dtable = new DTable({path: "./data"}); + async.nextTick(done); + }); + + it("Should construct a DTabel", function () { + assert.equal(dtable._autoSave, consts.dtableOpts.autoSave); + assert.equal(dtable._writeCount, 0); + assert.equal(dtable._writeThreshold, consts.dtableOpts.writeThreshold); + assert.deepEqual(dtable._table, new Map()); + assert.equal(dtable._idleTicks, 0); + assert.equal(dtable._idleTickMax, consts.dtableOpts.autoSave/1000); + assert.equal(dtable._fsyncInterval, consts.dtableOpts.fsyncInterval); + assert.equal(dtable._queue.size(), 0); + + var out; + try { + dtable = new DTable(); + } catch (e) { + out = e; + } + assert.ok(out instanceof Error); + }); + + it("Should start dtable instance", function (done) { + dtable.start("foo"); + dtable.once("open", () => { + assert.isString(dtable._id); + assert.isNumber(dtable._fd); + assert.ok(dtable._idleInterval); + assert.ok(dtable._syncInterval); + assert.ok(dtable._fstream); + done(); + }); + }); + + it("Should stop dtable instance", function (done) { + dtable.start("foo"); + dtable.stop(() => { + assert.equal(dtable._id, null); + assert.equal(dtable._fstream, null); + assert.equal(dtable._syncInterval, null); + assert.equal(dtable._idleInterval, null); + done(); + }); + }); + + it("Should stop dtable instance once fd open", function (done) { + dtable.start("foo"); + dtable.once("open", () => { + dtable.stop(() => { + assert.equal(dtable._id, null); + assert.equal(dtable._fstream, null); + assert.equal(dtable._syncInterval, null); + assert.equal(dtable._idleInterval, null); + done(); + }); + }); + }); + + it("Should stop dtable instance, wait for idle", function (done) { + dtable.start("foo"); + dtable.once("open", () => { + dtable._flushing = true; + dtable.stop(() => { + assert.equal(dtable._id, null); + assert.equal(dtable._fstream, null); + assert.equal(dtable._syncInterval, null); + assert.equal(dtable._idleInterval, null); + done(); + }); + dtable._flushing = false; + dtable.emit("idle"); + }); + }); + + it("Should load dtable instance", function (done) { + dtable.load((err) => { + assert.notOk(err); + done(); + }); + }); + + it("Should check if dtable instance is idle", function () { + assert.equal(dtable.idle(), true); + dtable._flushing = true; + assert.equal(dtable.idle(), false); + dtable._flushing = false; + }); + + it("Should get value in table", function () { + dtable._table.set("key", "val"); + assert.equal(dtable.get("key"), "val"); + }); + + it("Should smember value in table", function () { + dtable.sset("key", "val"); + assert.equal(dtable.smember("key", "val"), true); + + dtable.set("key", "val"); + assert.throws(_.partial(dtable.smember, "key", "val").bind(dtable)); + }); + + it("Should hget value in table", function () { + dtable.hset("key", "hkey", "val"); + assert.equal(dtable.hget("key", "hkey"), "val"); + + dtable.set("key", "val"); + assert.throws(_.partial(dtable.hget, "key", "hkey").bind(dtable)); + }); + + it("Should set value in table", function () { + dtable.set("key", "val"); + assert.equal(dtable.get("key"), "val"); + }); + + it("Should sset value in table", function () { + dtable.sset("key", "val"); + assert.equal(dtable.smember("key", "val"), true); + + dtable.set("key", "val"); + assert.throws(_.partial(dtable.sset, "key", "val").bind(dtable)); + }); + + it("Should hset value in table", function () { + dtable.hset("key", "hkey", "val"); + assert.equal(dtable.hget("key", "hkey"), "val"); + + dtable.set("key", "val"); + assert.throws(_.partial(dtable.hset, "key", "hkey", "val").bind(dtable)); + }); + + it("Should del value in table", function () { + dtable.set("key", "val"); + dtable.del("key"); + assert.notOk(dtable.get("key")); + }); + + it("Should sdel value in table", function () { + dtable.sset("key", "val"); + dtable.sset("key", "val2"); + dtable.sdel("key", "val"); + assert.equal(dtable.smember("key", "val"), false); + assert.equal(dtable.smember("key", "val2"), true); + dtable.sdel("key", "val2"); + assert.equal(dtable.smember("key", "val2"), false); + + dtable.set("key", "val"); + assert.throws(_.partial(dtable.sdel, "key", "val").bind(dtable)); + }); + + it("Should hdel value in table", function () { + dtable.hset("key", "hkey", "val"); + dtable.hset("key", "hkey2", "val2"); + dtable.hdel("key", "hkey"); + assert.notOk(dtable.hget("key", "hkey")); + assert.equal(dtable.hget("key", "hkey2"), "val2"); + dtable.hdel("key", "hkey2"); + assert.notOk(dtable.hget("key", "hkey2")); + + dtable.set("key", "val"); + assert.throws(_.partial(dtable.hdel, "key", "hkey").bind(dtable)); + }); + + it("Should run an async forEach over table", function (done) { + dtable.set("key", "val"); + dtable.set("key2", "val2"); + const memo = {}; + dtable.forEach((key, val, next) => { + memo[key] = val; + next(); + }, (err) => { + assert.notOk(err); + assert.deepEqual(memo, { + key: "val", + key2: "val2" + }); + done(); + }); + }); + + it("Should run sync forEach over table", function () { + dtable.set("key", "val"); + dtable.set("key2", "val2"); + const memo = {}; + dtable.forEachSync((key, val) => { + memo[key] = val; + }); + assert.deepEqual(memo, { + key: "val", + key2: "val2" + }); + }); + + it("Should setup idle flush interval", function (done) { + dtable._idleTickInterval = 1; + dtable._setupIdleFlushInterval(); + setTimeout(() => { + assert.equal(dtable._idleTicks, 1); + clearInterval(dtable._idleInterval); + done(); + }, 1); + }); + + it("Should setup idle flush interval, execute flush", function (done) { + dtable._idleTickInterval = 1; + dtable._idleTickMax = 1; + sinon.stub(dtable, "_flush"); + dtable._setupIdleFlushInterval(); + setTimeout(() => { + assert.equal(dtable._idleTicks, 0); + assert.ok(dtable._flush.called); + dtable._flush.restore(); + dtable._fd = null; + clearInterval(dtable._idleInterval); + async.nextTick(done); + }, 1); + }); + + it("Should setup AOF sync interval", function (done) { + sinon.stub(fs, "createWriteStream", () => { + var pstream = new stream.PassThrough(); + setTimeout(() => { + pstream.emit("open", 1); + }, 2); + return pstream; + }); + sinon.stub(fs, "fsync", (fd, cb) => { + return cb(); + }); + dtable._fsyncInterval = 1; + dtable._setupAOFSyncInterval(); + assert.ok(dtable._fstream); + assert.ok(dtable._syncInterval); + dtable._fstream.once("open", (fd) => { + assert.equal(dtable._fd, fd); + setTimeout(() => { + assert.ok(fs.fsync.called); + clearInterval(dtable._syncInterval); + fs.createWriteStream.restore(); + fs.fsync.restore(); + done(); + }, 1); + }); + }); + + it("Should update write count", function () { + dtable._updateWriteCount(); + assert.equal(dtable._idleTicks, 0); + assert.equal(dtable._writeCount, 1); + dtable._writeThreshold = 2; + sinon.stub(dtable, "_flush"); + dtable._updateWriteCount(); + assert.equal(dtable._writeCount, 0); + assert.ok(dtable._flush.called); + dtable._flush.restore(); + }); + + it("Should flush state to disk", function (done) { + dtable._setupAOFSyncInterval(); + sinon.stub(dtable, "_setupAOFSyncInterval"); + + dtable._flush(); + assert.notOk(dtable._setupAOFSyncInterval.called); + + dtable._flushing = true; + dtable._flush(); + assert.notOk(dtable._setupAOFSyncInterval.called); + dtable._flushing = false; + + dtable.once("open", () => { + dtable.once("idle", () => { + assert.ok(dtable._setupAOFSyncInterval); + dtable._setupAOFSyncInterval.restore(); + fs.unlink(dtable._path, _.partial(async.nextTick, done)); + }); + dtable.once("close", () => { + assert.equal(dtable._fd, null); + }); + dtable._flush(); + }); + }); + + it("Should flush AOF files to disk", function (done) { + fs.writeFile(dtable._aofPath, "", (err) => { + assert.notOk(err); + dtable._flushAOF((err) => { + assert.notOk(err); + fs.unlink(dtable._tmpAOFPath, done); + }); + }); + }); + + it("Should fail flushing AOF files to disk", function (done) { + fs.writeFile(dtable._aofPath, "", (err) => { + assert.notOk(err); + sinon.stub(fs, "createWriteStream", () => { + var pstream = new stream.PassThrough(); + async.nextTick(() => { + pstream.emit("error", new Error("foo")); + }); + return pstream; + }); + dtable._flushAOF((err) => { + assert.ok(err); + fs.createWriteStream.restore(); + fs.unlink(dtable._aofPath, _.partial(async.nextTick, done)); + }); + }); + }); + + it("Should flush snapshot of table to disk", function (done) { + var out = new Map(); + sinon.stub(fs, "createWriteStream", () => { + var pstream = new stream.PassThrough(); + pstream.on("data", (data) => { + data = JSON.parse(data); + out.set(data.key, DTable.decodeValue(data.value)); + }); + pstream.once("end", () => { + pstream.emit("close"); + }); + return pstream; + }); + dtable._flushTable({ + foo: "bar" + }, (err) => { + assert.notOk(err); + assert.deepEqual(out, dtable._table); + fs.createWriteStream.restore(); + async.nextTick(done); + }); + }); + + it("Should fail flushing snapshot of table to disk", function (done) { + sinon.stub(fs, "createWriteStream", () => { + var pstream = new stream.PassThrough(); + async.nextTick(() => { + pstream.emit("error", new Error("foo")); + }); + return pstream; + }); + dtable._flushTable({ + foo: "bar" + }, (err) => { + assert.ok(err); + fs.createWriteStream.restore(); + async.nextTick(done); + }); + }); + + it("Should write action to log", function (done) { + dtable._writeToLog("command", "foo", "bar", "baz"); + assert.deepEqual(dtable._queue.dequeue(), {op: "command", args: ["foo", "bar", "baz"]}); + + dtable._fd = 1; + dtable._fstream = new stream.PassThrough(); + dtable._fstream.once("data", (data) => { + data = JSON.parse(data); + assert.deepEqual(data, { + op: "command", + args: [ + {type: "string", data: "foo"}, + {type: "string", data: "bar"}, + {type: "string", data: "baz"} + ] + }); + async.nextTick(done); + }); + dtable._writeToLog("command", "foo", "bar", "baz"); + }); + + it("Should fail to load snapshot if disk read fails", function (done) { + sinon.stub(fs, "stat", (path, cb) => { + async.nextTick(() => { + cb(new Error("foo")); + }); + }); + dtable._loadState((err) => { + assert.ok(err); + fs.stat.restore(); + done(); + }); + }); + + it("Should skip loading snapshot if file doesn't exist", function (done) { + sinon.stub(fs, "stat", (path, cb) => { + async.nextTick(_.partial(cb, _.extend(new Error("foo"), {code: "ENOENT"}))); + }); + dtable._loadState((err) => { + assert.notOk(err); + assert.equal(dtable._table.size, 0); + fs.stat.restore(); + done(); + }); + }); + + it("Should load snapshot from disk", function (done) { + sinon.stub(fs, "stat", (path, cb) => { + async.nextTick(cb); + }); + sinon.stub(fs, "createReadStream", () => { + var pstream = new stream.PassThrough(); + async.nextTick(() => { + pstream.write(JSON.stringify({ + key: "key", + value: {type: "string", data: "val"} + }) + "\n"); + pstream.end(); + }); + return pstream; + }); + dtable._loadState((err) => { + assert.notOk(err); + assert.deepEqual(dtable._table, new Map([["key", "val"]])); + fs.stat.restore(); + fs.createReadStream.restore(); + done(); + }); + }); + + it("Should fail loading snapshot from disk if rstream emits error", function (done) { + sinon.stub(fs, "stat", (path, cb) => { + async.nextTick(cb); + }); + sinon.stub(fs, "createReadStream", () => { + var pstream = new stream.PassThrough(); + async.nextTick(() => { + pstream.emit("error", new Error("foo")); + }); + return pstream; + }); + dtable._loadState((err) => { + assert.ok(err); + fs.stat.restore(); + fs.createReadStream.restore(); + done(); + }); + }); + + it("Should fail to load AOF if disk read fails", function (done) { + sinon.stub(fs, "stat", (path, cb) => { + async.nextTick(() => { + cb(new Error("foo")); + }); + }); + dtable._loadAOF(dtable._aofPath, (err) => { + assert.ok(err); + fs.stat.restore(); + done(); + }); + }); + + it("Should skip loading AOF if file doesn't exist", function (done) { + sinon.stub(fs, "stat", (path, cb) => { + async.nextTick(_.partial(cb, _.extend(new Error("foo"), {code: "ENOENT"}))); + }); + dtable._loadAOF(dtable._aofPath, (err) => { + assert.notOk(err); + assert.equal(dtable._table.size, 0); + fs.stat.restore(); + done(); + }); + }); + + it("Should load AOF from disk", function (done) { + sinon.stub(fs, "stat", (path, cb) => { + async.nextTick(cb); + }); + sinon.stub(fs, "createReadStream", () => { + var pstream = new stream.PassThrough(); + async.nextTick(() => { + pstream.write(JSON.stringify({ + op: "set", + args: [{type: "string", data: "key"}, {type: "string", data: "val"}] + }) + "\n"); + pstream.end(); + }); + return pstream; + }); + dtable._loadAOF(dtable._aofPath, (err) => { + assert.notOk(err); + assert.deepEqual(dtable._table, new Map([["key", "val"]])); + fs.stat.restore(); + fs.createReadStream.restore(); + done(); + }); + }); + + it("Should fail loading AOF from disk if rstream emits error", function (done) { + sinon.stub(fs, "stat", (path, cb) => { + async.nextTick(cb); + }); + sinon.stub(fs, "createReadStream", () => { + var pstream = new stream.PassThrough(); + async.nextTick(() => { + pstream.emit("error", new Error("foo")); + }); + return pstream; + }); + dtable._loadAOF(dtable._aofPath, (err) => { + assert.ok(err); + fs.stat.restore(); + fs.createReadStream.restore(); + done(); + }); + }); + }); + + describe("DTable static unit tests", function () { + it("Should return invalid type error", function () { + var error = DTable.invalidTypeError("command", "key", "type"); + assert.ok(error instanceof Error); + assert.equal(error.type, "INVALID_TYPE"); + }); + + it("Should encode value", function () { + var out = DTable.encodeValue(new Set(["val", "val2"])); + assert.deepEqual(out, { + type: "Set", + data: ["val", "val2"] + }); + + out = DTable.encodeValue(new Map([["key", "val"]])); + assert.deepEqual(out, { + type: "Map", + data: [["key", "val"]] + }); + + out = DTable.encodeValue("foobar"); + assert.deepEqual(out, { + type: "string", + data: "foobar" + }); + }); + + it("Should decode value", function () { + var out = DTable.decodeValue({ + type: "Set", + data: ["val", "val2"] + }); + assert.deepEqual(out, new Set(["val", "val2"])); + + out = DTable.decodeValue({ + type: "Map", + data: [["key", "val"]] + }); + assert.deepEqual(out, new Map([["key", "val"]])); + + out = DTable.decodeValue({ + type: "object", + data: {} + }); + assert.deepEqual(out, {}); + }); + }); +}; diff --git a/test/unit/index.js b/test/unit/index.js index e990852..d20ff04 100644 --- a/test/unit/index.js +++ b/test/unit/index.js @@ -11,5 +11,6 @@ module.exports = function (mocks, lib) { require("./command_server")(mocks, lib); require("./utils")(mocks, lib); require("./cluster_node")(mocks, lib); + require("./dtable")(mocks, lib); }); }; diff --git a/test/unit/utils.js b/test/unit/utils.js index be06f90..a7464a4 100644 --- a/test/unit/utils.js +++ b/test/unit/utils.js @@ -132,5 +132,29 @@ module.exports = function (mocks, lib) { var out = utils.parseNodeList({nodes: ""}); assert.ok(out instanceof Error); }); + + it("Should transform map to object", function () { + var list = [["key", "val"], ["key2", "val2"]]; + var map = new Map(list); + var out = utils.mapToObject(map); + assert.deepEqual(out, { + key: "val", + key2: "val2" + }); + }); + + it("Should transform set to list", function () { + var list = ["val", "val2"]; + var set = new Set(list); + var out = utils.setToList(set); + assert.deepEqual(out, list); + }); + + it("Should transform map to list", function () { + var list = [["key", "val"], ["key2", "val2"]]; + var map = new Map(list); + var out = utils.mapToList(map); + assert.deepEqual(out, list); + }); }); }; From 8cbe837f4793a8c1af48f7e6e3183dc4c3da6693 Mon Sep 17 00:00:00 2001 From: Kevin Wilson Date: Thu, 8 Jun 2017 17:35:34 -0700 Subject: [PATCH 5/6] add more documentation to dtable class --- lib/consts.js | 2 + lib/dtable.js | 108 +++++++++++++++++++++++++++++++++++--------------- 2 files changed, 77 insertions(+), 33 deletions(-) diff --git a/lib/consts.js b/lib/consts.js index 4a286d4..33f46db 100644 --- a/lib/consts.js +++ b/lib/consts.js @@ -63,6 +63,8 @@ module.exports = { dlmOpts: Object.freeze(_.extend({ rquorum: 0.51, wquorum: 0.51, + minWaitTimeout: 10, + maxWaitTimeout: 100, disk: false }, dtableOpts)) }; diff --git a/lib/dtable.js b/lib/dtable.js index 48af88d..00bfc1f 100644 --- a/lib/dtable.js +++ b/lib/dtable.js @@ -16,6 +16,15 @@ const defaults = consts.dtableOpts; class DTable extends EventEmitter { /** + * + * In-memory key/value storage. Uses a combination of an AOF file and storage snapshot for persistence. For every "write" action, a corresponding write to the AOF file LATEST.LOG occurs. On an interval, this file is fsynced to disk (by default, 1000 milliseconds). In addition, a storage snapshot is kept and rewritten based on usage of the table. If the table remains inactive for a configurable amount of time (default 180000 milliseconds) or receives a configurable number of write operations before this idle time occurs (default 100 writes), the storage snapshot is resaved based on current state at that point in time. The algorithm is as follows: + * - Close the file descriptor on the AOF file LATEST.LOG + * - Append contents of LATEST.LOG (AOF file) to PREV.LOG (backup AOF file) + * - Delete LATEST.LOG and reopen with a new file descriptor + * - Dump state of table to DATA.SNAP (storage snapshot) + * - Delete PREV.LOG + * + * In the time that the AOF file LATEST.LOG is closed, log writes are stored in an internal queue. Once the file descriptor is reopened, the queue is flushed. * * @class DTable * @memberof Clusterluck @@ -49,12 +58,14 @@ class DTable extends EventEmitter { } /** + * + * Starts dtable instance, which triggers an fopen call to LATEST.LOG, the fsync interval for this log file, as well as other internal intervals to check for storage snapshot flush conditions. * * @method start * @memberof Clusterluck.DTable * @instance * - * @param {String} name + * @param {String} name - Name of table, meant for debugging purposes. * * @return {Clusterluck.DTable} This instance. * @@ -67,12 +78,14 @@ class DTable extends EventEmitter { } /** + * + * Stops the table, including all internal disk-based logic. If the table is idle and has an open file descriptor against LATEST.LOG, it will close immediately. If it's idle but the file descriptor against LATEST.LOG has been closed, this call will wait for a file descriptor to open again before continuing. Otherwise, the table isn't idle and therefore we wait for this condition. * * @method stop * @memberof Clusterluck.DTable * @instance * - * @param {Function} cb + * @param {Function} cb - Callback called once the table has been stopped. * */ stop(cb) { @@ -87,12 +100,17 @@ class DTable extends EventEmitter { } /** + * + * Loads state from disk. The algorithm is as follows: + * - DATA.SNAP is loaded and read into the current state of table. + * - PREV.LOG is loaded and rerun against the current state. + * - LATEST.LOG is loaded and also rerun against the current state. * * @method load * @memberof Clusterluck.DTable * @instance * - * @param {Function} cb + * @param {Function} cb - Callback called once the snapshot and AOF files have been loaded and rerun. * */ load(cb) { @@ -104,12 +122,14 @@ class DTable extends EventEmitter { } /** + * + * Returns whether this table is in an idle state or not. * * @method idle * @memberof Clusterluck.DTable * @instance * - * @return {Boolean} + * @return {Boolean} Whether this table is idle or not. * */ idle() { @@ -117,14 +137,16 @@ class DTable extends EventEmitter { } /** + * + * Retrieves value stored at `key`, returning `undefined` if no such data exists. * * @method get * @memberof Clusterluck.DTable * @instance * - * @param {String} key + * @param {String} key - Key to retrieve data from. * - * @return {Any} + * @return {Map|Set|JSON} Value stored at `key`. * */ get(key) { @@ -132,15 +154,17 @@ class DTable extends EventEmitter { } /** + * + * Returns whether `val` is a member of the set stored at `key`. * * @method smember * @memberof Clusterluck.DTable * @instance * - * @param {String} key - * @param {String} val + * @param {String} key - Key to retrieve set from. + * @param {String} val - Value to check existence of in the set. * - * @return {Boolean} + * @return {Boolean} Whether `val` is a member of the set stored at `key`. * */ smember(key, val) { @@ -152,15 +176,17 @@ class DTable extends EventEmitter { } /** + * + * Retrieves value stored at hash key `hkey` under storage key `key`, returning `undefined` if no such data exists. * * @method hget * @memberof Clusterluck.DTable * @instance * - * @param {String} key - * @param {String} hkey + * @param {String} key - Key to retrieve hash map from. + * @param {String} hkey - Hash key to retrieve data from. * - * @return {JSON} + * @return {JSON} - Value stored under hash key `hkey` at the hash map stored under `key`. * */ hget(key, hkey) { @@ -172,15 +198,17 @@ class DTable extends EventEmitter { } /** + * + * Sets value `value` under key `key`. * * @method set * @memberof Clusterluck.DTable * @instance * * @param {String} key - * @param {Any} val + * @param {Map|Set|JSON} val * - * @return {Any} + * @return {Map|Set|JSON} * */ set(key, val) { @@ -191,15 +219,17 @@ class DTable extends EventEmitter { } /** + * + * Inserts `val` into the set stored at key `key`. * * @method sset * @memberof Clusterluck.DTable * @instance * - * @param {String} key - * @param {String} val + * @param {String} key - Key which holds the set to insert `val` under. + * @param {String} val - Value to insert into the set. * - * @return {String} + * @return {String} The set stored at `key`. * */ sset(key, val) { @@ -215,16 +245,18 @@ class DTable extends EventEmitter { } /** + * + * Sets `value` under the hash key `hkey` in the hash map stored at `key`. * * @method hset * @memberof Clusterluck.DTable * @instance * - * @param {String} key - * @param {String} hkey - * @param {JSON} val + * @param {String} key - Key which holds the hash map. + * @param {String} hkey - Hash key to insert `val` under. + * @param {JSON} val - Value to set under `hkey` in the hash map. * - * @return {JSON} + * @return {Map} The map stored at `key`. * */ hset(key, hkey, val) { @@ -240,14 +272,16 @@ class DTable extends EventEmitter { } /** + * + * Removes key `key` from this table. * * @method del * @memberof Clusterluck.DTable * @instance * - * @param {String} key + * @param {String} key - Key to remove from this table. * - * @return {Clusterluck.DTable} + * @return {Clusterluck.DTable} This instance. * */ del(key) { @@ -258,15 +292,17 @@ class DTable extends EventEmitter { } /** + * + * Deletes `val` from the set stored under `key`. * * @method sdel * @memberof Clusterluck.DTable * @instance * - * @param {String} key - * @param {String} val + * @param {String} key - Key which olds the set to remove `val` from. + * @param {String} val - Value to remove from the set. * - * @return {Clusterluck.DTable} + * @return {Clusterluck.DTable} This instance. * */ sdel(key, val) { @@ -282,15 +318,17 @@ class DTable extends EventEmitter { } /** + * + * Removes the hash key `hkey` from the hash map stored under `key`. * * @method hdel * @memberof Clusterluck.DTable * @instance * - * @param {String} key - * @param {String} hkey + * @param {String} key - Key which holds the hash map that `hkey` will be removed from. + * @param {String} hkey - The hash key to remove from the hash map. * - * @return {Clusterluck.DTable} + * @return {Clusterluck.DTable} This instance. * */ hdel(key, hkey) { @@ -306,13 +344,15 @@ class DTable extends EventEmitter { } /** + * + * Asynchronously iterates over each key/value pair stored in this table at the point of this call. * * @method forEach * @memberof Clusterluck.DTable * @instance * - * @param {Function} cb - * @param {Function} fin + * @param {Function} cb - Function to call on each key/value pair. Has the signature `function (key, val, next) {...}`. + * @param {Function} fin -Finishing callback to call once iteration has completed. Hash the signature `function (err) {...}`, where `err` is populated if passed into the `next` callback at any point of iteration.. * */ forEach(cb, fin) { @@ -331,14 +371,16 @@ class DTable extends EventEmitter { } /** + * + * Synchronously iterates over each key/value pair stored in this table. * * @method forEachSync * @memberof Clusterluck.DTable * @instance * - * @param {Function} cb + * @param {Function} cb - Function call on each key/value pair. Has the signature `function (key, val) {...}`. * - * @return {Clusterluck.DTable} + * @return {Clusterluck.DTable} This instance. * */ forEachSync(cb) { From b20f771bd3d63648af8462ec05909fbad44f88d4 Mon Sep 17 00:00:00 2001 From: Kevin Wilson Date: Fri, 9 Jun 2017 00:04:15 -0700 Subject: [PATCH 6/6] keep data directory for tests --- data/.gitkeep | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 data/.gitkeep diff --git a/data/.gitkeep b/data/.gitkeep new file mode 100644 index 0000000..e69de29