Skip to content
This repository has been archived by the owner on Apr 24, 2020. It is now read-only.

Commit

Permalink
add stream socket type
Browse files Browse the repository at this point in the history
  • Loading branch information
reqshark committed Jan 6, 2015
1 parent a38479c commit ef5fa1c
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 1 deletion.
5 changes: 4 additions & 1 deletion binding.cc
Expand Up @@ -1227,7 +1227,10 @@ namespace zmq {
NODE_DEFINE_CONSTANT(target, ZMQ_PUSH);
NODE_DEFINE_CONSTANT(target, ZMQ_PULL);
NODE_DEFINE_CONSTANT(target, ZMQ_PAIR);

#if ZMQ_VERSION_MAJOR >= 4
NODE_DEFINE_CONSTANT(target, ZMQ_STREAM);
#endif

NODE_DEFINE_CONSTANT(target, ZMQ_POLLIN);
NODE_DEFINE_CONSTANT(target, ZMQ_POLLOUT);
NODE_DEFINE_CONSTANT(target, ZMQ_POLLERR);
Expand Down
1 change: 1 addition & 0 deletions lib/index.js
Expand Up @@ -36,6 +36,7 @@ var types = exports.types = {
, dealer: zmq.ZMQ_DEALER
, router: zmq.ZMQ_ROUTER
, pair: zmq.ZMQ_PAIR
, stream: zmq.ZMQ_STREAM
};

var longOptions = {
Expand Down
65 changes: 65 additions & 0 deletions test/socket.stream.js
@@ -0,0 +1,65 @@
var zmq = require('..')
, http = require('http')
, should = require('should')
, semver = require('semver');

describe('socket.stream', function(){

//since its for libzmq4+, we target versions > 4.0.0
var version = semver.lte(zmq.version, '4.0.0');

it('should support a streaming socket', function (done){

if (!version) {
done();
return console.warn('stream socket type in libzmq v4+');
}

var stream = zmq.socket('stream');
stream.on('message', function (id,msg){

msg.should.be.an.instanceof(Buffer);

var raw_header = String(msg).split('\r\n');
var method = raw_header[0].split(' ')[0];
method.should.equal('GET');

//due to HTTP GET method, prepare HTTP response for TCP socket
var httpProtocolString = 'HTTP/1.0 200 OK\r\n' //status code
+ 'Content-Type: text/html\r\n' //headers
+ '\r\n'
+ '<!DOCTYPE html>' //response body
+ '<head>' //make it xml, json, html or something else
+ '<meta charset="UTF-8">'
+ '</head>'
+ '<body>'
+'<p>derpin over protocols</p>'
+ '</body>'
+'</html>'

//zmq streaming prefixed by envelope's routing identifier
stream.send([id,httpProtocolString]);
});

var addr = '127.0.0.1:47080';
stream.bind('tcp://'+addr, function(){
//send non-peer request to zmq, like an http GET method with URI path
http.get('http://'+addr+'/aRandomRequestPath', function (httpMsg){

//it's a readable stream as the good lord intended
httpMsg.socket._readableState.reading.should.be.true

//conventional node streams emit data events to process zmq stream response
httpMsg.on('data',function (msg){
msg.should.be.an.instanceof(Buffer);
String(msg).should.equal('<!DOCTYPE html><head><meta charset="UTF-8"></head>'
+'<body>'
+'<p>derpin over protocols</p>'
+'</body>'
+'</html>');
done();
});
});
});
});
});
1 change: 1 addition & 0 deletions windows/include/zmq.h
Expand Up @@ -212,6 +212,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_PUSH 8
#define ZMQ_XPUB 9
#define ZMQ_XSUB 10
#define ZMQ_STREAM 11

/* Deprecated aliases */
#define ZMQ_XREQ ZMQ_DEALER
Expand Down

0 comments on commit ef5fa1c

Please sign in to comment.