Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

enable discovery with advertisement #2

Closed
wants to merge 5 commits into from

2 participants

@ashleybrener

discovery is a UDP multicast standard used by many protocols such as ws-discovery and UPnP.

this simple add-on allows collective to advertise its own identity and listen for new identities on the local server / LAN (if UDP multicast is supported).

the code addition provides an enableDiscovery function with option parameter "ttl"
example:

var collective = new Collective({ host: 'localhost', port: 8124 }, all_hosts, function (collective) {
    collective.set('foo.bar', 7);
    var foo_bar = collective.get('foo.bar'); // = 7;
});
collective.enableDiscovery(10000);

this means that collective will listen for new collective nodes and will advertise its own identity every 10sec.

if ttl is not specified, collective will be in listen mode only and will need to advertise itself manually by calling:

collective.advertise(true);

if collective advertises that it is online, the UDP message that is broadcast is:

collective +host:port

if collective advertises that it is offline, the UDP message that is broadcast is:

collective -host:port
@ashleybrener

does not work with cluster yet since the discoveryServer socket is shared.
however it does work in multiple node processes using child_process.fork.

there is a node core patch on the roadmap for testing with cluster on the following branch (only at this stage):
https://github.com/strongloop/node/tree/udp-cluster-test

then bind the discoveryServer socket using:

discoveryServer.bind(1900, {shared: false}); 
@arch1t3ct
Owner

Hey,

I'm not familiar with this kind of auto discovery, therefore I will have to look into it to properly understand realization and benefits/downsides (will try this month, no promises though). One thing that stands out, if I understand correctly, that this will not work outside of LAN?

Overall, I think this needs to be thought through from every possible angle before considering an implementation.

@ashleybrener

of course, I have provided this for your consideration.
you are correct, discovery is valuable only on the local machine and LAN.
I have not employed a discovery protocol standard such as ssdp or oasis since I felt the requirement here is much simpler.
also, you would need to correct the logic on new collective nodes being discovered, I am not as familiar with your object model as you are.

enjoy.

@arch1t3ct arch1t3ct closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
Showing with 269 additions and 169 deletions.
  1. +269 −169 index.js
View
438 index.js
@@ -1,275 +1,375 @@
+var dgram = require('dgram');
var net = require('net');
function Collective(local, all, callback) {
- var self = this;
+ var self = this;
- self.TYPES = ['New', 'Accept', 'Data'];
- self.DELIMITER = '\n';
+ self.TYPES = ['New', 'Accept', 'Data'];
+ self.DELIMITER = '\n';
- self.local = local;
- self.remote = self.parseHosts(all);
- self.connections = {};
- self.active = 0;
- self.data = {};
- self.history = {};
+ self.local = local;
+ self.remote = self.parseHosts(all);
+ self.connections = {};
+ self.active = 0;
+ self.data = {};
+ self.history = {};
- var server = net.createServer({allowHalfOpen: true}).listen(self.local.port, self.local.host);
+ var server = net.createServer({ allowHalfOpen: true }).listen(self.local.port, self.local.host);
- server.on('connection', function (connection) {
- self.listenData(connection);
- });
+ server.on('connection', function (connection) {
+ self.listenData(connection);
+ });
- server.on('listening', function () {
- self.makeConnections(function () {
- callback(self);
- });
- });
+ server.on('listening', function () {
+ self.makeConnections(function () {
+ callback(self);
+ });
+ });
}
Collective.prototype.parseHosts = function (all) {
- var self = this;
+ var self = this;
- var i = 0;
- var remote = [];
+ var i = 0;
+ var remote = [];
- for (i = 0; i < all.length; i++) {
- if (all[i].host !== self.local.host || all[i].port !== self.local.port) {
- remote.push(all[i]);
- }
- }
+ for (i = 0; i < all.length; i++) {
+ if (all[i].host !== self.local.host || all[i].port !== self.local.port) {
+ remote.push(all[i]);
+ }
+ }
- return remote;
+ return remote;
};
Collective.prototype.listenData = function (connection) {
- var self = this;
+ var self = this;
- var buffer = '';
+ var buffer = '';
- connection.on('data', function (data) {
- buffer += data;
+ connection.on('data', function (data) {
+ buffer += data;
- var position = -1;
- var command = {};
+ var position = -1;
+ var command = {};
- do {
- position = buffer.indexOf(self.DELIMITER);
+ do {
+ position = buffer.indexOf(self.DELIMITER);
- if (-1 !== position) {
- command = JSON.parse(buffer.substr(0, position));
+ if (-1 !== position) {
+ command = JSON.parse(buffer.substr(0, position));
- buffer = buffer.substr(position + 1);
+ buffer = buffer.substr(position + 1);
- self['parse' + self.TYPES[command[0]]](command);
- }
- } while (-1 !== position && '' !== buffer);
- });
+ self['parse' + self.TYPES[command[0]]](command);
+ }
+ } while (-1 !== position && '' !== buffer);
+ });
};
Collective.prototype.parseNew = function (command) {
- var self = this;
+ var self = this;
- self.makeConnection(command[1][0], command[1][1], function () {
- var ident = self.makeIdent(command[1][0], command[1][1]);
+ self.makeConnection(command[1][0], command[1][1], function () {
+ var ident = self.makeIdent(command[1][0], command[1][1]);
- if ('undefined' !== typeof self.connections[ident]) {
- var local_data = null;
+ if ('undefined' !== typeof self.connections[ident]) {
+ var local_data = null;
- if (true === command[1][2]) {
- local_data = self.data;
- }
+ if (true === command[1][2]) {
+ local_data = self.data;
+ }
- self.sendMessage(ident, 1, local_data);
- }
- });
+ self.sendMessage(ident, 1, local_data);
+ }
+ });
};
Collective.prototype.parseData = function (command) {
- var self = this;
+ var self = this;
- self.assign(command[1][0], command[1][1], command[1][2], command[1][3]);
+ self.assign(command[1][0], command[1][1], command[1][2], command[1][3]);
};
Collective.prototype.parseAccept = function (command) {
- var self = this;
+ var self = this;
- if (null !== command[1]) {
- self.data = command[1];
- }
+ if (null !== command[1]) {
+ self.data = command[1];
+ }
};
Collective.prototype.makeConnections = function (callback) {
- var self = this;
+ var self = this;
- var x = self.remote.length;
+ var x = self.remote.length;
- self.remote.forEach(function (item) {
- self.makeConnection(item.host, item.port, function () {
- x--;
+ self.remote.forEach(function (item) {
+ self.makeConnection(item.host, item.port, function () {
+ x--;
- if (0 === x) {
- self.notifyConnections(function () {
- callback();
- });
- }
- });
- });
+ if (0 === x) {
+ self.notifyConnections(function () {
+ callback();
+ });
+ }
+ });
+ });
};
Collective.prototype.makeConnection = function (host, port, callback) {
- var self = this;
-
- var options = {allowHalfOpen: true, host: host, port: port};
- var connection = net.connect(options);
- var ident = self.makeIdent(host, port);
-
- connection.on('connect', function () {
- self.addConnection(ident, connection, function () {
- callback();
- });
- });
-
- connection.on('end', function () {
- self.removeConnection(ident, function () {
- callback();
- });
- });
-
- connection.on('error', function (error) {
- self.removeConnection(ident, function () {
- callback();
- });
- });
+ var self = this;
+
+ var options = { allowHalfOpen: true, host: host, port: port };
+ var connection = net.connect(options);
+ var ident = self.makeIdent(host, port);
+
+ connection.on('connect', function () {
+ self.addConnection(ident, connection, function () {
+ callback();
+ });
+ });
+
+ connection.on('end', function () {
+ self.removeConnection(ident, function () {
+ callback();
+ });
+ });
+
+ connection.on('error', function (error) {
+ self.removeConnection(ident, function () {
+ callback();
+ });
+ });
};
Collective.prototype.addConnection = function (ident, connection, callback) {
- var self = this;
+ var self = this;
- self.connections[ident] = connection;
- self.active++;
+ self.connections[ident] = connection;
+ self.active++;
- callback();
+ callback();
};
Collective.prototype.removeConnection = function (ident, callback) {
- var self = this;
+ var self = this;
- if ('undefined' !== typeof self.connections[ident]) {
- delete self.connections[ident];
- self.active--;
- }
+ if ('undefined' !== typeof self.connections[ident]) {
+ delete self.connections[ident];
+ self.active--;
+ }
- callback();
+ callback();
};
Collective.prototype.notifyConnections = function (callback) {
- var self = this;
+ var self = this;
- var ident = '';
- var command = [self.local.host, self.local.port, false];
- var i = 0;
- var random_connection = Math.floor(Math.random() * self.active);
+ var ident = '';
+ var command = [self.local.host, self.local.port, false];
+ var i = 0;
+ var random_connection = Math.floor(Math.random() * self.active);
- for (ident in self.connections) {
- if (self.connections.hasOwnProperty(ident)) {
- command[2] = false;
+ for (ident in self.connections) {
+ if (self.connections.hasOwnProperty(ident)) {
+ command[2] = false;
- if (i === random_connection) {
- command[2] = true;
- }
+ if (i === random_connection) {
+ command[2] = true;
+ }
- self.sendMessage(ident, 0, command);
+ self.sendMessage(ident, 0, command);
- i++;
- }
- }
+ i++;
+ }
+ }
- callback();
+ callback();
};
Collective.prototype.sendMessage = function (ident, type, data) {
- var self = this;
+ var self = this;
- self.connections[ident].write(JSON.stringify([type, data]) + self.DELIMITER);
+ self.connections[ident].write(JSON.stringify([type, data]) + self.DELIMITER);
};
Collective.prototype.makeIdent = function (host, port) {
- return host + ':' + port;
+ return host + ':' + port;
};
Collective.prototype.get = function (key) {
- var self = this;
+ var self = this;
- var reference = self.traverse(key);
+ var reference = self.traverse(key);
- return reference[0][reference[1]];
+ return reference[0][reference[1]];
};
Collective.prototype.set = function (key, value, math) {
- var self = this;
+ var self = this;
- math = math || false;
+ math = math || false;
- var time = +new Date();
+ var time = +new Date();
- self.assign(key, value, math, time);
+ self.assign(key, value, math, time);
- var ident = '';
+ var ident = '';
- for (ident in self.connections) {
- if (self.connections.hasOwnProperty(ident)) {
- self.sendMessage(ident, 2, [key, value, math, time]);
- }
- }
+ for (ident in self.connections) {
+ if (self.connections.hasOwnProperty(ident)) {
+ self.sendMessage(ident, 2, [key, value, math, time]);
+ }
+ }
};
Collective.prototype.traverse = function (key) {
- var self = this;
+ var self = this;
- var notations = key.split('.');
- var i = '';
- var object = self.data;
- var tmp = '';
+ var notations = key.split('.');
+ var i = '';
+ var object = self.data;
+ var tmp = '';
- while (1 < notations.length) {
- i = notations.shift();
+ while (1 < notations.length) {
+ i = notations.shift();
- if ('undefined' === typeof object[i]) {
- object[i] = {};
- }
+ if ('undefined' === typeof object[i]) {
+ object[i] = {};
+ }
- if ('object' !== typeof object[i]) {
- tmp = object[i];
- object[i] = {};
- object[i][tmp] = {};
- }
+ if ('object' !== typeof object[i]) {
+ tmp = object[i];
+ object[i] = {};
+ object[i][tmp] = {};
+ }
- object = object[i];
- }
+ object = object[i];
+ }
- return [object, notations.shift()];
+ return [object, notations.shift()];
};
Collective.prototype.assign = function (key, value, math, time) {
- var self = this;
+ var self = this;
- var reference = self.traverse(key);
+ var reference = self.traverse(key);
- if (true === math) {
- if ('undefined' === typeof reference[0][reference[1]]) {
- reference[0][reference[1]] = 0;
- }
+ if (true === math) {
+ if ('undefined' === typeof reference[0][reference[1]]) {
+ reference[0][reference[1]] = 0;
+ }
- reference[0][reference[1]] += value;
- } else {
- if ('undefined' === typeof self.history[key]) {
- self.history[key] = time;
- }
+ reference[0][reference[1]] += value;
+ } else {
+ if ('undefined' === typeof self.history[key]) {
+ self.history[key] = time;
+ }
- if (self.history[key] <= time) {
- reference[0][reference[1]] = value;
- }
- }
+ if (self.history[key] <= time) {
+ reference[0][reference[1]] = value;
+ }
+ }
};
+Collective.prototype.enableDiscovery = function (ttl) {
+ var self = this;
+
+ var discoveryServer = dgram.createSocket('udp4');
+
+ discoveryServer.on('message', function (data, rinfo) {
+ self.parseMulticast(data);
+ });
+
+ discoveryServer.bind(1900);
+
+ discoveryServer.on('listening', function () {
+ discoveryServer.addMembership('239.255.255.250');
+ });
+
+ self.discoveryClient = dgram.createSocket('udp4');
+
+ self.discoveryClient.on('listening', function () {
+ self.discoveryClient.setTTL(1);
+ self.discoveryClient.setBroadcast(true);
+ self.discoveryClient.setMulticastTTL(1);
+ self.discoveryClient.setMulticastLoopback(true);
+
+ if (ttl) {
+ self.advertise(true);
+ setInterval(function () {
+ self.advertise(true);
+ }, ttl);
+ }
+ });
+
+ self.discoveryClient.bind();
+}
+
+Collective.prototype.parseMulticast = function (data) {
+ var self = this;
+
+ var message = data.toString('ascii');
+
+ var match = message.match(/collective (\+|-)((.+):(\d+))/);
+
+ if (match != null) {
+ var op = match[1];
+ var ident = match[2];
+ var host = match[3];
+ var port = match[4];
+
+ if (self.local.host != host || self.local.port != port) {
+ var exists = -1;
+
+ for (var i = 0; i < self.remote.length; i++) {
+ var r = self.remote[i];
+
+ if (r.host === host && r.port === port) {
+ exists = i;
+ break;
+ }
+ }
+
+ switch (op) {
+ case '+': {
+ if (exists == -1) {
+ if ('undefined' === typeof self.connections[ident]) {
+ self.makeConnection(host, port, function () {
+ self.notifyConnections(function () { });
+ });
+ }
+
+ self.remote.push({ host: host, port: port });
+ }
+
+ break;
+ }
+
+ case '-': {
+ if (exists > -1) {
+ if ('undefined' === typeof self.connections[ident]) {
+ self.removeConnection(ident, function () {
+ self.notifyConnections(function () { });
+ });
+ }
+
+ self.remote.splice(exists, 1);
+ }
+
+ break;
+ }
+ }
+ }
+ }
+}
+
+Collective.prototype.advertise = function (online) {
+ var self = this;
+
+ var ad = new Buffer('collective ' + (online ? '+' : '-') + self.local.host + ':' + self.local.port, 'ascii');
+ self.discoveryClient.send(ad, 0, ad.length, 1900, '239.255.255.250');
+}
+
module.exports = Collective;
Something went wrong with that request. Please try again.