Permalink
Browse files

merged

  • Loading branch information...
2 parents 189bcfe + e4ac29e commit b34448a9ccf19e43b0e2a5f50d9a001b7c733e0c @tj tj committed Feb 7, 2012
View
@@ -1,5 +1,5 @@
Copyright (c) 2011 TJ Holowaychuk
-Copyright (c) 2010 Justin Tulloss
+Copyright (c) 2010, 2011 Justin Tulloss
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
View
@@ -1,5 +1,7 @@
TESTS = $(wildcard test/test.*.js)
+DOX = ./node_modules/.bin/dox
+JADE = ./node_modules/.bin/jade
binding.node: build binding.cc
node-waf build
@@ -13,7 +15,14 @@ test:
clean:
node-waf clean
+docs:
+ $(DOX) < lib/index.js > docs/index.json
+ $(JADE) < docs/template.jade -o "{comments:$$(cat docs/index.json)}" > docs/index.html
+
+docclean:
+ rm -fr docs/index.{json,html}
+
distclean:
node-waf distclean
-.PHONY: clean distclean test
+.PHONY: clean distclean test docs docclean
View
@@ -1,21 +1,21 @@
# node-zeromq
- ØMQ bindings for node.js, a fork of [zeromq.node](https://github.com/JustinTulloss/zeromq.node).
+ [ØMQ](http://www.zeromq.org/) bindings for node.js.
## Installation
- $ npm install zeromq
+ $ npm install zmq
## Example
producer.js:
```js
-var zmq = require('zeromq')
+var zmq = require('zmq')
, sock = zmq.socket('push');
-sock.bind('tcp://127.0.0.1:3000');
+sock.bindSync('tcp://127.0.0.1:3000');
console.log('Producer bound to port 3000');
setInterval(function(){
@@ -28,7 +28,7 @@ worker.js:
```js
-var zmq = require('zeromq')
+var zmq = require('zmq')
, sock = zmq.socket('pull');
sock.connect('tcp://127.0.0.1:3000');
@@ -42,14 +42,42 @@ sock.on('message', function(msg){
## Running tests
Install dev deps:
-
+
$ npm install
Build:
$ make
Test:
-
+
$ make test
+## Contributors
+
+ Authored by Justin Tulloss, maintained by Shripad K and TJ Holowaychuk. To contribute please ensure _all_ tests pass, and do your best to maintain the style used within the rest of the library.
+
+ Output of `git summary`:
+
+ project: zeromq.node
+ commits: 260
+ files : 38
+ authors:
+ 114 Justin Tulloss 43.8%
+ 53 Tj Holowaychuk 20.4%
+ 48 Stéphan Kochen 18.5%
+ 12 jeremybarnes 4.6%
+ 10 TJ Holowaychuk 3.8%
+ 9 mike castleman 3.5%
+ 3 Yaroslav Shirokov 1.2%
+ 2 Corey Jewett 0.8%
+ 2 mgc 0.8%
+ 1 rick 0.4%
+ 1 Matt Crocker 0.4%
+ 1 Joshua Gourneau 0.4%
+ 1 Micheil Smith 0.4%
+ 1 Jeremy Barnes 0.4%
+ 1 nponeccop 0.4%
+ 1 Paul Bergeron 0.4%
+
+
View
@@ -23,9 +23,12 @@
*/
#include <v8.h>
-#include <ev.h>
#include <node.h>
+#include <node_version.h>
#include <node_buffer.h>
+#if !NODE_VERSION_AT_LEAST(0, 5, 5)
+#include <ev.h>
+#endif
#include <zmq.h>
#include <assert.h>
#include <stdio.h>
@@ -84,7 +87,13 @@ namespace zmq {
struct BindState;
static Handle<Value> Bind(const Arguments &args);
+
+#if NODE_VERSION_AT_LEAST(0, 5, 4)
+ static void EIO_DoBind(eio_req *req);
+#else
static int EIO_DoBind(eio_req *req);
+#endif
+
static int EIO_BindDone(eio_req *req);
static Handle<Value> BindSync(const Arguments &args);
@@ -442,12 +451,18 @@ namespace zmq {
return Undefined();
}
+#if NODE_VERSION_AT_LEAST(0, 5, 4)
+ void
+#else
int
+#endif
Socket::EIO_DoBind(eio_req *req) {
BindState* state = (BindState*) req->data;
if (zmq_bind(state->sock, *state->addr) < 0)
- state->error = zmq_errno();
+ state->error = zmq_errno();
+#if !NODE_VERSION_AT_LEAST(0, 5, 4)
return 0;
+#endif
}
int
@@ -745,6 +760,19 @@ namespace zmq {
* Module functions.
*/
+ static Handle<Value>
+ ZmqVersion(const Arguments& args) {
+ HandleScope scope;
+
+ int major, minor, patch;
+ zmq_version(&major, &minor, &patch);
+
+ char version_info[16];
+ snprintf(version_info, 16, "%d.%d.%d", major, minor, patch);
+
+ return scope.Close(String::New(version_info));
+ }
+
static void
Initialize(Handle<Object> target) {
HandleScope scope;
@@ -791,6 +819,8 @@ namespace zmq {
NODE_DEFINE_CONSTANT(target, STATE_BUSY);
NODE_DEFINE_CONSTANT(target, STATE_CLOSED);
+ NODE_SET_METHOD(target, "zmqVersion", ZmqVersion);
+
Context::Initialize(target);
Socket::Initialize(target);
}
View
@@ -0,0 +1,17 @@
+$(function(){
+ $('code').each(function(){
+ $(this).html(highlight($(this).text()));
+ });
+});
+
+function highlight(js) {
+ return js
+ .replace(/</g, '&lt;')
+ .replace(/>/g, '&gt;')
+ .replace(/\/\/(.*)/gm, '<span class="comment">//$1</span>')
+ .replace(/('.*')/gm, '<span class="string">$1</span>')
+ .replace(/(\d+\.\d+)/gm, '<span class="number">$1</span>')
+ .replace(/(\d+)/gm, '<span class="number">$1</span>')
+ .replace(/\bnew *(\w+)/gm, '<span class="keyword">new</span> <span class="init">$1</span>')
+ .replace(/\b(function|new|throw|return|var|if|else)\b/gm, '<span class="keyword">$1</span>')
+}
View
@@ -0,0 +1,170 @@
+<!DOCTYPE html><html><head><title>node-zeromq</title><link rel="stylesheet" href="style.css"><script src="http://ajax.googleapis.com/ajax/libs/jquery/1.4.3/jquery.min.js"></script><script src="highlight.js"></script><script src="menu.js"></script></head><body><h1>node-zeromq</h1><p><a href="http://www.zeromq.org/">ZeroMQ </a>bindings for Node.js
+</p><div class="comment"><h2>types</h2><div class="description"><p>Map of socket types.</p></div><a href="#" class="view-source">View source</a><pre><code>var types = exports.types = {
+ pub: zmq.ZMQ_PUB
+ , sub: zmq.ZMQ_SUB
+ , req: zmq.ZMQ_REQ
+ , xreq: zmq.ZMQ_XREQ
+ , rep: zmq.ZMQ_REP
+ , xrep: zmq.ZMQ_XREP
+ , push: zmq.ZMQ_PUSH
+ , pull: zmq.ZMQ_PULL
+ , dealer: zmq.ZMQ_DEALER
+ , router: zmq.ZMQ_ROUTER
+ , pair: zmq.ZMQ_PAIR
+};</code></pre></div><div class="comment"><h2>opts</h2><div class="description"><p>Map of socket options.</p></div><a href="#" class="view-source">View source</a><pre><code>var opts = exports.options = {
+ _fd: zmq.ZMQ_FD
+ , _ioevents: zmq.ZMQ_EVENTS
+ , _receiveMore: zmq.ZMQ_RCVMORE
+ , _subscribe: zmq.ZMQ_SUBSCRIBE
+ , _unsubscribe: zmq.ZMQ_UNSUBSCRIBE
+ , affinity: zmq.ZMQ_AFFINITY
+ , backlog: zmq.ZMQ_BACKLOG
+ , hwm: zmq.ZMQ_HWM
+ , identity: zmq.ZMQ_IDENTITY
+ , linger: zmq.ZMQ_LINGER
+ , mcast_loop: zmq.ZMQ_MCAST_LOOP
+ , rate: zmq.ZMQ_RATE
+ , rcvbuf: zmq.ZMQ_RCVBUF
+ , reconnect_ivl: zmq.ZMQ_RECONNECT_IVL
+ , recovery_ivl: zmq.ZMQ_RECOVERY_IVL
+ , sndbuf: zmq.ZMQ_SNDBUF
+ , swap: zmq.ZMQ_SWAP
+};
+
+// Context management happens here. We lazily initialize a default context,
+// and use that everywhere. Also cleans up on exit.
+var ctx;
+function defaultContext() {
+ if (ctx) return ctx;
+
+ var io_threads = 1;
+ if (process.env.ZMQ_IO_THREADS) {
+ io_threads = parseInt(process.env.ZMQ_IO_THREADS, 10);
+ if (!io_threads || io_threads < 1) {
+ console.warn('Invalid number in ZMQ_IO_THREADS, using 1 IO thread.');
+ io_threads = 1;
+ }
+ }
+
+ ctx = new zmq.Context(io_threads);
+ process.on('exit', function() {
+ // ctx.close();
+ ctx = null;
+ });
+
+ return ctx;
+};</code></pre></div><div class="comment"><h2>Socket()</h2><div class="description"><p>Create a new socket of the given <code>type</code>.</p></div><a href="#" class="view-source">View source</a><pre><code>function Socket(type) {
+ this.type = type;
+ this._zmq = new zmq.Socket(defaultContext(), types[type]);
+ this._outgoing = [];
+ this._watcher = new IOWatcher;
+ this._watcher.callback = this._flush.bind(this);
+ this._watcher.set(this._fd, true, false);
+ this._watcher.start();
+};</code></pre></div><div class="comment"><h2>Socket.prototype.setsockopt()</h2><div class="description"><p>Set <code>opt</code> to <code>val</code>.</p></div><a href="#" class="view-source">View source</a><pre><code>Socket.prototype.setsockopt = function(opt, val){
+ this._zmq.setsockopt(opts[opt] || opt, val);
+ return this;
+};</code></pre></div><div class="comment"><h2>Socket.prototype.getsockopt()</h2><div class="description"><p>Get socket <code>opt</code>.</p></div><a href="#" class="view-source">View source</a><pre><code>Socket.prototype.getsockopt = function(opt){
+ return this._zmq.getsockopt(opts[opt] || opt);
+};</code></pre></div><div class="comment"><h2>Socket.prototype.bind()</h2><div class="description"><p>Async bind.</p>
+
+<p>Emits the "bind" event.</p></div><a href="#" class="view-source">View source</a><pre><code>Socket.prototype.bind = function(addr, cb) {
+ var self = this;
+ self._watcher.stop();
+ self._zmq.bind(addr, function(err) {
+ self._watcher.start();
+ self.emit('bind');
+ cb && cb(err);
+ });
+ return this;
+};</code></pre></div><div class="comment"><h2>Socket.prototype.bindSync()</h2><div class="description"><p>Sync bind.</p></div><a href="#" class="view-source">View source</a><pre><code>Socket.prototype.bindSync = function(addr) {
+ this._watcher.stop();
+ try {
+ this._zmq.bindSync(addr);
+ } catch (e) {
+ this._watcher.start();
+ throw e;
+ }
+ this._watcher.start();
+ return this;
+};</code></pre></div><div class="comment"><h2>Socket.prototype.connect()</h2><div class="description"><p>Connect to <code>addr</code>.</p></div><a href="#" class="view-source">View source</a><pre><code>Socket.prototype.connect = function(addr) {
+ this._zmq.connect(addr);
+ return this;
+};</code></pre></div><div class="comment"><h2>Socket.prototype.subscribe()</h2><div class="description"><p>Subscribe with the given <code>filter</code>.</p></div><a href="#" class="view-source">View source</a><pre><code>Socket.prototype.subscribe = function(filter) {
+ this._subscribe = filter;
+ return this;
+};</code></pre></div><div class="comment"><h2>Socket.prototype.unsubscribe()</h2><div class="description"><p>Unsubscribe with the given <code>filter</code>.</p></div><a href="#" class="view-source">View source</a><pre><code>Socket.prototype.unsubscribe = function(filter) {
+ this._unsubscribe = filter;
+ return this;
+};</code></pre></div><div class="comment"><h2>Socket.prototype.send()</h2><div class="description"><p>Send the given <code>msg</code>.</p></div><a href="#" class="view-source">View source</a><pre><code>Socket.prototype.send = function(msg, flags) {
+ // allow strings etc
+ if (!Buffer.isBuffer(msg)) {
+ msg = new Buffer(String(msg), 'utf8');
+ }
+
+ this._outgoing.push([msg, flags || 0]);
+ this._flush();
+
+ return this;
+};
+
+// The workhorse that does actual send and receive operations.
+// This helper is called from `send` above, and in response to
+// the watcher noticing the signaller fd is readable.
+Socket.prototype._flush = function() {
+ var args;
+
+ // Don't allow recursive flush invocation as it can lead to stack
+ // exhaustion and write starvation
+ if (this._flushing) return;
+
+ this._flushing = true;
+ try {
+ while (true) {
+ var emitArgs
+ , flags = this._ioevents;
+
+ if (!this._outgoing.length) {
+ flags &= ~zmq.ZMQ_POLLOUT;
+ }
+
+ if (!flags) break;
+
+ if (flags & zmq.ZMQ_POLLIN) {
+ emitArgs = ['message'];
+
+ do {
+ emitArgs.push(new Buffer(this._zmq.recv()));
+ } while (this._receiveMore);
+
+ this.emit.apply(this, emitArgs);
+ if (this._zmq.state != zmq.STATE_READY) {
+ this._flushing = false;
+ return;
+ }
+ }
+
+ // We send as much as possible in one burst so that we don't
+ // starve sends if we receive more than one message for each
+ // one sent.
+ while (flags & zmq.ZMQ_POLLOUT && this._outgoing.length) {
+ args = this._outgoing.shift();
+ this._zmq.send(args[0], args[1]);
+ flags = this._ioevents;
+ }
+ }
+ } catch (e) {
+ this.emit('error', e);
+ }
+
+ this._flushing = false;
+};</code></pre></div><div class="comment"><h2>Socket.prototype.close()</h2><div class="description"><p>Close the socket.</p></div><a href="#" class="view-source">View source</a><pre><code>Socket.prototype.close = function() {
+ this._watcher.stop();
+ this._watcher = null;
+ this._zmq.close();
+ return this;
+};</code></pre></div><div class="comment"><h2>exports.socket()</h2><div class="description"><p>Create a <code>type</code> socket with the given <code>options</code>.</p></div><a href="#" class="view-source">View source</a><pre><code>exports.socket = function(type, options) {
+ var sock = new Socket(type);
+ for (var key in options) sock[key] = options[key];
+ return sock;
+};</code></pre></div></body></html>
Oops, something went wrong.

0 comments on commit b34448a

Please sign in to comment.