Skip to content

Commit

Permalink
Merge 3beef7f into 5cfaff2
Browse files Browse the repository at this point in the history
  • Loading branch information
dfellis committed Sep 19, 2018
2 parents 5cfaff2 + 3beef7f commit 7b2e83d
Show file tree
Hide file tree
Showing 19 changed files with 826 additions and 921 deletions.
46 changes: 23 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,46 +30,46 @@ If you want to use the ``jsonrpc-repl`` binary, also
## Library Usage

```js
var jsonrpc = require('multitransport-jsonrpc'); // Get the multitransport JSON-RPC suite
const jsonrpc = require('multitransport-jsonrpc'); // Get the multitransport JSON-RPC suite

var Server = jsonrpc.server; // The server constructor function
var Client = jsonrpc.client; // The client constructor function
const Server = jsonrpc.server; // The server constructor function
const Client = jsonrpc.client; // The client constructor function

var ServerHttp = jsonrpc.transports.server.http; // The server HTTP transport constructor function
var ServerTcp = jsonrpc.transports.server.tcp; // The server TCP transport constructor function
var ServerMiddleware = jsonrpc.transports.server.middleware; // The server Middleware transport constructor function (for Express/Connect)
var Loopback = jsonrpc.transports.shared.loopback; // The Loopback transport for mocking clients/servers in tests
const ServerHttp = jsonrpc.transports.server.http; // The server HTTP transport constructor function
const ServerTcp = jsonrpc.transports.server.tcp; // The server TCP transport constructor function
const ServerMiddleware = jsonrpc.transports.server.middleware; // The server Middleware transport constructor function (for Express/Connect)
const Loopback = jsonrpc.transports.shared.loopback; // The Loopback transport for mocking clients/servers in tests

var ClientHttp = jsonrpc.transports.client.http;
var ClientTcp = jsonrpc.transports.client.tcp;
const ClientHttp = jsonrpc.transports.client.http;
const ClientTcp = jsonrpc.transports.client.tcp;

// Setting up servers
var jsonRpcHttpServer = new Server(new ServerHttp(8000), {
const jsonRpcHttpServer = new Server(new ServerHttp(8000), {
loopback: function(obj, callback) { callback(undefined, obj); }
});

var jsonRpcTcpServer = new Server(new ServerTcp(8001), {
const jsonRpcTcpServer = new Server(new ServerTcp(8001), {
loopback: function(obj, callback) { callback(undefined, obj); }
});

var express = require('express');
var app = express();
const express = require('express');
const app = express();
app.use(express.bodyParser());
var jsonRpcMiddlewareServer = new Server(new ServerMiddleware(), {
const jsonRpcMiddlewareServer = new Server(new ServerMiddleware(), {
loopback: function(obj, callback) { callback(undefined, obj); }
});
app.use('/rpc', jsonRpcMiddlewareServer.transport.middleware);
app.listen(8002);

var loopback = new Loopback();
var jsonRpcLoopbackServer = new Server(loopback, {
const loopback = new Loopback();
const jsonRpcLoopbackServer = new Server(loopback, {
loopback: function(obj, callback) { callback(undefined, obj); }
});

// Setting up and using the clients

// Either explicitly register the remote methods
var jsonRpcHttpClient = new Client(new ClientHttp('localhost', 8000));
const jsonRpcHttpClient = new Client(new ClientHttp('localhost', 8000));
jsonRpcHttpClient.register('loopback');
jsonRpcHttpClient.loopback('foo', function(err, val) {
console.log(val); // Prints 'foo'
Expand All @@ -82,7 +82,7 @@ new Client(new ClientTcp('localhost', 8001), {}, function(jsonRpcTcpClient) {
});
});

var jsonRpcExpressClient = new Client(new ClientHttp('localhost', 8002, { path: '/rpc' }));
const jsonRpcExpressClient = new Client(new ClientHttp('localhost', 8002, { path: '/rpc' }));
jsonRpcExpressClient.register('loopback');
jsonRpcExpressClient.loopback('foo', function(err, val) {
console.log(val); // Prints 'foo'
Expand All @@ -95,11 +95,11 @@ new Client(loopback, {}, function(jsonRpcLoopbackClient) {
});

// The server can run multiple transports simultaneously, too
var jsonRpcMultitransportServer = new Server([new ServerTcp(8000), new ServerHttp(8080)], {
const jsonRpcMultitransportServer = new Server([new ServerTcp(8000), new ServerHttp(8080)], {
loopback: function(obj, callback) { callback(undefined, obj); }
});
var client1 = new Client(new ClientTcp('localhost', 8000));
var client2 = new Client(new ClientHttp('localhost', 8080));
const client1 = new Client(new ClientTcp('localhost', 8000));
const client2 = new Client(new ClientHttp('localhost', 8080));
```

### Constructor Function Parameters
Expand Down Expand Up @@ -297,8 +297,8 @@ function foo(bar, baz, callback) {
Alternately, the JSON-RPC server provides a ``blocking`` method that can be used to mark a function as a blocking function that takes no callback. Then the result is returned and errors are thrown.

```js
var blocking = jsonrpc.server.blocking;
var blockingFoo = blocking(function(bar, baz) {
const blocking = jsonrpc.server.blocking;
const blockingFoo = blocking(function(bar, baz) {
if(!baz) {
throw new Error('no baz!');
} else {
Expand Down
6 changes: 3 additions & 3 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ function JSONRPC(transport, options, done) {
// remote method.
JSONRPC.prototype.request = function(method, args, callback) {
// The *contents* variable contains the JSON-RPC 1.0 POST string.
var requestId = this.idGenerator()
const requestId = this.idGenerator()
if(!requestId || requestId === null) {
if(callback instanceof Function) {
callback(new Error('Request id generator function should return an id'))
return
}
}

var contents = {
const contents = {
method: method,
params: args,
id: requestId
Expand All @@ -70,7 +70,7 @@ JSONRPC.prototype.request = function(method, args, callback) {
callback(response)
} else if(response.error) {
if(response.error.message) {
var err = new Error(response.error.message)
const err = new Error(response.error.message)
Object.keys(response.error).forEach(function(key) {
if(key !== 'message') err[key] = response.error[key]
})
Expand Down
197 changes: 96 additions & 101 deletions lib/transports/client/childProcess.js
Original file line number Diff line number Diff line change
@@ -1,123 +1,118 @@
var util = require('util')
var EventEmitter = require('events').EventEmitter
var zlib = require('zlib')
const EventEmitter = require('events').EventEmitter
const zlib = require('zlib')

// The Client ChildProcessTransport constructor function
function ChildProcessTransport(child, config) {
// Initialize the Node EventEmitter on this
EventEmitter.call(this)

config = config || {}
this.requests = {}
this.killChildOnShutdown = typeof(config.killChildOnShutdown) === 'boolean' ? config.killChildOnShutdown : true
this.timeout = config.timeout || 30*1000
this.sweepTime = config.sweepTime || 1*1000
this.sweepInterval = setInterval(this.sweep.bind(this), this.sweepTime)
this.compressed = config.compressed || false
this.compressLength = config.compressLength || 0
this.child = child

function uncompressedMessageHandler(message) {
if (message && this.requests[message.id]) {
this.requests[message.id].callback(message)
delete this.requests[message.id]
}
function uncompressedMessageHandler(message) {
if (message && this.requests[message.id]) {
this.requests[message.id].callback(message)
delete this.requests[message.id]
}
}

function compressedMessageHandler(message) {
if (message && message.charAt(0) === 'z') {
var buf = new Buffer(message.substring(1), 'base64')
zlib.gunzip(buf, function(err, uncompressedJSON) {
if (err) return this.emit('error', err.message)
var obj = JSON.parse(uncompressedJSON.toString('utf8'))
if (obj && this.requests[obj.id]) {
this.requests[obj.id].callback(obj)
delete this.requests[obj.id]
}
}.bind(this))
} else {
var json = JSON.parse(message)
if (this.requests[json.id]) {
this.requests[json.id].callback(json)
delete this.requests[json.id]
function compressedMessageHandler(message) {
if (message && message.charAt(0) === 'z') {
const buf = new Buffer(message.substring(1), 'base64')
zlib.gunzip(buf, (err, uncompressedJSON) => {
if (err) return this.emit('error', err.message)
const obj = JSON.parse(uncompressedJSON.toString('utf8'))
if (obj && this.requests[obj.id]) {
this.requests[obj.id].callback(obj)
delete this.requests[obj.id]
}
})
} else {
const json = JSON.parse(message)
if (this.requests[json.id]) {
this.requests[json.id].callback(json)
delete this.requests[json.id]
}
}
this.child.on('message', this.compressed ? compressedMessageHandler.bind(this) : uncompressedMessageHandler.bind(this))
this.child.on('exit', function(code, signal) {
this.emit('exit', code, signal)
this.shutdown()
}.bind(this))
this.child.on('error', function(e) {
this.emit('error', e)
this.shutdown()
}.bind(this))

return this
}

// Attach the EventEmitter prototype as the ChildProcessTransport's prototype's prototype
util.inherits(ChildProcessTransport, EventEmitter)
class ChildProcessTransport extends EventEmitter {
constructor(child, config) {
super()

// The request logic is relatively straightforward, given the request
// body and callback function, register the request with the requests
// object, then if there is a valid connection at the moment, send the
// request to the server with a null terminator attached. This ordering
// guarantees that requests called during a connection issue won't be
// lost while a connection is re-established.
ChildProcessTransport.prototype.request = function request(body, callback) {
this.requests[body.id] = {
callback: callback,
body: body,
timestamp: Date.now()
config = config || {}
this.requests = {}
this.killChildOnShutdown = typeof(config.killChildOnShutdown) === 'boolean' ? config.killChildOnShutdown : true
this.timeout = config.timeout || 30*1000
this.sweepTime = config.sweepTime || 1*1000
this.sweepInterval = setInterval(this.sweep.bind(this), this.sweepTime)
this.compressed = config.compressed || false
this.compressLength = config.compressLength || 0
this.child = child

this.child.on('message', this.compressed ? compressedMessageHandler.bind(this) : uncompressedMessageHandler.bind(this))
this.child.on('exit', function(code, signal) {
this.emit('exit', code, signal)
this.shutdown()
}.bind(this))
this.child.on('error', function(e) {
this.emit('error', e)
this.shutdown()
}.bind(this))
}
if (this.child) {
if (this.compressed) {
var jsonStr = JSON.stringify(body)
if (!this.compressLength || jsonStr.length > this.compressLength) {
zlib.gzip(new Buffer(JSON.stringify(body)), function(err, compressedJSON) {
if (err) return this.emit('error', err.message)
this.child.send('z' + compressedJSON.toString('base64'))
}.bind(this))

// The request logic is relatively straightforward, given the request
// body and callback function, register the request with the requests
// object, then if there is a valid connection at the moment, send the
// request to the server with a null terminator attached. This ordering
// guarantees that requests called during a connection issue won't be
// lost while a connection is re-established.
request(body, callback) {
this.requests[body.id] = {
callback: callback,
body: body,
timestamp: Date.now()
}
if (this.child) {
if (this.compressed) {
const jsonStr = JSON.stringify(body)
if (!this.compressLength || jsonStr.length > this.compressLength) {
zlib.gzip(new Buffer(JSON.stringify(body)), (err, compressedJSON) => {
if (err) return this.emit('error', err.message)
this.child.send('z' + compressedJSON.toString('base64'))
})
} else {
this.child.send(jsonStr)
}
} else {
this.child.send(jsonStr)
this.child.send(body)
}
} else {
this.child.send(body)
}
}
}

// The sweep function looks at the timestamps for each request, and any
// request that is longer lived than the timeout (default 2 min) will be
// culled and assumed lost.
ChildProcessTransport.prototype.sweep = function sweep() {
var now = Date.now()
var cannedRequests = {}
for(var key in this.requests) {
if(this.requests[key].timestamp && this.requests[key].timestamp + this.timeout < now) {
this.requests[key].callback({ error: 'Request Timed Out' })
cannedRequests[key] = this.requests[key]
delete this.requests[key]
// The sweep method looks at the timestamps for each request, and any
// request that is longer lived than the timeout (default 2 min) will be
// culled and assumed lost.
sweep() {
const now = Date.now()
const cannedRequests = {}
for (const key in this.requests) {
if (this.requests[key].timestamp && this.requests[key].timestamp + this.timeout < now) {
this.requests[key].callback({ error: 'Request Timed Out' })
cannedRequests[key] = this.requests[key]
delete this.requests[key]
}
}
this.emit('sweep', cannedRequests)
}
this.emit('sweep', cannedRequests)
}

// When shutting down the client connection, the sweep is turned off, the
// requests are removed, the number of allowed retries is set to zero, the
// connection is ended, and a callback, if any, is called.
ChildProcessTransport.prototype.shutdown = function shutdown(done) {
clearInterval(this.sweepInterval)
this.requests = {}
if(this.killChildOnShutdown) {
if(this.child) this.child.kill()
delete this.child
} else {
this.child.disconnect()
// When shutting down the client connection, the sweep is turned off, the
// requests are removed, the number of allowed retries is set to zero, the
// connection is ended, and a callback, if any, is called.
shutdown(done) {
clearInterval(this.sweepInterval)
this.requests = {}
if (this.killChildOnShutdown) {
if (this.child) this.child.kill()
delete this.child
} else {
this.child.disconnect()
}
this.emit('shutdown')
if (done instanceof Function) done()
}
this.emit('shutdown')
if(done instanceof Function) done()
}

// Export the client ChildProcessTransport
Expand Down

0 comments on commit 7b2e83d

Please sign in to comment.