Skip to content
This repository

Windows Port #81

Closed
wants to merge 2 commits into from

4 participants

Seth Fitzsimmons TJ Holowaychuk Li Cai matthiasg
Seth Fitzsimmons

Moved IOWatcher functionality into C++ and adapted it for use on Windows (Vista+ due to use of WSAPoll). Everything appears to work as it should on both my Mac and in a Windows 7 VM, but I still won't fully trust my changes until others can observe the same.

I've included rough Windows build instructions, but it needs to be better packaged up to be a) usable and b) distributable. If you know of resources for creating cross-platform NPM packages containing binary add-ons, please post them here.

Addresses #74, maybe.

binding.cc
... ... @@ -248,10 +276,59 @@ enum {
248 276 return args.This();
249 277 }
250 278
  279 + bool
  280 + Socket::IsReady() {
  281 + size_t len = sizeof(int);
  282 + struct pollfd pfd = { 0 };
  283 + pfd.events = POLLIN;
  284 +
  285 + if (socket_ && zmq_getsockopt(socket_, ZMQ_FD, &pfd.fd, &len) >= 0) {
  286 + return sockpoll(&pfd, 1, 0);
  287 + } else {
  288 + return 0;
  289 + }
6
TJ Holowaychuk Collaborator

will this work on windows for you?:

  bool
  Socket::IsReady() {
    zmq_pollitem_t items[1];
    items[0].socket = socket_;
    items[0].events = ZMQ_POLLIN;
    return zmq_poll(items, 1, 0);
  }
Seth Fitzsimmons
mojodna added a note

Hard to tell. It looked like it worked once (based on some debug output). Windows doesn't seem to be doing the right thing with return codes (under bash or cmd.exe), so the tests aren't especially helpful at the moment (they always report success unless something throws). I guess I know what I'm looking at next.

I agree that this would be better; not only does it remove the Vista+ restriction, it's more likely to work on other systems with different poll implementations.

TJ Holowaychuk Collaborator

yup zmq already implements kqueue and friends so we should utilize that. as for the test suite yeah we can ditch bash and go with a little node script that loops through and does the same thing

Seth Fitzsimmons
mojodna added a note

How 'bout a proper test runner? Say...mocha or something.

TJ Holowaychuk Collaborator

nah we wouldn't gain much from a test framework for this sort of thing, the tests are so large it's a lot nicer to separate these ones into separate files. We could if we have a decent reason to but other than the small set of unit tests I dont think we really need it

Seth Fitzsimmons
mojodna added a note

They were finishing immediately because I thought I was being clever and uv_unref'ing after registering the uv_check. With that fixed, the tests legitimately pass on my Mac (with your simplified IsReady() implementation).

However, uv_check_start / uv_check_initdon't appear to work in any meaningful way on Windows. I'm very unfamiliar with libuv and node internals, so I could be doing something wrong...

Anyway, the upshot is that this works fine on OS X (not sure about performance differential), but isn't yet ready for Windows.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
binding.vcxproj
... ... @@ -0,0 +1,89 @@
  1 +<?xml version="1.0" encoding="utf-8"?>
3
TJ Holowaychuk Collaborator

.gitignore :D

Seth Fitzsimmons
mojodna added a note

Until we can get gyp (or something) to generate these, this and binding.sln (the "solution" file) stay. If it makes you feel better, think of them as Makefiles for Windows.

TJ Holowaychuk Collaborator

ah ok sounds good then for now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
TJ Holowaychuk
Collaborator

this also takes care of #79

Seth Fitzsimmons

Doesn't work on Windows yet, possibly due to uv_check_start and uv_check_init.

TJ Holowaychuk
Collaborator

added make benchmark: #82 so we have something to reference at least for now.

Seth Fitzsimmons

A relative comparison:

IOWatcher:

  pub/sub:
    100000 msgs
    16202 ops/s
    6172 ms

native:

  pub/sub:
    100000 msgs
    16374 ops/s
    6107 ms
TJ Holowaychuk
Collaborator

cool, well at least there's no regression, on my air im getting:

  pub/sub:
    100000 msgs
    22426 ops/s
    4459 ms

not a great benchmark though we should spawn a few subprocs so we're not grinding the one loop

Seth Fitzsimmons

joyent/node#2738 has details on why uv_check_* isn't the right fit for this (but they're not clear on what is yet). Their explanation could also be the reason why perf is substantially lower than czmq (I seem to remember IOWatcher using ev_check_*, but I could be wrong).

Li Cai
caili commented

Following the advise to use uv_timer_* (https://github.com/caili/zeromq.node/tree/windows), though tests run fine but it hangs occasionally in one of my projects. I tried with polling in js (https://github.com/caili/zeromq.node/tree/polling). Hardly a solution but it works so far for me in a prototype.

Seth Fitzsimmons

See also https://github.com/matthiasg/zeromq-node-windows

Matthias says that he's getting 14-15k messages/second on Windows with his changes.

Li Cai
caili commented

With Matthias's changes the node hangs in my environment (XP / VS 2010 / Node 0.6.10, a couple of REQ/REP and PUB/SUB sockets running in parallel). Basic REQ/REP or PUB/SUB tests run fine though.

matthiasg

@caili did you create new tests that I could use to recreate the hangs on my machine ? Also I thought @mojodna made changes that would require vista and up ?

matthiasg

btw i used node 0.6.12 and 0.6.13 is even better, since creating the csproj works correctly now

Seth Fitzsimmons

Also I thought @mojodna made changes that would require vista and up ?

By switching to zmq_poll, that requirement went away (it was only when using WSAPoll).

Li Cai
caili commented

@matthiasg all tests I made pass. I guess this is something else in the application I have to figure out.

Li Cai
caili commented

turned out to be an application bug. The uv_timer_* approach works pretty well :-)

Seth Fitzsimmons mojodna commented on the diff
binding.cc
@@ -248,10 +260,57 @@ enum {
248 260 return args.This();
249 261 }
250 262
  263 + bool
  264 + Socket::IsReady() {
  265 + zmq_pollitem_t items[1];
  266 + items[0].socket = socket_;
  267 + items[0].events = ZMQ_POLLIN;
  268 + return zmq_poll(items, 1, 0);
  269 + }
  270 +
  271 + void
  272 + Socket::CallbackIfReady() {
  273 + if (this->IsReady()) {
1
Seth Fitzsimmons
mojodna added a note

This may be unnecessary since the edge transition on the socket already told us that it's ready.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Seth Fitzsimmons mojodna commented on the diff
lib/index.js
@@ -4,8 +4,7 @@
4 4 */
5 5
6 6 var EventEmitter = require('events').EventEmitter
7   - , IOWatcher = process.binding('io_watcher').IOWatcher
8   - , zmq = require('../build/Release/binding');
  7 + , zmq = require('../binding');
1
Seth Fitzsimmons
mojodna added a note

If you see "Symbol not found" errors, either change this path back or copy the built version over ./binding.node.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Seth Fitzsimmons

Closing in favor of #110 (uv_poll--this isn't just a Windows thing), which contains the commits I just pushed.

Seth Fitzsimmons mojodna closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Showing 2 unique commits by 1 author.

Jun 25, 2012
Seth Fitzsimmons mojodna Use libuv d7f0fa3
Seth Fitzsimmons mojodna Use uv_poll in place of IOWatcher 9bbddda
This page is out of date. Refresh to see the latest.

Showing 2 changed files with 83 additions and 39 deletions. Show diff stats Hide diff stats

  1. +80 26 binding.cc
  2. +3 13 lib/index.js
106 binding.cc
@@ -26,9 +26,6 @@
26 26 #include <node.h>
27 27 #include <node_version.h>
28 28 #include <node_buffer.h>
29   -#if !NODE_VERSION_AT_LEAST(0, 5, 5)
30   -#include <ev.h>
31   -#endif
32 29 #include <zmq.h>
33 30 #include <assert.h>
34 31 #include <stdio.h>
@@ -37,6 +34,10 @@
37 34 #include <errno.h>
38 35 #include <stdexcept>
39 36
  37 +#ifdef _WIN32
  38 +#define snprintf _snprintf_s
  39 +#endif
  40 +
40 41 using namespace v8;
41 42 using namespace node;
42 43
@@ -70,6 +71,7 @@ namespace zmq {
70 71 public:
71 72 static void Initialize(v8::Handle<v8::Object> target);
72 73 virtual ~Socket();
  74 + void CallbackIfReady();
73 75
74 76 private:
75 77 static Handle<Value> New(const Arguments &args);
@@ -88,13 +90,9 @@ namespace zmq {
88 90 struct BindState;
89 91 static Handle<Value> Bind(const Arguments &args);
90 92
91   -#if NODE_VERSION_AT_LEAST(0, 5, 4)
92   - static void EIO_DoBind(eio_req *req);
93   -#else
94   - static int EIO_DoBind(eio_req *req);
95   -#endif
  93 + static void UV_BindAsync(uv_work_t* req);
  94 + static void UV_BindAsyncAfter(uv_work_t* req);
96 95
97   - static int EIO_BindDone(eio_req *req);
98 96 static Handle<Value> BindSync(const Arguments &args);
99 97
100 98 static Handle<Value> Connect(const Arguments &args);
@@ -111,8 +109,14 @@ namespace zmq {
111 109 Persistent<Object> context_;
112 110 void *socket_;
113 111 uint8_t state_;
  112 +
  113 + bool IsReady();
  114 + uv_poll_t *poll_handle_;
  115 + static void UV_PollCallback(uv_poll_t* handle, int status, int events);
114 116 };
115 117
  118 + Persistent<String> callback_symbol;
  119 +
116 120 static void
117 121 Initialize(Handle<Object> target);
118 122
@@ -154,6 +158,9 @@ namespace zmq {
154 158 Handle<Value>
155 159 Context::New(const Arguments& args) {
156 160 HandleScope scope;
  161 +
  162 + assert(args.IsConstructCall());
  163 +
157 164 int io_threads = 1;
158 165 if (args.Length() == 1) {
159 166 if (!args[0]->IsNumber()) {
@@ -222,6 +229,8 @@ namespace zmq {
222 229 NODE_SET_PROTOTYPE_METHOD(t, "close", Close);
223 230
224 231 target->Set(String::NewSymbol("Socket"), t->GetFunction());
  232 +
  233 + callback_symbol = NODE_PSYMBOL("onReady");
225 234 }
226 235
227 236 Socket::~Socket() {
@@ -231,6 +240,9 @@ namespace zmq {
231 240 Handle<Value>
232 241 Socket::New(const Arguments &args) {
233 242 HandleScope scope;
  243 +
  244 + assert(args.IsConstructCall());
  245 +
234 246 if (args.Length() != 2) {
235 247 return ThrowException(Exception::Error(
236 248 String::New("Must pass a context and a type to constructor")));
@@ -248,10 +260,57 @@ namespace zmq {
248 260 return args.This();
249 261 }
250 262
  263 + bool
  264 + Socket::IsReady() {
  265 + zmq_pollitem_t items[1];
  266 + items[0].socket = socket_;
  267 + items[0].events = ZMQ_POLLIN;
  268 + return zmq_poll(items, 1, 0);
  269 + }
  270 +
  271 + void
  272 + Socket::CallbackIfReady() {
  273 + if (this->IsReady()) {
  274 + HandleScope scope;
  275 +
  276 + Local<Value> callback_v = this->handle_->Get(callback_symbol);
  277 + if (!callback_v->IsFunction()) {
  278 + return;
  279 + }
  280 +
  281 + TryCatch try_catch;
  282 +
  283 + callback_v.As<Function>()->Call(this->handle_, 0, NULL);
  284 +
  285 + if (try_catch.HasCaught()) {
  286 + FatalException(try_catch);
  287 + }
  288 + }
  289 + }
  290 +
  291 + void
  292 + Socket::UV_PollCallback(uv_poll_t* handle, int status, int events) {
  293 + assert(status == 0);
  294 +
  295 + Socket* s = static_cast<Socket*>(handle->data);
  296 + s->CallbackIfReady();
  297 + }
  298 +
251 299 Socket::Socket(Context *context, int type) : ObjectWrap() {
252 300 context_ = Persistent<Object>::New(context->handle_);
253 301 socket_ = zmq_socket(context->context_, type);
254 302 state_ = STATE_READY;
  303 +
  304 + poll_handle_ = new uv_poll_t;
  305 +
  306 + poll_handle_->data = this;
  307 +
  308 + uv_os_sock_t socket;
  309 + size_t len = sizeof(uv_os_sock_t);
  310 + // TODO error handling
  311 + zmq_getsockopt(socket_, ZMQ_FD, &socket, &len);
  312 + uv_poll_init_socket(uv_default_loop(), poll_handle_, socket);
  313 + uv_poll_start(poll_handle_, UV_READABLE, Socket::UV_PollCallback);
255 314 }
256 315
257 316 Socket *
@@ -444,30 +503,22 @@ namespace zmq {
444 503 GET_SOCKET(args);
445 504
446 505 BindState* state = new BindState(socket, cb, addr);
447   - eio_custom(EIO_DoBind, EIO_PRI_DEFAULT, EIO_BindDone, state);
448   - ev_ref(EV_DEFAULT_UC);
  506 + uv_work_t* req = new uv_work_t;
  507 + req->data = state;
  508 + uv_queue_work(uv_default_loop(), req, UV_BindAsync, UV_BindAsyncAfter);
449 509 socket->state_ = STATE_BUSY;
450 510
451 511 return Undefined();
452 512 }
453 513
454   -#if NODE_VERSION_AT_LEAST(0, 5, 4)
455   - void
456   -#else
457   - int
458   -#endif
459   - Socket::EIO_DoBind(eio_req *req) {
460   - BindState* state = (BindState*) req->data;
  514 + void Socket::UV_BindAsync(uv_work_t* req) {
  515 + BindState* state = static_cast<BindState*>(req->data);
461 516 if (zmq_bind(state->sock, *state->addr) < 0)
462 517 state->error = zmq_errno();
463   -#if !NODE_VERSION_AT_LEAST(0, 5, 4)
464   - return 0;
465   -#endif
466 518 }
467 519
468   - int
469   - Socket::EIO_BindDone(eio_req *req) {
470   - BindState* state = (BindState*) req->data;
  520 + void Socket::UV_BindAsyncAfter(uv_work_t* req) {
  521 + BindState* state = static_cast<BindState*>(req->data);
471 522 HandleScope scope;
472 523
473 524 Local<Value> argv[1];
@@ -482,8 +533,7 @@ namespace zmq {
482 533 cb->Call(v8::Context::GetCurrent()->Global(), 1, argv);
483 534 if (try_catch.HasCaught()) FatalException(try_catch);
484 535
485   - ev_unref(EV_DEFAULT_UC);
486   - return 0;
  536 + delete req;
487 537 }
488 538
489 539 Handle<Value>
@@ -738,6 +788,8 @@ namespace zmq {
738 788 state_ = STATE_CLOSED;
739 789 context_.Dispose();
740 790 context_.Clear();
  791 +
  792 + uv_poll_stop(poll_handle_);
741 793 }
742 794 }
743 795
@@ -832,3 +884,5 @@ extern "C" void
832 884 init(Handle<Object> target) {
833 885 zmq::Initialize(target);
834 886 }
  887 +
  888 +NODE_MODULE(binding, init)
16 lib/index.js
@@ -4,8 +4,7 @@
4 4 */
5 5
6 6 var EventEmitter = require('events').EventEmitter
7   - , IOWatcher = process.binding('io_watcher').IOWatcher
8   - , zmq = require('../build/Release/binding');
  7 + , zmq = require('../binding');
9 8
10 9 /**
11 10 * Expose bindings as the module.
@@ -96,12 +95,9 @@ function defaultContext() {
96 95 function Socket(type) {
97 96 this.type = type;
98 97 this._zmq = new zmq.Socket(defaultContext(), types[type]);
  98 + this._zmq.onReady = this._flush.bind(this);
99 99 this._outgoing = [];
100 100 this._shouldFlush = true;
101   - this._watcher = new IOWatcher;
102   - this._watcher.callback = this._flush.bind(this);
103   - this._watcher.set(this._fd, true, false);
104   - this._watcher.start();
105 101 };
106 102
107 103 /**
@@ -165,9 +161,7 @@ Object.keys(opts).forEach(function(name){
165 161
166 162 Socket.prototype.bind = function(addr, cb) {
167 163 var self = this;
168   - self._watcher.stop();
169 164 self._zmq.bind(addr, function(err) {
170   - self._watcher.start();
171 165 self.emit('bind');
172 166 cb && cb(err);
173 167 });
@@ -183,14 +177,12 @@ Socket.prototype.bind = function(addr, cb) {
183 177 */
184 178
185 179 Socket.prototype.bindSync = function(addr) {
186   - this._watcher.stop();
187 180 try {
188 181 this._zmq.bindSync(addr);
189 182 } catch (err) {
190   - this._watcher.start();
191 183 throw err;
192 184 }
193   - this._watcher.start();
  185 +
194 186 return this;
195 187 };
196 188
@@ -323,8 +315,6 @@ Socket.prototype._flush = function() {
323 315 */
324 316
325 317 Socket.prototype.close = function() {
326   - this._watcher.stop();
327   - this._watcher = null;
328 318 this._zmq.close();
329 319 return this;
330 320 };

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.