Skip to content
This repository has been archived by the owner on Apr 24, 2020. It is now read-only.

Commit

Permalink
cleaned up zmq_proxy PR
Browse files Browse the repository at this point in the history
  • Loading branch information
reqshark committed Nov 6, 2014
1 parent e19cb8c commit c000633
Show file tree
Hide file tree
Showing 5 changed files with 394 additions and 0 deletions.
37 changes: 37 additions & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -597,3 +597,40 @@ exports.Context.getMaxSockets = function() {
var defaultCtx = defaultContext();
return defaultCtx.getOpt(zmq.ZMQ_MAX_SOCKETS);
};


/**
* JS based on API characteristics of the native zmq_proxy()
*/

function proxy (frontend, backend, capture){
switch(frontend.type+'/'+backend.type){
case 'push/pull':
case 'pull/push':
case 'xpub/xsub':
if(capture){
frontend.on('message',function (msg){backend.send(msg)});
backend.on('message',function (msg){frontend.send(msg);capture.send(msg)});
} else {
frontend.on('message',function (msg){backend.send(msg)});
backend.on('message',function (msg){frontend.send(msg)});
}
break;
case 'router/dealer':
if(capture){
frontend.on('message',function (id,delimiter,msg){backend.send([id,delimiter,msg])});
backend.on('message',function (id,delimiter,msg){
frontend.send([id,delimiter,msg]);
capture.send(msg);
});
} else {
frontend.on('message',function (id,delimiter,msg){backend.send([id,delimiter,msg])});
backend.on('message',function (id,delimiter,msg){frontend.send([id,delimiter,msg])});
}
break;
default:
throw new Error('wrong socket order to proxy');
}
}

exports.proxy = proxy;
10 changes: 10 additions & 0 deletions test/zmq_proxy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

var zmq = require('..')
, should = require('should');

describe('proxy', function() {
it('should be a function off the module namespace', function (done) {
zmq.proxy.should.be.a.Function;
done();
});
});
95 changes: 95 additions & 0 deletions test/zmq_proxy.push-pull.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
var zmq = require('..')
, should = require('should')
, semver = require('semver');

var addr = 'tcp://127.0.0.1'
, frontendAddr = addr+':5501'
, backendAddr = addr+':5502'
, captureAddr = addr+':5503';

describe('proxy.push-pull', function() {

it('should proxy push-pull connected to pull-push',function (done) {

var frontend = zmq.socket('pull');
var backend = zmq.socket('push');

var pull = zmq.socket('pull');
var push = zmq.socket('push');

frontend.bindSync(frontendAddr);
backend.bindSync(backendAddr);

push.connect(frontendAddr);
pull.connect(backendAddr);

pull.on('message',function (msg) {

frontend.close();
backend.close();
push.close();
pull.close();

msg.should.be.an.instanceof(Buffer);
msg.toString().should.equal('foo');
done();
});

setTimeout(function() {
push.send('foo');
}, 100.0);

zmq.proxy(frontend,backend);

});

it('should proxy pull-push connected to push-pull with capture',function (done) {

var frontend = zmq.socket('push');
var backend = zmq.socket('pull');

var capture = zmq.socket('pub');
var capSub = zmq.socket('sub');

var pull = zmq.socket('pull');
var push = zmq.socket('push');

frontend.bindSync(frontendAddr);
backend.bindSync(backendAddr);
capture.bindSync(captureAddr);

pull.connect(frontendAddr);
push.connect(backendAddr);
capSub.connect(captureAddr);

pull.on('message',function (msg) {
msg.should.be.an.instanceof(Buffer);
msg.toString().should.equal('foo');
console.log(msg.toString());
});

capSub.subscribe('');
capSub.on('message',function (msg) {
capture.close();
capSub.close();

setTimeout(function() {
frontend.close();
backend.close();
push.close();
pull.close();

msg.should.be.an.instanceof(Buffer);
msg.toString().should.equal('foo');
done();
},100.0);
});

setTimeout(function() {
push.send('foo');
}, 100.0);

zmq.proxy(frontend,backend,capture);

});
});
97 changes: 97 additions & 0 deletions test/zmq_proxy.router-dealer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
var zmq = require('..')
, should = require('should')
, semver = require('semver');

var addr = 'tcp://127.0.0.1'
, frontendAddr = addr+':5504'
, backendAddr = addr+':5505'
, captureAddr = addr+':5506';

describe('proxy.router-dealer', function() {

it('should proxy req-rep connected over router-dealer', function (done){

var frontend = zmq.socket('router');
var backend = zmq.socket('dealer');

var rep = zmq.socket('rep');
var req = zmq.socket('req');

frontend.bindSync(frontendAddr);
backend.bindSync(backendAddr);

req.connect(frontendAddr);
rep.connect(backendAddr);

req.on('message',function(msg){
msg.should.be.an.instanceof(Buffer);
msg.toString().should.equal('foo bar');
frontend.close();
backend.close();
req.close();
rep.close();
done();
});

rep.on('message', function (msg) {
rep.send(msg+' bar');
});

setTimeout(function() {
req.send('foo');
}, 100.0);

zmq.proxy(frontend,backend);

});

it('should proxy rep-req connections with capture', function (done){

var frontend = zmq.socket('router');
var backend = zmq.socket('dealer');

var rep = zmq.socket('rep');
var req = zmq.socket('req');

var capture = zmq.socket('pub');
var capSub = zmq.socket('sub');

frontend.bindSync(frontendAddr);
backend.bindSync(backendAddr);
capture.bindSync(captureAddr);

req.connect(frontendAddr);
rep.connect(backendAddr);
capSub.connect(captureAddr);
capSub.subscribe('');

req.on('message',function (msg) {
req.close();
rep.close();
console.log(msg.toString());
});

rep.on('message', function (msg) {
rep.send(msg+' bar');
});

capSub.on('message',function (msg) {
backend.close();
frontend.close();
capture.close();
capSub.close();
setTimeout(function() {
msg.should.be.an.instanceof(Buffer);
msg.toString().should.equal('foo bar');
done();
},100.0)
});

setTimeout(function() {
req.send('foo');
},200.0)

zmq.proxy(frontend,backend,capture);

});
});
Loading

0 comments on commit c000633

Please sign in to comment.