Skip to content

Commit

Permalink
Implemented 'drain' and 'readable' events.
Browse files Browse the repository at this point in the history
  • Loading branch information
Schoonology committed Feb 15, 2013
1 parent de0e139 commit c38ee4b
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 58 deletions.
11 changes: 10 additions & 1 deletion lib/zmqstream.js
@@ -1 +1,10 @@
module.exports = require('bindings')('zmqstream.node')
/*jshint proto:true*/
var zmqstream = require('bindings')('zmqstream.node')
, EventEmitter = require('events').EventEmitter

// We're using `__proto__` deliberately here to inherit a JS class to a C++ class.
// While we'd prefer to use `util.inherits`, that wipes out the already-completed prototype.
zmqstream.Socket.super_ = EventEmitter
zmqstream.Socket.prototype.__proto__ = new EventEmitter()

module.exports = zmqstream
202 changes: 149 additions & 53 deletions src/zmqstream.cc
Expand Up @@ -12,7 +12,7 @@ namespace zmqstream {
// TODO: Is there any reason to have more than one Context?
// Would that make managing blocking sockets (REQ, DEALER, PUSH) easier?
ScopedContext gContext;
Persistent<FunctionTemplate> Socket::constructor;
Persistent<Function> Socket::constructor;

//
// ## ScopedContext
Expand Down Expand Up @@ -49,6 +49,13 @@ namespace zmqstream {
//
#define PUSH(a, v) a->Set(a->Length(), v);

//
// Internal versions of Unwrap.
//
#define SOCKET_TO_THIS(obj) obj->handle_
; // For Sublime's syntax highlighter. Ignore.
#define THIS_TO_SOCKET(obj) ObjectWrap::Unwrap<Socket>(obj)

//
// Because we use ZMQ in a non-blocking way, EAGAIN holds a special place in our hearts.
//
Expand All @@ -70,9 +77,12 @@ namespace zmqstream {
// Much like the native `net` module, a ZMQStream socket (perhaps obviously) is really just a Duplex stream that
// you can `connect`, `bind`, etc. just like a native ZMQ socket.
//
Socket::Socket(int type) : ObjectWrap() {
Socket::Socket(int type) : ObjectWrap(), drain(true), readable(false) {
this->socket = zmq_socket(gContext.context, type);
assert(this->socket != 0);

assert(uv_check_init(uv_default_loop(), &handle) == 0);
handle.data = this;
}

Socket::~Socket() {
Expand All @@ -81,28 +91,6 @@ namespace zmqstream {
}
}


//
// ## Close `Close()`
//
// Closes the underlying ZMQ socket. _The stream should no longer be used!_
//
Handle<Value> Socket::Close(const Arguments& args) {
HandleScope scope;
Socket *self = ObjectWrap::Unwrap<Socket>(args.This());
void* socket = self->socket;

if (socket == NULL) {
return scope.Close(Undefined());
}

self->socket = NULL;

ZMQ_CHECK(zmq_close(socket));

return scope.Close(Undefined());
}

//
// ## Socket(options)
//
Expand All @@ -116,7 +104,7 @@ namespace zmqstream {

if (!args.IsConstructCall()) {
Handle<Value> argv[1] = { args[0] };
return constructor->GetFunction()->NewInstance(1, argv);
return constructor->NewInstance(1, argv);
}

Handle<Object> options;
Expand All @@ -136,16 +124,53 @@ namespace zmqstream {
// Establishes initial property values.
args.This()->Set(String::NewSymbol("type"), type);

// If we've had a super"class" provided, be sure to call its constructor.
Handle<Value> super = constructor->Get(String::NewSymbol("super_"));
if (super->IsFunction()) {
super->ToObject()->CallAsFunction(args.This(), 0, NULL);
}

// Establish our libuv handle and callback.
uv_check_start(&obj->handle, Check);

// TODO
ZMQ_CHECK(zmq_setsockopt(obj->socket, ZMQ_IDENTITY, "TestClient", 10));

return args.This();
}

//
// ## Close `Close()`
//
// Closes the underlying ZMQ socket. _The stream should no longer be used!_
//
Handle<Value> Socket::Close(const Arguments& args) {
HandleScope scope;
Socket *self = THIS_TO_SOCKET(args.This());
void* socket = self->socket;

uv_check_stop(&self->handle);

if (socket == NULL) {
return scope.Close(Undefined());
}

self->socket = NULL;

ZMQ_CHECK(zmq_close(socket));

// TODO: We could destroy the entire Socket with `delete`, but that would change the contract, making the object
// "near death", rather than accessible but invalid.
//
// delete self;

return scope.Close(Undefined());
}

//
// ## Read `Read(size)`
//
// Consumes a minimum of **size** messages of data from the ZMQ socket. If **size** is undefined, the entire
// Consumes a maximum of **size** messages of data from the ZMQ socket. If **size** is undefined, the entire
// queue will be read and returned.
//
// If there is no data to consume, or if there are fewer bytes in the internal buffer than the size argument,
Expand All @@ -155,12 +180,12 @@ namespace zmqstream {
//
// Returns an Array of Messages, which are in turn Arrays of Frames as Node Buffers.
//
// NOTE: To reiterate, this Read returns a different format than the builtin Duplex, which is a single Buffer or
// String. Additionally, there is no encoding support.
// NOTE: To reiterate, this Read returns a different amount and different format than the builtin Duplex, which
// is a single Buffer or String of <= `size`. Because of this, there is no encoding support.
//
Handle<Value> Socket::Read(const Arguments& args) {
HandleScope scope;
Socket *self = ObjectWrap::Unwrap<Socket>(args.This());
Socket *self = THIS_TO_SOCKET(args.This());

if (self->socket == NULL) {
THROW_REF("Socket is closed, and cannot be read from.");
Expand Down Expand Up @@ -194,13 +219,16 @@ namespace zmqstream {
message->Set(message->Length(), Buffer::New(String::New((char*)zmq_msg_data(&part), zmq_msg_size(&part))));

if (!zmq_msg_more(&part)) {
size--;
messages->Set(messages->Length(), message);
message = Array::New();
}
} else {
self->readable = false;
}

ZMQ_CHECK(zmq_msg_close(&part));
} while (rc == 0);
} while (rc == 0 && size != 0);

if (messages->Length() == 0) {
return scope.Close(Null());
Expand All @@ -222,9 +250,11 @@ namespace zmqstream {
// NOTE: Unlike the builtin Duplex class, a return value of `false` indicates the write was _unsuccessful_, and
// will need to be tried again.
//
// TODO: Investigate caching in JS and sending one event loop's worth every tick.
//
Handle<Value> Socket::Write(const Arguments& args) {
HandleScope scope;
Socket *self = ObjectWrap::Unwrap<Socket>(args.This());
Socket *self = THIS_TO_SOCKET(args.This());

if (self->socket == NULL) {
THROW_REF("Socket is closed, and cannot be written to.");
Expand All @@ -248,14 +278,11 @@ namespace zmqstream {
for (int i = 0; i < length; i++) {
Handle<Object> buffer = frames->Get(i)->ToObject();

// TODO:
// * Error handling
// * 'drain' event
// * Investigate caching in JS and sending one event loop's worth every tick.
rc = zmq_send(self->socket, Buffer::Data(buffer), Buffer::Length(buffer), i < length - 1 ? ZMQ_SNDMORE | flags : flags);
ZMQ_CHECK(rc);

if (isEAGAIN(rc)) {
self->drain = false;
return scope.Close(Boolean::New(0));
}
}
Expand All @@ -272,7 +299,7 @@ namespace zmqstream {
//
Handle<Value> Socket::Connect(const Arguments& args) {
HandleScope scope;
Socket *self = ObjectWrap::Unwrap<Socket>(args.This());
Socket *self = THIS_TO_SOCKET(args.This());

if (self->socket == NULL) {
THROW_REF("Socket is closed, and cannot be connected.");
Expand All @@ -298,7 +325,7 @@ namespace zmqstream {
//
Handle<Value> Socket::Disconnect(const Arguments& args) {
HandleScope scope;
Socket *self = ObjectWrap::Unwrap<Socket>(args.This());
Socket *self = THIS_TO_SOCKET(args.This());

if (self->socket == NULL) {
THROW_REF("Socket is closed, and cannot be disconnected.");
Expand All @@ -324,7 +351,7 @@ namespace zmqstream {
//
Handle<Value> Socket::Bind(const Arguments& args) {
HandleScope scope;
Socket *self = ObjectWrap::Unwrap<Socket>(args.This());
Socket *self = THIS_TO_SOCKET(args.This());

if (self->socket == NULL) {
THROW_REF("Socket is closed, and cannot be bound.");
Expand All @@ -350,7 +377,7 @@ namespace zmqstream {
//
Handle<Value> Socket::Unbind(const Arguments& args) {
HandleScope scope;
Socket *self = ObjectWrap::Unwrap<Socket>(args.This());
Socket *self = THIS_TO_SOCKET(args.This());

if (self->socket == NULL) {
THROW_REF("Socket is closed, and cannot be unbound.");
Expand All @@ -368,25 +395,94 @@ namespace zmqstream {
}

//
// Exports the Socket class within the module `target`.
// ## Check
//
void Socket::InstallExports(Handle<Object> target) {
HandleScope scope;
// To generate `'readable'` and `'drain'` events, we need to be polling our socket handles periodically. We
// define that period to be once per event loop tick, and this is our libuv callback to handle that.
//
void Socket::Check(uv_check_t* handle, int status) {
assert(handle);

Socket* self = (Socket*)handle->data;
assert(self->socket);

Handle<Value> jsObj = SOCKET_TO_THIS(self);
if (!jsObj->IsObject()) {
return;
}

Handle<Value> emit = jsObj->ToObject()->Get(String::NewSymbol("emit"));

constructor = Persistent<FunctionTemplate>::New(FunctionTemplate::New(New));
if (!emit->IsFunction()) {
return;
}

if (!self->readable) {
zmq_pollitem_t item;

item.socket = self->socket;
item.events = ZMQ_POLLIN;

int rc = zmq_poll(&item, 1, 0);
assert(rc != -1);

if (rc > 0) {
self->readable = true;
Handle<Value> args[1] = { String::New("readable") };
emit->ToObject()->CallAsFunction(jsObj->ToObject(), 1, args);
}
}

if (!self->drain) {
zmq_pollitem_t item;

item.socket = self->socket;
item.events = ZMQ_POLLOUT;

int rc = zmq_poll(&item, 1, 0);
assert(rc != -1);

if (rc > 0) {
self->drain = true;
Handle<Value> args[1] = { String::New("drain") };
emit->ToObject()->CallAsFunction(jsObj->ToObject(), 1, args);
}
}
}

//
// ## Initialize
//
// Creates and populates the constructor Function and its prototype.
//
void Socket::Initialize() {
Local<FunctionTemplate> constructorTemplate(FunctionTemplate::New(New));

// ObjectWrap uses the first internal field to store the wrapped pointer.
constructor->InstanceTemplate()->SetInternalFieldCount(1);
constructor->SetClassName(String::NewSymbol("Socket"));
constructorTemplate->InstanceTemplate()->SetInternalFieldCount(1);
constructorTemplate->SetClassName(String::NewSymbol("Socket"));

// Add all prototype methods, getters and setters here.
NODE_SET_PROTOTYPE_METHOD(constructor, "read", Read);
NODE_SET_PROTOTYPE_METHOD(constructor, "write", Write);
NODE_SET_PROTOTYPE_METHOD(constructor, "close", Close);
NODE_SET_PROTOTYPE_METHOD(constructor, "connect", Connect);
NODE_SET_PROTOTYPE_METHOD(constructor, "disconnect", Disconnect);
NODE_SET_PROTOTYPE_METHOD(constructor, "bind", Bind);
NODE_SET_PROTOTYPE_METHOD(constructor, "unbind", Unbind);
NODE_SET_PROTOTYPE_METHOD(constructorTemplate, "read", Read);
NODE_SET_PROTOTYPE_METHOD(constructorTemplate, "write", Write);
NODE_SET_PROTOTYPE_METHOD(constructorTemplate, "close", Close);
NODE_SET_PROTOTYPE_METHOD(constructorTemplate, "connect", Connect);
NODE_SET_PROTOTYPE_METHOD(constructorTemplate, "disconnect", Disconnect);
NODE_SET_PROTOTYPE_METHOD(constructorTemplate, "bind", Bind);
NODE_SET_PROTOTYPE_METHOD(constructorTemplate, "unbind", Unbind);

constructor = Persistent<Function>::New(constructorTemplate->GetFunction());
}

//
// ## InstallExports
//
// Exports the Socket class within the module `target`.
//
void Socket::InstallExports(Handle<Object> target) {
HandleScope scope;

Initialize();

Local<Object> Type = Object::New();
ZMQ_DEFINE_CONSTANT(Type, "REQ", ZMQ_REQ);
Expand All @@ -403,7 +499,7 @@ namespace zmqstream {

target->Set(String::NewSymbol("Type"), Type, static_cast<v8::PropertyAttribute>(v8::ReadOnly | v8::DontDelete));
// This has to be last, otherwise the properties won't show up on the object in JavaScript.
target->Set(String::NewSymbol("Socket"), constructor->GetFunction());
target->Set(String::NewSymbol("Socket"), constructor);

// TODO: Ensure cleanup like so:
// AtExit(Cleanup, NULL);
Expand Down

0 comments on commit c38ee4b

Please sign in to comment.