Permalink
Browse files

Merge "Merge branch '0.3.x' of ssh://pomelo.163.com:29418/pomelo into…

… 0.3.x"
  • Loading branch information...
Charlie Edward Gerrit Code Review
Charlie Edward authored and Gerrit Code Review committed Jan 22, 2013
2 parents baa5845 + b0c74c6 commit 9f156a327282e53a3d5998c80b1b1b4b2f991dbf
@@ -67,7 +67,7 @@ Component.prototype.stop = function(force, cb) {
process.nextTick(cb);
};
var getConnector = function(app, connector) {
var getConnector = function(app, connector, opts) {
if(!connector) {
return getDefaultConnector(app);
}
@@ -77,7 +77,7 @@ var getConnector = function(app, connector) {
}
var curServer = app.getCurServer();
return connector(curServer.wsPort, curServer.host);
return connector(curServer.wsPort, curServer.host, opts);
};
var getDefaultConnector = function(app) {
@@ -0,0 +1,81 @@
var protocol = require('pomelo-protocol');
var CODE_OK = 200;
var CODE_OLD_CLIENT = 300;
var CODE_USE_ERROR = 500;
var PKG_HANDSHAKE = 1; // handshake package
/**
* Process the handshake request.
*
* @param {Object} opts option parameters
* opts.handshake(msg, cb(err, resp)) handshake callback. msg is the handshake message from client.
* opts.hearbeat heartbeat interval (level?)
* opts.version required client level
*/
var Command = function(opts) {
opts = opts || {};
this.userHandshake = opts.handshake;
this.heartbeat = opts.heartbeat;
this.version = opts.version;
};
module.exports = Command;
Command.prototype.handle = function(socket, msg) {
if(!checkClientVersion(msg)) {
processError(socket, CODE_OLD_CLIENT);
return;
}
var heartbeat = setupHeartbeat(msg);
if(typeof this.userHandshake === 'function') {
this.userHandshake(msg, function(err, resp) {
if(err) {
process.nextTick(function() {
processError(socket, CODE_USE_ERROR);
});
return;
}
process.nextTick(function() {
response(socket, heartbeat, resp);
});
});
}
process.nextTick(function() {
response(socket, heartbeat);
});
};
var checkClientVersion = function(self, msg) {
return true;
};
var setupHeartbeat = function(self, msg) {
return self.heartbeat;
};
var response = function(socket, heartbeat, resp) {
var res = {
sys: {
heartbeat: heartbeat
}
};
if(resp) {
res.user = resp;
}
socket.handshakeResponse(protocol.encode(PKG_HANDSHAKE, JSON.stringify(res)));
};
var processError = function(socket, code) {
var res = {
code: code
};
socket.sendForce(PKG_HANDSHAKE, protocol.encode(JSON.stringify(res)));
};
@@ -0,0 +1,35 @@
var protocol = require('pomelo-protocol');
var PKG_HEARTBEAT = 2; // heartbeat package
/**
* Process hearbeat request.
*
* @param {Object} opts option request
* opts.hearbeat heartbeat interval
*/
var Command = function(opts) {
opts = opts || {};
this.heartbeat = opts.heartbeat;
this.timeouts = {};
};
module.exports = Command;
Command.prototype.handle = function(socket) {
this.clear(socket.id);
socket.send(protocol.encode(PKG_HEARTBEAT, ''));
this.timeouts[socket.id] = setTimeout(function() {
socket.close();
}, this.heartbeat);
};
Command.prototype.clear = function(id) {
var tid = this.timeouts[id];
if(tid) {
clearTimeout(tid);
}
};
@@ -11,7 +11,7 @@ var ST_CLOSED = 2;
*/
var Processor = function() {
EventEmitter.call(this);
this.mockServer = new MockServer();
this.mockServer = new HttpServer();
var self = this;
this.wsServer = new WebSocketServer({server: this.mockServer});
@@ -44,9 +44,3 @@ Processor.prototype.close = function() {
this.wsServer = null;
this.mockServer = null;
};
var MockServer = function() {
HttpServer.call(this);
};
util.inherits(MockServer, HttpServer);
@@ -3,21 +3,28 @@ var EventEmitter = require('events').EventEmitter;
var net = require('net');
var HybiSocket = require('./hybisocket');
var Switcher = require('./hybi/switcher');
var Handshake = require('./commands/handshake');
var Heartbeat = require('./commands/heartbeat');
var curId = 1;
/**
* Connector that manager low level connection and protocol bewteen server and client.
* Develper can provide their own connector to switch the low level prototol, such as tcp or probuf.
*/
var Connector = function(port) {
var Connector = function(port, host, opts) {
if (!(this instanceof Connector)) {
return new Connector(port);
return new Connector(port, host, opts);
}
EventEmitter.call(this);
this.port = port;
opts = opts || {};
this.handshake = new Handshake(opts);
this.heartbeat = new Heartbeat(opts);
this.switcher = null;
};
@@ -34,7 +41,18 @@ Connector.prototype.start = function() {
this.switcher = new Switcher(this.tcpServer);
this.switcher.on('connection', function(socket) {
self.emit('connection', new HybiSocket(curId++, socket));
var hybisocket = new HybiSocket(curId++, socket);
hybisocket.on('handshake',
self.handshake.handle.bind(self.handshake, socket));
hybisocket.on('heartbeat',
self.heartbeat.handle.bind(self.heartbeat, socket));
hybisocket.on('disconnect',
self.hearbeat.clear.bind(self.hearbeat, socket.id));
self.emit('connection', hybisocket);
});
this.tcpServer.listen(this.port);
@@ -3,7 +3,16 @@ var EventEmitter = require('events').EventEmitter;
var protocol = require('pomelo-protocol');
var ST_INITED = 0;
var ST_CLOSED = 1;
var ST_WAIT_ACK = 1;
var ST_WORKING = 2;
var ST_CLOSED = 3;
/**
* Package types
*/
var PKG_HANDSHAKE = 1; // handshake package
var PKG_HEARTBEAT = 2; // heartbeat package
var PKG_DATA = 3; // data package
/**
* Socket class that wraps socket and websocket to provide unified interface for up level.
@@ -25,9 +34,8 @@ var Socket = function(id, socket) {
socket.on('message', function(msg) {
if(msg) {
msg = protocol.decode(msg);
handle(self, msg);
}
self.emit('message', msg);
});
this.state = ST_INITED;
@@ -40,17 +48,72 @@ util.inherits(Socket, EventEmitter);
module.exports = Socket;
Socket.prototype.send = function(msg) {
if(this.state !== ST_INITED) {
if(this.state !== ST_WORKING) {
return;
}
this.socket.send(msg);
};
/**
* Send message to client no matter whether handshake.
*
* @api private
*/
Socket.prototype.sendForce = function(msg) {
if(this.state === ST_CLOSED) {
return;
}
this.socket.send(msg);
};
/**
* Response handshake request
*
* @api private
*/
Socket.prototype.handshakeResponse = function(resp) {
if(this.state !== ST_INITED) {
return;
}
this.socket.send(resp);
this.state = ST_WAIT_ACK;
};
Socket.prototype.disconnect = function() {
if(this.state === ST_CLOSED) {
return;
}
this.state = ST_CLOSED;
this.socket.close();
};
var handle = function(socket, msg) {
var handler = handlers[msg.flag];
if(handler) {
handler(socket, msg);
}
};
var handleHandshake = function(socket, msg) {
socket.emit('handshake', msg.buffer.toString('utf8'));
};
var handleHandshakeAck = function(socket, msg) {
this.state = ST_WORKING;
this.emit('heartbeat');
};
var handleHeartbeat = function(socket, msg) {
socket.emit('heartbeat');
};
var handleData = function(socket, msg) {
socket.emit('message', msg.buffer);
};
var handlers = {
PKG_HANDSHAKE: handleHandshake,
PKG_HEARTBEAT: handleHeartbeat,
PKG_DATA: handleData
};
View
@@ -12,7 +12,7 @@
"mkdirp": "0.3.3",
"pomelo-loader":"0.0.4",
"pomelo-rpc":"0.1.0",
"pomelo-protocol":">=0.0.3",
"pomelo-protocol":"0.0.5",
"pomelo-admin":"0.0.19",
"pomelo-monitor":">=0.3.5",
"pomelo-logger": "0.0.2",

0 comments on commit 9f156a3

Please sign in to comment.