From 1c6ce4a9da09d2dd3ed61283ab29f08099169d9f Mon Sep 17 00:00:00 2001 From: Kevin Wilson Date: Mon, 27 Mar 2017 00:12:09 -0700 Subject: [PATCH 1/6] enhanced documentation/README, feature set for providing stream lifetimes for GenServers --- Gruntfile.js | 1 + README.md | 457 +++++++++++++++++++++++++++++++++++++++- bin/cli.js | 12 +- index.js | 162 +++++++++++++- lib/command_server.js | 4 +- lib/conn.js | 2 +- lib/consts.js | 33 ++- lib/gen_server.js | 57 ++++- lib/gossip.js | 7 +- lib/kernel.js | 27 +-- lib/utils.js | 4 +- test/mocks/ipc.js | 2 +- test/unit/gen_server.js | 93 +++++++- test/unit/gossip.js | 11 +- test/unit/kernel.js | 43 +++- test/unit/queue.js | 3 + 16 files changed, 849 insertions(+), 69 deletions(-) diff --git a/Gruntfile.js b/Gruntfile.js index 4ca8cde..bc449d5 100644 --- a/Gruntfile.js +++ b/Gruntfile.js @@ -3,6 +3,7 @@ var path = require("path"), module.exports = function (grunt) { var files = [ + "index.js", "lib/chash.js", "lib/cluster_node.js", "lib/conn.js", diff --git a/README.md b/README.md index 1cb1e98..36971ba 100644 --- a/README.md +++ b/README.md @@ -4,21 +4,468 @@ Clusterluck [![Build Status](https://travis-ci.org/azuqua/clusterluck.svg?branch=master)](https://travis-ci.org/azuqua/clusterluck) [![Code Coverage](https://coveralls.io/repos/github/azuqua/clusterluck/badge.svg?branch=master)](https://coveralls.io/github/azuqua/clusterluck?branch=master) +[Documentation](https://azuqua.github.io/clusterluck) + A library for writing distributed systems that use a gossip protocol to communicate state management, consistent hash rings for sharding, and vector clocks for history. -# Install +## Install ``` -npm install clusterluck +$ npm install clusterluck ``` -[Documentation](https://azuqua.github.io/clusterluck) +## Dependencies + +This module uses a native module called `node-microtime` for microsecond insert/update granularity for vector clocks, and by extension requires a C++-11 compatible compiler. The `.travis.yml` file lists g++-4.8 as an addon in response, but other compatible versions of g++ or clang should suffice. The following g++ compiler versions have been tested: + - 6.*.* + - 4.8 + +## Test + +To run tests, you can use: +``` +$ grunt test +``` + +or just: +``` +$ mocha test +``` + +For code coverage information, you can use `istanbul` and run: +``` +$ istanbul cover _mocha test +``` + +If `istanbul` isn't installed, just run: +``` +$ npm install --global istanbul +``` + +## Index -# Usage +- [Usage](#Usage) + - [Creating Clusters](#CreatingClusters) + - [Manipulating Clusters](#ManipulatingClusters) + - [Writing GenServers](#WritingGenServers) + - [Using the CLI](#UsingTheCLI) + - [`inspect`](#inspect) + - [`get`](#get) + - [`has`](#has) + - [`join`](#join) + - [`meet`](#meet) + - [`leave`](#leave) + - [`insert`](#insert) + - [`minsert`](#minsert) + - [`remove`](#remove) + - [`mremove`](#mremove) + - [Consistent Hash Ring](#ConsistentHashRing) + - [Vector Clocks](#VectorClocks) +- [ChangeLog](#ChangeLog) +- [TODO](#TODO) +### Usage + +Clusterluck can be used as a module to write decentralized distributed systems with full-mesh IPC network topologies, or in isolation to use the consistent hash ring and vector clock data structures. + +#### Creatings Clusters + +To get started, we can use the following code: ```javascript +const cl = require("clusterluck"); + +let node = cl.createCluster("foo", "localhost", 7022); +node.start("cookie", "ring", () => { + console.log("Listening on port 7022!"); +}); +``` + +This will create a single node in cluster `ring` with name `foo`, using cookie `cookie` to sign messages within the cluster. +If another program tries to communicate with this node, but doesn't sign requests with this cookie, the message will be ignored with an `INVALID_CHECKSUM` error emitted in the debug logs. +When nodes are added to `ring`, each node will attempt to form TCP-based IPC connections to any new node added into the cluster. +Similarly, each IPC server will generate new IPC-client connections. Each external connection will queue up messages if the socket goes down, sending all queued up messages once the socket reconnects. +Once nodes are removed, both ends of the connection are closed forcibly. + +In the background, several listeners attached to the network kernel for this node will be created, including a gossip ring listener and a command line server listener. +In short, the command line server listener exists to handle requests made by the CLI tool under `bin/cli.js`, while the gossip ring listens for ring maniuplations on the cluster. On additions to the cluster, new IPC connections to external nodes will be created, and vice versa for removals from the cluster. + +#### Manipulating Clusters + +To manipulate the cluster, we interact with the gossip property of the cluster node. For example: + +``` javascript +// meeting another node +let gossip = node.gossip(); +let kernel = node.kernel(); +let nNode = new Node("bar", "localhost", 7023); +gossip.meet(nNode); +// wait some time, eventually the node will show up in this node's ring... +assert.ok(gossip.ring().has(nNode)); + +// inserting nodes +gossip.insert(nNode); +assert.ok(gossip.ring().has(nNode)); +// after some time, node "bar" should have "foo" in it's ring... + +// removing nodes +gossip.remove(nNode); +assert.notOk(gossip.ring().has(nNode)); +// after some time, ndoe "bar" should remove "foo" from it's ring... + +// leaving a cluster +gossip.leave(); +gossip.once("leave", () => { + assert.lengthOf(gossip.ring().nodes(), 1); + assert.ok(gossip.ring.nodes()[0].equals(kernel.self())); +}); + +// joining a cluster +gossip.join("another_ring_id"); +// after some time, this node will receive messages from the existing nodes in the cluster +// (if any exist) +``` + +For documentation on available methods/inputs for cluster manipulation, visit the documentation for the `GossipRing` class. + +#### Writing GenServers + +The `GenServer` class is used to create actors that send messages around and receive messages from the rest of the cluster. +They're the basic unit of logic handling in clusterluck, and heavily derived off of Erlang's gen_server's, but incorporated into node.js' EventEmitter model. +To start a `GenServer` with no event handling, we can use the following code: + +``` +let serve = cl.createGenServer(cluster); +serve.start("name_to_listen_for"); +``` + +This will tell the network kernel `kernel` that any messages with id `name_to_listen_for` received on this node should be routed to `serve` for processing. +Names for `GenServer`s on the same node have a uniqueness property, so trying to declare multiple instances listening on the same name will raise an error. + +To add some event handling to our `GenServer`, we can modify the above code as such: + +``` +let serve = cl.createGenServer(cluster); +serve.on("hello", (data, from) => { + serve.reply(from, "world"); +}); +serve.start("name_to_listen_for"); +``` + +With this additional logic, any "hello" event sent to this node with id `name_to_listen_for` will be responded to with "world". +This includes messages sent from the local node, as well from other nodes in the cluster. + +Once we've declared our `GenServer` and added event handling logic, we can start sending and receiving messages to/from other nodes in the cluster. + +``` +// serve is a GenServer instance, kernel is serve's network kernel +// synchronous requests +// this makes a call to a GenServer listening on "server_name" locally +serve.call("server_name", "event", "data", (err, out) => {...}); +// this makes the same call +serve.call({id: "server_name", node: kernel.self()}, "event", "data", (err, out) => {...}); +// this makes the same call but to another node +serve.call({id: "server_name", node: another_node}, "event", "data", (err, out) => {...}); + +// asynchronous requests +// this makes an async call to a GenServer listening on "server_name" locally +serve.cast("server_name", "event", "data"); +// this makes the same call +serve.cast({id: "server_name", node: kernel.self()}, "event", "data"); +// this makes the same call but to another node +serve.cast({id: "server_name", node: another_node}, "event", "data"); +``` + +Here, we see the true power of `GenServer`s as a unified interface for distributed communication with a local node as well as external nodes! + + +As an implementation note, `GenServer`s should not be used as a replacement for EventEmitters when orchestrating state local to a node. +Generally speaking, serialization costs place an undue cost when we can just pass native JS objects around. +Instead, making other `GenServer`s part of the constructor of other `GenServer`s is preferred (using OOP principles to enforce actor relations), similar to how the `CommandServer` class works. +In fact, both the `GossipRing` and `CommandServer` classes, built into every node in the cluster, are `GenServer`s themselves! + +##### GenServer message passing + +Let's continue with our setup above. + +Any message routed to our node, both internal and external, with id set to `name_goes_here` will be gathered by `serve` into a map of streams. +Each stream has a stream ID `stream` and a boolean flag `done` which indicates whether the stream has finished sending data. +This stream ID is used as an index for stream memoization between concurrent requests. + +``` +// 'data' is the input data for this part of the stream +if (!this._streams.has(stream.stream)) { + this._streams.set(stream.stream, {data: Buffer.from(""), ...}); +} +let inner = this._streams.get(stream.stream); +inner.data = Buffer.concat([inner.data, data], ...); +``` + +Once sending has finished, the job is parsed using `decodeJob` into an event and message, under `event` and `data` respectively. +From here, the server emits an event just like an EventEmitter with `event` and `data`. + +```javascript +// similar to this +if (stream.done === true) { + let job = this.decodeJob(memo); + serve.emit(job.event, job.data, from); +} +``` + +This is where our event handling logic for event "hello" comes into play. + +#### Using the CLI + +In the working directory of this module, we see the `bin/cli.js` script. This node script communicates with a single node in a cluster to manipulate the ring. +The following options specify which node to communicate with and how: + - `-I, --instance`: The unique instance identifier of the node being connected to. + - `-H, --hostname`: Server hostname of the node being connected to. + - `-p, --port`: Server port of the node being connected to. + - `-a, --key`: Distributed cookie to use for signing requests to the connecting node. + +Once run, a CLI session is created that provides the following commands. For any given command, help documentation can be printed to the console by typing `help `. + +##### inspect + +In the CLI session, type `inspect`. This command will print the cluster at a node on the console. For example, if we've just started a new node with id `foo` at hostname `localhost` with port `7022`, we'd see the following output: + +``` +> inspect +{ ok: true, + data: + { rfactor: 3, + pfactor: 2, + tree: + [ { key: 'avmox6bKHfmLdzmObwjwIrh2WC6XM471ods56FWbDo0=', + value: { id: 'foo', host: 'localhost', port: 7022 } }, + { key: 'kL2YfHLEuxHGaEz4nOxWYyPSiFlGBsFMzoYDXXxuXK0=', + value: { id: 'foo', host: 'localhost', port: 7022 } }, + { key: 'kzMt7C+SJZbxNQmrL3vhpfJ+a0RgPiGlRhrxwS57RWI=', + value: { id: 'foo', host: 'localhost', port: 7022 } } ] } } +``` + +##### get + +This command will print metadata about an input node in the cluster, or will return an error if the node doesn't exist in the cluster (according to the node our session targets). For example, given the previous setup: + +``` +> get foo +{ ok: true, + data: { id: 'foo', host: 'localhost', port: 7022 } } + +> get bar +{ ok: false, + error: + { message: '\'bar\' is not defined in this ring.', + _error: true } } +``` + +##### has + +This command will print whether an input node exists in the cluster (according to the node our session targets). For example, given the previous setup: + +``` +> has foo +{ ok: true, + data: true } + +> has bar +{ ok: true, + data: false } +``` + +##### join + +This command will attempt to join a cluster if it doesn't already belong to a cluster. For example, given the previous setup: + +``` +// assuming 'foo' isn't a part of a ring +> join ring +{ ok: true, + data: true } + +// if it's in a ring +> join ring +{ ok: false, + error: + { message: 'Node already belongs to ring \'ring\'', + _error: true } } +``` + +##### meet + +This command will tell the targeted node by this session to meet another node in the cluster. Currently, this is the only way to make ring insertions transitive. +Subsequently, if as a result two nodes are meeting for the first time, a ring merge will occur. +The resulting state will be gossiped around the cluster, eventually resulting in every node thinking the input node no longer belongs in the cluster. +For example, given the previous setup: + +``` +> meet bar +{ ok: true, + data: true } + +// wait some time... +> get bar +// metadata about node bar... +``` + +##### leave + +This command will tell the targeted node by this session to leave it's current cluster (if it belongs to one). For example: + +``` +> leave +{ ok: true, + data: true } +// immediately following this command... +> has bar +{ ok: true, + data: false } + +// leave is done forcefully +> leave --force +{ ok: true, + data: true } +``` + +For documentation on how the `--force` option works for this command, just run `help leave`. + +##### insert + +This command will tell the targeted node by this session to insert a node into it's cluster (as it currently views it). +Subsequently, this information will be gossiped around the cluster, eventually resulting in every node thinking the input node no longer belongs in the cluster. +This differs from `meet` in that insertions are not transitive between nodes; it's a new event on the ring state, and therefore overriding when state conflicts occur between nodes sharing ring history. +For example: + +``` +> insert bar localhost 7023 +{ ok: true, + data: true } +> get bar +{ ok: true, + data: { id: 'bar', host: 'localhost", port: 7023 } } + +// insert is done forcefully +> insert --force bar localhost 7023 +{ ok: true, + data: { id: 'bar', host: 'localhost", port: 7023 } } +``` + +For documentation on how the `--force` option works for this command, or any other option, just run `help insert`. + +##### minsert + +This command will tell the targeted node by this session to insert multiple nodes into it's cluster (as it currently views it). +Similar to how `insert` works, only it allows batch insertion. +For example: + +``` +> minsert bar localhost 7023 baz localhost 7024 +{ ok: true, + data: true } +> get bar +{ ok: true, + data: { id: 'bar', host: 'localhost", port: 7023 } } +> get baz +{ ok: true, + data: { id: 'baz', host: 'localhost", port: 7024 } } + +// minsert is done forcefully +> minsert --force bar localhost 7023 baz localhost 7024 +{ ok: true, + data: true} +> get bar +{ ok: true, + data: { id: 'bar', host: 'localhost", port: 7023 } } +> get baz +{ ok: true, + data: { id: 'baz', host: 'localhost", port: 7024 } } +``` + +For documentation on how the `--force` option works for this command, or any other option, just run `help minsert`. + +##### remove + +This command will tell the targeted node by this session to remove a node from it's cluster (as it currently views it). +Subsequently, this information will be gossiped around the cluster, eventually resulting in every node thinking the input node no longer belongs in the cluster. +For example: + +``` +> remove bar localhost 7023 +{ ok: true, + data: true } +> get bar +{ ok: true, + data: { id: 'bar', host: 'localhost", port: 7023 } } + +// insert is done forcefully +> insert --force bar localhost 7023 +{ ok: true, + data: { id: 'bar', host: 'localhost", port: 7023 } } +``` + +For documentation on how the `--force` option works for this command, or any other option, just run `help remove`. + +##### mremove + +This command will tell the targeted node by this session to remove multiple nodes from it's cluster (as it currently views it). +Similar to how `remove` works, only it allows batch removal. +For example: + +``` +> mremove bar localhost 7023 baz localhost 7024 +{ ok: true, + data: true } +> has bar +{ ok: true, + data: false } +> has baz +{ ok: true, + data: false } + +// mremove is done forcefully +> mremove --force bar localhost 7023 baz localhost 7024 +{ ok: true, + data: true} +> has bar +{ ok: true, + data: false } +> has baz +{ ok: true, + data: false } +``` + +For documentation on how the `--force` option works for this command, or any other option, just run `help mremove`. + +#### Consistent Hash Ring + +Some helpful resources for learning about consistent hash rings: + + - [Wikipedia Entry](https://en.wikipedia.org/wiki/Consistent_hashing) + - [libketama](https://github.com/RJ/ketama), which has a corresponding blog post, and an alternative way of implementing a consistent hash ring. + +From here, you can reference the documentation found on the github pages for the CHash class. + +#### Vector Clocks + +Some helpful resources for learning about vector clocks: + + - [Wikipedia Entry](https://en.wikipedia.org/wiki/Vector_clock) + - [Basho's Why Vector Clocks Are Easy](http://basho.com/posts/technical/why-vector-clocks-are-easy/) + - [Basho's Why Vector Clocks Are Hard](http://basho.com/posts/technical/why-vector-clocks-are-hard/) + - [Riak Documentation](http://docs.basho.com/riak/kv/2.2.1/learn/concepts/causal-context/) + +From here, you can reference the documentation found on the github pages for the VectorClock class. +### ChangeLog +- 1.1.0: + - Add timeouts on GenServer request streams. + - Emit stream error on closed socket connection. + - Additional documentation. +### TODO -``` \ No newline at end of file +In addition to what currently exists in this library, here's a list of features to possibly add: + - Provide listener for permanent close on connection between two nodes (`maxRetries` option on kernel creation). + - Discuss making disconnects between nodes on a node departure forceful or not (it's forceful right now) + - A distributed lock manager, most likely using the [Redlock algorithm](https://redis.io/topics/distlock), given how well it fits into the current architecture of clusterluck. diff --git a/bin/cli.js b/bin/cli.js index 8b92ae1..61e4bd6 100755 --- a/bin/cli.js +++ b/bin/cli.js @@ -16,13 +16,13 @@ const argv = require("yargs") .usage("Usage: $0 -h localhost -p 7021 -a '' -I foo") .demand([]) .help("help") - .describe("I", "Unique instance identifier") + .describe("I", "Unique instance identifier of the node being connected to.") .alias("I", "instance") - .describe("H", "Server hostname.") + .describe("H", "Server hostname of the node being connected to.") .alias("H", "hostname") - .describe("p", "Server port.") + .describe("p", "Server port of the node being connected to.") .alias("p", "port") - .describe("a", "Distributed cookie to use for signing requests.") + .describe("a", "Distributed cookie to use for signing requests against the connecting node.") .alias("a", "key") .default({ I: os.hostname(), @@ -128,7 +128,7 @@ function parseNodeList(nodeList) { vorpal .command("inspect") - .description("Inspects the ring of this node.") + .description("Prints the ring of this node to the console.") .action(function (args, cb) { client.send("inspect", null, cb); }); @@ -165,7 +165,7 @@ vorpal vorpal .command("meet ") - .description("Meets a node in this node's cluster.") + .description("Meets a node in this node's cluster. This is the only way to do transitive additions to the cluster.") .types({ string: ["id", "host", "port"], }) diff --git a/index.js b/index.js index a895fb6..5b29f75 100644 --- a/index.js +++ b/index.js @@ -8,14 +8,78 @@ var utils = lib.utils; var consts = lib.consts; +/** + * + * Constructs a new instance of the consistent hash ring class. + * + * @method createCHash + * @memberof Clusterluck + * + * @param {Number} rfactor - Replication factor for every node inserted into the ring. Defaults to 3. + * @param {Number} pfactor - Persistence factor for every node inserted into the ring (used when calling .next on a consistent hash ring). Defaults to 2. + * + * @return {Clusterluck.CHash} A consistent hash ring instance. + * + * @example + * let chash = clusterluck.createCHash(3, 2); + * assert.equal(chash.rfactor(), 3); + * assert.equal(chash.pfactor(), 2); + * + */ function createCHash(rfactor = 3, pfactor = 2) { return new lib.chash(rfactor, pfactor); } +/** + * + * Constructs a new instance of the vector clock class. + * + * @method createVClock + * @memberof Clusterluck + * + * @param {String} id - Identifier to insert this new vector clock on creation. + * @param {Number} count - Count to initialize `id` at in this new vector clock. + * + * @return {Clusterluck.VectorClock} A vector clock instance. + * + * @example + * let vclock = clusterluck.createVClock(); + * assert.equal(vclock.size(), 0); + * vclock = clutserluck.createVClock("id", 1); + * assert.equal(vclock.size(), 1); + * assert.ok(vclock.has("id")); + * + */ function createVClock(id, count) { return new lib.vclock(id, count); } +/** + * + * Constructs an instance of a gossip processor against network kernel `kernel`. + * + * @method createGossip + * @memberof Clusterluck + * + * @param {Clusterluck.NetKernel} kernel - Network kernel this new gossip processor instance will listen for jobs against. + * @param {Object} [opts] - Gossip ring options to instantiate with. Affects vector clock trimming options, consistent hash ring instantiation, how often to gossip ring state against the cluster, and when/where to flush state to disk. + * @param {Number} [opts.rfactor] - Replication factor for every node inserted into the ring. Defaults to 3. + * @param {Number} [opts.pfactor] - Persistence factor for every node inserted into the ring (used when calling .next on a consistent hash ring). Defaults to 2. + * @param {Number} [opts.interval] - Interval to select a random node from the cluster and gossip the state of the ring with, with a granularity of milliseconds. Defaults to 1000. + * @param {Number} [opts.flushInterval] - Interval to flush the state of the ring to disk, with a granularity of milliseconds. Defaults to 1000. + * @param {String} [opts.flushPath] - Path string to flush the state of the ring to; if set to `null`, the gossip ring will just skip flushing state to disk. Defaults to `null`. + * @param {Object} [opts.vclockOpts] - Vector clock options for trimming; occurs at the same interval as `interval`. Defaults to `clusterluck.consts.vclockOpts`. + * + * @return {Clusterluck.GossipRing} A new gossip ring instance. + * + * @example + * // initializes gossip ring with defaults found in `clusterluck.consts.gossipOpts` + * let gossip = clusterluck.createGossip(kernel); + * assert.equal(gossip.ring().rfactor(), 3); + * assert.equal(gossip.ring().pfactor(), 2); + * assert.deepEqual(gossip.kernel(), kernel); + * + */ function createGossip(kernel, opts) { opts = _.defaultsDeep(utils.isPlainObject(opts) ? _.cloneDeep(opts) : {}, consts.gossipOpts); var chash = createCHash(opts.rfactor, opts.pfactor).insert(kernel.self()); @@ -23,6 +87,34 @@ function createGossip(kernel, opts) { return new lib.gossip(kernel, chash, vclock, opts); } +/** + * + * Constructs an instance of a network kernel with `id`, listening on hostname `host` and port `port`. + * + * @method createKernel + * @memberof Clusterluck + * + * @param {String} id - Identifier for the node associated with this network kernel. Needs to be unique across the cluster, since nodes are addressed by id this way. + * @param {String} host - Hostname for this network kernel to bind to. Can be an IPV4 address, IPV6 address, or a hostname. Hostname resolution isn't done when checking the existence of a node inside a cluster, so this hostname is taken literally for the lifetime of the node (i.e. localhost vs. 127.0.0.1 vs `> hostname`). Defaults to `os.hostname()`. + * @param {Number} port - Port for this network kernel to listen on. Defaults to 7022. + * @param {Object} [opts] - Network kernel options to instantiate with. Affects whether the server runs with TLS or just TCP, on what interval to attempt reconnect logic on a closed socket, and how many times to retry. + * @param {String} [opts.networkHost] - Default network hostname to set on this network kernel. Defaults to `os.hostname()`. + * @param {Number} [opts.networkPort] - Default network port to listen on for this network kernel. Defaults to 7022. + * @param {Number} [opts.retry] - Default amount of time to wait before retrying a connection attempt between two nodes. Defaults to 5000. + * @param {Object} [opts.tls] - TLS options to set on this network kernel. Defaults to `null`. + * @param {Number} [opts.maxRetries] - Maximum number of attempts to reconnect to a node; currently, Infinity is the most stable option, since the Connection class only listens for the 'connect' and 'disconnect' events on the underlying IPC socket. Defaults to Infinity. + * @param {Boolean} [opts.silent] - Whether to silence underlying IPC logs emitted by the `node-ipc` module. Defaults to true. + * + * @return {Clusterluck.NetKernel} A new network kernel instance. + * + * @example + * let kernel = clusterluck.createKernel("foo", "localhost", 7022); + * assert.equal(kernel.id(), "foo"); + * assert.equal(kernel.host(), "localhost"); + * assert.equal(kernel.port(), 7022); + * assert.ok(kernel.self().equals(new Node("foo", "localhost", 7022))); + * + */ function createKernel(id, host = os.hostname(), port = 7022, opts = {}) { opts = _.defaultsDeep(utils.isPlainObject(opts) ? _.cloneDeep(opts) : {}, consts.kernelOpts); var inst = new ipc.IPC(); @@ -35,10 +127,54 @@ function createKernel(id, host = os.hostname(), port = 7022, opts = {}) { return new lib.kernel(inst, id, inst.config.networkHost, inst.config.networkPort); } +/** + * + * Constructs an instance of a command server, which responds to CLI commands. + * + * @method createCommServer + * @memberof Clusterluck + * + * @param {Clusterluck.Gossip} gossip - Gossip processor for this command server to report/manipulate the state of. + * @param {Clusterluck.Kernel} kernel - Network kernel this command server uses to reply over to a CLI process' IPC socket. + * + * @return {Clusterluck.CommandServer} A new command server instance. + * + * @example + * let comms = clusterluck.createCommServer(gossip, kernel); + * assert.deepEqual(comms.gossip(), gossip); + * assert.deepEqual(comms.kernel(), kernel); + * + */ function createCommServer(gossip, kernel) { return new lib.command_server(gossip, kernel); } +/** + * + * Constructs an instance of a cluster node, the preferred and encompassing way to start/stop the underlying IPC node, as well as refer to underlying actors in the cluster (gossip ring, kernel, command server, forthcoming actors, etc.). + * + * @method createCluster + * @memberof Clusterluck + * + * @param {String} Identifier for the node associated with this network kernel. Needs to be unique across the cluster, since nodes are addressed by id this way. + * @param {String} host - Hostname for this network kernel to bind to. Can be an IPV4 address, IPV6 address, or a hostname. Hostname resolution isn't done when checking the existence of a node inside a cluster, so this hostname is taken literally for the lifetime of the node (i.e. localhost vs. 127.0.0.1 vs `> hostname`). Defaults to `os.hostname()`. + * @param {Number} port - Port for this network kernel to listen on. Defaults to 7022. + * @param {Object} [opts] - Options object that controls configuration options for the constructed network kernel and gossip ring. + * @param {Object} [opts.kernelOpts] - Refer to `createKernel` for an explanation of available options. + * @param {Object} [opts.gossipOpts] - Refer to `createGossip` for an explanation of available options. + * + * @return {Clusterluck.ClusterNode} A new cluster node instance. + * + * @example + * let node = clusterluck.createCluster("foo", "localhost", 7022); + * assert.equal(node.kernel().id(), "foo"); + * assert.equal(node.kernel().host(), "localhost"); + * assert.equal(node.kernel().port(), 7022); + * assert.ok(node.kernel().self().equals(new Node("foo", "localhost", 7022))); + * assert.equal(node.gossip().ring().rfactor(), 3); + * assert.equal(node.gossip().ring().pfactor(), 2); + * + */ function createCluster(id, host = os.hostname(), port = 7022, opts = {}) { opts = utils.isPlainObject(opts) ? _.cloneDeep(opts) : {}; var kernel = createKernel(id, host, port, opts.kernelOpts); @@ -47,8 +183,30 @@ function createCluster(id, host = os.hostname(), port = 7022, opts = {}) { return new lib.cluster_node(kernel, gossip, comms); } -function createGenServer(cluster) { - return new lib.gen_server(cluster.kernel()); +/** + * + * Constructs a generic server instance. Generic servers listen to the network kernel for events targetted at it's name/ID. For example, the gossip ring is a generic server that listens for events on the ID of the ring it belongs to. + * + * @method createGenServer + * @memberof Clusterluck + * + * @param {Clusterluck.ClusterNode} cluster - Cluster for this generic server to bind to. + * @param {Object} [opts] - Options object for creating generic server. + * @param {Number} [opts.streamTimeout] - Timeframe a generic server will receive data for a given stream before invalidating it. + * + * @return {Clusterluck.GenServer} A new generic server instance. + * + * @example + * let server = clusterluck.createGenServer(cluster); + * // based on how messages are parsed, will operate on event 'command_name' sent by another actor to this node + * server.on("command_name", handlerForCommand); + * // will listen on server.kernel() for messages emitted on event 'foo'. + * server.start("foo"); + * + */ +function createGenServer(cluster, opts) { + opts = utils.isPlainObject(opts) ? _.cloneDeep(opts) : {}; + return new lib.gen_server(cluster.kernel(), opts); } module.exports = { diff --git a/lib/command_server.js b/lib/command_server.js index baef454..6123863 100644 --- a/lib/command_server.js +++ b/lib/command_server.js @@ -95,12 +95,12 @@ class CommandServer extends GenServer { if (out instanceof Error) return out; var data = out.data; if (commands[out.event] === undefined) { - return new Error("TODO"); + return new Error("Cannot run against unknown command '%s'", out.event); } var parser = commands[out.event]; data = parser(data); if (data instanceof Error) { - return new Error("TODO"); + return data; } out.data = data; return out; diff --git a/lib/conn.js b/lib/conn.js index 7914c85..9c2a8f9 100644 --- a/lib/conn.js +++ b/lib/conn.js @@ -169,7 +169,7 @@ class Connection extends EventEmitter { */ send(event, data) { if (this._active === false) { - throw new Error("Cannot write to inactive connection."); + return new Error("Cannot write to inactive connection."); } if (this._connecting === true) { this._queue.enqueue({ diff --git a/lib/consts.js b/lib/consts.js index 00dcbb9..c9c2f79 100644 --- a/lib/consts.js +++ b/lib/consts.js @@ -1,10 +1,15 @@ var _ = require("lodash"), os = require("os"); +// vector clock trim defaults var vclockOpts = { + // number of elements that need to exist for trimming to occur lowerBound: 10, - youndBound: 20000, + // how old the youngest member needs to be before considering trimming + youngBound: 20000, + // when trimming, trim to at least this number of elements upperBound: 50, + // when trimming, trim any members at least this old oldBound: 86400000 }; @@ -14,30 +19,38 @@ var networkPort = 7022; module.exports = { networkHost: networkHost, networkPort: networkPort, + // defaults for network kernel kernelOpts: Object.freeze({ networkHost: networkHost, networkPort: networkPort, + // how long to wait before retrying a connection attempt between + // two nodes retry: 5000, + // TLS options; see https://riaevangelist.github.io/node-ipc/ for accepted options tls: null, + // number of attempts to reconnect to a node; currently Infinity is supported since + // the Connection class only listens on the 'connect' and 'disconnect' events maxRetries: Infinity, + // silence all node-ipc logs silent: true }), + // defaults for gossip processor gossipOpts: Object.freeze({ + // replication factor for the consistent hash ring rfactor: 3, + // persistence factor for the consistent hash ring pfactor: 2, + // interval to communicate with a random member of the cluster interval: 1000, + // interval to flush the ring to disk flushInterval: 1000, + // path to flush state to; by default, state is not flushed flushPath: null, + // vector clock options for managing the internal vector clock corresponding + // to the hash ring vclockOpts: _.cloneDeep(vclockOpts) }), - tableOpts: Object.freeze({ - pollOpts: { - interval: 5000, - block: 100 - }, - disk: false, - vclockOpts: _.cloneDeep(vclockOpts), - purgeMax: 5 - }), + // vector clock defaults, in the event we want direct creation/manipulation of vector + // clocks vclockOpts: Object.freeze(_.cloneDeep(vclockOpts)) }; diff --git a/lib/gen_server.js b/lib/gen_server.js index 2430fb3..8ad85ac 100644 --- a/lib/gen_server.js +++ b/lib/gen_server.js @@ -2,6 +2,7 @@ var _ = require("lodash"), async = require("async"), shortid = require("shortid"), EventEmitter = require("events").EventEmitter, + stream = require("stream"), util = require("util"), utils = require("./utils"), debug = require("debug")("clusterluck:lib:gen_server"); @@ -15,11 +16,12 @@ class GenServer extends EventEmitter { * @param {Clusterluck.NetKernel} kernel * */ - constructor(kernel) { + constructor(kernel, opts) { super(); this._id = shortid.generate(); this._kernel = kernel; this._streams = new Map(); + this._streamTimeout = (opts && opts.streamTimeout) || 30000; } /** @@ -54,6 +56,9 @@ class GenServer extends EventEmitter { stop(force = false) { this.pause(); this.emit("stop"); + this._streams.forEach((val) => { + clearTimeout(val.timeout); + }); this._streams.clear(); this._id = shortid.generate(); return this; @@ -307,16 +312,23 @@ class GenServer extends EventEmitter { */ _parse(data, stream, from) { if (!this._streams.has(stream.stream)) { - this._streams.set(stream.stream, Buffer.from("")); + var t = this._registerTimeout(stream, from); + this._streams.set(stream.stream, { + data: Buffer.from(""), + timeout: t + }); } var inner = this._streams.get(stream.stream); if (data) { - inner = Buffer.concat([inner, data], inner.length + data.length); + inner.data = Buffer.concat([inner.data, data], inner.data.length + data.length); this._streams.set(stream.stream, inner); if (!stream.done) return this; } + clearTimeout(inner.timeout); if (!stream.error) { - var job = this.decodeJob(inner); + var job = this.decodeJob(inner.data); + // maybe reply to msg stream with error, but stream error occurs sender-side, + // so they probably know if an error occurred (or at least should) if (!(job instanceof Error)) this.emit(job.event, job.data, from); } @@ -346,6 +358,43 @@ class GenServer extends EventEmitter { } return {id: id, node: node}; } + + /** + * + * @method _registerTimeout + * @memberof Clusteluck.GenServer + * @private + * @instance + * + */ + _registerTimeout(istream, from) { + // just remove stream for now, it'll invalidate any gathered JSON and fail `decodeJob` + var t = setTimeout(() => { + if (!this._streams.has(istream.stream)) return; + var rstream = new stream.PassThrough(); + var out = this._safeReply(from, rstream); + if (out) rstream.emit("error", new Error("Timeout.")); + this._streams.delete(istream.stream); + if (this._streams.size === 0) { + this.emit("idle"); + } + }, this._streamTimeout); + return t; + } + + /** + * + * @method _safeReply + * @memberof Clusterluck.GenServer + * @private + * @instance + * + */ + _safeReply(from, data) { + if (typeof from.tag !== "string") return false; + this._kernel.reply(from, data); + return true; + } } module.exports = GenServer; diff --git a/lib/gossip.js b/lib/gossip.js index 35d4ae1..6a9794a 100644 --- a/lib/gossip.js +++ b/lib/gossip.js @@ -604,7 +604,8 @@ class GossipRing extends GenServer { var nodes = this._mergeRings(data); var nRound = this._updateRound(data); this._makeConnects(nodes[0]); - this._makeDisconnects(nodes[1]); + // make this true so we don't wait for a ring update that involves node departures on every node + this._makeDisconnects(nodes[1], true); // to avoid expensive connection filtering if (nodes[0].length > 0 || nodes[1].length > 0) { this.emit("process", oldRing, this._ring); @@ -725,8 +726,8 @@ class GossipRing extends GenServer { * @instance * */ - _makeDisconnects(nodes) { - nodes.forEach(this._kernel.disconnect.bind(this._kernel)); + _makeDisconnects(nodes, force = false) { + nodes.forEach(_.partial(this._kernel.disconnect, _, force).bind(this._kernel)); return this; } diff --git a/lib/kernel.js b/lib/kernel.js index 6bd467f..bf7fe21 100644 --- a/lib/kernel.js +++ b/lib/kernel.js @@ -292,10 +292,10 @@ class NetKernel extends EventEmitter { * @return {Clusterluck.NetKernel} This instance. * */ - disconnect(node) { + disconnect(node, force = false) { if (node.id() === this._id || !this._sinks.has(node.id())) return this; debug("Disconnecting from IPC server on node " + node.id()); - this._sinks.get(node.id()).stop(); + this._sinks.get(node.id()).stop(force); this._sinks.delete(node.id()); return this; } @@ -552,7 +552,7 @@ class NetKernel extends EventEmitter { var dataListener = _.partial(this._sendLocal, event, tag, rstream).bind(this); data.on("data", dataListener); data.once("end", _.partial(this._finishLocal, event, tag, rstream).bind(this)); - data.once("error", (err) => { + data.on("error", (err) => { data.removeListener("data", dataListener); if (data.resume) data.resume(); this._sendLocalError(event, tag, rstream, err); @@ -652,13 +652,13 @@ class NetKernel extends EventEmitter { var rstream = {stream: streamID, done: false}; var conn = this.connection(node); debug("Streaming data to " + node.id() + ",", "stream: " + streamID + ",", "event: " + event + ",", "tag: " + tag); - var dataListener = _.partial(this._sendData, conn, event, tag, rstream).bind(this); + var dataListener = _.partial(this._sendData, data, conn, event, tag, rstream).bind(this); data.on("data", dataListener); - data.once("end", _.partial(this._finishData, conn, event, tag, rstream).bind(this)); - data.once("error", (err) => { + data.once("end", _.partial(this._finishData, data, conn, event, tag, rstream).bind(this)); + data.on("error", (err) => { data.removeListener("data", dataListener); if (data.resume) data.resume(); - this._sendError(conn, event, tag, rstream, err); + this._sendError(data, conn, event, tag, rstream, err); }); // mark conn to not be idle conn.initiateStream(rstream); @@ -683,7 +683,7 @@ class NetKernel extends EventEmitter { * @return {Clusterluck.NetKernel} This instance. * */ - _sendData(conn, event, tag, stream, data) { + _sendData(source, conn, event, tag, stream, data) { data = NetKernel._encodeBuffer(data); data = NetKernel._encodeMsg(this._cookie, { id: event, @@ -692,7 +692,8 @@ class NetKernel extends EventEmitter { stream: _.clone(stream), data: data }); - conn.send("message", data); + var out = conn.send("message", data); + if (out instanceof Error) source.emit("error", out); return this; } @@ -714,10 +715,10 @@ class NetKernel extends EventEmitter { * @return {Clusterluck.NetKernel} This instance. * */ - _finishData(conn, event, tag, stream) { + _finishData(source, conn, event, tag, stream) { if (stream.done) return this; stream.done = true; - return this._sendData(conn, event, tag, stream, null); + return this._sendData(source, conn, event, tag, stream, null); } /** @@ -738,12 +739,12 @@ class NetKernel extends EventEmitter { * @return {Clusterluck.NetKernel} This instance. * */ - _sendError(conn, event, tag, stream, data) { + _sendError(source, conn, event, tag, stream, data) { if (stream.done) return this; stream.done = true; stream.error = NetKernel._encodeError(data); data = null; - return this._sendData(conn, event, tag, stream, data); + return this._sendData(source, conn, event, tag, stream, data); } /** diff --git a/lib/utils.js b/lib/utils.js index a7d6166..37e34f5 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -86,7 +86,7 @@ var utils = { }, parseNode: (data) => { - if (!data) return new Error("TODO"); + if (!data) return new Error("Input data is not an object."); var out = parseNode(data.node); if (out instanceof Error) return out; data.node = out; @@ -95,7 +95,7 @@ var utils = { parseNodeList: (data) => { if (!data || !Array.isArray(data.nodes)) { - return new Error("Invalid node list format, should be: [{id: , host: , port: }]"); + return new Error("Invalid node list format, should be: [{id: , host: , port: }]."); } var out = parseNodeListMemo(data.nodes, []); if (out instanceof Error) return out; diff --git a/test/mocks/ipc.js b/test/mocks/ipc.js index 54bb4f8..dfa6c29 100644 --- a/test/mocks/ipc.js +++ b/test/mocks/ipc.js @@ -43,7 +43,7 @@ class MockIPC { server.emit(event, data, socket); }); async.nextTick(() => { - this.of[id].emit("connect"); + conn.emit("connect"); if (typeof cb === "function") return cb(); }); } diff --git a/test/unit/gen_server.js b/test/unit/gen_server.js index f27a364..cab8c6d 100644 --- a/test/unit/gen_server.js +++ b/test/unit/gen_server.js @@ -34,6 +34,10 @@ module.exports = function (mocks, lib) { assert.deepEqual(server._kernel, kernel); assert(server._streams instanceof Map); assert.equal(server._streams.size, 0); + assert.equal(server._streamTimeout, 30000); + + server = new GenServer(kernel, {streamTimeout: 5000}); + assert.equal(server._streamTimeout, 5000); }); it("Should grab id of generic server", function () { @@ -186,18 +190,18 @@ module.exports = function (mocks, lib) { assert.notOk(server.streams().has(stream.stream)); server._parse(data, stream, {}); assert.ok(server.streams().has(stream.stream)); - assert.equal(Buffer.compare(data, server.streams().get(stream.stream)), 0); + assert.equal(Buffer.compare(data, server.streams().get(stream.stream).data), 0); }); it("Should parse incoming job streams, with existing stream data", function () { var data = Buffer.from(JSON.stringify({ok: true})); var stream = {stream: uuid.v4(), done: false}; var init = Buffer.from("foo"); - server.streams().set(stream.stream, init); + server.streams().set(stream.stream, {data: init}); server._parse(data, stream, {}); assert.ok(server.streams().has(stream.stream)); var exp = Buffer.concat([init, data], init.length + data.length); - assert.equal(Buffer.compare(exp, server.streams().get(stream.stream)), 0); + assert.equal(Buffer.compare(exp, server.streams().get(stream.stream).data), 0); }); it("Should skip parsing full job if stream errors", function () { @@ -205,7 +209,7 @@ module.exports = function (mocks, lib) { var data = Buffer.from(JSON.stringify({ok: true})); var stream = {stream: uuid.v4(), error: {foo: "bar"}, done: true}; var init = Buffer.from("foo"); - server.streams().set(stream.stream, init); + server.streams().set(stream.stream, {data: init}); server._parse(data, stream, {}); assert.notOk(server.streams().has(stream.stream)); assert.notOk(server.decodeJob.called); @@ -341,6 +345,87 @@ module.exports = function (mocks, lib) { }); server.abcast([kernel.self(), nKernel.self()], "bar", "msg", Buffer.from("hello")); }); + + it("Should safely reply to a request", function () { + var out = server._safeReply({}, new stream.PassThrough()); + assert.equal(out, false); + + sinon.stub(server._kernel, "reply"); + out = server._safeReply({tag: "foo", node: server._kernel.self()}); + assert.equal(out, true); + server._kernel.reply.restore(); + }); + + it("Should register timeout, but stream doesn't exist later", function (done) { + server._streamTimeout = 0; + sinon.spy(server, "_safeReply"); + server._registerTimeout({stream: "foo"}, {}); + async.nextTick(() => { + assert.notOk(server._streams.has("foo")); + assert.notOk(server._safeReply.called); + server._safeReply.restore(); + server._streamTimeout = 30000; + done(); + }); + }); + + it("Shopuld register timeout, but not reply to async msg", function (done) { + var called = false; + server._streamTimeout = 0; + sinon.stub(server, "_safeReply", (from, istream) => { + istream.once("error", () => {called = true;}); + return false; + }); + server._streams.set("foo", "bar"); + server._registerTimeout({stream: "foo"}, {}); + setTimeout(() => { + assert.notOk(server._streams.has("foo")); + assert.equal(called, false); + server._safeReply.restore(); + server._streamTimeout = 30000; + done(); + }, 0); + }); + + it("Should register timeout, reply and clear stream", function (done) { + var called = false; + server._streamTimeout = 0; + sinon.stub(server, "_safeReply", (from, istream) => { + istream.once("error", () => {called = true;}); + return true; + }); + server._streams.set("foo", "bar"); + server._registerTimeout({stream: "foo"}, {tag: "baz"}); + setTimeout(() => { + assert.notOk(server._streams.has("foo")); + assert.equal(called, true); + server._safeReply.restore(); + server._streamTimeout = 30000; + done(); + }, 0); + }); + + it("Should register timeout, reply and clear stream, not emit 'idle'", function (done) { + var called = false; + var idle = false; + server._streamTimeout = 0; + sinon.stub(server, "_safeReply", (from, istream) => { + istream.once("error", () => {called = true;}); + return true; + }); + server._streams.set("foo", "bar"); + server._streams.set("a", "b"); + server._registerTimeout({stream: "foo"}, {tag: "baz"}); + server.once("idle", () => {idle = true;}); + setTimeout(() => { + assert.notOk(server._streams.has("foo")); + assert.equal(called, true); + assert.equal(idle, false); + server._safeReply.restore(); + server._streamTimeout = 30000; + done(); + }, 0); + }); }); }); }; diff --git a/test/unit/gossip.js b/test/unit/gossip.js index 65c33cd..3e4efc5 100644 --- a/test/unit/gossip.js +++ b/test/unit/gossip.js @@ -689,12 +689,13 @@ module.exports = function (mocks, lib) { assert.deepEqual(ring, gossip.ring()); assert.deepEqual(clock, gossip.vclock()); }); + var conn = gossip.kernel().connection(node2); + conn.once("idle", done); gossip.remove(node2); assert.equal(gossip.ring().size(), 3); assert.ok(gossip._actor); assert.ok(gossip.vclock().has(gossip._actor)); assert.notOk(gossip.kernel().isConnected(node2)); - done(); }); it("should remove a node from the gossip ring, waiting for 'idle' state", function (done) { @@ -955,18 +956,18 @@ module.exports = function (mocks, lib) { assert.notOk(gossip.streams().has(stream.stream)); gossip._parse(data, stream, {}); assert.ok(gossip.streams().has(stream.stream)); - assert.equal(Buffer.compare(data, gossip.streams().get(stream.stream)), 0); + assert.equal(Buffer.compare(data, gossip.streams().get(stream.stream).data), 0); }); it("Should parse incoming job streams, with existing stream data", function () { var data = Buffer.from(JSON.stringify(chash.toJSON(true))); var stream = {stream: uuid.v4(), done: false}; var init = Buffer.from("foo"); - gossip.streams().set(stream.stream, init); + gossip.streams().set(stream.stream, {data: init}); gossip._parse(data, stream, {}); assert.ok(gossip.streams().has(stream.stream)); var exp = Buffer.concat([init, data], init.length + data.length); - assert.equal(Buffer.compare(exp, gossip.streams().get(stream.stream)), 0); + assert.equal(Buffer.compare(exp, gossip.streams().get(stream.stream).data), 0); }); it("Should skip parsing full job if stream errors", function () { @@ -974,7 +975,7 @@ module.exports = function (mocks, lib) { var data = Buffer.from(JSON.stringify(chash.toJSON(true))); var stream = {stream: uuid.v4(), error: {foo: "bar"}, done: true}; var init = Buffer.from("foo"); - gossip.streams().set(stream.stream, init); + gossip.streams().set(stream.stream, {data: init}); gossip._parse(data, stream, {}); assert.notOk(gossip.streams().has(stream.stream)); assert.notOk(gossip.decodeJob.called); diff --git a/test/unit/kernel.js b/test/unit/kernel.js index e643a36..d6e32ac 100644 --- a/test/unit/kernel.js +++ b/test/unit/kernel.js @@ -4,6 +4,7 @@ var _ = require("lodash"), stream = require("stream"), crypto = require("crypto"), sinon = require("sinon"), + stream = require("stream"), assert = require("chai").assert; module.exports = function (mocks, lib) { @@ -899,7 +900,7 @@ module.exports = function (mocks, lib) { var job = uuid.v4(); var buf = Buffer.from("foo"); var tag = "tag"; - var stream = {stream: uuid.v4(), done: false}; + var istream = {stream: uuid.v4(), done: false}; var called = 0; var conn = kernel.connection(node); conn.once("send", (event, data) => { @@ -908,7 +909,7 @@ module.exports = function (mocks, lib) { id: job, tag: tag, from: kernel.self().toJSON(true), - stream: stream, + stream: istream, data: buf.toJSON() }); conn._streams = new Map(); @@ -916,7 +917,7 @@ module.exports = function (mocks, lib) { if (called === 2) return done(); }); nKernel.once(job, (data, rstream, from) => { - assert.deepEqual(stream, rstream); + assert.deepEqual(istream, rstream); assert.deepEqual(_.pick(from, ["node", "tag"]), { tag: tag, node: kernel.self() @@ -925,14 +926,34 @@ module.exports = function (mocks, lib) { called++; if (called === 2) return done(); }); - kernel._sendData(conn, job, tag, stream, buf); + kernel._sendData(new stream.PassThrough(), conn, job, tag, istream, buf); + }); + + it("Should send data externally, but fail with 'error'", function (done) { + var node = nKernel.self(); + var job = uuid.v4(); + var buf = Buffer.from("foo"); + var tag = "tag"; + var istream = {stream: uuid.v4(), done: false}; + var called = 0; + var conn = kernel.connection(node); + var pstream = new stream.PassThrough(); + pstream.once("error", (error) => { + assert.ok(error); + done(); + }); + sinon.stub(conn, "send", () => { + conn.send.restore(); + return new Error("error"); + }); + kernel._sendData(pstream, conn, job, tag, istream, buf); }); it("Should send end externally", function (done) { var node = nKernel.self(); var job = uuid.v4(); var tag = "tag"; - var stream = {stream: uuid.v4(), done: false}; + var istream = {stream: uuid.v4(), done: false}; var called = 0; var conn = kernel.connection(node); conn.once("send", (event, data) => { @@ -941,7 +962,7 @@ module.exports = function (mocks, lib) { id: job, tag: tag, from: kernel.self().toJSON(true), - stream: stream, + stream: istream, data: null }); conn._streams = new Map(); @@ -949,7 +970,7 @@ module.exports = function (mocks, lib) { if (called === 2) return done(); }); nKernel.once(job, (data, rstream, from) => { - assert.deepEqual(stream, rstream); + assert.deepEqual(istream, rstream); assert.deepEqual(_.pick(from, ["node", "tag"]), { tag: tag, node: kernel.self() @@ -958,7 +979,7 @@ module.exports = function (mocks, lib) { called++; if (called === 2) return done(); }); - kernel._finishData(conn, job, tag, stream); + kernel._finishData(new stream.PassThrough(), conn, job, tag, istream); }); it("Should skip end sending if 'error' already emitted", function () { @@ -966,7 +987,7 @@ module.exports = function (mocks, lib) { var job = uuid.v4(); var tag = "tag"; var pstream = {stream: uuid.v4(), done: true}; - kernel._finishData(kernel.connection(node), job, tag, pstream); + kernel._finishData(new stream.PassThrough(), kernel.connection(node), job, tag, pstream); }); it("Should send error externally", function (done) { @@ -1006,7 +1027,7 @@ module.exports = function (mocks, lib) { called++; if (called === 2) return done(); }); - kernel._sendError(kernel.connection(node), job, tag, pstream, err); + kernel._sendError(new stream.PassThrough(), kernel.connection(node), job, tag, pstream, err); }); it("Should skip error sending if 'done' already emitted", function () { @@ -1015,7 +1036,7 @@ module.exports = function (mocks, lib) { var err = new Error("foo"); var tag = "tag"; var pstream = {stream: uuid.v4(), done: true}; - kernel._sendError(kernel.connection(node), job, tag, pstream, err); + kernel._sendError(new stream.PassThrough(), kernel.connection(node), job, tag, pstream, err); }); it("Should skip message", function (done) { diff --git a/test/unit/queue.js b/test/unit/queue.js index abe2162..344d608 100644 --- a/test/unit/queue.js +++ b/test/unit/queue.js @@ -54,8 +54,11 @@ module.exports = function (mocks, lib) { it("Should return first entry when peeking", function () { queue.enqueue("foo"); + queue.enqueue("bar"); assert.equal(queue.peek(), "foo"); queue.dequeue(); + assert.equal(queue.peek(), "bar"); + queue.dequeue(); assert.equal(queue.peek(), undefined); }); }); From 97ac9ef8e3a1a21dd251ab11ec6f80dcd798de4e Mon Sep 17 00:00:00 2001 From: Kevin Wilson Date: Mon, 27 Mar 2017 00:14:40 -0700 Subject: [PATCH 2/6] add js tags onto code snippets for README --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 36971ba..7897b68 100644 --- a/README.md +++ b/README.md @@ -133,7 +133,7 @@ The `GenServer` class is used to create actors that send messages around and rec They're the basic unit of logic handling in clusterluck, and heavily derived off of Erlang's gen_server's, but incorporated into node.js' EventEmitter model. To start a `GenServer` with no event handling, we can use the following code: -``` +```javascript let serve = cl.createGenServer(cluster); serve.start("name_to_listen_for"); ``` @@ -143,7 +143,7 @@ Names for `GenServer`s on the same node have a uniqueness property, so trying to To add some event handling to our `GenServer`, we can modify the above code as such: -``` +```javascript let serve = cl.createGenServer(cluster); serve.on("hello", (data, from) => { serve.reply(from, "world"); @@ -156,7 +156,7 @@ This includes messages sent from the local node, as well from other nodes in the Once we've declared our `GenServer` and added event handling logic, we can start sending and receiving messages to/from other nodes in the cluster. -``` +```javascript // serve is a GenServer instance, kernel is serve's network kernel // synchronous requests // this makes a call to a GenServer listening on "server_name" locally @@ -191,7 +191,7 @@ Any message routed to our node, both internal and external, with id set to `name Each stream has a stream ID `stream` and a boolean flag `done` which indicates whether the stream has finished sending data. This stream ID is used as an index for stream memoization between concurrent requests. -``` +```javascript // 'data' is the input data for this part of the stream if (!this._streams.has(stream.stream)) { this._streams.set(stream.stream, {data: Buffer.from(""), ...}); From 1c7254963068d4c7d86aa05af21e1d0412b7e008 Mon Sep 17 00:00:00 2001 From: Kevin Wilson Date: Mon, 27 Mar 2017 11:24:14 -0700 Subject: [PATCH 3/6] add contributing doc, updated README, package.json keywords, examples directory --- CONTRIBUTING.md | 19 +++++++++++++++++++ README.md | 2 ++ examples/.gitkeep | 0 package.json | 13 ++++++++++++- 4 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 CONTRIBUTING.md create mode 100644 examples/.gitkeep diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..75eae77 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,19 @@ + +# Forth Eorlingas! + +## Pull Requests + +For most PR's (read: additive feature requests), please submit against the `develop` branch; +this is to ensure that we may quickly merge the changes in and allow the community to critique/modify +the proposed changes from a common source. + +## Code Format + +Follow the same coding format seen in the source code; the one hard requirement is that code indentation +**must** be two hard spaces (no soft tabs), this is to ensure that diff views of code submission remains legible. + +## Tests + +There is an exhaustive unit test suite under `/test`, which can be run using both `mocha test` or `grunt test`. + +PR's that provide additional functionality should also provide corresponding unit test cases. diff --git a/README.md b/README.md index 7897b68..0cdd1d4 100644 --- a/README.md +++ b/README.md @@ -462,6 +462,8 @@ From here, you can reference the documentation found on the github pages for the - Add timeouts on GenServer request streams. - Emit stream error on closed socket connection. - Additional documentation. +- Dawn of time (unversioned up to 1.0.0): + - Check the commit history for details (about code changes, dawn of time undocumented) ### TODO diff --git a/examples/.gitkeep b/examples/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/package.json b/package.json index 4d914f2..56776ed 100644 --- a/package.json +++ b/package.json @@ -19,7 +19,18 @@ "bugs": { "url": "https://github.com/azuqua/clusterluck/issues" }, - "keywords": [], + "keywords": [ + "consistent hashing", + "vector clocks", + "gossip protocol", + "distributed", + "decentralized", + "communication", + "IPC", + "cluster", + "full-mesh topology", + "actor model" + ], "bin": { "ncl": "./bin/cli.js" }, From b578aae6cbd94cb2db9813f00babc8fb1a3d61a3 Mon Sep 17 00:00:00 2001 From: Kevin Wilson Date: Mon, 27 Mar 2017 11:36:05 -0700 Subject: [PATCH 4/6] fix typo for command_name --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 0cdd1d4..51a0747 100644 --- a/README.md +++ b/README.md @@ -222,7 +222,7 @@ The following options specify which node to communicate with and how: - `-p, --port`: Server port of the node being connected to. - `-a, --key`: Distributed cookie to use for signing requests to the connecting node. -Once run, a CLI session is created that provides the following commands. For any given command, help documentation can be printed to the console by typing `help `. +Once run, a CLI session is created that provides the following commands. For any given command, help documentation can be printed to the console by typing `help `. ##### inspect From 7498a00cc3d940f61847db6bebc29f05f4ab6b7c Mon Sep 17 00:00:00 2001 From: Kevin Wilson Date: Tue, 28 Mar 2017 15:41:03 -0700 Subject: [PATCH 5/6] more documentation on user-facing classes, updated gossip ring state loading logic --- README.md | 67 ++++++++------ lib/chash.js | 15 +++ lib/gen_server.js | 219 +++++++++++++++++++++++++++++++++++++------- lib/gossip.js | 49 +++++++++- lib/vclock.js | 4 + test/unit/gossip.js | 20 ++++ 6 files changed, 312 insertions(+), 62 deletions(-) diff --git a/README.md b/README.md index 51a0747..185aa4b 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,7 @@ $ npm install --global istanbul - [Usage](#Usage) - [Creating Clusters](#CreatingClusters) - [Manipulating Clusters](#ManipulatingClusters) + - [Example Cluster](#ExampleCluster) - [Writing GenServers](#WritingGenServers) - [Using the CLI](#UsingTheCLI) - [`inspect`](#inspect) @@ -127,6 +128,41 @@ gossip.join("another_ring_id"); For documentation on available methods/inputs for cluster manipulation, visit the documentation for the `GossipRing` class. +#### Example Cluster + +In an example.js file, insert the following: + +```javascript +const cl = require("clusterluck"), + os = require("os"); + +let id = process.argv[2], + port = parseInt(process.argv[3]); + +let node = cl.createCluster(id, os.hostname(), port); +node.start("cookie", "ring", () => { + console.log("Listening on port %s!", port); +}); +``` + +Then, in one terminal, run: +``` +$ node example.js foo 7022 +``` + +And in another terminal, run: +``` +$ node example.js bar 7023 +``` + +Now, if we spin up the CLI and connect to `foo`, we can then run: +``` +// whatever os.hostname() resolves to, replace localhost with that +$ meet bar localhost 7023 +``` + +If we then go to inspect the ring on each node, we should see both node `foo` and node `bar` in the ring. + #### Writing GenServers The `GenServer` class is used to create actors that send messages around and receive messages from the rest of the cluster. @@ -183,36 +219,6 @@ Generally speaking, serialization costs place an undue cost when we can just pas Instead, making other `GenServer`s part of the constructor of other `GenServer`s is preferred (using OOP principles to enforce actor relations), similar to how the `CommandServer` class works. In fact, both the `GossipRing` and `CommandServer` classes, built into every node in the cluster, are `GenServer`s themselves! -##### GenServer message passing - -Let's continue with our setup above. - -Any message routed to our node, both internal and external, with id set to `name_goes_here` will be gathered by `serve` into a map of streams. -Each stream has a stream ID `stream` and a boolean flag `done` which indicates whether the stream has finished sending data. -This stream ID is used as an index for stream memoization between concurrent requests. - -```javascript -// 'data' is the input data for this part of the stream -if (!this._streams.has(stream.stream)) { - this._streams.set(stream.stream, {data: Buffer.from(""), ...}); -} -let inner = this._streams.get(stream.stream); -inner.data = Buffer.concat([inner.data, data], ...); -``` - -Once sending has finished, the job is parsed using `decodeJob` into an event and message, under `event` and `data` respectively. -From here, the server emits an event just like an EventEmitter with `event` and `data`. - -```javascript -// similar to this -if (stream.done === true) { - let job = this.decodeJob(memo); - serve.emit(job.event, job.data, from); -} -``` - -This is where our event handling logic for event "hello" comes into play. - #### Using the CLI In the working directory of this module, we see the `bin/cli.js` script. This node script communicates with a single node in a cluster to manipulate the ring. @@ -469,5 +475,6 @@ From here, you can reference the documentation found on the github pages for the In addition to what currently exists in this library, here's a list of features to possibly add: - Provide listener for permanent close on connection between two nodes (`maxRetries` option on kernel creation). + - Add a GenStream class similar to GenServer, but strictly uses streams for communication instead of JS natives (will also require a protocol definition for indicating stream start, etc) - Discuss making disconnects between nodes on a node departure forceful or not (it's forceful right now) - A distributed lock manager, most likely using the [Redlock algorithm](https://redis.io/topics/distlock), given how well it fits into the current architecture of clusterluck. diff --git a/lib/chash.js b/lib/chash.js index fb925fe..50c2664 100644 --- a/lib/chash.js +++ b/lib/chash.js @@ -328,6 +328,21 @@ class CHash { return this._pfactor; } + /** + * + * Returns whether two consistent hash rings are equal. This involves checking the following: + * - The hash rings have the same size + * - For every entry in this instance's RBT, the corresponding entry exists in the RBT of `chash` + * + * @method equals + * @memberof Clusterluck.CHash + * @instance + * + * @param {Clusterluck.CHash} chash - CHash to derive equality against. + * + * @return {Boolean} Whether this chash instance equals `chash`. + * + */ equals(chash) { var dims = this._rfactor === chash.rfactor() && this._pfactor === chash.pfactor() && diff --git a/lib/gen_server.js b/lib/gen_server.js index 8ad85ac..5937993 100644 --- a/lib/gen_server.js +++ b/lib/gen_server.js @@ -9,11 +9,15 @@ var _ = require("lodash"), class GenServer extends EventEmitter { /** + * + * Generic server implementation. Basic unit of logic handling in clusterluck. Provides the ability to send/receive messages across a cluster using `call`, `cast`, `multicall`, and `abcast`. Largely derived from Erlang's gen_server model, but incorporated into node.js' EventEmitter model. Internally, message streams are memoized until finished, which then triggers the firing of an event for given event (say we're listening for event "hello", if this server receives a message stream with event "hello", we'll fire it under the hood). + * + * Think of it like event emitters that are targetable across a cluster, as well as internal to a node. * * @class GenServer * @memberof Clusterluck * - * @param {Clusterluck.NetKernel} kernel + * @param {Clusterluck.NetKernel} kernel - Network kernel this instance will listen for messages on. * */ constructor(kernel, opts) { @@ -25,13 +29,18 @@ class GenServer extends EventEmitter { } /** + * + * Starts this GenServer. Use this to start processing messages. As a result of this function call, any message with an id equal to this server's id (or name, if provided) will be routed by the network kernel to this server. If called multiple times in a row, this function will throw, since name uniqueness is enforced by the network kernel. If no name is provided, the generated short id at construction-time will be used as the routing name. * * @method start * @memberof Clusterluck.GenServer * @abstract * @instance + * + * @listens Clusterluck.GenServer#GenServer:pause + * @listens Clusterluck.NetKernel#NetKernel:user_defined * - * @param {String} [name] - Name to register this handler with instead of the unique id attached. + * @param {String} [name] - Name to register this handler with instead of the unique id attached. Any message received on the network kernel with id `name` will be routed to this instance for message stream parsing, and possible event firing. * */ start(name) { @@ -46,15 +55,35 @@ class GenServer extends EventEmitter { } /** + * + * Stops the processing of messages on this GenServer. As a result, any further messages sent to the network kernel will fail to route to this GenServer. Additionally, all current message streams will be discarded, and the current name will be regenerated to another short id. * * @method stop * @memberof Clusterluck.GenServer * @abstract * @instance * + * @fires Clusterluck.GenServer#GenServer:stop + * */ stop(force = false) { this.pause(); + + /** + * + * Emitted when the server has completely stopped executing handler logic on any user-defined events it's listening for. + * + * @event Clusterluck.GenServer#GenServer:stop + * @memberof Clusterluck.GenServer + * + * @example + * // in any subclasses og GenServer, we could make stop fire asynchronously + * server.on("stop", () => { + * console.log("We've stopped!"); + * }); + * server.stop(); + * + */ this.emit("stop"); this._streams.forEach((val) => { clearTimeout(val.timeout); @@ -65,41 +94,94 @@ class GenServer extends EventEmitter { } /** + * + * Pauses the GenServer's message processing. As a result, any messages routed from the network kernel to this GenServer will be missed until `this.resume()` is called. * * @method pause * @memberof Clusterluck.GenServer * @abstract * @instance * + * @fires Clusterluck.GenServer#GenServer:pause + * + * @return {Clusterluck.GenServer} This instance. + * */ pause() { + /** + * + * Emitted when the server has paused. On 'pause', if the server is started with name `name`, the network kernel will remove this server's event handler for `name`. + * + * @event Clusterluck.GenServer#GenServer:pause + * @memberof Clusterluck.GenServer + * + * @example + * let name = server.id(); + * let old = server.kernel().listeners(name).length; + * server.once("pause", () => { + * assert.lengthOf(server.kernel().listeners(name), old-1); + * }); + * server.pause(); + * + */ this.emit("pause"); return this; } /** + * + * Resumes the processing of message streams on this GenServer. Any messages missed in between pausing and now will result in failed message parsing (since all JSON contents are sent in one message). * * @method resume * @memberof Clusterluck.GenServer * @abstract * @instance * + * @fires Clusterluck.GenServer#GenServer:resume + * @listens Clusterluck.GenServer#GenServer:pause + * + * @return {Clusterluck.GenServer} This instance. + * */ resume() { var handler = this._parse.bind(this); this._kernel.on(this._id, handler); this.once("pause", _.partial(this._kernel.removeListener, this._id, handler).bind(this._kernel)); + + /** + * + * Emitted when the server has been resumed. Before 'resume' is emitted, the network kernel restarts the server's event handler for `name`, where `name` is the name of the server. + * + * @event Clusterluck.GenServer#GenServer:resume + * @memberof Clusterluck.GenServer + * + * @example + * let name = server.id(); + * let old = server.kernel().listeners(name).length; + * server.once("resume", () => { + * assert.lengthOf(server.kernel().listeners(name), old+1); + * }); + * server.resume(); + * + */ this.emit("resume"); return this; } /** + * + * Parses a fully memoized message stream into an object containing a key/value pair. If we fail to parse the job buffer (invalid JSON, etc), we just return an error and this GenServer will skip emitting an event. Otherwise, triggers user-defined logic for the parsed event. * * @method decodeJob * @memberof Clusterluck.GenServer + * @private * @abstract * @instance * + * @param {Buffer} job - Memoized buffer that represents complete message stream. + * + * @return {Object} Object containing an event and data key/value pair, which are used to emit an event for user-defined logic. + * */ decodeJob(job) { var out = utils.safeParse(job, (k, v) => { @@ -194,13 +276,17 @@ class GenServer extends EventEmitter { } /** + * + * Replies to the sending node for an event stream. Used when a triggered event contains synchronous logic, in which we need to respond to the calling node of this event. * * @method reply * @memberof Clusterluck.GenServer * @instance * - * @param {Object} from - * @param {Stream|Buffer} data + * @param {Object} from - Object received on synchronous message. This contains a tag to uniquely identify the request on the sender's end. + * @param {String} from.tag - Unique identifer for request. + * @param {Clusterluck.Node} from.node - Node representing the sender. + * @param {Stream|Buffer} data - Data to reply to synchronous message with. * * @return {Clusterluck.GenServer} This instance. * @@ -211,62 +297,66 @@ class GenServer extends EventEmitter { } /** + * + * Makes a synchronous request against `id` with event `event` and event args `data`. If `id` is an object, it will be parsed into a GenServer id and a receiving node. If a callback is supplied, the response will be aggregated and returned on success as `cb(null, res)` or on error as `cb(err)`. In either case, the response stream is returned, which can be used to pipe to other streams. * * @method call * @memberof Clusterluck.GenServer * @instance * - * @param {String|Object} id - * @param {String} event - * @param {Buffer|String|Number|Boolean|Object|Array} data - * @param {Function} cb - * @param {Number} [timeout] + * @param {String|Object} id - Receiving GenServer. Can be a string, in which case this message will be sent locally to `id`. If an object, will be parsed into a receiving node and GenServer id. + * @param {String} event - Event this message will be emitted under on the receiving GenServer. + * @param {Buffer|String|Number|Boolean|Object|Array} data - Data to send with this message. + * @param {Function} [cb] - Optional callback for response/error handling. + * @param {Number} [timeout] - Timeout to wait for response before returning an error. If no callback is supplied, the response stream will emit an equivalent error. Defaults to Infinity, meaning no timeout will occur. * - * @return {Clusterluck.GenServer} This instance. + * @return {Stream} Response stream for request. * */ call(id, event, data, cb, timeout=Infinity) { var out = this._parseRecipient(id); - this._kernel.call(out.node, out.id, JSON.stringify({ + return this._kernel.call(out.node, out.id, JSON.stringify({ event: event, data: data }), cb, timeout); - return this; } /** + * + * Analogous to `call`, but makes the call to the GenServer with name `id` on each node in `nodes`. * * @method multicall * @memberof Clusterluck.GenServer * @instance * - * @param {Array} nodes - * @param {String} id - * @param {String} event - * @param {Stream|Buffer} data - * @param {Function} cb - * @param {Number} [timeout] + * @param {Array} nodes - Array of nodes to send this message to. + * @param {String} id - Receiving GenServer name. + * @param {String} event - Event this message will be emitted under on the receiving GenServer. + * @param {Buffer|String|Number|Boolean|Object|Array} data - Data to send with this message. + * @param {Function} [cb] - Optional callback for response/error handling. + * @param {Number} [timeout] - Timeout to wait for response before returning an error. If no callback is supplied, the response stream will emit an equivalent error. Defaults to Infinity, meaning no timeout will occur. * - * @return {Clusterluck.GenServer} This instance. + * @return {Array} Array of response streams. * */ multicall(nodes, id, event, data, cb, timeout=Infinity) { - this._kernel.multicall(nodes, id, JSON.stringify({ + return this._kernel.multicall(nodes, id, JSON.stringify({ event: event, data }), cb, timeout); - return this; } /** + * + * Makes an asynchronous request against `id` with event `event` and event args `data`. If `id` is an object, it will be parsed into a GenServer id and a receiving node. * * @method cast * @memberof Clusterluck.GenServer * @instance * - * @param {String|Object} id - * @param {String} event - * @param {Stream|Buffer} data + * @param {String|Object} id - Receiving GenServer. Can be a string, in which case this message will be sent locally to `id`. If an object, will be parsed into a receiving node and GenServer id. + * @param {String} event - Event this message will be emitted under on the receiving GenServer. + * @param {Buffer|String|Number|Boolean|Object|Array} data - Data to send with this message. * * @return {Clusterluck.GenServer} This instance. * @@ -281,15 +371,17 @@ class GenServer extends EventEmitter { } /** + * + * Analogous to `cast`, but makes the cast to the GenServer with name `id` on each node in `nodes`. * * @method abcast * @memberof Clusterluck.GenServer * @instance * - * @param {Array} nodes - * @param {String} id - * @param {String} event - * @param {Stream|Buffer} data + * @param {Array} nodes - Array of nodes to send this message to. + * @param {String} id - Receiving GenServer. + * @param {String} event - Event this message will be emitted under on the receiving GenServer. + * @param {Buffer|String|Number|Boolean|Object|Array} data - Data to send with this message. * * @return {Clusterluck.GenServer} This instance. * @@ -309,6 +401,9 @@ class GenServer extends EventEmitter { * @private * @instance * + * @fires Clusterluck.GenServer#GenServer:user_defined + * @fires Clusterluck.GenServer#GenServer:idle + * */ _parse(data, stream, from) { if (!this._streams.has(stream.stream)) { @@ -333,7 +428,20 @@ class GenServer extends EventEmitter { } this._streams.delete(stream.stream); - if (this._streams.size === 0) { + if (this.idle()) { + /** + * + * Emitted when the number of active messages streams on this server is zero (or, if extending the GenServer class, when this.idle() returns true). + * + * @event Clusterluck.GenServer#GenServer:idle + * @memberof Clusterluck.GenServer + * + * @example + * server.once("idle", () => { + * // now we could safely stop the server w/o killing any message streams + * }); + * + */ this.emit("idle"); } return this; @@ -366,6 +474,8 @@ class GenServer extends EventEmitter { * @private * @instance * + * @fires Clusterluck.GenServer#GenServer:idle + * */ _registerTimeout(istream, from) { // just remove stream for now, it'll invalidate any gathered JSON and fail `decodeJob` @@ -375,7 +485,7 @@ class GenServer extends EventEmitter { var out = this._safeReply(from, rstream); if (out) rstream.emit("error", new Error("Timeout.")); this._streams.delete(istream.stream); - if (this._streams.size === 0) { + if (this.idle()) { this.emit("idle"); } }, this._streamTimeout); @@ -396,5 +506,52 @@ class GenServer extends EventEmitter { return true; } } - + +/** + * + * Events defined by users for message handling. + * + * Suppose we have a GenServer named `server` started with name `name_goes_here`. Any message routed to our node, both internal and external, with id set to `name_goes_here` will be gathered by `serve` into a map of streams. Each stream has a stream ID `stream` and a boolean flag `done` which indicates whether the stream has finished sending data. This stream ID is used as an index for stream memoization between concurrent requests. + * + * ```javascript + * // 'data' is the input data for this part of the stream + * if (!this._streams.has(stream.stream)) { + * this._streams.set(stream.stream, {data: Buffer.from(""), ...}); + * } + * let inner = this._streams.get(stream.stream); + * inner.data = Buffer.concat([inner.data, data], ...); + * ``` + * + * Once sending has finished, the job is parsed using `decodeJob` into an event and message, under `event` and `data` respectively. From here, the server emits an event just like an EventEmitter with `event` and `data`. + * + * ```javascript + * // similar to this + * if (stream.done === true) { + * let job = this.decodeJob(memo); + * serve.emit(job.event, job.data, from); + * } + * ``` + * + * This is where our own event handling logic comes into play. Everything else is handled under the hood by this class. + * + * Diagram of data flow: + * ``` + * Messages comes into node with id set to `name_goes_here` ---------------> this._parse + * Message comes into node with `stream.done` set to true -----------------> this.decodeJob ---> this.emit(event, data,from) ---> this.on(event, handlerLogic...) + * ``` + * + * @event Clusterluck.GenServer#GenServer:user_defined + * @memberof Clusterluck.GenServer + * @type {Object} + * @property {Any} data - Data emitted with this user-defined event. Analogous to how an EventEmitter emits events. + * @property {Object} from - Object received on synchronous message. This contains a tag to uniquely identify the request on the sender's end. + * @property {String} from.tag - Unique identifer for request. + * @property {Clusterluck.Node} from.node - Node representing the sender. + * + * @example + * server.on("hello", (data, from) => { + * server.reply(from, "world"); + * }); + * + */ module.exports = GenServer; diff --git a/lib/gossip.js b/lib/gossip.js index 6a9794a..f553a0f 100644 --- a/lib/gossip.js +++ b/lib/gossip.js @@ -92,6 +92,19 @@ class GossipRing extends GenServer { this.leave(force); } + /** + * + * Pauses the GossipRing's message processing. As a reuslt, any messages routed from the network kernel to this instance will be missed until `this.resume()` is called. + * + * @method pause + * @memberof Clusterluck.GossipRing + * @instance + * + * @fires Clusterluck.GenServer#GenServer:pause + * + * @return {Clusterluck.GossipRing} This instance. + * + */ pause() { debug("Pausing gossip event handler"); clearInterval(this._interval); @@ -102,6 +115,20 @@ class GossipRing extends GenServer { return this; } + /** + * + * Resumes the processing of message streams on this instance. Any messages missed in between pausing and now will result in failed message parsing (since all JSON contents are sent in one message). + * + * @method resume + * @memberof Clusterluck.GossipRing + * @instance + * + * @fires Clusterluck.GenServer#GenServer:resume + * @listens Clusterluck.GenServer#GenServer:pause + * + * @return {Clusterluck.GossipRing} This instance. + * + */ resume() { this._interval = setInterval(this.poll.bind(this), this._poll); this._flush = setInterval(this.flush.bind(this), this._flushPoll); @@ -110,12 +137,18 @@ class GossipRing extends GenServer { } /** + * + * Parses a fully memoized message stream into an object containing a key/value pair. If we fail to parse the job buffer (invalid JSON, etc), we just return an error and this GenServer will skip emitting an event. Otherwise, triggers user-defined logic for the parsed event. * * @method decodeJob * @memberof Clusterluck.GossipRing * @private * @instance * + * @param {Buffer} job - Memoized buffer that represents complete message stream. + * + * @return {Object} Object containing an event and data key/value pair, which are used to emit an event for user-defined logic. + * */ decodeJob(buf) { var out = super.decodeJob(buf); @@ -131,12 +164,24 @@ class GossipRing extends GenServer { return out; } + /** + * + * Loads existing gossip ring from disk. If no path is specified or the file does not exist, we simple do nothing. Otherwise, the result JSON is parsed read into this instance's hash ring, vector clock, ring ID, and current actor. + * + * @method load + * @memberof Clusterluck.GossipRing + * @instance + * + * @param {Function} cb - Called when loading state has finished. On error, will operate as `cb(err)`. Otherwise, will operate as `cb()`. + * + */ load(cb) { if (typeof this._flushPath !== "string") { return async.nextTick(cb); } fs.readFile(this._flushPath, (err, data) => { - if (err) return cb(err); + if (err && err.code !== "ENOENT") return cb(err); + if (err && err.code === "ENOENT") return cb(); data = utils.safeParse(data); if (data instanceof Error || !util.isObject(data)) { return cb(new Error("File at path '" + this._flushPath + "' contains invalid JSON blob.")); @@ -210,6 +255,8 @@ class GossipRing extends GenServer { } /** + * + * Joins the ring `ringID` if not already a member of a ring. Otherwise, will return an error noting that this instance already belongs to a ring. * * @method join * @memberof Clusterluck.GossipRing diff --git a/lib/vclock.js b/lib/vclock.js index d8f6ab7..48c47ed 100644 --- a/lib/vclock.js +++ b/lib/vclock.js @@ -371,6 +371,8 @@ class VectorClock { } /** + * + * Serializes this instance into a JSON object. * * @method toJSON * @memberof Netkernel.VectorClock @@ -386,6 +388,8 @@ class VectorClock { } /** + * + * Populates data for this instance from JSON object `ent`. * * @method fromJSON * @memberof Netkernel.VectorClock diff --git a/test/unit/gossip.js b/test/unit/gossip.js index 3e4efc5..c7fb385 100644 --- a/test/unit/gossip.js +++ b/test/unit/gossip.js @@ -220,6 +220,26 @@ module.exports = function (mocks, lib) { }); }); + it("Should skip loading state from disk if call results in ENOENT error", function (done) { + gossip._flushPath = "/foo/bar"; + sinon.stub(fs, "readFile", (path, cb) => { + async.nextTick(() => { + return cb(_.extend(new Error("error"), { + code: "ENOENT" + })); + }); + }); + gossip.load((err) => { + assert.notOk(err); + assert.deepEqual(gossip.ring(), chash); + assert.deepEqual(gossip.vclock(), vclock); + assert.notOk(gossip._actor); + assert.notOk(gossip._ringID); + fs.readFile.restore(); + done(); + }); + }); + it("Should fail to load state from disk if call results in error", function (done) { sinon.stub(fs, "readFile", (path, cb) => { async.nextTick(() => { From d84ef5a92a2d3840ccaa012a01cc41f2fb658cf9 Mon Sep 17 00:00:00 2001 From: Kevin Wilson Date: Tue, 28 Mar 2017 17:33:41 -0700 Subject: [PATCH 6/6] event documentation for netkernel, gossip ring, gen server, and cluster node --- lib/cluster_node.js | 22 +++++++++ lib/gen_server.js | 1 - lib/gossip.js | 113 ++++++++++++++++++++++++++++++++++++++++++++ lib/kernel.js | 54 +++++++++++++++++++++ 4 files changed, 189 insertions(+), 1 deletion(-) diff --git a/lib/cluster_node.js b/lib/cluster_node.js index 2f8ac71..f25bfed 100644 --- a/lib/cluster_node.js +++ b/lib/cluster_node.js @@ -131,6 +131,15 @@ class ClusterNode extends EventEmitter { this._comms.start("command"); this._gossip.start(ringID); this._kernel.start({cookie: cookie}); + + /** + * + * Emitted when the command line server, gossip ring server, and network kernel have all started and are ready to start processing messages. + * + * @event Clusterluck.ClusterNode#ClusterNode:ready + * @memberof Clusterluck.ClusterNode + * + */ this._kernel.once("_ready", () => {this.emit("ready");}); if (typeof cb === "function") { this.on("ready", cb); @@ -146,6 +155,10 @@ class ClusterNode extends EventEmitter { * @memberof Clusterluck.ClusterNode * @instance * + * @fires Clusterluck.ClusterNode#ClusterNode:stop + * @listens Clusterluck.CommandServer#CommandServer:stop + * @listens Clusterluck.GossipRing#GossipRing:stop + * * @param {Boolean} [force] - Whether to forcibly stop this node or not. * * @return {Clusterluck.ClusterNode} This instance. @@ -166,6 +179,15 @@ class ClusterNode extends EventEmitter { this._kernel.sinks().forEach((val) => { this._kernel.disconnect(val.node(), true); }); + + /** + * + * Emitted when the command line server, gossip ring server, and network kernel have all stopped processing messages. + * + * @event Clusterluck.ClusterNode#ClusterNode:stop + * @memberof Clusterluck.ClusterNode + * + */ this.emit("stop"); } ]); diff --git a/lib/gen_server.js b/lib/gen_server.js index 5937993..a22e20d 100644 --- a/lib/gen_server.js +++ b/lib/gen_server.js @@ -542,7 +542,6 @@ class GenServer extends EventEmitter { * * @event Clusterluck.GenServer#GenServer:user_defined * @memberof Clusterluck.GenServer - * @type {Object} * @property {Any} data - Data emitted with this user-defined event. Analogous to how an EventEmitter emits events. * @property {Object} from - Object received on synchronous message. This contains a tag to uniquely identify the request on the sender's end. * @property {String} from.tag - Unique identifer for request. diff --git a/lib/gossip.js b/lib/gossip.js index f553a0f..1585509 100644 --- a/lib/gossip.js +++ b/lib/gossip.js @@ -53,6 +53,9 @@ class GossipRing extends GenServer { * * @param {String} ringID - Ring ID to listen for events on. * + * @listens Clusterluck.GenServer#GenServer:stop + * @listens Clusterluck.GossipRing#GossipRing:ring + * * @return {Clusterluck.GossipRing} This instance. * */ @@ -62,6 +65,19 @@ class GossipRing extends GenServer { this._interval = setInterval(this.poll.bind(this), this._poll); this._flush = setInterval(this.flush.bind(this), this._flushPoll); + /** + * + * Event emitted when any ring updates occur and are gossipped around the cluster. Since the GossipRing class extends the GenServer class, this event is emitted whenever a message stream is parsed with event "ring". + * + * @event Clusterluck.GossipRing#GossipRing:ring + * @memberof Clusterluck.GossipRing + * @type {Object} + * @property {Clusterluck.CHash} data - Consistent hash ring sent with this ring update message. + * @property {Clusterluck.VectorClock} vclock - Vector clock associated with this message. Based on comparison between this value and this instance's own vector clock, the message may be ignored. + * @property {String} actor - Actor associated with the most recent update to `vclock`. + * @property {String} type - Update type. Can be either "join" (meet request) or "update" (anything else). + * + */ var events = [ {event: "ring", method: "_updateRing"}, ]; @@ -81,6 +97,9 @@ class GossipRing extends GenServer { * @memberof Clusterluck.GossipRing * @instance * + * @fires Clusterluck.GenServer#GenServer:stop + * @listens Clusterluck.GossipRing#GossipRing:close + * * @param {Boolean} [force] - Whether to forcibly stop this process. If false, will wait for an idle state before leaving the ring and clearing the internal message stream map. Otherwise, will immediately clear any pending updates and leave the ring. Defaults to false. * * @return {Clusterluck.GossipRing} This instance. @@ -262,6 +281,9 @@ class GossipRing extends GenServer { * @memberof Clusterluck.GossipRing * @instance * + * @listens Clusterluck.NetKernel#NetKernel:user_defined + * @listens Clusterluck.GenServer#GenServer:pause + * * @param {String} ringID - Ring ID for this node to join. * * @return {Clusterluck.GossipRing} This instance. @@ -287,6 +309,8 @@ class GossipRing extends GenServer { * @memberof Clusterluck.GossipRing * @instance * + * @fires Clusterluck.GossipRing#GossipRing:send + * * @param {Clusterluck.Node} node - Node to seed ring-join with. * * @return {Clusterluck.GossipRing} This instance. @@ -307,6 +331,18 @@ class GossipRing extends GenServer { vclock: this._vclock.toJSON(true), round: 0 }); + + /** + * + * Emitted whenever this instance sends a message to another node in the cluster. + * + * @event Clusterluck.GossipRing#GossipRing:send + * @memberof Clusterluck.GossipRing + * @property {Clusterluck.VectorClock} vclock - Vector clock of message sent. + * @property {String} event - Event name of message sent. + * @property {Object} msg - Message contents. + * + */ this.emit("send", this._vclock, "ring", msg); return this; } @@ -319,6 +355,9 @@ class GossipRing extends GenServer { * @memberof Clusterluck.GossipRing * @instance * + * @fires Clusterluck.GossipRing#GossipRing:process + * @listens Clusterluck.GenServer#GenServer:idle + * * @param {Clusterluck.Node} node - Node to insert into this gossip ring's cluster. * @param {Boolean} [force] - Whether to forcibly add `node` into the current state of this ring, or wait for an idle state. Defaults to false. * @@ -335,6 +374,18 @@ class GossipRing extends GenServer { this._vclock.increment(this._actor); this._kernel.connect(node); this.sendRing(GossipRing.maxMsgRound(this._ring)); + + /** + * + * Emitted whenever this instance changes it's consistent hash ring, either based on events generated by the CLI or + * by gossipped messages. + * + * @event Clusterluck.GossipRing#GossipRing:process + * @memberof Clusterluck.GossipRing + * @property {Clusterluck.CHash} oldRing - Previous ring before this event is fired. + * @property {Clusterluck.CHash} currRing - Current ring when this event is fired. + * + */ this.emit("process", oldRing, this._ring); return this; } @@ -350,6 +401,9 @@ class GossipRing extends GenServer { * @memberof Clusterluck.GossipRing * @instance * + * @fires Clusterluck.GossipRing#GossipRing:process + * @listens Clusterluck.GenServer#GenServer:idle + * * @param {Array} nodes - Nodes to insert into this gossip ring's cluster. * @param {Boolean} [force] - Whether to forcibly add `nodes` into the current state of this ring, or wait for an idle state. Defaults to false. * @@ -381,6 +435,10 @@ class GossipRing extends GenServer { * @memberof Clusterluck.GossipRing * @instance * + * @fires Clusterluck.GossipRing#GossipRing:leave + * @fires Clusterluck.GossipRing#GossipRing:close + * @listens Clusterluck.GenServer#GenServer:idle + * * @param {Boolean} [force] - Whether to forcibly leave this ring or not. If false, will wait for an idle state before leaving the ring and clearing the internal message stream map. Otherwise, will immediately clear any pending updates and leave the ring. Defaults to false. * * @return {Clusterluck.GossipRing} This instance. @@ -413,6 +471,9 @@ class GossipRing extends GenServer { * @memberof Clusterluck.GossipRing * @instance * + * @fires Clusterluck.GossipRing#GossipRing:process + * @listens Clusterluck.GenServer#GenServer:idle + * * @param {Node} node - Node to remove from this gossip ring's cluster. * @param {Boolean} [force] - Whether to forcibly remove `node` from the current state of this ring, or wait for an idle state. Defaults to false. * @@ -444,6 +505,9 @@ class GossipRing extends GenServer { * @memberof Clusterluck.GossipRing * @instance * + * @fires Clusterluck.GossipRing#GossipRing:process + * @listens Clusterluck.GenServer#GenServer:idle + * * @param {Array} nodes - Nodes to remove from this gossip ring's cluster. * @param {Boolean} [force] - Whether to forcibly remove `nodes` from the current state of this ring, or wait for an idle state. Defaults to false. * @@ -512,10 +576,20 @@ class GossipRing extends GenServer { * @memberof Clusterluck.GossipRing * @instance * + * @fires Clusterluck.GossipRing#GossipRing:flushing + * * @return {Clusterluck.GossipRing} This instance. * */ flush() { + /** + * + * Emitted when this instance begins to flush state to disk. NOT a reliable indicator for when state is actually flushed to disk, as this event is fired whenever `this.flush()` is called (as opposed to firing when it makes an fs call). + * + * @event Clusterluck.GossipRing#GossipRing:flushing + * @memberof Clusterluck.GossipRing + * + */ this.emit("flushing"); if (this._actor === null || !this._flushPath) return this; debug("Flushing gossip ring to disk"); @@ -614,6 +688,8 @@ class GossipRing extends GenServer { * @memberof Clusterluck.GossipRing * @instance * + * @fires Clusterluck.GossipRing#GossipRing:send + * * @param {Array} nodes - Nodes to send this message to. * @param {String} event - Event to send. * @param {Object} msg - Value to send with this message. @@ -645,6 +721,8 @@ class GossipRing extends GenServer { * @private * @instance * + * @fires Clusterluck.GossipRing#GossipRing:process + * */ _updateRing(data) { var oldRing = (new CHash(this._ring.rfactor(), this._ring.pfactor(), this._ring.tree())); @@ -724,8 +802,20 @@ class GossipRing extends GenServer { * @private * @instance * + * @fires Clusterluck.GossipRing#GossipRing:conflict + * */ _handleRingConflict(data) { + /** + * + * Emitted when the ring state of another node, based off a received message off the network kernel, is in conflict with the current state of this instance. More for documenting purposes in this class, since all ring conflicts are handled with a "Last Write Wins" approach. However, in any new classes that may use similar logic to this, will be useful for custom state conflict. + * + * @event Clusterluck.GossipRing#GossipRing:conflict + * @memberof Clusterluck.GossipRing + * @property {Clusterluck.CHash} RecRing - Received ring in conflict with the current ring. + * @property {Clusterluck.VectorClock} RecClock - Received vector clock conflicting with the current vector clock. + * + */ this.emit("conflict", data.data, data.vclock); var oldRing = this._ring; // use LWW to handle conflict automatically, but this can be abstracted @@ -785,6 +875,10 @@ class GossipRing extends GenServer { * @private * @instance * + * @fires Clusterluck.GossipRing#GossipRing:send + * @fires Clusterluck.GossipRing#GossipRing:leave + * @fires Clusterluck.GossipRing#GossipRing:close + * */ _closeRing() { var nodes = this.selectRandom(2); @@ -800,9 +894,28 @@ class GossipRing extends GenServer { round: GossipRing.maxMsgRound(sendRing)-1 }); this.emit("send", sendClock, "ring", msg); + + /** + * + * Emitted when the ring state on this instance has been cleared, and the resulting ring with this node removed has been sent. + * + * @event Clusterluck.GossipRing#GossipRing:leave + * @memberof Clusterluck.GossipRing + * @property {Clusterluck.CHash} sendRing - Ring that will be sent to all nodes in the old cluster except this node (since we're leaving when this event has fired). + * + */ this.emit("leave", sendRing); this._ringID = null; this._makeDisconnects(sendRing.nodes()); + + /** + * + * Emitted when this instance has left it's current ring, overwritten it's ring ID to `null`, and disconnected from all other nodes in the cluster. + * + * @event Clusterluck.GossipRing#GossipRing:close + * @memberof Clusterluck.GossipRing + * + */ this.emit("close"); } diff --git a/lib/kernel.js b/lib/kernel.js index bf7fe21..d2cdf29 100644 --- a/lib/kernel.js +++ b/lib/kernel.js @@ -75,6 +75,8 @@ class NetKernel extends EventEmitter { * @memberof Clusterluck.NetKernel * @instance * + * @fires Clusterluck.NetKernel#NetKernel:_stop + * * @return {Clusterluck.NetKernel} This instance. * */ @@ -82,6 +84,15 @@ class NetKernel extends EventEmitter { debug("Stopping network kernel on " + this._id); this._ipc.server.stop(); this._srcs = new Map(); + + /** + * + * Emitted when this instance has stopped it's IPC server and stopped receiving messages from other nodes. + * + * @event Clusterluck.NetKernel#NetKernel:_stop + * @memberof Clusterluck.NetKernel + * + */ this.emit("_stop"); return this; } @@ -511,6 +522,9 @@ class NetKernel extends EventEmitter { * @instance * @private * + * @fires Clusterluck.NetKernel#NetKernel:_ready + * @fires Clusterluck.NetKernel#NetKernel:user_defined + * */ _startData() { this._ipc.server.on("message", (data, socket) => { @@ -521,6 +535,23 @@ class NetKernel extends EventEmitter { this._addSocket(socket); var nData = NetKernel._decodeBuffer(data.data); debug("Received message on net kernel with stream:", data.stream.stream + ",", "event:", data.id + ",", "from:", data.from.id); + + /** + * + * Emitted whenever this instance has received a message that has passed security checks and ready to be routed to a GenServer/event handler. + * + * @event Clusterluck.NetKernel#NetKernel:user_defined + * @memberof Clusterluck.NetKernel + * @property {String} id - Local target of this message (corresponding to a GenServer). + * @property {Buffer} data - Data buffer of message. + * @property {Object} stream - Stream object for this message. + * @property {String} stream.stream - ID of stream. + * @property {Boolean} stream.done - Whether this stream has more data coming or not. + * @property {Object} from - From object received on a message. This contains a tag to uniquely identity the request on the sender's end. + * @property {String} from.tag - Unique identifier for request. + * @property {Clusterluck.Node} from.node - Node representing the sender. + * + */ this.emit(data.id, nData, data.stream, { tag: data.tag || null, socket: socket, @@ -528,6 +559,15 @@ class NetKernel extends EventEmitter { }); }); this._ipc.server.on("socket.disconnected", this._disconnectSocket.bind(this)); + + /** + * + * Emitted when this instance's IPC server has started, and ready to receive messages from other nodes. + * + * @event Clusterluck.NetKernel#NetKernel:_ready + * @memberof Clusterluck.NetKernel + * + */ this.emit("_ready"); } @@ -567,6 +607,8 @@ class NetKernel extends EventEmitter { * @instance * @private * + * @fires Clusterluck.NetKernel#NetKernel:user_defined + * * @param {String} event * @param {String} tag * @param {Object} stream @@ -852,6 +894,8 @@ class NetKernel extends EventEmitter { * @instance * @private * + * @fires Clusterluck.NetKernel#NetKernel:_skip + * * @param {Object} data * * @return {Clusterluck.NetKernel} This instance. @@ -859,6 +903,16 @@ class NetKernel extends EventEmitter { */ _skipMsg(data) { debug("Skipping message", data); + + /** + * + * Emitted whenever this instance has received a message that a) contained invalid JSON or b) had a mismatched HMAC checksum in the body. + * + * @event Clusterluck.NetKernel#NetKernel:_skip + * @memberof Clusterluck.NetKernel + * @property {Object} data - Contents of skipped message. + * + */ this.emit("_skip", data); return this; }