Permalink
Browse files

initial commit

  • Loading branch information...
0 parents commit feaf80da497394ff090ad71ed63463a7b3b87ed4 @bmeck committed Aug 1, 2011
@@ -0,0 +1,5 @@
+.DS_Store
+build/
+node_modules
+*.log
+
@@ -0,0 +1,15 @@
+#!/usr/local/bin/node
+var Kitsune = require('../');
+function toPort(x) {
+ return (x = Number(x)) >= 0 ? x : false;
+}
+var port = toPort(process.argv[2]) || 8585;
+var kit = new Kitsune({insecure: true});
+var oldEmit = kit.emit;
+kit.emit = function emit(event) {
+ console.log('EVENT',event);
+ console.dir.apply(console,[].slice.call(arguments,1));
+ oldEmit.apply(this, arguments);
+}
+kit.listen(port);
+console.log('Kistune REST API listening on HTTP with PORT ', kit.servers[0].address().port);
@@ -0,0 +1,98 @@
+var EventEmitter = require('events').EventEmitter;
+var util = require('util');
+var http = require('http');
+var https = require('https');
+var journey = require('journey');
+var Balancer = require('./kitsune/balancer');
+function toPort(x) {
+ return (x = Number(x)) >= 0 ? x : false;
+}
+
+function Kitsune(options) {
+ options = options || {};
+ EventEmitter.call(this);
+ var self = this;
+
+ this.insecure = options.insecure || false;
+ this.router = new journey.Router();
+ this.balancer = new Balancer();
+ this.servers = [];
+
+ var oldEmit = this.balancer.emit;
+ this.balancer.emit = function emit() {
+ self.emit.apply(self, arguments);
+ return oldEmit.apply(this, arguments);
+ }
+
+ this.setupRouter();
+}
+util.inherits(Kitsune, EventEmitter);
+module.exports = Kitsune;
+
+Kitsune.prototype.setupRouter = function setupRouter() {
+ var self = this;
+ this.router.map(function () {
+ this.root.bind(function (req, res) { res.send(404) });
+ this.get('mappings').bind(function (req, res) {
+ self.balancer.mappings(function (err, mappings) {
+ if(err) res.send(err.code || 500, {}, err);
+ else res.send(200, {}, mappings);
+ });
+ });
+ this.post('/proxy').bind(function (req, res, data) {
+ console.dir(arguments)
+ self.balancer.provision(data.desiredPort, data.actualPort, data.actualHost, null, function (err) {
+ if(err) res.send(err.code || 500, err);
+ else res.send(201);
+ });
+ });
+ this.del('/proxy').bind(function (req, res, data) {
+ self.balancer.release(data.desiredPort, data.actualPort, data.actualHost, null, function (err) {
+ if(err) res.send(err.code || 500, err);
+ else res.send(200);
+ });
+ });
+ });
+}
+
+Kitsune.prototype.listen = function listen() {
+ var self = this;
+ function onRequest(request, response) {
+ var body = "";
+
+ request.addListener('data', function (chunk) { body += chunk });
+ request.addListener('end', function () {
+ self.router.handle(request, body, function (result) {
+ response.writeHead(result.status, result.headers);
+ response.end(result.body);
+ });
+ });
+ }
+ var server;
+ if(typeof arguments[0] === 'object' && arguments[0] instanceof http.Server) {
+ server = arguments[0];
+ if(!this.insecure && server instanceof htts.Server) {
+ throw new Error('Cannot bind to an insecure server when Kitsune is not set to insecure.');
+ }
+ server.on('request', onRequest);
+ }
+ else if(toPort(arguments[0])) {
+ if(this.insecure) {
+ server = http.createServer(onRequest);
+ }
+ else {
+ server = https.createServer(onRequest);
+ }
+ server.listen.apply(server, arguments);
+ }
+ else {
+ throw new Error('Do not know how listen to these arguments.');
+ }
+ this.emit('server::listen', server);
+ this.servers.push(server);
+
+ server.on('close', function() {
+ this.emit('server::close', server);
+ self.servers.splice(self.servers.indexOf(server), 1);
+ });
+}
@@ -0,0 +1,83 @@
+function BalancerTable() {
+ //this.ips = BalancerTable
+ //this.domains = BalancerTable
+ this.other = [
+ //{port:,host:}
+ ];
+}
+exports.BalancerTable = BalancerTable;
+
+BalancerTable.prototype.balance = function balance(ip, domain, callback) {
+ var balancer = this;
+ if(ip) {
+ balancer = this.ips[ip];
+ if(!balancer) {
+ return callback(true);
+ }
+ else {
+ return balancer.balance(ip, domain, callback)
+ }
+ }
+ if(domain) {
+ balancer = this.domains[domain];
+ if(!balancer) {
+ return callback(true);
+ }
+ else {
+ return balancer.balance(ip, domain, callback)
+ }
+ }
+ if(!balancer.other.length) {
+ return callback(true);
+ }
+ var descriptor = balancer.other.shift();
+ balancer.other.push(descriptor);
+ return callback(false, descriptor);
+}
+BalancerTable.prototype.addForward = function addForward(descriptor) {
+ this.other.push(descriptor);
+}
+Balancer.prototype.dropForward = function dropForward(descriptor) {
+ var index = 0;
+ for(;index < this.other.length; index++) {
+ var forward = this.other[index];
+ if(descriptor.host == forward.host
+ && descriptor.port == forward.port) {
+ break;
+ }
+ }
+ if(index === this.other.length) {
+ return;
+ }
+ this.other.splice(index, 1);
+}
+BalancerTable.prototype.addDomainForward = function addDomainForward(domain, descriptor) {
+ if(!this.domains) this.domains = {};
+ if(!this.domains[domain]) this.domains[domain] = new BalancerTable();
+ this.domains[domain].addForward(descriptor);
+}
+BalancerTable.prototype.dropDomainForward = function dropDomainForward(domain, descriptor) {
+ if(!this.domains) return;
+ if(this.domains[domain]) this.domains[domain].dropForward(descriptor);
+}
+BalancerTable.prototype.addIPForward = function addIPForward(ip, domain, descriptor) {
+ if(!this.ips) this.ips = {};
+ if(!this.ips[ip]) this.ips[ip] = new BalancerTable();
+ if(descriptor) {
+ this.ips[ip].addDomainForward(domain, descriptor);
+ }
+ else {
+ this.ips[ip].addForward(descriptor);
+ }
+}
+BalancerTable.prototype.dropIPForward = function dropIPForward(ip, domain, descriptor) {
+ if(!this.ips) return;
+ if(this.ips[ip]) {
+ if(domain) {
+ this.ips[ip].dropDomainForward(descriptor);
+ }
+ else {
+ this.ips[ips].dropForward(descriptor);
+ }
+ }
+}
@@ -0,0 +1,154 @@
+var EventEmitter = require('events').EventEmitter;
+var net = require('net');
+var dgram = require('dgram');
+var util = require('util');
+var BalancerTable = require('./balancerTable').BalancerTable;
+function toPort(x) {
+ return (x = Number(x)) >= 0 ? x : false;
+}
+
+//
+// TODO : Select more protocols [TLS, HTTP, HTTPS]
+// : Filter protocols per port / server
+// : Options like manual heartbeats
+//
+function Balancer(options) {
+ EventEmitter.call(this);
+ options = options || {};
+ this.defaultHost = options.defaultHost || '127.0.0.1';
+ this.servers = {
+ // port : TieredServer
+ };
+ this.httpProxy = new httpProxy.HttpProxy(options);
+}
+util.inherits(Balancer, EventEmitter);
+module.exports = Balancer;
+
+
+Balancer.prototype.provision = function provision (desiredPort, actualPort, actualHost, options, callback) {
+ desiredPort = toPort(desiredPort);
+ actualPort = toPort(actualPort);
+ if(!desiredPort || !actualPort) {
+ var err = new Error('Data invalid');
+ err.code = 400;
+ callback(err);
+ return;
+ }
+ var self = this;
+ var descriptor = {
+ port: actualPort,
+ host: actualHost || this.defaultHost,
+ options: options
+ };
+
+ if(!self.servers[desiredPort]) {
+ var tieredServer = getTieredServer({
+ http: function(req, res) {
+ var balancers = self.servers[desiredPort]
+ if(balancers) {
+ balancers.http.balance(req.connection.remoteAddress, req.headers.host, function(err, descriptor) {
+ if(err) {
+ res.writeHead(404);
+ return res.end();
+ }
+ return self.httpProxy.proxyRequest(req, res, descriptor.host, descriptor.port);
+ });
+ }
+ },
+ https: function(req, res) {
+ var balancers = self.servers[desiredPort]
+ if(balancers) {
+ balancers.https.balance(req.connection.remoteAddress, req.headers.host, function(err, descriptor) {
+ if(err) {
+ res.writeHead(404);
+ return res.end();
+ }
+ return self.httpProxy.proxyRequest(req, res, descriptor.host, descriptor.port);
+ });
+ }
+ },
+ tcp: function(err) {this.end()},
+ tls: function(clearText, encrypted) {clearText.end()}
+ })
+
+ self.servers[desiredPort] = {
+ http: new BalancerTable(),
+ https: new BalancerTable(),
+ //tcp: new BalancerTable(),
+ //tls: new BalancerTable(),
+ server: server
+ }
+
+ try {
+ server.listen(desiredPort);
+ }
+ catch(e) {
+ e.code = 500;
+ if(callback) callback(e)
+ return;
+ }
+ }
+
+ var balancers = self.servers[desiredPort];
+ var protocols = this.protocolsFromOptions(options);
+ for(var i = 0; i < protocols.length; i++) {
+ if(options.ip) {
+ balancers[protocols[i]].addIPForward(options.ip, options.domain, descriptor);
+ }
+ else if(options.domain) {
+ balancers[protocols[i]].addDomainForward(options.domain, descriptor);
+ }
+ else {
+ balancers[protocols[i]].addForward(descriptor);
+ }
+ }
+
+
+ self.emit('balancer::proxy::provision', desiredPort, descriptor);
+ if(callback) callback(false);
+}
+
+Balancer.prototype.protocolsFromOptions = function protocolsFromOptions(options) {
+ var protocols = [];
+ if(options.domain) {
+ protocols.filter(function(protocol){
+ return ['tcp', 'tls'].indexOf(protocol) === -1;
+ });
+ }
+ if(options.secure) {
+ protocols.filter(function(protocol){
+ return ['tcp', 'http'].indexOf(protocol) === -1;
+ });
+ }
+ return protocols;
+}
+
+
+Balancer.prototype.release = function release (desiredPort, actualPort, actualHost, options, callback) {
+ var self = this;
+ var descriptor = {
+ port: actualPort,
+ host: actualHost || this.defaultHost,
+ options: options
+ };
+ var balancers = self.servers[desiredPort];
+ var protocols = this.protocolsFromOptions(options);
+ for(var i = 0; i < protocols.length; i++) {
+ if(options.ip) {
+ balancers[protocols[i]].dropIPForward(options.ip, options.domain, descriptor);
+ }
+ else if(options.domain) {
+ balancers[protocols[i]].dropDomainForward(options.domain, descriptor);
+ }
+ else {
+ balancers[protocols[i]].dropForward(descriptor);
+ }
+ }
+ self.emit('balancer::proxy::release', desiredPort, descriptor);
+ if(callback) callback(false);
+}
+
+
+Balancer.prototype.mappings = function mappings(callback) {
+ callback(false, this.ports);
+}
@@ -0,0 +1,32 @@
+exports.createBalancerServer = createBalancerServer;
+function createBalancerServer(port, balancer) {
+ getTieredServer({
+ https: function(req, res) {
+ balancer.resolveHttps(port, req, res, function(err) {
+ if(err) {
+ balancer.resolveTls(port, req.connection.pair.cleattext, req.connection.pair.encrypted)
+ }
+ });
+ },
+ http: function() {
+ balancer.resolveHttp(port, req, res, function(err) {
+ if(err) {
+ balancer.resolveTcp(port, req.connection.pair.cleattext, req.connection.pair.encrypted)
+ }
+ });
+ },
+ tcp: function() {
+ balancer.resolveTcp(port, this)
+ },
+ tls: function(clearText, encrypted) {
+ balancer.resolveTls(port, clearText, encrypted, function(err) {
+ if(err) {
+ balancer.resolveTcp(port, req.connection.pair.cleattext, req.connection.pair.encrypted)
+ }
+ });
+ },
+ udp: function() {
+ balancer.resolveUdp(port, this);
+ }
+ }).listen(port);
+}
Oops, something went wrong.

0 comments on commit feaf80d

Please sign in to comment.