From e6a03d95d9c70c186b5151b48a139673bf8f6ded Mon Sep 17 00:00:00 2001 From: Kevin Wilson Date: Thu, 1 Jun 2017 21:29:27 -0700 Subject: [PATCH] add disk-based table using AOF+snapshot persistence model (for future feature sets involving disk writes), tests, docs --- Gruntfile.js | 1 + lib/consts.js | 14 +- lib/dtable.js | 740 ++++++++++++++++++++++++++++++++++++++++++++ lib/utils.js | 24 ++ test/unit/dtable.js | 570 ++++++++++++++++++++++++++++++++++ test/unit/index.js | 1 + test/unit/utils.js | 24 ++ 7 files changed, 1373 insertions(+), 1 deletion(-) create mode 100644 lib/dtable.js create mode 100644 test/unit/dtable.js diff --git a/Gruntfile.js b/Gruntfile.js index 2db6bb8..40fdbc0 100644 --- a/Gruntfile.js +++ b/Gruntfile.js @@ -12,6 +12,7 @@ module.exports = function (grunt) { "lib/kernel.js", "lib/node.js", "lib/vclock.js", + "lib/dtable.js" ]; grunt.initConfig({ diff --git a/lib/consts.js b/lib/consts.js index 8c3d333..4a286d4 100644 --- a/lib/consts.js +++ b/lib/consts.js @@ -13,6 +13,12 @@ var vclockOpts = { oldBound: 86400000000 }; +var dtableOpts = { + writeThreshold: 100, + autoSave: 180000, + fsyncInterval: 1000 +}; + var networkHost = os.hostname(); var networkPort = 7022; @@ -52,5 +58,11 @@ module.exports = { }), // vector clock defaults, in the event we want direct creation/manipulation of vector // clocks - vclockOpts: Object.freeze(_.cloneDeep(vclockOpts)) + vclockOpts: Object.freeze(_.cloneDeep(vclockOpts)), + dtableOpts: Object.freeze(dtableOpts), + dlmOpts: Object.freeze(_.extend({ + rquorum: 0.51, + wquorum: 0.51, + disk: false + }, dtableOpts)) }; diff --git a/lib/dtable.js b/lib/dtable.js new file mode 100644 index 0000000..48af88d --- /dev/null +++ b/lib/dtable.js @@ -0,0 +1,740 @@ +const _ = require("lodash"), + fs = require("fs"), + util = require("util"), + debug = require("debug")("clusterluck:lib:dtable"), + async = require("async"), + path = require("path"), + shortid = require("shortid"), + readline = require("readline"), + EventEmitter = require("events").EventEmitter; + +const utils = require("./utils"), + Queue = require("./queue"), + consts = require("./consts"); + +const defaults = consts.dtableOpts; + +class DTable extends EventEmitter { + /** + * + * @class DTable + * @memberof Clusterluck + * + * @param {Object} opts + * @param {String} opts.path + * @param {Number} [opts.writeThreshold] + * @param {Number} [opts.autoSave] + * @param {Number} [opts.fsyncInterval] + * + */ + constructor(opts = defaults) { + super(); + opts = _.defaults(_.clone(opts), defaults); + if (!util.isString(opts.path)) { + throw new Error("Missing 'path' option in options object."); + } + this._path = path.join(opts.path, "DATA.SNAP"); + this._aofPath = path.join(opts.path, "LATEST.LOG"); + this._tmpAOFPath = path.join(opts.path, "PREV.LOG"); + this._autoSave = opts.autoSave; + this._writeCount = 0; + this._writeThreshold = opts.writeThreshold; + this._table = new Map(); + this._idleTicks = 0; + this._idleTickInterval = 1000; + this._idleTickMax = this._autoSave/this._idleTickInterval; + this._fsyncInterval = opts.fsyncInterval; + this._queue = new Queue(); + this._flushing = false; + } + + /** + * + * @method start + * @memberof Clusterluck.DTable + * @instance + * + * @param {String} name + * + * @return {Clusterluck.DTable} This instance. + * + */ + start(name) { + this._name = name; + this._id = shortid.generate(); + this._setupDiskOps(); + return this; + } + + /** + * + * @method stop + * @memberof Clusterluck.DTable + * @instance + * + * @param {Function} cb + * + */ + stop(cb) { + if (this.idle() && this._fd) { + this.emit("stop"); + this._id = null; + return cb(); + } else if (this.idle() && !this._fd) { + return this.once("open", _.partial(this.stop, cb).bind(this)); + } + this.once("idle", _.partial(this.stop, cb).bind(this)); + } + + /** + * + * @method load + * @memberof Clusterluck.DTable + * @instance + * + * @param {Function} cb + * + */ + load(cb) { + async.series([ + _.partial(this._loadState).bind(this), + _.partial(this._loadAOF, this._tmpAOFPath).bind(this), + _.partial(this._loadAOF, this._aofPath).bind(this) + ], cb); + } + + /** + * + * @method idle + * @memberof Clusterluck.DTable + * @instance + * + * @return {Boolean} + * + */ + idle() { + return this._flushing === false; + } + + /** + * + * @method get + * @memberof Clusterluck.DTable + * @instance + * + * @param {String} key + * + * @return {Any} + * + */ + get(key) { + return this._table.get(key); + } + + /** + * + * @method smember + * @memberof Clusterluck.DTable + * @instance + * + * @param {String} key + * @param {String} val + * + * @return {Boolean} + * + */ + smember(key, val) { + const out = this._table.get(key) || new Set(); + if (!(out instanceof Set)) { + throw DTable.invalidTypeError("smember", key, typeof out); + } + return out.has(val); + } + + /** + * + * @method hget + * @memberof Clusterluck.DTable + * @instance + * + * @param {String} key + * @param {String} hkey + * + * @return {JSON} + * + */ + hget(key, hkey) { + var out = this._table.get(key) || new Map(); + if (!(out instanceof Map)) { + throw new DTable.invalidTypeError("hget", key, typeof out); + } + return out.get(hkey); + } + + /** + * + * @method set + * @memberof Clusterluck.DTable + * @instance + * + * @param {String} key + * @param {Any} val + * + * @return {Any} + * + */ + set(key, val) { + this._table.set(key, val); + this._writeToLog("set", key, val); + this._updateWriteCount(); + return val; + } + + /** + * + * @method sset + * @memberof Clusterluck.DTable + * @instance + * + * @param {String} key + * @param {String} val + * + * @return {String} + * + */ + sset(key, val) { + const out = this._table.get(key) || new Set(); + if (!(out instanceof Set)) { + throw DTable.invalidTypeError("sset", key, typeof out); + } + out.add(val); + this._table.set(key, out); + this._writeToLog("sset", key, val); + this._updateWriteCount(); + return out; + } + + /** + * + * @method hset + * @memberof Clusterluck.DTable + * @instance + * + * @param {String} key + * @param {String} hkey + * @param {JSON} val + * + * @return {JSON} + * + */ + hset(key, hkey, val) { + const out = this._table.get(key) || new Map(); + if (!(out instanceof Map)) { + throw DTable.invalidTypeError("hset", key, typeof out); + } + out.set(hkey, val); + this._table.set(key, out); + this._writeToLog("hset", key, hkey, val); + this._updateWriteCount(); + return out; + } + + /** + * + * @method del + * @memberof Clusterluck.DTable + * @instance + * + * @param {String} key + * + * @return {Clusterluck.DTable} + * + */ + del(key) { + this._table.delete(key); + this._writeToLog("del", key); + this._updateWriteCount(); + return this; + } + + /** + * + * @method sdel + * @memberof Clusterluck.DTable + * @instance + * + * @param {String} key + * @param {String} val + * + * @return {Clusterluck.DTable} + * + */ + sdel(key, val) { + const out = this._table.get(key) || new Set(); + if (!(out instanceof Set)) { + throw DTable.invalidTypeError("sdel", key, typeof out); + } + out.delete(val); + if (out.size === 0) this._table.delete(key); + this._writeToLog("sdel", key, val); + this._updateWriteCount(); + return this; + } + + /** + * + * @method hdel + * @memberof Clusterluck.DTable + * @instance + * + * @param {String} key + * @param {String} hkey + * + * @return {Clusterluck.DTable} + * + */ + hdel(key, hkey) { + const out = this._table.get(key) || new Map(); + if (!(out instanceof Map)) { + throw DTable.invalidTypeError("hdel", key, typeof out); + } + out.delete(hkey); + if (out.size === 0) this._table.delete(key); + this._writeToLog("hdel", key, hkey); + this._updateWriteCount(); + return this; + } + + /** + * + * @method forEach + * @memberof Clusterluck.DTable + * @instance + * + * @param {Function} cb + * @param {Function} fin + * + */ + forEach(cb, fin) { + const entries = this._table.entries(); + let done = false; + async.whilst(() => { + return done !== true; + }, (next) => { + const val = entries.next(); + if (val.done === true) { + done = true; + return next(); + } + cb(val.value[0], val.value[1], next); + }, fin); + } + + /** + * + * @method forEachSync + * @memberof Clusterluck.DTable + * @instance + * + * @param {Function} cb + * + * @return {Clusterluck.DTable} + * + */ + forEachSync(cb) { + this._table.forEach((val, key) => { + cb(key, val); + }); + return this; + } + + /** + * + * @method _setupDiskOps + * @memberof Clusterluck.DTable + * @instance + * @private + * + * @return {Clusterluck.DTable} + * + */ + _setupDiskOps() { + this._setupIdleFlushInterval(); + this._setupAOFSyncInterval(); + + this.once("stop", () => { + this._fd = null; + this._fstream.end(); + this._fstream = null; + clearInterval(this._idleInterval); + this._idleInterval = null; + clearInterval(this._syncInterval); + this._syncInterval = null; + }); + return this; + } + + /** + * + * @method _setupIdleFlushInterval + * @memberof Clusterluck.DTable + * @instance + * @private + * + * @return {Clusterluck.DTable} + * + */ + _setupIdleFlushInterval() { + this._idleInterval = setInterval(() => { + this._idleTicks++; + if (this._idleTicks >= this._idleTickMax) { + this._idleTicks = 0; + this._flush(); + } + }, this._idleTickInterval); + return this; + } + + /** + * + * @method _setupAOFSyncInterval + * @memberof Clusterluck.DTable + * @instance + * @private + * + * @return {Clusterluck.DTable} + * + */ + _setupAOFSyncInterval() { + this._fstream = fs.createWriteStream(this._aofPath, { + flags: "a" + }); + this._fstream.once("open", (fd) => { + this._fd = fd; + this._queue.flush().forEach((el) => { + const args = [el.op].concat(el.args); + this._writeToLog.apply(this, args); + }); + this.emit("open"); + }); + this._syncInterval = setInterval(() => { + if (!this._fd) return; + fs.fsync(this._fd, (err) => { + if (err) debug("failed to fsync underlying file descriptor for AOF file on table %s.", this._name); + }); + }, this._fsyncInterval); + return this; + } + + /** + * + * @method _updateWriteCount + * @memberof Clusterluck.DTable + * @instance + * @private + * + * @return {Clusterluck.DTable} + * + */ + _updateWriteCount() { + this._idleTicks = 0; + this._writeCount++; + if (this._writeCount >= this._writeThreshold) { + this._writeCount = 0; + this._flush(); + } + return this; + } + + /** + * + * @method _flush + * @memberof Clusterluck.DTable + * @instance + * @private + * + */ + _flush() { + if (this._flushing || !this._fd) return; + this._flushing = true; + clearInterval(this._syncInterval); + + var obj = utils.mapToObject(this._table); + this._fstream.once("close", () => { + async.series([ + _.partial(this._flushAOF).bind(this), + (next) => { + this._setupAOFSyncInterval(); + async.nextTick(next); + }, + _.partial(this._flushTable, obj).bind(this), + _.partial(fs.unlink, this._tmpAOFPath) + ], (err) => { + this._flushing = false; + this.emit("idle"); + }); + }); + this._fstream.end(); + this._fd = null; + this.emit("close"); + } + + /** + * + * @method _flushAOF + * @memberof Clusterluck.DTable + * @instance + * @private + * + * @param {Function} cb + * + */ + _flushAOF(cb) { + let called = false; + const wstream = fs.createWriteStream(this._tmpAOFPath, { + flags: "a" + }); + const rstream = fs.createReadStream(this._aofPath); + rstream.pipe(wstream); + wstream.on("close", () => { + if (called) return; + called = true; + fs.unlink(this._aofPath, cb); + }); + wstream.on("error", (err) => { + if (called) return; + called = true; + rstream.resume(); + return cb(err); + }); + } + + /** + * + * @method _flushTable + * @memberof Clusterluck.DTable + * @instance + * @private + * + * @param {Object} obj + * @param {Function} cb + * + */ + _flushTable(obj, cb) { + let called = false; + const wstream = fs.createWriteStream(this._path); + wstream.on("error", (err) => { + if (called) return; + called = true; + wstream.end(); + return cb(err); + }); + wstream.on("close", () => { + if (called) return; + called = true; + return cb(); + }); + async.eachLimit(Object.keys(obj), 4, (key, next) => { + if (called) return next("Closed prematurely."); + const val = obj[key]; + wstream.write(JSON.stringify({ + key: key, + value: DTable.encodeValue(val) + }) + "\n"); + async.nextTick(next); + }, (err) => { + if (called) return; + wstream.end(); + }); + } + + /** + * + * @method _writeToLog + * @memberof Clusterluck.DTable + * @instance + * @private + * + * @param {String} op + * + * @return {Clusterluck.DTable} + * + */ + _writeToLog(op, ...args) { + if (!this._fd) { + this._queue.enqueue({op: op, args: args}); + return; + } + args = args.map((val) => { + return DTable.encodeValue(val); + }); + const out = JSON.stringify({ + op: op, + args: args + }) + "\n"; + this._fstream.write(out); + return this; + } + + /** + * + * @method _loadState + * @memberof Clusterluck.DTable + * @instance + * @private + * + * @param {Function} cb + * + */ + _loadState(cb) { + fs.stat(this._path, (err) => { + if (err && err.code !== "ENOENT") return cb(err); + else if (err && err.code === "ENOENT") return cb(); + const rstream = fs.createReadStream(this._path); + const rline = readline.createInterface({ + input: rstream + }); + let called = false; + rline.once("close", () => { + if (called) return; + called = true; + return cb(); + }); + rstream.on("error", (err) => { + if (called) return; + called = true; + rstream.resume(); + rline.close(); + return cb(err); + }); + rline.on("line", (line) => { + const lineObj = JSON.parse(line); + const val = DTable.decodeValue(lineObj.value); + this._table.set(lineObj.key, val); + }); + }); + } + + /** + * + * @method _loadAOF + * @memberof Clusterluck.DTable + * @instance + * @private + * + * @param {String} path + * @param {Function} cb + * + */ + _loadAOF(path, cb) { + fs.stat(path, (err) => { + if (err && err.code !== "ENOENT") return cb(err); + else if (err && err.code === "ENOENT") return cb(); + const rstream = fs.createReadStream(path); + const rline = readline.createInterface({ + input: rstream + }); + let called = false; + rline.once("close", () => { + if (called) return; + called = true; + return cb(); + }); + rstream.on("error", (err) => { + if (called) return; + called = true; + rstream.resume(); + rline.close(); + return cb(err); + }); + rline.on("line", (line) => { + const lineObj = JSON.parse(line); + lineObj.args = lineObj.args.map((val) => { + return DTable.decodeValue(val); + }); + + try { + this._table[lineObj.op].apply(this, lineObj.args); + } catch (e) { + debug("failed to complete operation %s with args " + JSON.stringify(lineObj.args) + " from %s", lineObj.op, path); + } + }); + }); + } + + /** + * + * @method invalidTypeError + * @memberof Clusterluck.DTable + * @static + * + * @param {String} command + * @param {String} key + * @param {String} type + * + * @return {Error} + * + */ + static invalidTypeError(command, key, type) { + let msg = "Invalid command '" + command + "' against key '" + key + "'"; + msg += " of type '" + type +"'"; + return _.extend(new Error(msg), { + type: "INVALID_TYPE" + }); + } + + /** + * + * @method encodeValue + * @memberof Clusterluck.DTable + * @static + * + * @param {Map|Set|JSON} value + * + * @return {Object} + * + */ + static encodeValue(value) { + if (value instanceof Set) { + return { + type: "Set", + data: utils.setToList(value) + }; + } else if (value instanceof Map) { + return { + type: "Map", + data: utils.mapToList(value) + }; + } else { + return { + type: typeof value, + data: value + }; + } + } + + /** + * + * @method decodeValue + * @memberof Clusterluck.DTable + * @static + * + * @param {Object} value + * @param {String} value.type + * @param {JSON} value.data + * + * @return {Map|Set|JSON} + * + */ + static decodeValue(value) { + if (value.type === "Set") { + return new Set(value.data); + } else if (value.type === "Map") { + return new Map(value.data); + } else { + return value.data; + } + } +} + +module.exports = DTable; diff --git a/lib/utils.js b/lib/utils.js index 37e34f5..cd24c63 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -101,6 +101,30 @@ var utils = { if (out instanceof Error) return out; data.nodes = out; return data; + }, + + mapToObject: (data) => { + var obj = {}; + data.forEach((val, key) => { + obj[key] = val; + }); + return obj; + }, + + setToList: (data) => { + var memo = []; + data.forEach((val) => { + memo.push(val); + }); + return memo; + }, + + mapToList: (data) => { + var memo = []; + data.forEach((val, key) => { + memo.push([key, val]); + }); + return memo; } }; diff --git a/test/unit/dtable.js b/test/unit/dtable.js new file mode 100644 index 0000000..f61ac13 --- /dev/null +++ b/test/unit/dtable.js @@ -0,0 +1,570 @@ +var _ = require("lodash"), + async = require("async"), + sinon = require("sinon"), + stream = require("stream"), + fs = require("fs"), + assert = require("chai").assert; + +module.exports = function (mocks, lib) { + var DTable = lib.dtable; + var consts = lib.consts; + + describe("DTable unit tests", function () { + var dtable; + beforeEach(function (done) { + dtable = new DTable({path: "./data"}); + async.nextTick(done); + }); + + it("Should construct a DTabel", function () { + assert.equal(dtable._autoSave, consts.dtableOpts.autoSave); + assert.equal(dtable._writeCount, 0); + assert.equal(dtable._writeThreshold, consts.dtableOpts.writeThreshold); + assert.deepEqual(dtable._table, new Map()); + assert.equal(dtable._idleTicks, 0); + assert.equal(dtable._idleTickMax, consts.dtableOpts.autoSave/1000); + assert.equal(dtable._fsyncInterval, consts.dtableOpts.fsyncInterval); + assert.equal(dtable._queue.size(), 0); + + var out; + try { + dtable = new DTable(); + } catch (e) { + out = e; + } + assert.ok(out instanceof Error); + }); + + it("Should start dtable instance", function (done) { + dtable.start("foo"); + dtable.once("open", () => { + assert.isString(dtable._id); + assert.isNumber(dtable._fd); + assert.ok(dtable._idleInterval); + assert.ok(dtable._syncInterval); + assert.ok(dtable._fstream); + done(); + }); + }); + + it("Should stop dtable instance", function (done) { + dtable.start("foo"); + dtable.stop(() => { + assert.equal(dtable._id, null); + assert.equal(dtable._fstream, null); + assert.equal(dtable._syncInterval, null); + assert.equal(dtable._idleInterval, null); + done(); + }); + }); + + it("Should stop dtable instance once fd open", function (done) { + dtable.start("foo"); + dtable.once("open", () => { + dtable.stop(() => { + assert.equal(dtable._id, null); + assert.equal(dtable._fstream, null); + assert.equal(dtable._syncInterval, null); + assert.equal(dtable._idleInterval, null); + done(); + }); + }); + }); + + it("Should stop dtable instance, wait for idle", function (done) { + dtable.start("foo"); + dtable.once("open", () => { + dtable._flushing = true; + dtable.stop(() => { + assert.equal(dtable._id, null); + assert.equal(dtable._fstream, null); + assert.equal(dtable._syncInterval, null); + assert.equal(dtable._idleInterval, null); + done(); + }); + dtable._flushing = false; + dtable.emit("idle"); + }); + }); + + it("Should load dtable instance", function (done) { + dtable.load((err) => { + assert.notOk(err); + done(); + }); + }); + + it("Should check if dtable instance is idle", function () { + assert.equal(dtable.idle(), true); + dtable._flushing = true; + assert.equal(dtable.idle(), false); + dtable._flushing = false; + }); + + it("Should get value in table", function () { + dtable._table.set("key", "val"); + assert.equal(dtable.get("key"), "val"); + }); + + it("Should smember value in table", function () { + dtable.sset("key", "val"); + assert.equal(dtable.smember("key", "val"), true); + + dtable.set("key", "val"); + assert.throws(_.partial(dtable.smember, "key", "val").bind(dtable)); + }); + + it("Should hget value in table", function () { + dtable.hset("key", "hkey", "val"); + assert.equal(dtable.hget("key", "hkey"), "val"); + + dtable.set("key", "val"); + assert.throws(_.partial(dtable.hget, "key", "hkey").bind(dtable)); + }); + + it("Should set value in table", function () { + dtable.set("key", "val"); + assert.equal(dtable.get("key"), "val"); + }); + + it("Should sset value in table", function () { + dtable.sset("key", "val"); + assert.equal(dtable.smember("key", "val"), true); + + dtable.set("key", "val"); + assert.throws(_.partial(dtable.sset, "key", "val").bind(dtable)); + }); + + it("Should hset value in table", function () { + dtable.hset("key", "hkey", "val"); + assert.equal(dtable.hget("key", "hkey"), "val"); + + dtable.set("key", "val"); + assert.throws(_.partial(dtable.hset, "key", "hkey", "val").bind(dtable)); + }); + + it("Should del value in table", function () { + dtable.set("key", "val"); + dtable.del("key"); + assert.notOk(dtable.get("key")); + }); + + it("Should sdel value in table", function () { + dtable.sset("key", "val"); + dtable.sset("key", "val2"); + dtable.sdel("key", "val"); + assert.equal(dtable.smember("key", "val"), false); + assert.equal(dtable.smember("key", "val2"), true); + dtable.sdel("key", "val2"); + assert.equal(dtable.smember("key", "val2"), false); + + dtable.set("key", "val"); + assert.throws(_.partial(dtable.sdel, "key", "val").bind(dtable)); + }); + + it("Should hdel value in table", function () { + dtable.hset("key", "hkey", "val"); + dtable.hset("key", "hkey2", "val2"); + dtable.hdel("key", "hkey"); + assert.notOk(dtable.hget("key", "hkey")); + assert.equal(dtable.hget("key", "hkey2"), "val2"); + dtable.hdel("key", "hkey2"); + assert.notOk(dtable.hget("key", "hkey2")); + + dtable.set("key", "val"); + assert.throws(_.partial(dtable.hdel, "key", "hkey").bind(dtable)); + }); + + it("Should run an async forEach over table", function (done) { + dtable.set("key", "val"); + dtable.set("key2", "val2"); + const memo = {}; + dtable.forEach((key, val, next) => { + memo[key] = val; + next(); + }, (err) => { + assert.notOk(err); + assert.deepEqual(memo, { + key: "val", + key2: "val2" + }); + done(); + }); + }); + + it("Should run sync forEach over table", function () { + dtable.set("key", "val"); + dtable.set("key2", "val2"); + const memo = {}; + dtable.forEachSync((key, val) => { + memo[key] = val; + }); + assert.deepEqual(memo, { + key: "val", + key2: "val2" + }); + }); + + it("Should setup idle flush interval", function (done) { + dtable._idleTickInterval = 1; + dtable._setupIdleFlushInterval(); + setTimeout(() => { + assert.equal(dtable._idleTicks, 1); + clearInterval(dtable._idleInterval); + done(); + }, 1); + }); + + it("Should setup idle flush interval, execute flush", function (done) { + dtable._idleTickInterval = 1; + dtable._idleTickMax = 1; + sinon.stub(dtable, "_flush"); + dtable._setupIdleFlushInterval(); + setTimeout(() => { + assert.equal(dtable._idleTicks, 0); + assert.ok(dtable._flush.called); + dtable._flush.restore(); + dtable._fd = null; + clearInterval(dtable._idleInterval); + async.nextTick(done); + }, 1); + }); + + it("Should setup AOF sync interval", function (done) { + sinon.stub(fs, "createWriteStream", () => { + var pstream = new stream.PassThrough(); + setTimeout(() => { + pstream.emit("open", 1); + }, 2); + return pstream; + }); + sinon.stub(fs, "fsync", (fd, cb) => { + return cb(); + }); + dtable._fsyncInterval = 1; + dtable._setupAOFSyncInterval(); + assert.ok(dtable._fstream); + assert.ok(dtable._syncInterval); + dtable._fstream.once("open", (fd) => { + assert.equal(dtable._fd, fd); + setTimeout(() => { + assert.ok(fs.fsync.called); + clearInterval(dtable._syncInterval); + fs.createWriteStream.restore(); + fs.fsync.restore(); + done(); + }, 1); + }); + }); + + it("Should update write count", function () { + dtable._updateWriteCount(); + assert.equal(dtable._idleTicks, 0); + assert.equal(dtable._writeCount, 1); + dtable._writeThreshold = 2; + sinon.stub(dtable, "_flush"); + dtable._updateWriteCount(); + assert.equal(dtable._writeCount, 0); + assert.ok(dtable._flush.called); + dtable._flush.restore(); + }); + + it("Should flush state to disk", function (done) { + dtable._setupAOFSyncInterval(); + sinon.stub(dtable, "_setupAOFSyncInterval"); + + dtable._flush(); + assert.notOk(dtable._setupAOFSyncInterval.called); + + dtable._flushing = true; + dtable._flush(); + assert.notOk(dtable._setupAOFSyncInterval.called); + dtable._flushing = false; + + dtable.once("open", () => { + dtable.once("idle", () => { + assert.ok(dtable._setupAOFSyncInterval); + dtable._setupAOFSyncInterval.restore(); + fs.unlink(dtable._path, _.partial(async.nextTick, done)); + }); + dtable.once("close", () => { + assert.equal(dtable._fd, null); + }); + dtable._flush(); + }); + }); + + it("Should flush AOF files to disk", function (done) { + fs.writeFile(dtable._aofPath, "", (err) => { + assert.notOk(err); + dtable._flushAOF((err) => { + assert.notOk(err); + fs.unlink(dtable._tmpAOFPath, done); + }); + }); + }); + + it("Should fail flushing AOF files to disk", function (done) { + fs.writeFile(dtable._aofPath, "", (err) => { + assert.notOk(err); + sinon.stub(fs, "createWriteStream", () => { + var pstream = new stream.PassThrough(); + async.nextTick(() => { + pstream.emit("error", new Error("foo")); + }); + return pstream; + }); + dtable._flushAOF((err) => { + assert.ok(err); + fs.createWriteStream.restore(); + fs.unlink(dtable._aofPath, _.partial(async.nextTick, done)); + }); + }); + }); + + it("Should flush snapshot of table to disk", function (done) { + var out = new Map(); + sinon.stub(fs, "createWriteStream", () => { + var pstream = new stream.PassThrough(); + pstream.on("data", (data) => { + data = JSON.parse(data); + out.set(data.key, DTable.decodeValue(data.value)); + }); + pstream.once("end", () => { + pstream.emit("close"); + }); + return pstream; + }); + dtable._flushTable({ + foo: "bar" + }, (err) => { + assert.notOk(err); + assert.deepEqual(out, dtable._table); + fs.createWriteStream.restore(); + async.nextTick(done); + }); + }); + + it("Should fail flushing snapshot of table to disk", function (done) { + sinon.stub(fs, "createWriteStream", () => { + var pstream = new stream.PassThrough(); + async.nextTick(() => { + pstream.emit("error", new Error("foo")); + }); + return pstream; + }); + dtable._flushTable({ + foo: "bar" + }, (err) => { + assert.ok(err); + fs.createWriteStream.restore(); + async.nextTick(done); + }); + }); + + it("Should write action to log", function (done) { + dtable._writeToLog("command", "foo", "bar", "baz"); + assert.deepEqual(dtable._queue.dequeue(), {op: "command", args: ["foo", "bar", "baz"]}); + + dtable._fd = 1; + dtable._fstream = new stream.PassThrough(); + dtable._fstream.once("data", (data) => { + data = JSON.parse(data); + assert.deepEqual(data, { + op: "command", + args: [ + {type: "string", data: "foo"}, + {type: "string", data: "bar"}, + {type: "string", data: "baz"} + ] + }); + async.nextTick(done); + }); + dtable._writeToLog("command", "foo", "bar", "baz"); + }); + + it("Should fail to load snapshot if disk read fails", function (done) { + sinon.stub(fs, "stat", (path, cb) => { + async.nextTick(() => { + cb(new Error("foo")); + }); + }); + dtable._loadState((err) => { + assert.ok(err); + fs.stat.restore(); + done(); + }); + }); + + it("Should skip loading snapshot if file doesn't exist", function (done) { + sinon.stub(fs, "stat", (path, cb) => { + async.nextTick(_.partial(cb, _.extend(new Error("foo"), {code: "ENOENT"}))); + }); + dtable._loadState((err) => { + assert.notOk(err); + assert.equal(dtable._table.size, 0); + fs.stat.restore(); + done(); + }); + }); + + it("Should load snapshot from disk", function (done) { + sinon.stub(fs, "stat", (path, cb) => { + async.nextTick(cb); + }); + sinon.stub(fs, "createReadStream", () => { + var pstream = new stream.PassThrough(); + async.nextTick(() => { + pstream.write(JSON.stringify({ + key: "key", + value: {type: "string", data: "val"} + }) + "\n"); + pstream.end(); + }); + return pstream; + }); + dtable._loadState((err) => { + assert.notOk(err); + assert.deepEqual(dtable._table, new Map([["key", "val"]])); + fs.stat.restore(); + fs.createReadStream.restore(); + done(); + }); + }); + + it("Should fail loading snapshot from disk if rstream emits error", function (done) { + sinon.stub(fs, "stat", (path, cb) => { + async.nextTick(cb); + }); + sinon.stub(fs, "createReadStream", () => { + var pstream = new stream.PassThrough(); + async.nextTick(() => { + pstream.emit("error", new Error("foo")); + }); + return pstream; + }); + dtable._loadState((err) => { + assert.ok(err); + fs.stat.restore(); + fs.createReadStream.restore(); + done(); + }); + }); + + it("Should fail to load AOF if disk read fails", function (done) { + sinon.stub(fs, "stat", (path, cb) => { + async.nextTick(() => { + cb(new Error("foo")); + }); + }); + dtable._loadAOF(dtable._aofPath, (err) => { + assert.ok(err); + fs.stat.restore(); + done(); + }); + }); + + it("Should skip loading AOF if file doesn't exist", function (done) { + sinon.stub(fs, "stat", (path, cb) => { + async.nextTick(_.partial(cb, _.extend(new Error("foo"), {code: "ENOENT"}))); + }); + dtable._loadAOF(dtable._aofPath, (err) => { + assert.notOk(err); + assert.equal(dtable._table.size, 0); + fs.stat.restore(); + done(); + }); + }); + + it("Should load AOF from disk", function (done) { + sinon.stub(fs, "stat", (path, cb) => { + async.nextTick(cb); + }); + sinon.stub(fs, "createReadStream", () => { + var pstream = new stream.PassThrough(); + async.nextTick(() => { + pstream.write(JSON.stringify({ + op: "set", + args: [{type: "string", data: "key"}, {type: "string", data: "val"}] + }) + "\n"); + pstream.end(); + }); + return pstream; + }); + dtable._loadAOF(dtable._aofPath, (err) => { + assert.notOk(err); + assert.deepEqual(dtable._table, new Map([["key", "val"]])); + fs.stat.restore(); + fs.createReadStream.restore(); + done(); + }); + }); + + it("Should fail loading AOF from disk if rstream emits error", function (done) { + sinon.stub(fs, "stat", (path, cb) => { + async.nextTick(cb); + }); + sinon.stub(fs, "createReadStream", () => { + var pstream = new stream.PassThrough(); + async.nextTick(() => { + pstream.emit("error", new Error("foo")); + }); + return pstream; + }); + dtable._loadAOF(dtable._aofPath, (err) => { + assert.ok(err); + fs.stat.restore(); + fs.createReadStream.restore(); + done(); + }); + }); + }); + + describe("DTable static unit tests", function () { + it("Should return invalid type error", function () { + var error = DTable.invalidTypeError("command", "key", "type"); + assert.ok(error instanceof Error); + assert.equal(error.type, "INVALID_TYPE"); + }); + + it("Should encode value", function () { + var out = DTable.encodeValue(new Set(["val", "val2"])); + assert.deepEqual(out, { + type: "Set", + data: ["val", "val2"] + }); + + out = DTable.encodeValue(new Map([["key", "val"]])); + assert.deepEqual(out, { + type: "Map", + data: [["key", "val"]] + }); + + out = DTable.encodeValue("foobar"); + assert.deepEqual(out, { + type: "string", + data: "foobar" + }); + }); + + it("Should decode value", function () { + var out = DTable.decodeValue({ + type: "Set", + data: ["val", "val2"] + }); + assert.deepEqual(out, new Set(["val", "val2"])); + + out = DTable.decodeValue({ + type: "Map", + data: [["key", "val"]] + }); + assert.deepEqual(out, new Map([["key", "val"]])); + + out = DTable.decodeValue({ + type: "object", + data: {} + }); + assert.deepEqual(out, {}); + }); + }); +}; diff --git a/test/unit/index.js b/test/unit/index.js index e990852..d20ff04 100644 --- a/test/unit/index.js +++ b/test/unit/index.js @@ -11,5 +11,6 @@ module.exports = function (mocks, lib) { require("./command_server")(mocks, lib); require("./utils")(mocks, lib); require("./cluster_node")(mocks, lib); + require("./dtable")(mocks, lib); }); }; diff --git a/test/unit/utils.js b/test/unit/utils.js index be06f90..a7464a4 100644 --- a/test/unit/utils.js +++ b/test/unit/utils.js @@ -132,5 +132,29 @@ module.exports = function (mocks, lib) { var out = utils.parseNodeList({nodes: ""}); assert.ok(out instanceof Error); }); + + it("Should transform map to object", function () { + var list = [["key", "val"], ["key2", "val2"]]; + var map = new Map(list); + var out = utils.mapToObject(map); + assert.deepEqual(out, { + key: "val", + key2: "val2" + }); + }); + + it("Should transform set to list", function () { + var list = ["val", "val2"]; + var set = new Set(list); + var out = utils.setToList(set); + assert.deepEqual(out, list); + }); + + it("Should transform map to list", function () { + var list = [["key", "val"], ["key2", "val2"]]; + var map = new Map(list); + var out = utils.mapToList(map); + assert.deepEqual(out, list); + }); }); };