Skip to content
This repository has been archived by the owner on Apr 24, 2020. It is now read-only.

Commit

Permalink
Make sure Socket _flush is not getting called recursively
Browse files Browse the repository at this point in the history
  • Loading branch information
ValYouW committed Mar 7, 2015
1 parent 22ec9de commit c06cfe0
Showing 1 changed file with 18 additions and 15 deletions.
33 changes: 18 additions & 15 deletions lib/index.js
Expand Up @@ -195,7 +195,6 @@ exports.Socket = function (type) {
self.emit('error', err);
} finally {
self._zmq.pending = self._outgoing.length;
self._flushing = false;
}
}

Expand Down Expand Up @@ -272,7 +271,6 @@ Socket.prototype.bind = function(addr, cb) {
self.emit('error', err);
} finally {
self._zmq.pending = self._outgoing.length;
self._flushing = false;
}

});
Expand Down Expand Up @@ -320,7 +318,6 @@ Socket.prototype.unbind = function(addr, cb) {
self.emit('error', err);
} finally {
self._zmq.pending = self._outgoing.length;
self._flushing = false;
}
});
}
Expand Down Expand Up @@ -476,7 +473,6 @@ Socket.prototype.send = function(msg, flags) {
this.emit('error', err);
} finally {
this._zmq.pending = this._outgoing.length;
this._flushing = false;
}
}

Expand All @@ -498,27 +494,32 @@ Socket.prototype._flush = function() {
// If an Async thread that may call a zmq function is in flight
// do not call any zmq functions.
if (this._zmq.state !== zmq.STATE_READY) {
this._flushing = false;
return;
}

flags = this._zmq.getsockopt(zmq.ZMQ_EVENTS);

if (!this._outgoing.length) flags &= ~zmq.ZMQ_POLLOUT;
if (!flags) return;
if (!flags) {
this._flushing = false;
return
};

if (flags & zmq.ZMQ_POLLIN) {
emitArgs = ['message'];
emitArgs = ['message'];

do {
emitArgs.push(this._zmq.recv());
} while (this._zmq.getsockopt(zmq.ZMQ_RCVMORE));
do {
emitArgs.push(this._zmq.recv());
} while (this._zmq.getsockopt(zmq.ZMQ_RCVMORE));

// Handle received message immediately to prevent memory leak in driver
this.emit.apply(this, emitArgs);
// Handle received message immediately to prevent memory leak in driver
this.emit.apply(this, emitArgs);

if (this._zmq.state !== zmq.STATE_READY) {
return;
}
if (this._zmq.state !== zmq.STATE_READY) {
this._flushing = false;
return;
}
}

// We send as much as possible in one burst so that we don't
Expand All @@ -528,6 +529,7 @@ Socket.prototype._flush = function() {
flags = this._zmq.getsockopt(zmq.ZMQ_EVENTS);

if (!(flags & zmq.ZMQ_POLLOUT)) {
this._flushing = false;
return;
}

Expand All @@ -543,11 +545,12 @@ Socket.prototype._flush = function() {
while (args && (args[1] & zmq.ZMQ_SNDMORE)) {
args = this._outgoing.shift();
}

this._flushing = false;
throw sendError;
}
}
}
this._flushing = false;
};

/**
Expand Down

0 comments on commit c06cfe0

Please sign in to comment.