Permalink
Browse files

version control

  • Loading branch information...
0 parents commit a0254f7e22fdcfa502e838d7b1c2a9eec315e1bf @bpot committed Sep 10, 2010
@@ -0,0 +1,100 @@
+node-msgpack-rpc
+================
+
+node-msgpack-rpc is an implementation of the [Msgpack-RPC](http://redmine.msgpack.org/projects/msgpack/wiki/RPCDesign) protocol specification for node.js. Msgpack-RPC is built ontop of the very fast [MessagePack](http://msgpack.org) serialization format. This implementation supports tcp and unix socket transports (it may one day support UDP).
+
+Simple Usage
+------------
+
+The easiest way to create a server is with a handler object. All incoming calls will be invoked on the handler object:
+
+ var handler = {
+ 'add' : function(a, b, response) {
+ response.result( a + b );
+ }
+ }
+
+ var rpc = require('msgpack-rpc');
+ rpc.createServer();
+ rpc.setHandler(handler);
+ rpc.listen(8000);
+
+a corresponding client might look like:
+
+ var c = rpc.createClient(8000, '127.0.0.1', function() {
+ c.invoke('add', 5, 4, function(err, response) {
+ assert.equal(9, response);
+ c.close();
+ }
+ });
+
+
+Without a handler
+-----------------
+
+ rpc.createServer(function(rpc_stream) {
+ rpc_stream.on('request', function(method, params, response) {
+ if(method == 'add') {
+ response.result( params[0] + params[1] );
+ } else {
+ response.error("unknown method!");
+ }
+ }
+
+ rpc_stream.on('notify', function(method, params) {
+ console.log("recieved notification: " + method);
+ });
+ });
+ rpc.listen(8000);
+
+Session Pool
+------------
+
+This module also provides a session pool which allows you to re-use client connections:
+
+ var sp = new SesssionPool();
+ sp.getClient(8000, '127.0.0.1').invoke('hello','world', function(err, response) { ... });;
+ sp.getClient(8001, '127.0.0.1').invoke('hello','world', function(err, response) { ... });;
+
+ // Uses same tcp connection as above
+ sp.getClient(8000, '127.0.0.1').invoke('goodbye','world', function(err, response) { ... });;
+
+ sp.closeClients();
+
+Installation
+------------
+
+First you will need to install the [node-msgpack](http://github.com/pgriess/node-msgpack) add-on
+
+To install node-msgpack-rpc with npm:
+
+ git clone http://github.com/bpot/node-msgpack-rpc/
+ cd node-msgpack-rpc
+ npm link .
+
+
+RPC Stream API
+--------------
+
+Clients and the streams passed to servers for incoming connections are both instances of MsgpackRPCStream.
+
+Methods
+
+ c.createClient(port, [hostname], [ready_cb]);
+ c.invoke(method, [param1, param2, ...], cb);
+ c.notify(method, [param1, param2, ...]);
+ c.setTimeout(milliseconds); // Setting this will cause requests to fail with err "timeout" if they don't recieve a response for the specified period
+ c.close(); // Close the socket for this client
+ c.stream // underlying net.Stream object
+
+Events
+
+ 'ready' // emitted when we've connected to the server
+ 'request' // recieved request
+ 'notify' // recieved notification
+
+
+TODO
+----
+* UDP Support?
+
@@ -0,0 +1,18 @@
+var rpc = require('msgpack-rpc');
+
+var handler = {
+ 'n' : 0,
+ 'echo' : function(data, response) {
+ response.result(data);
+ },
+ 'hello' : function(data) {
+ this.n += 1;
+ if(this.n % 100000 == 0) {
+ console.log(this.n)
+ }
+ }
+}
+
+var server = rpc.createServer();
+server.setHandler(handler);
+server.listen(8000);
@@ -0,0 +1,21 @@
+var rpc = require('msgpack-rpc');
+var assert = require('assert');
+
+var client = rpc.createClient(8000);
+client.on('ready', function() {
+ var count = 0;
+ setInterval(function() {
+ var b = "asdkjfhksjadfhskdjflksdjlfewiurowieurowieuroi";
+ for(var i = 0;i < 10000;i++) {
+ client.invoke('echo', b, function(err, response) {
+ if(!err) {
+ count += 1;
+ var the_buf = response;
+ assert.equal(b.length, the_buf.length);
+ } else {
+ console.log("call failed");
+ }
+ });
+ }
+ }, 500);
+});
@@ -0,0 +1,12 @@
+var rpc = require('msgpack-rpc');
+var assert = require('assert');
+
+var client = rpc.createClient(8000);
+client.on('ready', function() {
+ var count = 0;
+ setInterval(function() {
+ for(var i = 0;i < 10000;i++) {
+ client.notify('hello', "world")
+ }
+ }, 500);
+});
@@ -0,0 +1,195 @@
+var net = require('net'),
+ msgpack = require('msgpack'),
+ events = require('events'),
+ sys = require('sys');
+
+var REQUEST = 0;
+var RESPONSE = 1;
+var NOTIFY = 2;
+var MAX_SEQID = Math.pow(2,32)-1;
+
+function RPCResponse (stream, seqid) {
+ this.stream = stream;
+ this.seqid = seqid;
+}
+
+RPCResponse.prototype.result = function(args) {
+ this.stream.respond(this.seqid, null, args);
+}
+
+RPCResponse.prototype.error = function(error) {
+ this.stream.respond(this.seqid, error, null);
+}
+
+// The heart of the beast, used for both server and client
+var MsgpackRPCStream = function(stream, handler) {
+ events.EventEmitter.call(this);
+ var self = this;
+ this.last_seqid = undefined;
+ this.stream = stream;
+ this.handler = handler;
+ this.cbs = [];
+ this.timeout = undefined;
+
+ this.msgpack_stream = new msgpack.Stream(this.stream);
+ this.msgpack_stream.on('msg', function(msg) {
+ var type = msg.shift();
+ switch(type) {
+ case REQUEST:
+ var seqid = msg[0];
+ var method = msg[1];
+ var params = msg[2];
+ var response = new RPCResponse(self, seqid);
+
+ self.invokeHandler(method, params.concat(response));
+ self.emit('request', method, params, response);
+ break;
+ case RESPONSE:
+ var seqid = msg[0];
+ var error = msg[1];
+ var result = msg[2];
+
+ if(self.cbs[seqid]) {
+ self.triggerCb(seqid, [error, result]);
+ } else {
+ self.emit('error', new Error("unexpected response with unrecognized seqid (" + seqid + ")"))
+ }
+ break;
+ case NOTIFY:
+ var method = msg[0];
+ var params = msg[1];
+
+ self.invokeHandler(method, params);
+ self.emit('notify', method, params);
+ break;
+ }
+ });
+ this.stream.on('connect', function() { self.emit('ready'); });
+
+ // Failures
+ this.stream.on('end', function() { self.stream.end(); self.failCbs(new Error("connection closed by peer")); });
+ this.stream.on('timeout', function() { self.failCbs(new Error("connection timeout")); });
+ this.stream.on('error', function(error) { self.failCbs(error); });
+ this.stream.on('close', function(had_error) {
+ if(had_error) return;
+ self.failCbs(new Error("connection closed locally"));
+ });
+}
+
+sys.inherits(MsgpackRPCStream, events.EventEmitter);
+
+MsgpackRPCStream.prototype.triggerCb = function(seqid, args) {
+ this.cbs[seqid].apply(this, args);
+ delete this.cbs[seqid];
+}
+
+MsgpackRPCStream.prototype.failCbs = function(error) {
+ for(var seqid in this.cbs) {
+ this.triggerCb(seqid, [error])
+ }
+}
+
+MsgpackRPCStream.prototype.invokeHandler = function(method, params) {
+ if(this.handler) {
+ if(this.handler[method]) {
+ this.handler[method].apply(this.handler, params);
+ } else {
+ response.error(new Error("unknown method"));
+ }
+ }
+}
+
+MsgpackRPCStream.prototype.nextSeqId = function() {
+ if(this.last_seqid == undefined) {
+ return this.last_seqid = 0;
+ } else if(this.last_seqid > MAX_SEQID ) {
+ return this.last_seqid = 0;
+ } else {
+ return this.last_seqid += 1;
+ }
+}
+
+MsgpackRPCStream.prototype.invoke = function() {
+ var self = this;
+ var seqid = this.nextSeqId();
+ var method = arguments[0];
+ var cb = arguments[arguments.length - 1];
+ var args = [];
+ for(var i = 1;i < arguments.length - 1;i++) {
+ args.push(arguments[i]);
+ }
+
+ this.cbs[seqid] = cb;
+ if(this.timeout) {
+ setTimeout(function() { if(self.cbs[seqid]) self.triggerCb(seqid, ["timeout"]); }, this.timeout);
+ }
+ if(this.stream.writable) { return this.msgpack_stream.send([REQUEST, seqid, method, args]) };
+}
+
+MsgpackRPCStream.prototype.respond = function(seqid, error, result) {
+ if(this.stream.writable) { return this.msgpack_stream.send([RESPONSE, seqid, error, result]) };
+}
+
+MsgpackRPCStream.prototype.notify = function(method, params) {
+ var method = arguments[0];
+ var args = [];
+ for(var i = 1;i < arguments.length;i++) {
+ args.push(arguments[i]);
+ }
+
+ if(this.stream.writable) { return this.msgpack_stream.send([NOTIFY, method, args]) };
+}
+
+MsgpackRPCStream.prototype.setTimeout = function(timeout) {
+ this.timeout = timeout;
+}
+
+MsgpackRPCStream.prototype.close = function() {
+ this.stream.end();
+}
+
+exports.createClient = function(port, hostname,cb) {
+ var s = new MsgpackRPCStream(new net.createConnection(port, hostname));
+ if(typeof hostname == 'function') s.on('ready', hostname);
+ if(cb) s.on('ready', cb);
+
+ return s;
+}
+
+var Server = function(listener) {
+ net.Server.call(this);
+ var self = this;
+ this.handler = undefined;
+
+ this.on('connection', function(stream) {
+ stream.on('end', function() { stream.end(); });
+ var rpc_stream = new MsgpackRPCStream(stream, self.handler);
+ if(listener) listener(rpc_stream);
+ });
+}
+sys.inherits(Server, net.Server);
+
+Server.prototype.setHandler = function(handler) {
+ this.handler = handler;
+}
+
+exports.createServer = function(handler) {
+ return new Server(handler);
+}
+
+var SessionPool = exports.SessionPool = function() {
+ this.clients = {};
+};
+
+SessionPool.prototype.getClient = function(port, hostname) {
+ var address = hostname + ":" + port;
+ if(this.clients[address]) {
+ return this.clients[address];
+ } else {
+ return this.clients[address] = exports.createClient(port, hostname);
+ }
+};
+
+SessionPool.prototype.closeClients = function() {
+ for(var i in this.clients) this.clients[i].stream.end();
+};
@@ -0,0 +1,11 @@
+{
+ 'name' : 'msgpack-rpc',
+ 'version' : '0.0.1',
+ 'description' : 'describe stuff',
+ 'homepage' : 'http://github.com/bpot/node-msgpack-rpc',
+ 'main' : './msgpack-rpc',
+ 'repository' :
+ { "type" : "git"
+ , "url" : "http://github.com/bpot/node-msgpack-rpc.git"
+ }
+}
Oops, something went wrong. Retry.

0 comments on commit a0254f7

Please sign in to comment.