Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #141 from albertyfwu/bug136

add support for callbacks with socket.send()
  • Loading branch information...
commit 55a089d01cb2be8b2b3c8de8b240b7b78ea6d967 2 parents 6836902 + 63f29da
@rauchg rauchg authored
Showing with 67 additions and 5 deletions.
  1. +5 −0 README.md
  2. +62 −5 lib/socket.js
View
5 README.md
@@ -96,6 +96,10 @@ Exposed as `eio` in the browser standalone build.
- Fired upon disconnection.
- `error`
- Fired when an error occurs.
+- `flush`
+ - Fired upon completing a buffer flush
+- `drain`
+ - Fired after `drain` event of transport if writeBuffer is empty
#### Methods
@@ -122,6 +126,7 @@ Exposed as `eio` in the browser standalone build.
- Sends a message to the server
- **Parameters**
- `String`: data to send
+ - `Function`: optional, callback upon `drain`
- `close`
- Disconnects the client.
View
67 lib/socket.js
@@ -21,6 +21,14 @@ module.exports = Socket;
var global = util.global();
/**
+ * Noop function.
+ *
+ * @api private
+ */
+
+function noop () {};
+
+/**
* Socket constructor.
*
* @param {String|Object} uri or options
@@ -71,6 +79,7 @@ function Socket(uri, opts){
this.transports = opts.transports || ['polling', 'websocket', 'flashsocket'];
this.readyState = '';
this.writeBuffer = [];
+ this.callbackBuffer = [];
this.policyPort = opts.policyPort || 843;
this.open();
@@ -191,7 +200,7 @@ Socket.prototype.setTransport = function (transport) {
// set up transport listeners
transport
.on('drain', function () {
- self.flush();
+ self.onDrain();
})
.on('packet', function (packet) {
self.onPacket(packet);
@@ -414,6 +423,41 @@ Socket.prototype.ping = function () {
};
/**
+ * Called on `drain` event
+ *
+ * @api private
+ */
+
+ Socket.prototype.onDrain = function() {
+ this.callbacks();
+ this.writeBuffer.splice(0, this.prevBufferLen);
+ this.callbackBuffer.splice(0, this.prevBufferLen);
+ // setting prevBufferLen = 0 is very important
+ // for example, when upgrading, upgrade packet is sent over,
+ // and a nonzero prevBufferLen could cause problems on `drain`
+ this.prevBufferLen = 0;
+ if (this.writeBuffer.length == 0) {
+ this.emit('drain');
+ } else {
+ this.flush();
+ }
+ }
+
+/**
+ * Calls all the callback functions associated with sending packets
+ *
+ * @api private
+ */
+
+Socket.prototype.callbacks = function() {
+ for (var i = 0; i < this.prevBufferLen; i++) {
+ if (this.callbackBuffer[i]) {
+ this.callbackBuffer[i]();
+ }
+ }
+}
+
+/**
* Flush write buffers.
*
* @api private
@@ -424,7 +468,10 @@ Socket.prototype.flush = function () {
!this.upgrading && this.writeBuffer.length) {
debug('flushing %d packets in socket', this.writeBuffer.length);
this.transport.send(this.writeBuffer);
- this.writeBuffer = [];
+ // keep track of current length of writeBuffer
+ // splice writeBuffer and callbackBuffer on `drain`
+ this.prevBufferLen = this.writeBuffer.length;
+ this.emit('flush');
}
};
@@ -432,13 +479,14 @@ Socket.prototype.flush = function () {
* Sends a message.
*
* @param {String} message.
+ * @param {Function} callback function.
* @return {Socket} for chaining.
* @api public
*/
Socket.prototype.write =
-Socket.prototype.send = function (msg) {
- this.sendPacket('message', msg);
+Socket.prototype.send = function (msg, fn) {
+ this.sendPacket('message', msg, fn);
return this;
};
@@ -447,13 +495,15 @@ Socket.prototype.send = function (msg) {
*
* @param {String} packet type.
* @param {String} data.
+ * @param {Function} callback function.
* @api private
*/
-Socket.prototype.sendPacket = function (type, data) {
+Socket.prototype.sendPacket = function (type, data, fn) {
var packet = { type: type, data: data };
this.emit('packetCreate', packet);
this.writeBuffer.push(packet);
+ this.callbackBuffer.push(fn);
this.flush();
};
@@ -495,8 +545,15 @@ Socket.prototype.onError = function (err) {
Socket.prototype.onClose = function (reason, desc) {
if ('opening' == this.readyState || 'open' == this.readyState) {
debug('socket close with reason: "%s"', reason);
+ var self = this;
clearTimeout(this.pingIntervalTimer);
clearTimeout(this.pingTimeoutTimer);
+ // clean buffers in next tick, so developers can still
+ // grab the buffers on `close` event
+ setTimeout(function() {
+ self.writeBuffer = [];
+ self.callbackBuffer = [];
+ }, 0);
this.readyState = 'closed';
this.emit('close', reason, desc);
this.onclose && this.onclose.call(this);
Please sign in to comment.
Something went wrong with that request. Please try again.