Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

add support for callbacks with socket.send() #141

Merged
merged 16 commits into from

2 participants

@albertyfwu

No description provided.

lib/socket.js
((10 lines not shown))
var packet = { type: type, data: data };
this.emit('packetCreate', packet);
this.writeBuffer.push(packet);
+ fn = fn || noop; // if no callback function given
+ this.callbackBuffer.push(fn);
@rauchg Owner
rauchg added a note

It seems to me it would be more efficient to just test for the existence of fn to keep the buffer smaller.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
lib/socket.js
((5 lines not shown))
+ *
+ * @api private
+ */
+
+ Socket.prototype.onDrain = function() {
+ this.callbacks();
+ this.writeBuffer.splice(0, this.prevBufferLen);
+ this.callbackBuffer.splice(0, this.prevBufferLen);
+ // setting prevBufferLen = 0 is very important
+ // for example, when upgrading, upgrade packet is sent over,
+ // and a nonzero prevBufferLen could cause problems on `drain`
+ this.prevBufferLen = 0;
+ if (this.writeBuffer.length == 0) {
+ this.emit('drain');
+ }
+ this.flush();
@rauchg Owner
rauchg added a note

Maybe else here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@rauchg rauchg merged commit 55a089d into from
@rauchg
Owner

Excellent work @albertyfwu

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
Showing with 67 additions and 5 deletions.
  1. +5 −0 README.md
  2. +62 −5 lib/socket.js
View
5 README.md
@@ -95,6 +95,10 @@ Exposed as `eio` in the browser standalone build.
- Fired upon disconnection.
- `error`
- Fired when an error occurs.
+- `flush`
+ - Fired upon completing a buffer flush
+- `drain`
+ - Fired after `drain` event of transport if writeBuffer is empty
#### Methods
@@ -121,6 +125,7 @@ Exposed as `eio` in the browser standalone build.
- Sends a message to the server
- **Parameters**
- `String`: data to send
+ - `Function`: optional, callback upon `drain`
- `close`
- Disconnects the client.
View
67 lib/socket.js
@@ -21,6 +21,14 @@ module.exports = Socket;
var global = util.global();
/**
+ * Noop function.
+ *
+ * @api private
+ */
+
+function noop () {};
+
+/**
* Socket constructor.
*
* @param {String|Object} uri or options
@@ -71,6 +79,7 @@ function Socket(uri, opts){
this.transports = opts.transports || ['polling', 'websocket', 'flashsocket'];
this.readyState = '';
this.writeBuffer = [];
+ this.callbackBuffer = [];
this.policyPort = opts.policyPort || 843;
this.open();
@@ -191,7 +200,7 @@ Socket.prototype.setTransport = function (transport) {
// set up transport listeners
transport
.on('drain', function () {
- self.flush();
+ self.onDrain();
})
.on('packet', function (packet) {
self.onPacket(packet);
@@ -414,6 +423,41 @@ Socket.prototype.ping = function () {
};
/**
+ * Called on `drain` event
+ *
+ * @api private
+ */
+
+ Socket.prototype.onDrain = function() {
+ this.callbacks();
+ this.writeBuffer.splice(0, this.prevBufferLen);
+ this.callbackBuffer.splice(0, this.prevBufferLen);
+ // setting prevBufferLen = 0 is very important
+ // for example, when upgrading, upgrade packet is sent over,
+ // and a nonzero prevBufferLen could cause problems on `drain`
+ this.prevBufferLen = 0;
+ if (this.writeBuffer.length == 0) {
+ this.emit('drain');
+ } else {
+ this.flush();
+ }
+ }
+
+/**
+ * Calls all the callback functions associated with sending packets
+ *
+ * @api private
+ */
+
+Socket.prototype.callbacks = function() {
+ for (var i = 0; i < this.prevBufferLen; i++) {
+ if (this.callbackBuffer[i]) {
+ this.callbackBuffer[i]();
+ }
+ }
+}
+
+/**
* Flush write buffers.
*
* @api private
@@ -424,7 +468,10 @@ Socket.prototype.flush = function () {
!this.upgrading && this.writeBuffer.length) {
debug('flushing %d packets in socket', this.writeBuffer.length);
this.transport.send(this.writeBuffer);
- this.writeBuffer = [];
+ // keep track of current length of writeBuffer
+ // splice writeBuffer and callbackBuffer on `drain`
+ this.prevBufferLen = this.writeBuffer.length;
+ this.emit('flush');
}
};
@@ -432,13 +479,14 @@ Socket.prototype.flush = function () {
* Sends a message.
*
* @param {String} message.
+ * @param {Function} callback function.
* @return {Socket} for chaining.
* @api public
*/
Socket.prototype.write =
-Socket.prototype.send = function (msg) {
- this.sendPacket('message', msg);
+Socket.prototype.send = function (msg, fn) {
+ this.sendPacket('message', msg, fn);
return this;
};
@@ -447,13 +495,15 @@ Socket.prototype.send = function (msg) {
*
* @param {String} packet type.
* @param {String} data.
+ * @param {Function} callback function.
* @api private
*/
-Socket.prototype.sendPacket = function (type, data) {
+Socket.prototype.sendPacket = function (type, data, fn) {
var packet = { type: type, data: data };
this.emit('packetCreate', packet);
this.writeBuffer.push(packet);
+ this.callbackBuffer.push(fn);
this.flush();
};
@@ -495,8 +545,15 @@ Socket.prototype.onError = function (err) {
Socket.prototype.onClose = function (reason, desc) {
if ('opening' == this.readyState || 'open' == this.readyState) {
debug('socket close with reason: "%s"', reason);
+ var self = this;
clearTimeout(this.pingIntervalTimer);
clearTimeout(this.pingTimeoutTimer);
+ // clean buffers in next tick, so developers can still
+ // grab the buffers on `close` event
+ setTimeout(function() {
+ self.writeBuffer = [];
+ self.callbackBuffer = [];
+ }, 0);
this.readyState = 'closed';
this.emit('close', reason, desc);
this.onclose && this.onclose.call(this);
Something went wrong with that request. Please try again.