Skip to content
This repository has been archived by the owner on May 18, 2023. It is now read-only.

Commit

Permalink
Merge pull request #36 from azuqua/kevin/develop
Browse files Browse the repository at this point in the history
DLM/DSM gen_servers, more examples, MTable class for non-persistent DTable
  • Loading branch information
kevinwilson541 committed Jul 11, 2017
2 parents 26b4acb + 35c225f commit bb77360
Show file tree
Hide file tree
Showing 34 changed files with 4,645 additions and 3 deletions.
4 changes: 4 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ Follow the same coding format seen in the source code; the one hard requirement
There is an exhaustive unit test suite under `/test`, which can be run using both `mocha test` or `grunt test`.

PR's that provide additional functionality should also provide corresponding unit test cases.

## Documentation

Anybody and everybody can help with documentation on this project, and there's a lot to be done. Specifically with documenting events and when they're fired/listened to, and also with examples of how to use different modules in this library. So if you find some topic confusing or lacking in documentation, please make a PR or an issue (and thanks in advance).
7 changes: 6 additions & 1 deletion Gruntfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ module.exports = function (grunt) {
"lib/kernel.js",
"lib/node.js",
"lib/vclock.js",
"lib/dtable.js"
"lib/dtable.js",
"lib/mtable.js",
"lib/dlm/dlm.js",
"lib/dlm/lock.js",
"lib/dsem/semaphore.js",
"lib/dsem/dsm.js"
];

grunt.initConfig({
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -461,4 +461,3 @@ In addition to what currently exists in this library, here's a list of features
- Provide listener for permanent close on connection between two nodes (`maxRetries` option on kernel creation).
- Add a GenStream class similar to GenServer, but strictly uses streams for communication instead of JS natives (will also require a protocol definition for indicating stream start, etc).
- Discuss making disconnects between nodes on a node departure forceful or not (it's forceful right now).
- A distributed lock manager, most likely using the [Redlock algorithm](https://redis.io/topics/distlock), given how well it fits into the current architecture of clusterluck.
20 changes: 20 additions & 0 deletions examples/usage/dlm/example.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
const cl = require("../../../index"),
DLMServer = require("./index");

const node = cl.createCluster("node_id");
const server = new DLMServer(node.gossip(), node.kernel());

server.start("dlm_server");
server.load((err) => {
if (err) process.exit(1);
server.on("rlock", (data, from) => {
console.log("Received rlock request on %s with holder %s", data.id, data.holder);
});
server.on("wlock", (data, from) => {
console.log("Received rlock request on %s with holder %s", data.id, data.holder);
});

node.start("cookie", "ring", () => {
console.log("Node %s listening!", node.kernel().self().id());
});
});
1 change: 1 addition & 0 deletions examples/usage/dlm/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module.exports = require("../../../lib/dlm/dlm");
102 changes: 102 additions & 0 deletions examples/usage/dlm/test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
const cl = require("../../../index"),
_ = require("lodash"),
os = require("os"),
assert = require("chai").assert,
debug = require("debug")("examples:usage:dlm:test"),
async = require("async"),
DLMServer = cl.DLMServer;

const nodeID = process.argv[2],
port = parseInt(process.argv[3]),
nodeID2 = process.argv[4],
port2 = parseInt(process.argv[5]),
host = os.hostname();

var nodes = [];
var dlms = [];
async.each([[nodeID, port], [nodeID2, port2]], (config, next) => {
const node = cl.createCluster(config[0], host, config[1]),
gossip = node.gossip(),
kernel = node.kernel();

const dlm = new DLMServer(gossip, kernel, {
rquorum: 0.51,
wquorum: 0.51
});
nodes.push(node);
dlms.push(dlm);
node.load(() => {
dlm.start("locker");
node.start("cookie", "ring", () => {
debug("Node %s listening on hostname %s, port %s!", config[0], host, config[1]);
next();
});
});
}, () => {
setTimeout(() => {
// make sure nodes know about each other
assert.ok(_.find(nodes[0].gossip().ring().nodes(), (node) => {
return node.equals(nodes[1].kernel().self());
}));
var dlm = dlms[0];
async.series([
(next) => {
dlm.wlock("id", "holder", 30000, (err, nodes) => {
assert.notOk(err);
debug("Successfully grabbed write lock on id '%s' with holder '%s'", "id", "holder");
assert.lengthOf(nodes, 2);
next();
}, 1000, 1);
},
(next) => {
dlm.wlock("id", "holder2", 30000, (err, wNodes) => {
assert.ok(err);
debug("Failed to grab write lock id '%s' with holder '%s'", "id", "holder2");
next();
}, 1000, 1);
},
(next) => {
dlm.wunlock("id", "holder", (err) => {
assert.notOk(err);
next();
});
},
(next) => {
dlm.rlock("id", "holder", 30000, (err, nodes) => {
assert.notOk(err);
debug("Successfully grabbed read lock on id '%s' with holder '%s'", "id", "holder");
assert.lengthOf(nodes, 2);
next();
}, 1000, 1);
},
(next) => {
dlm.rlock("id", "holder2", 30000, (err, nodes) => {
assert.notOk(err);
debug("Successfully grabbed read lock on id '%s' with holder '%s'", "id", "holder2");
next();
}, 1000, 1);
},
(next) => {
dlm.wlock("id", "holder3", 30000, (err, wNodes) => {
assert.ok(err);
debug("Failed to grab write lock id '%s' with holder '%s'", "id", "holder2");
next();
}, 1000, 1);
},
(next) => {
dlm.runlockAsync("id", "holder");
dlm.runlockAsync("id", "holder2");
next();
}
], () => {
dlms.forEach((dlm) => {
dlm.stop();
});
debug("Done!");
process.exit(0);
});
}, 1000);
// make nodes meet each other first
nodes[0].gossip().meet(nodes[1].kernel().self());
debug("Waiting for nodes to meet each other, swear I had something for this...");
});
23 changes: 23 additions & 0 deletions examples/usage/dsm/example.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
const cl = require("../../../index"),
DSMServer = require("./index");

const node = cl.createCluster("node_id");
const server = new DSMServer(node.gossip(), node.kernel());

server.start("dsm_server");
server.load((err) => {
if (err) process.exit(1);
server.on("create", (data, from) => {
console.log("Received create request on semaphore %s with concurrency limit %i", data.id, data.n);
});
server.on("post", (data, from) => {
console.log("Received post request on %s with holder %s", data.id, data.holder);
});
server.on("close", (data, from) => {
console.log("Received close request on %s with holder %s", data.id, data.holder);
});

node.start("cookie", "ring", () => {
console.log("Node %s listening!", node.kernel().self().id());
});
});
1 change: 1 addition & 0 deletions examples/usage/dsm/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module.exports = require("../../../lib/dsem/dsm");
120 changes: 120 additions & 0 deletions examples/usage/dsm/test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
const cl = require("../../../index"),
_ = require("lodash"),
os = require("os"),
assert = require("chai").assert,
debug = require("debug")("examples:usage:dsm:test"),
async = require("async"),
DSMServer = cl.DSMServer;

const nodeID = process.argv[2],
port = parseInt(process.argv[3]),
nodeID2 = process.argv[4],
port2 = parseInt(process.argv[5]),
host = os.hostname();

var nodes = [];
var dsms = [];
async.each([[nodeID, port], [nodeID2, port2]], (config, next) => {
const node = cl.createCluster(config[0], host, config[1]),
gossip = node.gossip(),
kernel = node.kernel();

const dsm = new DSMServer(gossip, kernel, {
rquorum: 0.51,
wquorum: 0.51
});
nodes.push(node);
dsms.push(dsm);
node.load(() => {
dsm.start("sem_server");
node.start("cookie", "ring", () => {
debug("Node %s listening on hostname %s, port %s!", config[0], host, config[1]);
next();
});
});
}, () => {
setTimeout(() => {
// make sure nodes know about each other
assert.ok(_.find(nodes[0].gossip().ring().nodes(), (node) => {
return node.equals(nodes[1].kernel().self());
}));
var dsm = dsms[0];
async.series([
(next) => {
dsm.create("id", 2, (err) => {
assert.notOk(err);
debug("Successfully created semaphore 'id' with concurrency limit of 3");
next();
});
},
(next) => {
dsm.read("id", (err, out) => {
assert.notOk(err);
assert.deepEqual(out, {
n: 2,
active: 0
});
next();
});
},
(next) => {
dsm.post("id", "holder", 10000, (err) => {
assert.notOk(err);
debug("Successfully grabbed semaphore 'id' with holder 'holder'");
next();
});
},
(next) => {
dsm.post("id", "holder2", 10000, (err) => {
assert.notOk(err);
debug("Successfully grabbed semaphore 'id' with holder 'holder2'");
next();
});
},
(next) => {
dsm.post("id", "holder3", 10000, (err) => {
assert.ok(err);
debug("Failed to grab semaphore 'id' with holder 'holder3', limit reached");
next();
}, 1000, 0);
},
(next) => {
dsm.read("id", (err, out) => {
assert.notOk(err);
assert.deepEqual(out, {
n: 2,
active: 2
});
next();
});
},
(next) => {
async.each(["holder", "holder2"], (holder, done) => {
dsm.close("id", holder, done);
}, next);
},
(next) => {
dsm.read("id", (err, out) => {
assert.notOk(err);
assert.deepEqual(out, {
n: 2,
active: 0
});
next();
});
},
(next) => {
dsm.destroy("id", next);
}
], () => {
dsms.forEach((dsm) => {
dsm.stop();
});
debug("Done!");
process.exit(0);
});
}, 1000);
// make nodes meet each other first
nodes[0].gossip().meet(nodes[1].kernel().self());
debug("Waiting for nodes to meet each other, swear I had something for this...");
});
14 changes: 14 additions & 0 deletions examples/usage/ping/example.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
const cl = require("../../../index"),
PingServer = require("./ping");

const node = cl.createCluster("node_id");
const pingServe = new PingServer(node.kernel());

pingServe.start("ping_server");
pingServe.on("ping", (data, from) => {
console.log("Received ping request from:", from);
});

node.start("cookie", "ring", () => {
console.log("Node %s listening!", node.kernel().self().id());
});
1 change: 1 addition & 0 deletions examples/usage/ping/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module.exports = require("./ping");
28 changes: 28 additions & 0 deletions examples/usage/ping/ping.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
const cl = require("../../../index"),
_ = require("lodash");

class PingServer extends cl.GenServer {
constructor(kernel) {
super(kernel);
}

start(name) {
super.start(name);

const handler = this._doPing.bind(this);
this.on("ping", handler);
this.once("stop", _.partial(this.removeListener, "ping", handler).bind(this));

return this;
}

ping(node, cb) {
this.call({node: node, id: this._id}, "ping", null, cb);
}

_doPing(data, from) {
return this.reply(from, "pong");
}
}

module.exports = PingServer;
48 changes: 48 additions & 0 deletions examples/usage/ping/test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
const cl = require("../../../index"),
_ = require("lodash"),
os = require("os"),
assert = require("chai").assert,
debug = require("debug")("examples:usage:ping:test"),
async = require("async"),
PingServer = require("./ping");

const nodeID = process.argv[2],
port = parseInt(process.argv[3]),
nodeID2 = process.argv[4],
port2 = parseInt(process.argv[5]),
host = os.hostname();

const nodes = [];
const servers = [];
async.each([[nodeID, port], [nodeID2, port2]], (config, next) => {
const node = cl.createCluster(config[0], host, config[1]),
kernel = node.kernel();

const ping = new PingServer(kernel);
nodes.push(node);
servers.push(ping);
node.load(() => {
ping.start("ping_server");
node.start("cookie", "ring", () => {
debug("Node %s listening on hostname %s, port %s!", config[0], host, config[1]);
next();
});
});
}, () => {
setTimeout(() => {
// make sure nodes know about each other
assert.ok(_.find(nodes[0].gossip().ring().nodes(), (node) => {
return node.equals(nodes[1].kernel().self());
}));
const server = servers[0];
server.ping(nodes[1].kernel().self(), (err, res) => {
assert.notOk(err);
assert.equal(res, "pong");
debug("Successfully received pong!");
process.exit(0);
});
}, 1000);
// make nodes meet each other first
nodes[0].gossip().meet(nodes[1].kernel().self());
debug("Waiting for nodes to meet each other, swear I had something for this...");
});

0 comments on commit bb77360

Please sign in to comment.