Fixed high water mark #277

Merged
merged 8 commits into from Feb 14, 2014
View
@@ -102,8 +102,13 @@ namespace zmq {
private:
static NAN_METHOD(New);
Socket(Context *context, int type);
+
static Socket* GetSocket(_NAN_METHOD_ARGS);
static NAN_GETTER(GetState);
+
+ static NAN_GETTER(GetPending);
+ static NAN_SETTER(SetPending);
+
template<typename T>
Handle<Value> GetSockOpt(int option);
template<typename T>
@@ -140,6 +145,7 @@ namespace zmq {
Persistent<Object> context_;
void *socket_;
+ int32_t pending_;
uint8_t state_;
int32_t endpoints;
#if ZMQ_CAN_MONITOR
@@ -253,6 +259,8 @@ namespace zmq {
t->InstanceTemplate()->SetInternalFieldCount(1);
t->InstanceTemplate()->SetAccessor(
String::NewSymbol("state"), Socket::GetState);
+ t->InstanceTemplate()->SetAccessor(
+ String::NewSymbol("pending"), GetPending, SetPending);
NODE_SET_PROTOTYPE_METHOD(t, "bind", Bind);
NODE_SET_PROTOTYPE_METHOD(t, "bindSync", BindSync);
@@ -310,11 +318,14 @@ namespace zmq {
bool
Socket::IsReady() {
zmq_pollitem_t item = {socket_, 0, ZMQ_POLLIN, 0};
+ if (pending_ > 0)
+ item.events |= ZMQ_POLLOUT;
+
int rc = zmq_poll(&item, 1, 0);
if (rc < 0) {
throw std::runtime_error(ErrorMessage());
}
- return item.revents & (ZMQ_POLLIN);
+ return item.revents & item.events;
}
void
@@ -429,6 +440,7 @@ namespace zmq {
Socket::Socket(Context *context, int type) : ObjectWrap() {
NanAssignPersistent(Object, context_, NanObjectWrapHandle(context));
socket_ = zmq_socket(context->context_, type);
+ pending_ = 0;
state_ = STATE_READY;
endpoints = 0;
@@ -470,6 +482,22 @@ namespace zmq {
NanReturnValue(Integer::New(socket->state_));
}
+ NAN_GETTER(Socket::GetPending) {
+ NanScope();
+ Socket* socket = ObjectWrap::Unwrap<Socket>(args.Holder());
+ NanReturnValue(Integer::New(socket->pending_));
+ }
+
+ NAN_SETTER(Socket::SetPending) {
+ NanScope();
+ if (!value->IsNumber()) {
+ NanThrowTypeError("Pending must be an integer");
+ }
+
+ Socket* socket = ObjectWrap::Unwrap<Socket>(args.Holder());
+ socket->pending_ = value->Int32Value();
+ }
+
template<typename T>
Handle<Value> Socket::GetSockOpt(int option) {
T value = 0;
View
@@ -459,8 +459,7 @@ Socket.prototype._flush = function() {
}
if (zmq.STATE_READY != this._zmq.state) {
- this._flushing = false;
- return;
+ break;
}
}
@@ -476,6 +475,9 @@ Socket.prototype._flush = function() {
} catch (err) {
this.emit('error', err);
}
+
+ this._zmq.pending = this._outgoing.length;
+
this._flushing = false;
};
View
@@ -66,6 +66,10 @@ if (semver.gte(zmq.version, '3.0.0')) {
'RCVHWM',
'MAXMSGSIZE',
'ZMQ_MULTICAST_HOPS',
+ 'TCP_KEEPALIVE',
+ 'TCP_KEEPALIVE_CNT',
+ 'TCP_KEEPALIVE_IDLE',
+ 'TCP_KEEPALIVE_INTVL'
]);
}
@@ -0,0 +1,44 @@
+var zmq = require('../')
+ , should = require('should')
+ , semver = require('semver');
+
+var push = zmq.socket('push')
+ , pull = zmq.socket('pull');
+
+var n = 0;
+
+pull.on('message', function(msg){
+ msg = msg.toString();
+ switch (n++) {
+ case 0:
+ msg.should.equal('string');
+ break;
+ case 1:
+ msg.should.equal('15.99');
+ break;
+ case 2:
+ msg.should.equal('buffer');
+ push.close();
+ pull.close();
+ break;
+ }
+});
+
+setTimeout(function () {
+ n.should.equal(3);
+}, 1*1000);
+
+if (semver.satisfies(zmq.version, '>=3.x')) {
+ push.setsockopt(zmq.ZMQ_SNDHWM, 1);
+ pull.setsockopt(zmq.ZMQ_RCVHWM, 1);
+} else if (semver.satisfies(zmq.version, '2.x')) {
+ push.setsockopt(zmq.ZMQ_HWM, 1);
+ pull.setsockopt(zmq.ZMQ_HWM, 1);
+}
+
+push.bind('tcp://127.0.0.1:12345', function () {
+ push.send('string');
+ push.send(15.99);
+ push.send(new Buffer('buffer'));
+ pull.connect('tcp://127.0.0.1:12345');
+});