diff --git a/lib/index.js b/lib/index.js index 701e668..650ebd4 100644 --- a/lib/index.js +++ b/lib/index.js @@ -195,7 +195,6 @@ exports.Socket = function (type) { self.emit('error', err); } finally { self._zmq.pending = self._outgoing.length; - self._flushing = false; } } @@ -272,7 +271,6 @@ Socket.prototype.bind = function(addr, cb) { self.emit('error', err); } finally { self._zmq.pending = self._outgoing.length; - self._flushing = false; } }); @@ -320,7 +318,6 @@ Socket.prototype.unbind = function(addr, cb) { self.emit('error', err); } finally { self._zmq.pending = self._outgoing.length; - self._flushing = false; } }); } @@ -476,7 +473,6 @@ Socket.prototype.send = function(msg, flags) { this.emit('error', err); } finally { this._zmq.pending = this._outgoing.length; - this._flushing = false; } } @@ -498,27 +494,32 @@ Socket.prototype._flush = function() { // If an Async thread that may call a zmq function is in flight // do not call any zmq functions. if (this._zmq.state !== zmq.STATE_READY) { + this._flushing = false; return; } flags = this._zmq.getsockopt(zmq.ZMQ_EVENTS); if (!this._outgoing.length) flags &= ~zmq.ZMQ_POLLOUT; - if (!flags) return; + if (!flags) { + this._flushing = false; + return + }; if (flags & zmq.ZMQ_POLLIN) { - emitArgs = ['message']; + emitArgs = ['message']; - do { - emitArgs.push(this._zmq.recv()); - } while (this._zmq.getsockopt(zmq.ZMQ_RCVMORE)); + do { + emitArgs.push(this._zmq.recv()); + } while (this._zmq.getsockopt(zmq.ZMQ_RCVMORE)); - // Handle received message immediately to prevent memory leak in driver - this.emit.apply(this, emitArgs); + // Handle received message immediately to prevent memory leak in driver + this.emit.apply(this, emitArgs); - if (this._zmq.state !== zmq.STATE_READY) { - return; - } + if (this._zmq.state !== zmq.STATE_READY) { + this._flushing = false; + return; + } } // We send as much as possible in one burst so that we don't @@ -528,6 +529,7 @@ Socket.prototype._flush = function() { flags = this._zmq.getsockopt(zmq.ZMQ_EVENTS); if (!(flags & zmq.ZMQ_POLLOUT)) { + this._flushing = false; return; } @@ -543,11 +545,12 @@ Socket.prototype._flush = function() { while (args && (args[1] & zmq.ZMQ_SNDMORE)) { args = this._outgoing.shift(); } - + this._flushing = false; throw sendError; } } } + this._flushing = false; }; /**