Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

refactor server tcp handling

- limit on number of tcp connections
- preliminary support for websockets
  • Loading branch information...
commit 51d91ce0e8ba63ac9943c1ae91462a806e89635b 1 parent ab28444
@defunctzombie authored
Showing with 247 additions and 164 deletions.
  1. +59 −35 client.js
  2. +12 −0 lib/rand_id.js
  3. +176 −129 server.js
View
94 client.js
@@ -10,6 +10,9 @@ var argv = require('optimist')
default: 'http://localtunnel.me',
describe: 'upstream server providing forwarding'
})
+ .options('subdomain', {
+ describe: 'request this subdomain'
+ })
.describe('port', 'internal http server port')
.argv;
@@ -29,67 +32,88 @@ var opt = {
var base_uri = 'http://' + opt.host + ':' + opt.port + opt.path;
-var internal;
-var upstream;
-var prev_id;
+var prev_id = argv.subdomain || '';
(function connect_proxy() {
opt.uri = base_uri + ((prev_id) ? prev_id : '?new');
request(opt, function(err, res, body) {
if (err) {
- console.error('upstream not available: %s', err.message);
- return process.exit(-1);
+ console.error('tunnel server not available: %s, retry 1s', err.message);
+
+ // retry interval for id request
+ return setTimeout(function() {
+ connect_proxy();
+ }, 1000);
}
// our assigned hostname and tcp port
var port = body.port;
var host = opt.host;
+ var max_conn = body.max_conn_count || 1;
// store the id so we can try to get the same one
prev_id = body.id;
console.log('your url is: %s', body.url);
- // connect to remote tcp server
- upstream = net.createConnection(port, host);
+ var count = 0;
+
+ // open 5 connections to the localtunnel server
+ // allows for resources to be served faster
+ for (var count = 0 ; count < max_conn ; ++count) {
+ var upstream = duplex(port, host, local_port, 'localhost');
+ upstream.once('end', function() {
+ // all upstream connections have been closed
+ if (--count <= 0) {
+ connect_proxy();
+ }
+ });
+ }
+ });
+})();
- // reconnect internal
- connect_internal();
+function duplex(port, host, local_port, local_host) {
- upstream.on('end', function() {
- console.log('> upstream connection terminated');
+ // connect to remote tcp server
+ var upstream = net.createConnection(port, host);
+ var internal = net.createConnection(local_port, local_host);
- // sever connection to internal server
- // on reconnect we will re-establish
- internal.end();
+ // when upstream connection is closed, close other associated connections
+ upstream.on('end', function() {
+ console.log('> upstream connection terminated');
- setTimeout(function() {
- connect_proxy();
- }, 1000);
- });
+ // sever connection to internal server
+ // on reconnect we will re-establish
+ internal.end();
});
-})();
-function connect_internal() {
+ upstream.on('error', function(err) {
+ console.error(err);
+ });
- internal = net.createConnection(local_port);
- internal.on('error', function(err) {
- console.log('error connecting to local server. retrying in 1s');
+ (function connect_internal() {
- setTimeout(function() {
- connect_internal();
- }, 1000);
- });
+ //internal = net.createConnection(local_port);
+ internal.on('error', function(err) {
+ console.log('error connecting to local server. retrying in 1s');
+ setTimeout(function() {
+ connect_internal();
+ }, 1000);
+ });
- internal.on('end', function() {
- console.log('disconnected from local server. retrying in 1s');
- setTimeout(function() {
- connect_internal();
- }, 1000);
- });
+ internal.on('end', function() {
+ console.log('disconnected from local server. retrying in 1s');
+ setTimeout(function() {
+ connect_internal();
+ }, 1000);
+ });
- upstream.pipe(internal);
- internal.pipe(upstream);
+ upstream.pipe(internal);
+ internal.pipe(upstream);
+ })();
+
+ return upstream;
}
+
View
12 lib/rand_id.js
@@ -0,0 +1,12 @@
+
+var chars = 'abcdefghiklmnopqrstuvwxyz';
+module.exports = function rand_id() {
+ var randomstring = '';
+ for (var i=0; i<4; ++i) {
+ var rnum = Math.floor(Math.random() * chars.length);
+ randomstring += chars[rnum];
+ }
+
+ return randomstring;
+}
+
View
305 server.js
@@ -3,47 +3,24 @@
var http = require('http');
var net = require('net');
var url = require('url');
-var FreeList = require('freelist').FreeList;
-
-var argv = require('optimist')
- .usage('Usage: $0 --port [num]')
- .options('port', {
- default: '80',
- describe: 'listen on this port for outside requests'
- })
- .argv;
-
-if (argv.help) {
- require('optimist').showHelp();
- process.exit();
-}
// here be dragons
var HTTPParser = process.binding('http_parser').HTTPParser;
var ServerResponse = http.ServerResponse;
var IncomingMessage = http.IncomingMessage;
+// vendor
var log = require('book');
-var chars = 'abcdefghiklmnopqrstuvwxyz';
-function rand_id() {
- var randomstring = '';
- for (var i=0; i<4; ++i) {
- var rnum = Math.floor(Math.random() * chars.length);
- randomstring += chars[rnum];
- }
-
- return randomstring;
-}
+// local
+var rand_id = require('./lib/rand_id');
var server = http.createServer();
// id -> client http server
var clients = {};
-// id -> list of sockets waiting for a valid response
-var wait_list = {};
-
+// available parsers
var parsers = http.parsers;
// data going back to a client (the last client that made a request)
@@ -52,23 +29,26 @@ function socketOnData(d, start, end) {
var socket = this;
var req = this._httpMessage;
- var current = clients[socket.subdomain].current;
-
- if (!current) {
- log.error('no current for http response from backend');
+ var response_socket = socket.respond_socket;
+ if (!response_socket) {
+ log.error('no response socket assigned for http response from backend');
return;
}
- // send the goodies
- current.write(d.slice(start, end));
+ // pass the response from our client back to the requesting socket
+ response_socket.write(d.slice(start, end));
- // invoke parsing so we know when all the goodies have been sent
- var parser = current.out_parser;
+ if (socket.for_websocket) {
+ return;
+ }
+
+ // invoke parsing so we know when the response is complete
+ var parser = response_socket.out_parser;
parser.socket = socket;
var ret = parser.execute(d, start, end - start);
if (ret instanceof Error) {
- debug('parse error');
+ log.error(ret);
freeParser(parser, req);
socket.destroy(ret);
}
@@ -99,83 +79,103 @@ server.on('connection', function(socket) {
var self = this;
- var for_client = false;
- var client_id;
-
- var request;
-
+ // parser handles incoming requests for the socket
+ // the request is what lets us know if we proxy or not
var parser = parsers.alloc();
parser.socket = socket;
parser.reinitialize(HTTPParser.REQUEST);
+ function our_request(req) {
+ var res = new ServerResponse(req);
+ res.assignSocket(socket);
+ self.emit('request', req, res);
+ return;
+ }
+
// a full request is complete
// we wait for the response from the server
parser.onIncoming = function(req, shouldKeepAlive) {
log.trace('request', req.url);
- request = req;
- for_client = false;
+ // default is that the data is not for the client
+ delete parser.sock;
+ delete parser.buffer;
+ delete parser.client;
var hostname = req.headers.host;
-
if (!hostname) {
log.trace('no hostname: %j', req.headers);
- // normal processing if not proxy
- var res = new ServerResponse(req);
-
- // TODO(shtylman) skip favicon for now, it caused problems
- if (req.url === '/favicon.ico') {
- return;
- }
-
- res.assignSocket(parser.socket);
- self.emit('request', req, res);
- return;
+ return our_request(req);
}
var match = hostname.match(/^([a-z]{4})[.].*/);
-
if (!match) {
- // normal processing if not proxy
- var res = new ServerResponse(req);
+ return our_request(req);
+ }
- // TODO(shtylman) skip favicon for now, it caused problems
- if (req.url === '/favicon.ico') {
- return;
- }
+ var client_id = match[1];
+ var client = clients[client_id];
+
+ // requesting a subdomain that doesn't exist
+ if (!client) {
+ return socket.end();
+ }
+
+ parser.client = client;
+
+ // assigned socket for the client
+ var sock = client.sockets.shift();
- res.assignSocket(parser.socket);
- self.emit('request', req, res);
+ // no free sockets, queue
+ if (!sock) {
+ parser.buffer = true;
return;
}
- client_id = match[1];
- for_client = true;
+ // for tcp proxying
+ parser.sock = sock;
+
+ // set who we will respond back to
+ sock.respond_socket = socket;
var out_parser = parsers.alloc();
out_parser.reinitialize(HTTPParser.RESPONSE);
socket.out_parser = out_parser;
- // we have a response
- out_parser.onIncoming = function(res) {
+ // we have completed a response
+ // the tcp socket is free again
+ out_parser.onIncoming = function (res) {
res.on('end', function() {
log.trace('done with response for: %s', req.url);
// done with the parser
parsers.free(out_parser);
- var next = wait_list[client_id].shift();
-
- clients[client_id].current = next;
+ // unset the response
+ delete sock.respond_socket;
+ var next = client.waiting.shift();
if (!next) {
+ // return socket to available
+ client.sockets.push(sock);
return;
}
- // write original bytes that we held cause client was busy
- clients[client_id].write(next.queue);
+ // reuse avail socket for next connection
+ sock.respond_socket = next;
+
+ // needed to know when this response will be done
+ out_parser.reinitialize(HTTPParser.RESPONSE);
+ next.out_parser = out_parser;
+
+ // write original bytes we held cause we were busy
+ sock.write(next.queue);
+
+ // continue with other bytes
next.resume();
+
+ return;
});
};
};
@@ -183,68 +183,87 @@ server.on('connection', function(socket) {
// process new data on the client socket
// we may need to forward this it the backend
socket.ondata = function(d, start, end) {
+
+ // run through request parser to determine if we should pass to tcp
+ // onIncoming will be run before this returns
var ret = parser.execute(d, start, end - start);
// invalid request from the user
if (ret instanceof Error) {
- debug('parse error');
+ log.error(ret);
socket.destroy(ret);
return;
}
- // only write data if previous request to this client is done?
- log.trace('%s %s', parser.incoming && parser.incoming.upgrade, for_client);
+ // websocket stuff
+ if (parser.incoming && parser.incoming.upgrade) {
+ log.trace('upgrade request');
- // what if the subdomains are treated differently
- // as individual channels to the backend if available?
- // how can I do that?
+ parser.finish();
- if (parser.incoming && parser.incoming.upgrade) {
- // websocket shit
- }
+ var hostname = parser.incoming.headers.host;
- // wtf do you do with upgraded connections?
+ var match = hostname.match(/^([a-z]{4})[.].*/);
+ if (!match) {
+ return our_request(req);
+ }
- // forward the data to the backend
- if (for_client) {
+ var client_id = match[1];
var client = clients[client_id];
- // requesting a subdomain that doesn't exist
- if (!client) {
- return;
+ var sock = client.sockets.shift();
+ sock.respond_socket = socket;
+ sock.for_websocket = true;
+
+ socket.ondata = function(d, start, end) {
+ sock.write(d.slice(start, end));
+ };
+
+ socket.end = function() {
+ log.trace('websocket end');
+
+ delete sock.respond_socket;
+ client.sockets.push(sock);
}
- // if the client is already processing something
- // then new connections need to go into pause mode
- // and when they are revived, then they can send data along
- if (client.current && client.current !== socket) {
- log.trace('pausing', request.url);
- // prevent new data from gathering for this connection
- // we are waiting for a response to a previous request
- socket.pause();
+ sock.write(d.slice(start, end));
+
+ return;
+ }
+
+ // if no available socket, buffer the request for later
+ if (parser.buffer) {
- var copy = Buffer(end - start);
- d.copy(copy, 0, start, end);
- socket.queue = copy;
+ // pause any further data on this socket
+ socket.pause();
- wait_list[client_id].push(socket);
+ // copy the current data since we have already received it
+ var copy = Buffer(end - start);
+ d.copy(copy, 0, start, end);
+ socket.queue = copy;
- return;
- }
+ // add socket to queue
+ parser.client.waiting.push(socket);
- // this socket needs to receive responses
- client.current = socket;
+ return;
+ }
- // send through tcp tunnel
- client.write(d.slice(start, end));
+ if (!parser.sock) {
+ return;
}
+
+ // assert, respond socket should be set
+
+ // send through tcp tunnel
+ // responses will go back to the respond_socket
+ parser.sock.write(d.slice(start, end));
};
socket.onend = function() {
var ret = parser.finish();
if (ret instanceof Error) {
- log.trace('parse error');
+ log.error(ret);
socket.destroy(ret);
return;
}
@@ -271,8 +290,12 @@ server.on('request', function(req, res) {
if (req.url === '/' && !parsed.query.new) {
res.writeHead(301, { Location: 'http://shtylman.github.com/localtunnel/' });
res.end();
+ return;
}
+ // at this point, the client is requesting a new tunnel setup
+ // either generate an id or use the one they requested
+
var match = req.url.match(/\/([a-z]{4})?/);
// user can request a particular set of characters
@@ -285,17 +308,17 @@ server.on('request', function(req, res) {
}
var id = requested_id || rand_id();
- if (wait_list[id]) {
- // new id
- id = rand_id();
- }
- // generate new shit for client
- if (wait_list[id]) {
- wait_list[id].forEach(function(waiting) {
- waiting.end();
- });
- }
+ // maximum number of tcp connections the client can setup
+ // each tcp channel allows for more parallel requests
+ var max_tcp_sockets = 4;
+
+ // sockets is a list of available sockets for the connection
+ // waiting is?
+ var client = clients[id] = {
+ sockets: [],
+ waiting: []
+ };
var client_server = net.createServer();
client_server.listen(function() {
@@ -305,7 +328,12 @@ server.on('request', function(req, res) {
var url = 'http://' + id + '.' + req.headers.host;
res.writeHead(200, { 'Content-Type': 'application/json' });
- res.end(JSON.stringify({ url: url, id: id, port: port }));
+ res.end(JSON.stringify({
+ url: url,
+ id: id,
+ port: port,
+ max_conn_count: max_tcp_sockets
+ }));
});
// user has 5 seconds to connect before their slot is given up
@@ -313,31 +341,50 @@ server.on('request', function(req, res) {
client_server.close();
}, 5000);
+ // no longer accepting connections for this id
+ client_server.on('close', function() {
+ delete clients[id];
+ });
+
+ var count = 0;
client_server.on('connection', function(socket) {
- // who the info should route back to
- socket.subdomain = id;
+ // no more socket connections allowed
+ if (count++ >= max_tcp_sockets) {
+ return socket.end();
+ }
+
+ log.trace('new connection for id: %s', id);
// multiplexes socket data out to clients
socket.ondata = socketOnData;
+ // no need to close the client server
clearTimeout(conn_timeout);
- log.trace('new connection for id: %s', id);
- clients[id] = socket;
- wait_list[id] = [];
+ // add socket to pool for this id
+ var idx = client.sockets.push(socket) - 1;
+
+ socket.on('close', function(had_error) {
+ count--;
+ client.sockets.splice(idx, 1);
- socket.on('end', function() {
- delete clients[id];
+ // no more sockets for this ident
+ if (client.sockets.length === 0) {
+ delete clients[id];
+ }
+ });
+
+ // close will be emitted after this
+ socket.on('error', function(err) {
+ log.error(err);
});
});
- client_server.on('err', function(err) {
+ client_server.on('error', function(err) {
log.error(err);
});
});
-server.listen(argv.port, function() {
- log.info('server listening on port: %d', server.address().port);
-});
+module.exports = server;
Please sign in to comment.
Something went wrong with that request. Please try again.