Skip to content

Commit

Permalink
[api] Update WebSocket support to use http.Agent APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
indexzero committed Apr 16, 2011
1 parent 5681fc1 commit b0b0183
Showing 1 changed file with 96 additions and 122 deletions.
218 changes: 96 additions & 122 deletions lib/node-http-proxy.js
Expand Up @@ -136,7 +136,7 @@ exports.createServer = function () {
server.on('upgrade', function(req, socket, head) {
// Tunnel websocket requests too

proxy.proxyWebSocketRequest(port, host);
proxy.proxyWebSocketRequest(req, socket, head, port, host);
});
}

Expand Down Expand Up @@ -444,91 +444,111 @@ HttpProxy.prototype._forwardRequest = function (req) {
});
};

HttpProxy.prototype.proxyWebSocketRequest = function (port, server, host, data) {
var self = this, req = self.req, socket = self.sock, head = self.head,
headers = new _headers(req.headers), CRLF = '\r\n';

// Will generate clone of headers
// To not change original
function _headers(headers) {
var h = {};
for (var i in headers) {
h[i] = headers[i];
}
return h;
}
HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, port, host, buffer) {
var self = this, CRLF = '\r\n';

// WebSocket requests has method = GET
if (req.method !== 'GET' || headers.upgrade.toLowerCase() !== 'websocket') {
if (req.method !== 'GET' || req.headers.upgrade.toLowerCase() !== 'websocket') {
// This request is not WebSocket request
return;
}

// Turn of all bufferings
// For server set KeepAlive
// For client set encoding
function _socket(socket, server) {
function _socket(socket, keepAlive) {
socket.setTimeout(0);
socket.setNoDelay(true);
if (server) {
if (keepAlive) {
socket.setKeepAlive(true, 0);
}
else {
socket.setEncoding('utf8');
}
}

function onUpgrade(reverseProxy) {
var listeners = {};

// We're now connected to the server, so lets change server socket
reverseProxy.on('data', listeners._r_data = function(data) {
// Pass data to client
if (socket.writable) {
try {
socket.write(data);
}
catch (e) {
socket.end();
reverseProxy.end();
}
}
});

socket.on('data', listeners._data = function(data) {
// Pass data from client to server
try {
reverseProxy.write(data);
}
catch (e) {
reverseProxy.end();
socket.end();
}
});

// Detach event listeners from reverseProxy
function detach() {
reverseProxy.removeListener('close', listeners._r_close);
reverseProxy.removeListener('data', listeners._r_data);
socket.removeListener('data', listeners._data);
socket.removeListener('close', listeners._close);
}

// Hook disconnections
reverseProxy.on('end', listeners._r_close = function() {
socket.end();
detach();
});

socket.on('end', listeners._close = function() {
reverseProxy.end();
detach();
});
};

// Client socket
_socket(socket);

// If host is undefined
// Get it from headers
if (!host) {
host = headers.Host;
}

// Remote host address
var remote_host = server + (port - 80 === 0 ? '' : ':' + port);
var remoteHost = host + (port - 80 === 0 ? '' : ':' + port),
agent = _getAgent(host, port);

// Change headers
headers.Host = remote_host;
headers.Origin = 'http://' + remote_host;

// Open request
var p = manager.getPool(port, server);

p.getClient(function(client) {
// Based on 'pool/main.js'
var request = client.request('GET', req.url, headers);

var errorListener = function (error) {
client.removeListener('error', errorListener);

// Remove the client from the pool's available clients since it has errored
p.clients.splice(p.clients.indexOf(client), 1);
socket.end();
}

// Not disconnect on update
client.on('upgrade', function(request, remote_socket, head) {
// Prepare socket
_socket(remote_socket, true);
req.headers.host = remoteHost;
req.headers.origin = 'http://' + host;

var opts = {
host: host,
port: port,
agent: agent,
method: 'GET',
path: req.url,
headers: req.headers
}

// Emit event
onUpgrade(remote_socket);
});
// Make the outgoing WebSocket request
var request = http.request(opts, function () { });

// Not disconnect on update
agent.on('upgrade', function(request, remoteSocket, head) {
// Prepare socket
_socket(remoteSocket, true);

client.on('error', errorListener);
request.on('response', function (response) {
response.on('end', function () {
client.removeListener('error', errorListener);
client.busy = false;
p.onFree(client);
})
})
client.busy = true;

var handshake;
// Emit event
onUpgrade(remoteSocket);
});

var handshake;
if (typeof request.socket !== 'undefined') {
request.socket.on('data', handshake = function(data) {
// Handshaking

Expand All @@ -547,8 +567,8 @@ HttpProxy.prototype.proxyWebSocketRequest = function (port, server, host, data)
data = data.slice(Buffer.byteLength(sdata), data.length);

// Replace host and origin
sdata = sdata.replace(remote_host, host)
.replace(remote_host, host);
sdata = sdata.replace(remoteHost, host)
.replace(remoteHost, host);

try {
// Write printable
Expand All @@ -570,65 +590,19 @@ HttpProxy.prototype.proxyWebSocketRequest = function (port, server, host, data)
// Remove data listener now that the 'handshake' is complete
request.socket.removeListener('data', handshake);
});
}

// Write upgrade-head
try {
request.write(head);
}
catch(e) {
request.end();
socket.end();
}
self.unwatch(socket);
});

// Request

function onUpgrade(reverse_proxy) {
var listeners = {};

// We're now connected to the server, so lets change server socket
reverse_proxy.on('data', listeners._r_data = function(data) {
// Pass data to client
if (socket.writable) {
try {
socket.write(data);
}
catch (e) {
socket.end();
reverse_proxy.end();
}
}
});

socket.on('data', listeners._data = function(data) {
// Pass data from client to server
try {
reverse_proxy.write(data);
}
catch (e) {
reverse_proxy.end();
socket.end();
}
});

// Detach event listeners from reverse_proxy
function detach() {
reverse_proxy.removeListener('close', listeners._r_close);
reverse_proxy.removeListener('data', listeners._r_data);
socket.removeListener('data', listeners._data);
socket.removeListener('close', listeners._close);
}

// Hook disconnections
reverse_proxy.on('end', listeners._r_close = function() {
socket.end();
detach();
});

socket.on('end', listeners._close = function() {
reverse_proxy.end();
detach();
});
};
// Write upgrade-head
try {
request.write(head);
}
catch (ex) {
request.end();
socket.end();
}

// If we have been passed buffered data, resume it.
if (buffer && !errState) {
buffer.resume();
}
};

0 comments on commit b0b0183

Please sign in to comment.