Skip to content

Commit

Permalink
Add broadcast support.
Browse files Browse the repository at this point in the history
  • Loading branch information
demchenkoe committed Dec 4, 2013
1 parent 9dcc4b3 commit 9e297e1
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 17 deletions.
73 changes: 72 additions & 1 deletion README.md
Expand Up @@ -13,6 +13,11 @@ Tested with RabbitMQ under ubuntu.

npm install amqp-rpc

##round-robin

Example: Call remote function.
Run multiple servers.js for round-robin shared.


###server.js example

Expand Down Expand Up @@ -71,5 +76,71 @@ Tested with RabbitMQ under ubuntu.
rpc.call('withoutCB', {}); //output message on server side console


##broadcast

Example: Core receiving data from all workers.
Run multiple worker.js for broadcast witness.
The core.js must be launched after all worker.js instances.

###example/broadcast/worker.js

var os = require('os');
var worker_name = os.hostname() + ':' + process.pid;
var counter = 0;

var rpc = require('../../index').factory({
url: "amqp://guest:guest@localhost:5672"
});

rpc.onBroadcast('getWorkerStat', function(params, cb) {
if(params && params.type == 'fullStat') {
cb(null, {
pid: process.pid,
hostname: os.hostname(),
uptime: process.uptime(),
counter: counter++
});
}
else {
cb(null, { counter: counter++ })
}
});

###example/broadcast/core.js

var rpc = require('../../index').factory({
url: "amqp://guest:guest@localhost:5672"
});

var all_stats = {};

//rpc.callBroadcast() is rpc.call() + waiting multiple responses
//If remote handler without response data, you can use rpc.call() for initiate broadcast calls.

rpc.callBroadcast(
'getWorkerStat',
{ type: 'fullStat'}, //request parameters
{ //call options
ttl: 1000, //wait response time (1 seconds), after run onComplete
onResponse: function(err, stat) { //callback on each worker response
all_stats[ stat.hostname+':'+ stat.pid ] = stat;

},
onComplete: function() { //callback on ttl expired
console.log('----------------------- WORKER STATISTICS ----------------------------------------');
for(var worker in all_stats) {
s = all_stats[worker];
console.log(worker, '\tuptime=', s.uptime.toFixed(2) + ' seconds', '\tcounter=', s.counter);
}
}
});


results for three workers:

----------------------- WORKER STATISTICS ----------------------------------------
host1:2612 uptime= 2470.39 seconds counter= 2
host2:1615 uptime= 3723.53 seconds counter= 8
host2:2822 uptime= 2279.16 seconds counter= 3

Eugene Demchenko aka Goldy skype demchenkoe email it-bm@mail.ru
Eugene Demchenko aka Goldy skype demchenkoe email demchenkoev@gmail.com
26 changes: 26 additions & 0 deletions example/broadcast/core.js
@@ -0,0 +1,26 @@
var rpc = require('../../index').factory({
url: "amqp://guest:guest@localhost:5672"
});

var all_stats = {};

//rpc.callBroadcast() is rpc.call() + waiting multiple responses
//If remote handler without response data, you can use rpc.call() for initiate broadcast calls.

rpc.callBroadcast(
'getWorkerStat',
{ type: 'fullStat'}, //request parameters
{ //call options
ttl: 1000, //wait response time (1 seconds), after run onComplete
onResponse: function(err, stat) { //callback on each worker response
all_stats[ stat.hostname+':'+ stat.pid ] = stat;

},
onComplete: function() { //callback on ttl expired
console.log('----------------------- WORKER STATISTICS ----------------------------------------');
for(var worker in all_stats) {
s = all_stats[worker];
console.log(worker, '\tuptime=', s.uptime.toFixed(2) + ' seconds', '\tcounter=', s.counter);
}
}
});
24 changes: 24 additions & 0 deletions example/broadcast/worker.js
@@ -0,0 +1,24 @@
var os = require('os');
var worker_name = os.hostname() + ':' + process.pid;
var counter = 0;

var rpc = require('../../index').factory({
url: "amqp://guest:guest@localhost:5672"
});

rpc.onBroadcast('getWorkerStat', function(params, cb) {
if(params && params.type == 'fullStat') {
cb(null, {
pid: process.pid,
hostname: os.hostname(),
uptime: process.uptime(),
counter: counter++
});
}
else {
cb(null, { counter: counter++ })
}
});


rpc.call('log', { worker: worker_name, message: 'worker started' });
2 changes: 1 addition & 1 deletion example/client.js → example/round-robin/client.js
@@ -1,5 +1,5 @@

var rpc = require('../index').factory({
var rpc = require('../../index').factory({
url: "amqp://guest:guest@localhost:5672"
});

Expand Down
2 changes: 1 addition & 1 deletion example/server.js → example/round-robin/server.js
@@ -1,5 +1,5 @@

var rpc = require('../index').factory({
var rpc = require('../../index').factory({
url: "amqp://guest:guest@localhost:5672"
});

Expand Down
89 changes: 77 additions & 12 deletions index.js
Expand Up @@ -2,7 +2,8 @@

var amqp = require('amqp');
var uuid = require('node-uuid').v4;

var os = require('os');
var queueNo = 0;

function rpc(opt) {

Expand All @@ -26,6 +27,15 @@ function rpc(opt) {
this.__exchangeCbs = [];
}

/**
* generate unique name for new queue
* @returns {string}
*/

rpc.prototype.generateQueueName = function(type) {
return /*'njsListener:' +*/ os.hostname() + ':pid' + process.pid + ':' + type;
}


rpc.prototype._connect = function(cb) {

Expand Down Expand Up @@ -125,7 +135,7 @@ rpc.prototype._makeResultsQueue = function(cb) {

var $this = this;

this.__results_queue_name = uuid();
this.__results_queue_name = this.generateQueueName('callback');
this.__make_results_cb.push(cb);

$this._makeExchange(function() {
Expand Down Expand Up @@ -160,17 +170,17 @@ rpc.prototype.__onResult = function(message, headers, deliveryInfo) {

var args = [];
if(Array.isArray(message) ) {
for(var k in message) {
if(!message.hasOwnProperty(k)) continue;

args.push(message[k]);
for(var i=0; i< message.length; i++) {
args.push(message[i]);
}
}
else args.push(message);

cb.cb.apply(cb.context, args);

delete this.__results_cb[ deliveryInfo.correlationId ];
if(cb.autoDeleteCallback !== false)
delete this.__results_cb[ deliveryInfo.correlationId ];
}

/**
Expand All @@ -189,6 +199,7 @@ rpc.prototype.call = function(cmd, params, cb, context, options) {
if(!options) options = {};

options.contentType = 'application/json';
var corr_id = options.correlationId || uuid();

this._connect(function() {

Expand All @@ -198,8 +209,11 @@ rpc.prototype.call = function(cmd, params, cb, context, options) {

$this._makeResultsQueue(function() {

var corr_id = uuid();
$this.__results_cb[ corr_id ] = { cb: cb, context: context };
$this.__results_cb[ corr_id ] = {
cb: cb,
context: context,
autoDeleteCallback: !!options.autoDeleteCallback
};


options.mandatory = true;
Expand Down Expand Up @@ -235,6 +249,8 @@ rpc.prototype.call = function(cmd, params, cb, context, options) {
});
}
});

return corr_id;
}

/**
Expand All @@ -246,16 +262,16 @@ rpc.prototype.call = function(cmd, params, cb, context, options) {
*/


rpc.prototype.on = function(cmd, cb, context) {
rpc.prototype.on = function(cmd, cb, context, options) {

if(this.__cmds[ cmd ]) return false;
options || (options = {});

var $this = this;

this._connect(function() {

$this.__conn.queue(cmd, function(queue) {

$this.__conn.queue(options.queueName || cmd, function(queue) {
$this.__cmds[ cmd ] = { queue: queue };
queue.subscribe(function(message, d, headers, deliveryInfo) {

Expand All @@ -276,7 +292,7 @@ rpc.prototype.on = function(cmd, cb, context) {

$this.__exchange.publish(
deliveryInfo.replyTo,
arguments,
Array.prototype.slice.call(arguments),
options
);
}, cmdInfo);
Expand Down Expand Up @@ -349,6 +365,55 @@ rpc.prototype.off = function(cmd) {
return true;
}

/**
* call broadcast
* @param cmd
* @param params
* @param options
*/


rpc.prototype.callBroadcast = function(cmd, params, options) {

var $this = this;

options || (options = {});
options.broadcast = true;
options.autoDeleteCallback = options.ttl ? false : true;
var corr_id = this.call.call(this, cmd, params, options.onResponse, options.context, options);
if(options.ttl) {
setTimeout(function() {
//release cb
if($this.__results_cb[ corr_id ]) {
delete $this.__results_cb[ corr_id ];
}
options.onComplete.call(options.context, cmd, options);
}, options.ttl);
}
}

/**
* subscribe to broadcast commands
* @param cmd
* @param cb
* @param context
*/

rpc.prototype.onBroadcast = function (cmd, cb, context, options) {

options || (options = {});
options.queueName = this.generateQueueName('broadcast:q'+ (queueNo++) );
return this.on.call(this, cmd, cb, context, options);
}


/**
*
* @type {Function}
*/

rpc.prototype.offBroadcast = rpc.prototype.off;


module.exports.amqpRPC = rpc;

Expand Down
4 changes: 2 additions & 2 deletions package.json
@@ -1,10 +1,10 @@
{
"name": "amqp-rpc",
"description": "AMQP RPC driver for node",
"description": "AMQP RPC driver for node.js",
"keywords": [
"amqp", "rpc"
],
"version": "0.0.3",
"version": "0.0.4",
"preferGlobal": true,
"author": {"name": "Eugene Demchenko"},
"repository": {
Expand Down

0 comments on commit 9e297e1

Please sign in to comment.