Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

clear writeBuffer in each `drain` + clear writeBuffer after nextTick #95

Closed
wants to merge 18 commits into from

2 participants

@afshinm

This pull request contains new changes for issue #89 and issue #59.

test/server.js
((6 lines not shown))
var engine = listen({ allowUpgrades: false }, function (port) {
- // hack to access the sockets created by node-xmlhttprequest
@rauchg Owner
rauchg added a note

Why did you undo this test?

@rauchg Owner
rauchg added a note

This change was needed when we upgraded node-xmlhttprequest to 1.5.0

@afshinm
afshinm added a note

Sorry. I think I have some conflicts in my github repository, let me check it again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@afshinm

Ok, conflicts resolved. Please check them again.

@afshinm afshinm commented on the diff
lib/socket.js
@@ -24,6 +24,7 @@ function Socket (id, server, transport) {
this.readyState = 'opening';
this.writeBuffer = [];
this.packetsFn = [];
+ this.packetLengths = [];
@afshinm
afshinm added a note

In order to keep packets length

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@afshinm afshinm commented on the diff
lib/socket.js
((13 lines not shown))
var self = this;
- //the message was sent successfully, execute the callback
- this.transport.on('drain', function() {
+ //the message was sent successfully...
+ this.transport.on('drain', function() {
+ //Remove the object from writeBuffer and packetLengths
+ for (var i in self.packetLengths) {
+ self.writeBuffer.splice(0, self.packetLengths[i]);
+ self.packetLengths.splice(0, 1);
@afshinm
afshinm added a note

Remove received packets from writeBuffer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@afshinm afshinm commented on the diff
lib/socket.js
@@ -286,12 +299,21 @@ Socket.prototype.sendPacket = function (type, data, callback) {
Socket.prototype.flush = function () {
if ('closed' != this.readyState && this.transport.writable
- && this.writeBuffer.length) {
+ && this.writeBuffer.length && this.writeBuffer.length > this.packetLengths.length) {
@afshinm
afshinm added a note

If there's a new packet that we didn't send it to client. This condition occurs when transport is lock and writeBuffer is growing in memory.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@rauchg rauchg closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Sep 28, 2012
  1. @afshinm
  2. @afshinm
  3. @afshinm
Commits on Oct 1, 2012
  1. @afshinm

    more code readability

    afshinm authored
Commits on Oct 2, 2012
  1. @afshinm

    fix bad indentation

    afshinm authored
  2. @afshinm

    fix indentation

    afshinm authored
Commits on Oct 12, 2012
  1. @afshinm
  2. @afshinm

    more code readability

    afshinm authored
  3. @afshinm

    fix bad indentation

    afshinm authored
  4. @afshinm

    fix indentation

    afshinm authored
  5. @afshinm

    clean writeBuffer in nextTick

    afshinm authored
  6. @afshinm
  7. @afshinm

    rebase with master + add new test

    afshinm authored
    add new test for clean writeBuffer in nextTick.
Commits on Oct 13, 2012
  1. @afshinm

    more code readability

    afshinm authored
  2. @afshinm

    fix indentation

    afshinm authored
  3. @afshinm

    fix conflicts

    afshinm authored
Commits on Oct 14, 2012
  1. @afshinm

    fix conflicts

    afshinm authored
  2. @afshinm

    add two tests again

    afshinm authored
This page is out of date. Refresh to see the latest.
Showing with 77 additions and 11 deletions.
  1. +33 −11 lib/socket.js
  2. +44 −0 test/server.js
View
44 lib/socket.js
@@ -24,6 +24,7 @@ function Socket (id, server, transport) {
this.readyState = 'opening';
this.writeBuffer = [];
this.packetsFn = [];
+ this.packetLengths = [];
@afshinm
afshinm added a note

In order to keep packets length

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
this.setTransport(transport);
this.onOpen();
@@ -137,8 +138,8 @@ Socket.prototype.setTransport = function (transport) {
this.transport.on('packet', this.onPacket.bind(this));
this.transport.on('drain', this.flush.bind(this));
this.transport.once('close', this.onClose.bind(this, 'transport close'));
- //this function will manage packet events (also message callbacks)
- this.setupSendCallback();
+ //This function will manage packet events (Message callbacks etc.)
+ this.setupPacketEventManager();
};
/**
@@ -217,6 +218,11 @@ Socket.prototype.clearTransport = function () {
Socket.prototype.onClose = function (reason, description) {
if ('closed' != this.readyState) {
+ var self = this;
+ //to clean the writeBuffer in next tick (loop), so developers can still grab the writeBuffer in onClose
+ process.nextTick(function() {
+ self.writeBuffer = [];
+ });
this.packetsFn = [];
this.clearTransport();
this.readyState = 'closed';
@@ -225,15 +231,22 @@ Socket.prototype.onClose = function (reason, description) {
};
/**
- * Setup and manage send callback
+ * Setup and manage message events.
+ * We use this method for manage the packets events.
*
* @api private
*/
-Socket.prototype.setupSendCallback = function () {
+Socket.prototype.setupPacketEventManager = function () {
var self = this;
- //the message was sent successfully, execute the callback
- this.transport.on('drain', function() {
+ //the message was sent successfully...
+ this.transport.on('drain', function() {
+ //Remove the object from writeBuffer and packetLengths
+ for (var i in self.packetLengths) {
+ self.writeBuffer.splice(0, self.packetLengths[i]);
+ self.packetLengths.splice(0, 1);
@afshinm
afshinm added a note

Remove received packets from writeBuffer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ }
+ //Executing the message callback
if (self.packetsFn.length > 0) {
var seqFn = self.packetsFn.splice(0,1)[0];
if ('function' == typeof seqFn) {
@@ -286,12 +299,21 @@ Socket.prototype.sendPacket = function (type, data, callback) {
Socket.prototype.flush = function () {
if ('closed' != this.readyState && this.transport.writable
- && this.writeBuffer.length) {
+ && this.writeBuffer.length && this.writeBuffer.length > this.packetLengths.length) {
@afshinm
afshinm added a note

If there's a new packet that we didn't send it to client. This condition occurs when transport is lock and writeBuffer is growing in memory.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
debug('flushing buffer to transport');
- this.emit('flush', this.writeBuffer);
- this.server.emit('flush', this, this.writeBuffer);
- this.transport.send(this.writeBuffer);
- this.writeBuffer = [];
+ //Sum all not drained packets length
+ var notDrainedLength = 0;
+ for (var i = 0; i < this.packetLengths.length; ++i) {
+ //So, we have some packets that still are available in writeBuffer. It means that client didn't send any ack about this packets.
+ notDrainedLength += this.packetLengths[i];
+ }
+ //Slice the correct writeBuffer object with the packetLengths and current writeBuffer length
+ var packet = this.writeBuffer.slice(notDrainedLength, this.writeBuffer.length - this.packetLengths.length + notDrainedLength);
+ //we keep writeBuffer length(s) in order to slice them when transport drains
+ this.packetLengths.push(packet.length);
+ this.emit('flush', packet);
+ this.server.emit('flush', this, packet);
+ this.transport.send(packet);
this.emit('drain');
this.server.emit('drain', this);
}
View
44 test/server.js 100644 → 100755
@@ -722,6 +722,50 @@ describe('server', function () {
new eioc.Socket('ws://localhost:%d'.s(port));
});
});
+
+ it('should not clear writeBuffer in every flush', function(done) {
+ var engine = listen({ allowUpgrades: false }, function(port) {
+ engine.on('connection', function(socket) {
+ var msg = "boo";
+
+ socket.on('drain', function() {
+ expect(socket.writeBuffer.length).not.to.be(0);
+ expect(socket.writeBuffer[0].data).to.be(msg);
+
+ //waiting for client to receive message
+ setTimeout(function() {
+ expect(socket.writeBuffer.length).to.be(0);
+ done();
+ }, 100);
+ });
+
+ socket.send(msg);
+ });
+ new eioc.Socket('ws://localhost:%d'.s(port));
+ });
+ });
+
+ it('should still available when connection closed', function (done) {
+ var opts = { allowUpgrades: false, transports: ['websocket'] };
+ var engine = listen(opts, function (port) {
+ var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['websocket'] });
+ var msg = "boo";
+
+ engine.on('connection', function (conn) {
+ socket.on('open', function () {
+ //send message
+ conn.send(msg);
+ //force close
+ conn.close();
+ });
+ conn.on('close', function () {
+ expect(conn.writeBuffer.length).to.be(1);
+ expect(conn.writeBuffer[0].data).to.be(msg);
+ done();
+ });
+ });
+ });
+ });
});
describe('send', function() {
Something went wrong with that request. Please try again.