Skip to content
This repository has been archived by the owner on May 18, 2023. It is now read-only.

Commit

Permalink
Merge pull request #53 from azuqua/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
kevinwilson541 committed Oct 13, 2017
2 parents 19fbb15 + d539f81 commit a32c13a
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 13 deletions.
24 changes: 21 additions & 3 deletions index.js
@@ -1,7 +1,6 @@
var _ = require("lodash"),
ipc = require("node-ipc"),
os = require("os"),
util = require("util"),
lib = require("./lib");

var utils = lib.utils;
Expand Down Expand Up @@ -222,6 +221,10 @@ function createGenServer(cluster, opts) {
* @param {Number} [opts.writeThreshold] - Number of write operations to the log file before triggering a snapshot flush to disk. Defaults to 100 writes.
* @param {Number} [opts.autoSave] - Number of milliseconds this table will wait in an idle state before triggering a snapshot flush to disk. Defaults to 180000 milliseconds.
* @param {Number} [opts.fsyncInterval] - Internval in milliseconds to fsync the log file. Defaults to 1000 milliseconds.
* @param {Boolean} [opts.compress] - Whether to run RDB snapshot streams through a GZIP compression stream. Defaults to `false`.
* @param {Function} [opts.encodeFn] - Encoding function to use when serializing writes to the AOF file and when saving to the RDB snapshot. Defaults to `DTable.encodeValue`.
* @param {Function} [opts.decodeFn] - Decoding function to use when loading contents from disk. Defaults to `DTable.decodeValue`.
* @param {String} [opts.name] - Name to start table with; can be used as a replacement for passing `name` to the start function. Required to be passed if you don't want a race condition between table loads and the idle interval that runs to trigger RDB snapshot logic. Defaults to `undefined`.
*
* @return {Clusterluck.DTable} A new dtable instance.
*
Expand All @@ -234,6 +237,19 @@ function createGenServer(cluster, opts) {
* });
* table.start("foo");
*
* @example
* let table = clusterluck.createDTable({
* path: "/path/to/dir",
* writeThreshold: 100,
* autoSave: 180000,
* fsyncInterval: 1000,
* name: "TABLE_NAME"
* });
* table.load((err) => {
* if (err) process.exit(1);
* table.start();
* });
*
*/
function createDTable(opts) {
opts = utils.isPlainObject(opts) ? _.cloneDeep(opts) : {};
Expand Down Expand Up @@ -276,6 +292,8 @@ function createMTable() {
* @param {Number} [opts.writeThreshold] - Write threshold of underlying DTable instance.
* @param {Number} [opts.autoSave] - Autosave interval of underlying DTable instance.
* @param {Number} [opts.fsyncInterval] - Fsync interval of underlying DTable instance.
* @param {Boolean} [opts.compress] - Whether to run RDB snapshot streams through a GZIP compression stream. Defaults to `false`.
* @param {String} [opts.name] - Name to start table with; can be used as a replacement for passing `name` to the start function. Required to be passed if you don't want a race condition between table loads and the idle interval that runs to trigger RDB snapshot logic. Defaults to `undefined`.
*
* @return {Clusterluck.DLMServer} A new generic server instance.
*
Expand Down Expand Up @@ -307,6 +325,8 @@ function createDLM(cluster, opts) {
* @param {Number} [opts.writeThreshold] - Write threshold of underlying DTable instance.
* @param {Number} [opts.autoSave] - Autosave interval of underlying DTable instance.
* @param {Number} [opts.fsyncInterval] - Fsync interval of underlying DTable instance.
* @param {Boolean} [opts.compress] - Whether to run RDB snapshot streams through a GZIP compression stream. Defaults to `false`.
* @param {String} [opts.name] - Name to start table with; can be used as a replacement for passing `name` to the start function. Required to be passed if you don't want a race condition between table loads and the idle interval that runs to trigger RDB snapshot logic. Defaults to `undefined`.
*
* @return {Clusterluck.DSMServer} A new generic server instance.
*
Expand All @@ -329,8 +349,6 @@ module.exports = {
GossipRing: lib.gossip,
NetKernel: lib.kernel,
Node: lib.node,
StateTable: lib.table,
TableTerm: lib.table_term,
VectorClock: lib.vclock,
DTable: lib.dtable,
MTable: lib.mtable,
Expand Down
6 changes: 5 additions & 1 deletion lib/dlm/dlm.js
Expand Up @@ -53,6 +53,8 @@ class DLMServer extends GenServer {
* @param {Number} [opts.writeThreshold] - Write threshold of underlying DTable instance.
* @param {Number} [opts.autoSave] - Autosave interval of underlying DTable instance.
* @param {Number} [opts.fsyncInterval] - Fsync interval of underlying DTable instance.
* @param {Boolean} [opts.compress] - Whether to feed RDB snapshot streams through a GZIP compression stream for the underlying DTable instance.
* @param {String} [opts.name] - Name of underlying DTable to write to.
*
*/
constructor(gossip, kernel, opts = serverDefaults) {
Expand All @@ -72,7 +74,9 @@ class DLMServer extends GenServer {
"path",
"writeThreshold",
"autoSave",
"compress"
"compress",
"fsyncInterval",
"name"
]));
} else {
this._disk = false;
Expand Down
6 changes: 5 additions & 1 deletion lib/dsem/dsm.js
Expand Up @@ -67,6 +67,8 @@ class DSMServer extends GenServer {
* @param {Number} [opts.writeThreshold] - Write threshold of underlying DTable instance.
* @param {Number} [opts.autoSave] - Autosave interval of underlying DTable instance.
* @param {Number} [opts.fsyncInterval] - Fsync interval of underlying DTable instance.
* @param {Boolean} [opts.compress] - Whether to feed RDB snapshot streams through a GZIP compression stream for the underlying DTable instance.
* @param {String} [opts.name] - Name of underlying DTable to write to.
*
*/
constructor(gossip, kernel, opts = serverDefaults) {
Expand All @@ -83,7 +85,9 @@ class DSMServer extends GenServer {
"path",
"writeThreshold",
"autoSave",
"compress"
"compress",
"fsyncInterval",
"name"
]));
} else {
this._disk = false;
Expand Down
63 changes: 56 additions & 7 deletions lib/dtable.js
Expand Up @@ -83,6 +83,10 @@ class DTable extends EventEmitter {
* @param {Number} [opts.writeThreshold]
* @param {Number} [opts.autoSave]
* @param {Number} [opts.fsyncInterval]
* @param {Boolean} [opts.compress]
* @param {Function} [opts.encodeFn]
* @param {Function} [opts.decodeFn]
* @param {String} [opts.name]
*
*/
constructor(opts = defaults) {
Expand All @@ -100,12 +104,21 @@ class DTable extends EventEmitter {
this._idleTicks = 0;
this._idleTickInterval = 1000;
this._idleTickMax = this._autoSave/this._idleTickInterval;
this._idleInterval = null;
this._syncInterval = null;
this._fsyncInterval = opts.fsyncInterval;
this._queue = new Queue();
this._flushing = false;
this._compress = opts.compress !== undefined ? opts.compress : false;
this._encodeFn = typeof opts.encodeFn === "function" ? opts.encodeFn : DTable.encodeValue;
this._decodeFn = typeof opts.decodeFn === "function" ? opts.decodeFn : DTable.decodeValue;
if (typeof opts.name === "string") {
this._name = opts.name;
this._path = path.join(this._prefix, this._name + "_DATA.SNAP");
this._tmpDumpPath = path.join(this._prefix, this._name + "_DATA_PREV.SNAP");
this._aofPath = path.join(this._prefix, this._name + "_LATEST.LOG");
this._tmpAOFPath = path.join(this._prefix, this._name + "_PREV.LOG");
}
}

/**
Expand All @@ -122,16 +135,20 @@ class DTable extends EventEmitter {
*
*/
start(name) {
this._name = name;
this._path = path.join(this._prefix, this._name + "_DATA.SNAP");
this._tmpDumpPath = path.join(this._prefix, this._name + "_DATA_PREV.SNAP");
this._aofPath = path.join(this._prefix, this._name + "_LATEST.LOG");
this._tmpAOFPath = path.join(this._prefix, this._name + "_PREV.LOG");
if (typeof name === "string" && this._name !== name) {
this._name = name;
this._path = path.join(this._prefix, this._name + "_DATA.SNAP");
this._tmpDumpPath = path.join(this._prefix, this._name + "_DATA_PREV.SNAP");
this._aofPath = path.join(this._prefix, this._name + "_LATEST.LOG");
this._tmpAOFPath = path.join(this._prefix, this._name + "_PREV.LOG");
}
this._id = shortid.generate();
this._setupDiskOps();
this._startDiskOps();
return this;
}



/**
*
* Stops the table, including all internal disk-based logic. If the table is idle and has an open file descriptor against LATEST.LOG, it will close immediately. If it's idle but the file descriptor against LATEST.LOG has been closed, this call will wait for a file descriptor to open again before continuing. Otherwise, the table isn't idle and therefore we wait for this condition.
Expand Down Expand Up @@ -212,6 +229,21 @@ class DTable extends EventEmitter {
return this._compress;
}

/**
*
* Returns the underlying in-memory map this table writes to.
*
* @method raw
* @memberof Clusterluck.DTable
* @instance
*
* @return {Map} Underlying in-memory map this table writes to.
*
*/
raw() {
return this._table;
}

/**
*
* Retrieves value stored at `key`, returning `undefined` if no such data exists.
Expand Down Expand Up @@ -464,6 +496,22 @@ class DTable extends EventEmitter {
return this;
}

/**
*
* @method _startDiskOps
* @memberof Clusterluck.DTable
* @instance
* @private
*
* @return {Clusterluck.DTable}
*
*/
_startDiskOps() {
if (this._idleInterval !== null) return this;
this._setupDiskOps();
return this;
}

/**
*
* @method _setupDiskOps
Expand All @@ -485,7 +533,7 @@ class DTable extends EventEmitter {
clearInterval(this._idleInterval);
this._idleInterval = null;
clearInterval(this._syncInterval);
this._syncInterval = null;
this._syncInterval = null;
});
return this;
}
Expand Down Expand Up @@ -574,6 +622,7 @@ class DTable extends EventEmitter {
if (this._flushing || !this._fd) return;
this._flushing = true;
clearInterval(this._syncInterval);
this._syncInterval = null;

const obj = utils.mapToObject(this._table);
this._fstream.once("close", () => {
Expand Down
70 changes: 69 additions & 1 deletion test/unit/dtable.js
Expand Up @@ -18,7 +18,11 @@ module.exports = function (mocks, lib) {
});

after(function (done) {
fs.unlink("./data/foo_LATEST.LOG", done);
async.parallel([
_.partial(fs.unlink, "./data/foo_LATEST.LOG"),
_.partial(fs.unlink, "./data/name_LATEST.LOG"),
_.partial(fs.unlink, "./data/bar_LATEST.LOG")
], done);
});

it("Should construct a DTable", function () {
Expand All @@ -43,6 +47,16 @@ module.exports = function (mocks, lib) {
assert.deepEqual(dtable._encodeFn, _.identity);
assert.deepEqual(dtable._decodeFn, _.identity);

dtable = new DTable({
path: "./data",
name: "FOO"
});
assert.isString(dtable._name);
assert.isString(dtable._path);
assert.isString(dtable._tmpDumpPath);
assert.isString(dtable._aofPath);
assert.isString(dtable._tmpAOFPath);

var out;
try {
dtable = new DTable();
Expand All @@ -64,6 +78,46 @@ module.exports = function (mocks, lib) {
});
});

it("Should start dtable instance with name found in opts", function (done) {
dtable._name = "name";
dtable._path = "./data/name_DATA.SNAP";
dtable._tmpDumpPath = "./data/name_DATA_PREV.SNAP";
dtable._aofPath = "./data/name_LATEST.LOG";
dtable._tmpAOFPath = "./data/name_PREV.LOG";
dtable.start();
dtable.once("open", () => {
assert.equal(dtable._name, "name");
assert.isString(dtable._id);
assert.isNumber(dtable._fd);
assert.ok(dtable._idleInterval);
assert.ok(dtable._syncInterval);
assert.ok(dtable._fstream);
done();
});
});

it("Should start dtable instance, change name if not maching one found in optss", function (done) {
dtable._name = "name";
dtable._path = "./data/name_DATA.SNAP";
dtable._tmpDumpPath = "./data/name_DATA_PREV.SNAP";
dtable._aofPath = "./data/name_LATEST.LOG";
dtable._tmpAOFPath = "./data/name_PREV.LOG";
dtable.start("bar");
dtable.once("open", () => {
assert.equal(dtable._name, "bar");
assert.equal(dtable._path, "data/bar_DATA.SNAP");
assert.equal(dtable._tmpDumpPath, "data/bar_DATA_PREV.SNAP");
assert.equal(dtable._aofPath, "data/bar_LATEST.LOG");
assert.equal(dtable._tmpAOFPath, "data/bar_PREV.LOG");
assert.isString(dtable._id);
assert.isNumber(dtable._fd);
assert.ok(dtable._idleInterval);
assert.ok(dtable._syncInterval);
assert.ok(dtable._fstream);
done();
});
});

it("Should stop dtable instance", function (done) {
dtable.start("foo");
dtable.stop(() => {
Expand Down Expand Up @@ -127,6 +181,11 @@ module.exports = function (mocks, lib) {
assert.equal(dtable.compress(), true);
});

it("Should return raw table for manual insertion", function () {
var table = dtable.raw();
assert.ok(_.isEqual(table, dtable._table));
});

it("Should get value in table", function () {
dtable._table.set("key", "val");
assert.equal(dtable.get("key"), "val");
Expand Down Expand Up @@ -239,6 +298,15 @@ module.exports = function (mocks, lib) {
});
});

it("Should only setup disk ops on 'start' if not already started", function () {
// set to anything other than null
dtable._idleInterval = "foo";
sinon.spy(dtable, "_setupDiskOps");
dtable._startDiskOps();
assert.notOk(dtable._setupDiskOps.called);
dtable._setupDiskOps.restore();
});

it("Should setup idle flush interval", function (done) {
dtable._idleTickInterval = 1;
dtable._setupIdleFlushInterval();
Expand Down

0 comments on commit a32c13a

Please sign in to comment.