Skip to content

Commit

Permalink
Add flow control
Browse files Browse the repository at this point in the history
Not in the "async/fibers/coro" sense of flow control, but in the TCP
backpressure sense.

Pause the stream when a write isn't flushed, and then resume it once the
writable stream drains.
  • Loading branch information
isaacs authored and indexzero committed Sep 8, 2011
1 parent 967884c commit 2b9e09b
Showing 1 changed file with 46 additions and 8 deletions.
54 changes: 46 additions & 8 deletions lib/node-http-proxy.js
Expand Up @@ -539,16 +539,23 @@ HttpProxy.prototype.proxyRequest = function (req, res, options) {
response.on('data', function (chunk) {
if (req.method !== 'HEAD' && res.writable) {
try {
res.write(chunk);
var flushed = res.write(chunk);
} catch (er) {
console.error("res.write error: %s", er.message);
try {
res.end();
} catch (er) {
console.error("res.end error: %s", er.message);
}
return;
}
}
if (!flushed) {
response.pause();
res.once('drain', function () {
response.resume();
});
}
});

// When the `reverseProxy` `response` ends, end the
Expand Down Expand Up @@ -578,7 +585,13 @@ HttpProxy.prototype.proxyRequest = function (req, res, options) {
// `req` write it to the `reverseProxy` request.
req.on('data', function (chunk) {
if (!errState) {
reverseProxy.write(chunk);
var flushed = reverseProxy.write(chunk);
if (!flushed) {
req.pause();
reverseProxy.once('drain', function () {
req.resume();
});
}
}
});

Expand Down Expand Up @@ -646,7 +659,13 @@ HttpProxy.prototype._forwardRequest = function (req) {

// Chunk the client request body as chunks from the proxied request come in
req.on('data', function (chunk) {
forwardProxy.write(chunk);
var flushed = forwardProxy.write(chunk);
if (!flushed) {
req.pause();
forwardProxy.once('drain', function () {
req.resume();
});
}
})

// At the end of the client request, we are going to stop the proxied request
Expand Down Expand Up @@ -741,7 +760,13 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options
if (reverseProxy.incoming.socket.writable) {
try {
self.emit('websocket:outgoing', req, socket, head, data);
reverseProxy.incoming.socket.write(data);
var flushed = reverseProxy.incoming.socket.write(data);
if (!flushed) {
proxySocket.pause();
reverseProxy.incoming.socket.once('drain', function () {
proxySocket.resume();
});
}
}
catch (e) {
detach();
Expand All @@ -758,7 +783,13 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options
reverseProxy.incoming.socket.on('data', listeners.onOutgoing = function(data) {
try {
self.emit('websocket:incoming', reverseProxy, reverseProxy.incoming, head, data);
proxySocket.write(data);
var flushed = proxySocket.write(data);
if (!flushed) {
reverseProxy.incoming.socket.pause();
proxySocket.once('drain', function () {
reverseProxy.incoming.socket.resume();
});
}
}
catch (e) {
detach();
Expand Down Expand Up @@ -918,7 +949,14 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options
//
self.emit('websocket:handshake', req, socket, head, sdata, data);
socket.write(sdata);
socket.write(data);
var flushed = socket.write(data);
if (!flushed) {
reverseProxy.socket.pause();
socket.once('drain', function () {
reverseProxy.socket.resume();
});
}

}
catch (ex) {
proxyError(ex)
Expand All @@ -935,9 +973,9 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, options
reverseProxy.on('error', proxyError);

try {
//
// Attempt to write the upgrade-head to the reverseProxy request.
//
// This is small, and there's only ever one of it.
// No need for pause/resume.
reverseProxy.write(head);
}
catch (ex) {
Expand Down

0 comments on commit 2b9e09b

Please sign in to comment.