diff --git a/openvidu-browser/src/OpenViduInternal/KurentoUtils/kurento-jsonrpc/RpcBuilder.js b/openvidu-browser/src/OpenViduInternal/KurentoUtils/kurento-jsonrpc/RpcBuilder.js new file mode 100644 index 0000000000..7b10da1584 --- /dev/null +++ b/openvidu-browser/src/OpenViduInternal/KurentoUtils/kurento-jsonrpc/RpcBuilder.js @@ -0,0 +1,747 @@ +/* + * (C) Copyright 2014 Kurento (http://kurento.org/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +var defineProperty_IE8 = false +if (Object.defineProperty) { + try { + Object.defineProperty({}, "x", {}); + } catch (e) { + defineProperty_IE8 = true + } +} + +// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Function/bind +if (!Function.prototype.bind) { + Function.prototype.bind = function (oThis) { + if (typeof this !== 'function') { + // closest thing possible to the ECMAScript 5 + // internal IsCallable function + throw new TypeError('Function.prototype.bind - what is trying to be bound is not callable'); + } + + var aArgs = Array.prototype.slice.call(arguments, 1), + fToBind = this, + fNOP = function () {}, + fBound = function () { + return fToBind.apply(this instanceof fNOP && oThis ? + this : + oThis, + aArgs.concat(Array.prototype.slice.call(arguments))); + }; + + fNOP.prototype = this.prototype; + fBound.prototype = new fNOP(); + + return fBound; + }; +} + + +var EventEmitter = require('events').EventEmitter; + +var inherits = require('inherits'); + +var Mapper = require('./Mapper'); + + +var BASE_TIMEOUT = 5000; + + +function unifyResponseMethods(responseMethods) { + if (!responseMethods) return {}; + + for (var key in responseMethods) { + var value = responseMethods[key]; + + if (typeof value == 'string') + responseMethods[key] = { + response: value + } + }; + + return responseMethods; +}; + +function unifyTransport(transport) { + if (!transport) return; + + // Transport as a function + if (transport instanceof Function) + return { + send: transport + }; + + // WebSocket & DataChannel + if (transport.send instanceof Function) + return transport; + + // Message API (Inter-window & WebWorker) + if (transport.postMessage instanceof Function) { + transport.send = transport.postMessage; + return transport; + } + + // Stream API + if (transport.write instanceof Function) { + transport.send = transport.write; + return transport; + } + + // Transports that only can receive messages, but not send + if (transport.onmessage !== undefined) return; + if (transport.pause instanceof Function) return; + + throw new SyntaxError("Transport is not a function nor a valid object"); +}; + + +/** + * Representation of a RPC notification + * + * @class + * + * @constructor + * + * @param {String} method -method of the notification + * @param params - parameters of the notification + */ +function RpcNotification(method, params) { + if (defineProperty_IE8) { + this.method = method + this.params = params + } else { + Object.defineProperty(this, 'method', { + value: method, + enumerable: true + }); + Object.defineProperty(this, 'params', { + value: params, + enumerable: true + }); + } +}; + + +/** + * @class + * + * @constructor + * + * @param {object} packer + * + * @param {object} [options] + * + * @param {object} [transport] + * + * @param {Function} [onRequest] + */ +function RpcBuilder(packer, options, transport, onRequest) { + var self = this; + + if (!packer) + throw new SyntaxError('Packer is not defined'); + + if (!packer.pack || !packer.unpack) + throw new SyntaxError('Packer is invalid'); + + var responseMethods = unifyResponseMethods(packer.responseMethods); + + + if (options instanceof Function) { + if (transport != undefined) + throw new SyntaxError("There can't be parameters after onRequest"); + + onRequest = options; + transport = undefined; + options = undefined; + }; + + if (options && options.send instanceof Function) { + if (transport && !(transport instanceof Function)) + throw new SyntaxError("Only a function can be after transport"); + + onRequest = transport; + transport = options; + options = undefined; + }; + + if (transport instanceof Function) { + if (onRequest != undefined) + throw new SyntaxError("There can't be parameters after onRequest"); + + onRequest = transport; + transport = undefined; + }; + + if (transport && transport.send instanceof Function) + if (onRequest && !(onRequest instanceof Function)) + throw new SyntaxError("Only a function can be after transport"); + + options = options || {}; + + + EventEmitter.call(this); + + if (onRequest) + this.on('request', onRequest); + + + if (defineProperty_IE8) + this.peerID = options.peerID + else + Object.defineProperty(this, 'peerID', { + value: options.peerID + }); + + var max_retries = options.max_retries || 0; + + + function transportMessage(event) { + self.decode(event.data || event); + }; + + this.getTransport = function () { + return transport; + } + this.setTransport = function (value) { + // Remove listener from old transport + if (transport) { + // W3C transports + if (transport.removeEventListener) + transport.removeEventListener('message', transportMessage); + + // Node.js Streams API + else if (transport.removeListener) + transport.removeListener('data', transportMessage); + }; + + // Set listener on new transport + if (value) { + // W3C transports + if (value.addEventListener) + value.addEventListener('message', transportMessage); + + // Node.js Streams API + else if (value.addListener) + value.addListener('data', transportMessage); + }; + + transport = unifyTransport(value); + } + + if (!defineProperty_IE8) + Object.defineProperty(this, 'transport', { + get: this.getTransport.bind(this), + set: this.setTransport.bind(this) + }) + + this.setTransport(transport); + + + var request_timeout = options.request_timeout || BASE_TIMEOUT; + var ping_request_timeout = options.ping_request_timeout || request_timeout; + var response_timeout = options.response_timeout || BASE_TIMEOUT; + var duplicates_timeout = options.duplicates_timeout || BASE_TIMEOUT; + + + var requestID = 0; + + var requests = new Mapper(); + var responses = new Mapper(); + var processedResponses = new Mapper(); + + var message2Key = {}; + + + /** + * Store the response to prevent to process duplicate request later + */ + function storeResponse(message, id, dest) { + var response = { + message: message, + /** Timeout to auto-clean old responses */ + timeout: setTimeout(function () { + responses.remove(id, dest); + }, + response_timeout) + }; + + responses.set(response, id, dest); + }; + + /** + * Store the response to ignore duplicated messages later + */ + function storeProcessedResponse(ack, from) { + var timeout = setTimeout(function () { + processedResponses.remove(ack, from); + }, + duplicates_timeout); + + processedResponses.set(timeout, ack, from); + }; + + + /** + * Representation of a RPC request + * + * @class + * @extends RpcNotification + * + * @constructor + * + * @param {String} method -method of the notification + * @param params - parameters of the notification + * @param {Integer} id - identifier of the request + * @param [from] - source of the notification + */ + function RpcRequest(method, params, id, from, transport) { + RpcNotification.call(this, method, params); + + this.getTransport = function () { + return transport; + } + this.setTransport = function (value) { + transport = unifyTransport(value); + } + + if (!defineProperty_IE8) + Object.defineProperty(this, 'transport', { + get: this.getTransport.bind(this), + set: this.setTransport.bind(this) + }) + + var response = responses.get(id, from); + + /** + * @constant {Boolean} duplicated + */ + if (!(transport || self.getTransport())) { + if (defineProperty_IE8) + this.duplicated = Boolean(response) + else + Object.defineProperty(this, 'duplicated', { + value: Boolean(response) + }); + } + + var responseMethod = responseMethods[method]; + + this.pack = packer.pack.bind(packer, this, id) + + /** + * Generate a response to this request + * + * @param {Error} [error] + * @param {*} [result] + * + * @returns {string} + */ + this.reply = function (error, result, transport) { + // Fix optional parameters + if (error instanceof Function || error && error.send instanceof Function) { + if (result != undefined) + throw new SyntaxError("There can't be parameters after callback"); + + transport = error; + result = null; + error = undefined; + } else if (result instanceof Function || + result && result.send instanceof Function) { + if (transport != undefined) + throw new SyntaxError("There can't be parameters after callback"); + + transport = result; + result = null; + }; + + transport = unifyTransport(transport); + + // Duplicated request, remove old response timeout + if (response) + clearTimeout(response.timeout); + + if (from != undefined) { + if (error) + error.dest = from; + + if (result) + result.dest = from; + }; + + var message; + + // New request or overriden one, create new response with provided data + if (error || result != undefined) { + if (self.peerID != undefined) { + if (error) + error.from = self.peerID; + else + result.from = self.peerID; + } + + // Protocol indicates that responses has own request methods + if (responseMethod) { + if (responseMethod.error == undefined && error) + message = { + error: error + }; + + else { + var method = error ? + responseMethod.error : + responseMethod.response; + + message = { + method: method, + params: error || result + }; + } + } else + message = { + error: error, + result: result + }; + + message = packer.pack(message, id); + } + + // Duplicate & not-overriden request, re-send old response + else if (response) + message = response.message; + + // New empty reply, response null value + else + message = packer.pack({ + result: null + }, id); + + // Store the response to prevent to process a duplicated request later + storeResponse(message, id, from); + + // Return the stored response so it can be directly send back + transport = transport || this.getTransport() || self.getTransport(); + + if (transport) + return transport.send(message); + + return message; + } + }; + inherits(RpcRequest, RpcNotification); + + + function cancel(message) { + var key = message2Key[message]; + if (!key) return; + + delete message2Key[message]; + + var request = requests.pop(key.id, key.dest); + if (!request) return; + + clearTimeout(request.timeout); + + // Start duplicated responses timeout + storeProcessedResponse(key.id, key.dest); + }; + + /** + * Allow to cancel a request and don't wait for a response + * + * If `message` is not given, cancel all the request + */ + this.cancel = function (message) { + if (message) return cancel(message); + + for (var message in message2Key) + cancel(message); + }; + + + this.close = function () { + // Prevent to receive new messages + var transport = this.getTransport(); + if (transport && transport.close) + transport.close(4003, "Cancel request"); + + // Request & processed responses + this.cancel(); + + processedResponses.forEach(clearTimeout); + + // Responses + responses.forEach(function (response) { + clearTimeout(response.timeout); + }); + }; + + + /** + * Generates and encode a JsonRPC 2.0 message + * + * @param {String} method -method of the notification + * @param params - parameters of the notification + * @param [dest] - destination of the notification + * @param {object} [transport] - transport where to send the message + * @param [callback] - function called when a response to this request is + * received. If not defined, a notification will be send instead + * + * @returns {string} A raw JsonRPC 2.0 request or notification string + */ + this.encode = function (method, params, dest, transport, callback) { + // Fix optional parameters + if (params instanceof Function) { + if (dest != undefined) + throw new SyntaxError("There can't be parameters after callback"); + + callback = params; + transport = undefined; + dest = undefined; + params = undefined; + } else if (dest instanceof Function) { + if (transport != undefined) + throw new SyntaxError("There can't be parameters after callback"); + + callback = dest; + transport = undefined; + dest = undefined; + } else if (transport instanceof Function) { + if (callback != undefined) + throw new SyntaxError("There can't be parameters after callback"); + + callback = transport; + transport = undefined; + }; + + if (self.peerID != undefined) { + params = params || {}; + + params.from = self.peerID; + }; + + if (dest != undefined) { + params = params || {}; + + params.dest = dest; + }; + + // Encode message + var message = { + method: method, + params: params + }; + + if (callback) { + var id = requestID++; + var retried = 0; + + message = packer.pack(message, id); + + function dispatchCallback(error, result) { + self.cancel(message); + + callback(error, result); + }; + + var request = { + message: message, + callback: dispatchCallback, + responseMethods: responseMethods[method] || {} + }; + + var encode_transport = unifyTransport(transport); + + function sendRequest(transport) { + var rt = (method === 'ping' ? ping_request_timeout : request_timeout); + request.timeout = setTimeout(timeout, rt * Math.pow(2, retried++)); + message2Key[message] = { + id: id, + dest: dest + }; + requests.set(request, id, dest); + + transport = transport || encode_transport || self.getTransport(); + if (transport) + return transport.send(message); + + return message; + }; + + function retry(transport) { + transport = unifyTransport(transport); + + console.warn(retried + ' retry for request message:', message); + + var timeout = processedResponses.pop(id, dest); + clearTimeout(timeout); + + return sendRequest(transport); + }; + + function timeout() { + if (retried < max_retries) + return retry(transport); + + var error = new Error('Request has timed out'); + error.request = message; + + error.retry = retry; + + dispatchCallback(error) + }; + + return sendRequest(transport); + }; + + // Return the packed message + message = packer.pack(message); + + transport = transport || this.getTransport(); + if (transport) + return transport.send(message); + + return message; + }; + + /** + * Decode and process a JsonRPC 2.0 message + * + * @param {string} message - string with the content of the message + * + * @returns {RpcNotification|RpcRequest|undefined} - the representation of the + * notification or the request. If a response was processed, it will return + * `undefined` to notify that it was processed + * + * @throws {TypeError} - Message is not defined + */ + this.decode = function (message, transport) { + if (!message) + throw new TypeError("Message is not defined"); + + try { + message = packer.unpack(message); + } catch (e) { + // Ignore invalid messages + return console.debug(e, message); + }; + + var id = message.id; + var ack = message.ack; + var method = message.method; + var params = message.params || {}; + + var from = params.from; + var dest = params.dest; + + // Ignore messages send by us + if (self.peerID != undefined && from == self.peerID) return; + + // Notification + if (id == undefined && ack == undefined) { + var notification = new RpcNotification(method, params); + + if (self.emit('request', notification)) return; + return notification; + }; + + + function processRequest() { + // If we have a transport and it's a duplicated request, reply inmediatly + transport = unifyTransport(transport) || self.getTransport(); + if (transport) { + var response = responses.get(id, from); + if (response) + return transport.send(response.message); + }; + + var idAck = (id != undefined) ? id : ack; + var request = new RpcRequest(method, params, idAck, from, transport); + + if (self.emit('request', request)) return; + return request; + }; + + function processResponse(request, error, result) { + request.callback(error, result); + }; + + function duplicatedResponse(timeout) { + console.warn("Response already processed", message); + + // Update duplicated responses timeout + clearTimeout(timeout); + storeProcessedResponse(ack, from); + }; + + + // Request, or response with own method + if (method) { + // Check if it's a response with own method + if (dest == undefined || dest == self.peerID) { + var request = requests.get(ack, from); + if (request) { + var responseMethods = request.responseMethods; + + if (method == responseMethods.error) + return processResponse(request, params); + + if (method == responseMethods.response) + return processResponse(request, null, params); + + return processRequest(); + } + + var processed = processedResponses.get(ack, from); + if (processed) + return duplicatedResponse(processed); + } + + // Request + return processRequest(); + }; + + var error = message.error; + var result = message.result; + + // Ignore responses not send to us + if (error && error.dest && error.dest != self.peerID) return; + if (result && result.dest && result.dest != self.peerID) return; + + // Response + var request = requests.get(ack, from); + if (!request) { + var processed = processedResponses.get(ack, from); + if (processed) + return duplicatedResponse(processed); + + return console.warn("No callback was defined for this message", message); + }; + + // Process response + processResponse(request, error, result); + }; +}; +inherits(RpcBuilder, EventEmitter); + + +RpcBuilder.RpcNotification = RpcNotification; +RpcBuilder.clients = undefined; +RpcBuilder.packers = undefined; + +module.exports = RpcBuilder; diff --git a/openvidu-browser/src/OpenViduInternal/KurentoUtils/kurento-jsonrpc/clients/jsonrpcclient.js b/openvidu-browser/src/OpenViduInternal/KurentoUtils/kurento-jsonrpc/clients/jsonrpcclient.js index 8f0463ca0b..658d34b78c 100644 --- a/openvidu-browser/src/OpenViduInternal/KurentoUtils/kurento-jsonrpc/clients/jsonrpcclient.js +++ b/openvidu-browser/src/OpenViduInternal/KurentoUtils/kurento-jsonrpc/clients/jsonrpcclient.js @@ -15,7 +15,7 @@ * */ -var RpcBuilder = require('../'); +var RpcBuilder = require('../RpcBuilder'); var WebSocketWithReconnection = require('./transports/webSocketWithReconnection'); var OpenViduLogger = require('../../../Logger/OpenViduLogger').OpenViduLogger; @@ -277,4 +277,4 @@ function JsonRpcClient(configuration) { } -module.exports = JsonRpcClient; \ No newline at end of file +module.exports = JsonRpcClient; diff --git a/openvidu-browser/src/OpenViduInternal/KurentoUtils/kurento-jsonrpc/index.js b/openvidu-browser/src/OpenViduInternal/KurentoUtils/kurento-jsonrpc/index.js index 7eb9c9492a..305ee96c14 100644 --- a/openvidu-browser/src/OpenViduInternal/KurentoUtils/kurento-jsonrpc/index.js +++ b/openvidu-browser/src/OpenViduInternal/KurentoUtils/kurento-jsonrpc/index.js @@ -15,740 +15,13 @@ * */ - -var defineProperty_IE8 = false -if (Object.defineProperty) { - try { - Object.defineProperty({}, "x", {}); - } catch (e) { - defineProperty_IE8 = true - } -} - -// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Function/bind -if (!Function.prototype.bind) { - Function.prototype.bind = function (oThis) { - if (typeof this !== 'function') { - // closest thing possible to the ECMAScript 5 - // internal IsCallable function - throw new TypeError('Function.prototype.bind - what is trying to be bound is not callable'); - } - - var aArgs = Array.prototype.slice.call(arguments, 1), - fToBind = this, - fNOP = function () {}, - fBound = function () { - return fToBind.apply(this instanceof fNOP && oThis ? - this : - oThis, - aArgs.concat(Array.prototype.slice.call(arguments))); - }; - - fNOP.prototype = this.prototype; - fBound.prototype = new fNOP(); - - return fBound; - }; -} - - -var EventEmitter = require('events').EventEmitter; - -var inherits = require('inherits'); - -var packers = require('./packers'); -var Mapper = require('./Mapper'); - - -var BASE_TIMEOUT = 5000; - - -function unifyResponseMethods(responseMethods) { - if (!responseMethods) return {}; - - for (var key in responseMethods) { - var value = responseMethods[key]; - - if (typeof value == 'string') - responseMethods[key] = { - response: value - } - }; - - return responseMethods; -}; - -function unifyTransport(transport) { - if (!transport) return; - - // Transport as a function - if (transport instanceof Function) - return { - send: transport - }; - - // WebSocket & DataChannel - if (transport.send instanceof Function) - return transport; - - // Message API (Inter-window & WebWorker) - if (transport.postMessage instanceof Function) { - transport.send = transport.postMessage; - return transport; - } - - // Stream API - if (transport.write instanceof Function) { - transport.send = transport.write; - return transport; - } - - // Transports that only can receive messages, but not send - if (transport.onmessage !== undefined) return; - if (transport.pause instanceof Function) return; - - throw new SyntaxError("Transport is not a function nor a valid object"); -}; - - -/** - * Representation of a RPC notification - * - * @class - * - * @constructor - * - * @param {String} method -method of the notification - * @param params - parameters of the notification - */ -function RpcNotification(method, params) { - if (defineProperty_IE8) { - this.method = method - this.params = params - } else { - Object.defineProperty(this, 'method', { - value: method, - enumerable: true - }); - Object.defineProperty(this, 'params', { - value: params, - enumerable: true - }); - } -}; - - -/** - * @class - * - * @constructor - * - * @param {object} packer - * - * @param {object} [options] - * - * @param {object} [transport] - * - * @param {Function} [onRequest] - */ -function RpcBuilder(packer, options, transport, onRequest) { - var self = this; - - if (!packer) - throw new SyntaxError('Packer is not defined'); - - if (!packer.pack || !packer.unpack) - throw new SyntaxError('Packer is invalid'); - - var responseMethods = unifyResponseMethods(packer.responseMethods); - - - if (options instanceof Function) { - if (transport != undefined) - throw new SyntaxError("There can't be parameters after onRequest"); - - onRequest = options; - transport = undefined; - options = undefined; - }; - - if (options && options.send instanceof Function) { - if (transport && !(transport instanceof Function)) - throw new SyntaxError("Only a function can be after transport"); - - onRequest = transport; - transport = options; - options = undefined; - }; - - if (transport instanceof Function) { - if (onRequest != undefined) - throw new SyntaxError("There can't be parameters after onRequest"); - - onRequest = transport; - transport = undefined; - }; - - if (transport && transport.send instanceof Function) - if (onRequest && !(onRequest instanceof Function)) - throw new SyntaxError("Only a function can be after transport"); - - options = options || {}; - - - EventEmitter.call(this); - - if (onRequest) - this.on('request', onRequest); - - - if (defineProperty_IE8) - this.peerID = options.peerID - else - Object.defineProperty(this, 'peerID', { - value: options.peerID - }); - - var max_retries = options.max_retries || 0; - - - function transportMessage(event) { - self.decode(event.data || event); - }; - - this.getTransport = function () { - return transport; - } - this.setTransport = function (value) { - // Remove listener from old transport - if (transport) { - // W3C transports - if (transport.removeEventListener) - transport.removeEventListener('message', transportMessage); - - // Node.js Streams API - else if (transport.removeListener) - transport.removeListener('data', transportMessage); - }; - - // Set listener on new transport - if (value) { - // W3C transports - if (value.addEventListener) - value.addEventListener('message', transportMessage); - - // Node.js Streams API - else if (value.addListener) - value.addListener('data', transportMessage); - }; - - transport = unifyTransport(value); - } - - if (!defineProperty_IE8) - Object.defineProperty(this, 'transport', { - get: this.getTransport.bind(this), - set: this.setTransport.bind(this) - }) - - this.setTransport(transport); - - - var request_timeout = options.request_timeout || BASE_TIMEOUT; - var ping_request_timeout = options.ping_request_timeout || request_timeout; - var response_timeout = options.response_timeout || BASE_TIMEOUT; - var duplicates_timeout = options.duplicates_timeout || BASE_TIMEOUT; - - - var requestID = 0; - - var requests = new Mapper(); - var responses = new Mapper(); - var processedResponses = new Mapper(); - - var message2Key = {}; - - - /** - * Store the response to prevent to process duplicate request later - */ - function storeResponse(message, id, dest) { - var response = { - message: message, - /** Timeout to auto-clean old responses */ - timeout: setTimeout(function () { - responses.remove(id, dest); - }, - response_timeout) - }; - - responses.set(response, id, dest); - }; - - /** - * Store the response to ignore duplicated messages later - */ - function storeProcessedResponse(ack, from) { - var timeout = setTimeout(function () { - processedResponses.remove(ack, from); - }, - duplicates_timeout); - - processedResponses.set(timeout, ack, from); - }; - - - /** - * Representation of a RPC request - * - * @class - * @extends RpcNotification - * - * @constructor - * - * @param {String} method -method of the notification - * @param params - parameters of the notification - * @param {Integer} id - identifier of the request - * @param [from] - source of the notification - */ - function RpcRequest(method, params, id, from, transport) { - RpcNotification.call(this, method, params); - - this.getTransport = function () { - return transport; - } - this.setTransport = function (value) { - transport = unifyTransport(value); - } - - if (!defineProperty_IE8) - Object.defineProperty(this, 'transport', { - get: this.getTransport.bind(this), - set: this.setTransport.bind(this) - }) - - var response = responses.get(id, from); - - /** - * @constant {Boolean} duplicated - */ - if (!(transport || self.getTransport())) { - if (defineProperty_IE8) - this.duplicated = Boolean(response) - else - Object.defineProperty(this, 'duplicated', { - value: Boolean(response) - }); - } - - var responseMethod = responseMethods[method]; - - this.pack = packer.pack.bind(packer, this, id) - - /** - * Generate a response to this request - * - * @param {Error} [error] - * @param {*} [result] - * - * @returns {string} - */ - this.reply = function (error, result, transport) { - // Fix optional parameters - if (error instanceof Function || error && error.send instanceof Function) { - if (result != undefined) - throw new SyntaxError("There can't be parameters after callback"); - - transport = error; - result = null; - error = undefined; - } else if (result instanceof Function || - result && result.send instanceof Function) { - if (transport != undefined) - throw new SyntaxError("There can't be parameters after callback"); - - transport = result; - result = null; - }; - - transport = unifyTransport(transport); - - // Duplicated request, remove old response timeout - if (response) - clearTimeout(response.timeout); - - if (from != undefined) { - if (error) - error.dest = from; - - if (result) - result.dest = from; - }; - - var message; - - // New request or overriden one, create new response with provided data - if (error || result != undefined) { - if (self.peerID != undefined) { - if (error) - error.from = self.peerID; - else - result.from = self.peerID; - } - - // Protocol indicates that responses has own request methods - if (responseMethod) { - if (responseMethod.error == undefined && error) - message = { - error: error - }; - - else { - var method = error ? - responseMethod.error : - responseMethod.response; - - message = { - method: method, - params: error || result - }; - } - } else - message = { - error: error, - result: result - }; - - message = packer.pack(message, id); - } - - // Duplicate & not-overriden request, re-send old response - else if (response) - message = response.message; - - // New empty reply, response null value - else - message = packer.pack({ - result: null - }, id); - - // Store the response to prevent to process a duplicated request later - storeResponse(message, id, from); - - // Return the stored response so it can be directly send back - transport = transport || this.getTransport() || self.getTransport(); - - if (transport) - return transport.send(message); - - return message; - } - }; - inherits(RpcRequest, RpcNotification); - - - function cancel(message) { - var key = message2Key[message]; - if (!key) return; - - delete message2Key[message]; - - var request = requests.pop(key.id, key.dest); - if (!request) return; - - clearTimeout(request.timeout); - - // Start duplicated responses timeout - storeProcessedResponse(key.id, key.dest); - }; - - /** - * Allow to cancel a request and don't wait for a response - * - * If `message` is not given, cancel all the request - */ - this.cancel = function (message) { - if (message) return cancel(message); - - for (var message in message2Key) - cancel(message); - }; - - - this.close = function () { - // Prevent to receive new messages - var transport = this.getTransport(); - if (transport && transport.close) - transport.close(4003, "Cancel request"); - - // Request & processed responses - this.cancel(); - - processedResponses.forEach(clearTimeout); - - // Responses - responses.forEach(function (response) { - clearTimeout(response.timeout); - }); - }; - - - /** - * Generates and encode a JsonRPC 2.0 message - * - * @param {String} method -method of the notification - * @param params - parameters of the notification - * @param [dest] - destination of the notification - * @param {object} [transport] - transport where to send the message - * @param [callback] - function called when a response to this request is - * received. If not defined, a notification will be send instead - * - * @returns {string} A raw JsonRPC 2.0 request or notification string - */ - this.encode = function (method, params, dest, transport, callback) { - // Fix optional parameters - if (params instanceof Function) { - if (dest != undefined) - throw new SyntaxError("There can't be parameters after callback"); - - callback = params; - transport = undefined; - dest = undefined; - params = undefined; - } else if (dest instanceof Function) { - if (transport != undefined) - throw new SyntaxError("There can't be parameters after callback"); - - callback = dest; - transport = undefined; - dest = undefined; - } else if (transport instanceof Function) { - if (callback != undefined) - throw new SyntaxError("There can't be parameters after callback"); - - callback = transport; - transport = undefined; - }; - - if (self.peerID != undefined) { - params = params || {}; - - params.from = self.peerID; - }; - - if (dest != undefined) { - params = params || {}; - - params.dest = dest; - }; - - // Encode message - var message = { - method: method, - params: params - }; - - if (callback) { - var id = requestID++; - var retried = 0; - - message = packer.pack(message, id); - - function dispatchCallback(error, result) { - self.cancel(message); - - callback(error, result); - }; - - var request = { - message: message, - callback: dispatchCallback, - responseMethods: responseMethods[method] || {} - }; - - var encode_transport = unifyTransport(transport); - - function sendRequest(transport) { - var rt = (method === 'ping' ? ping_request_timeout : request_timeout); - request.timeout = setTimeout(timeout, rt * Math.pow(2, retried++)); - message2Key[message] = { - id: id, - dest: dest - }; - requests.set(request, id, dest); - - transport = transport || encode_transport || self.getTransport(); - if (transport) - return transport.send(message); - - return message; - }; - - function retry(transport) { - transport = unifyTransport(transport); - - console.warn(retried + ' retry for request message:', message); - - var timeout = processedResponses.pop(id, dest); - clearTimeout(timeout); - - return sendRequest(transport); - }; - - function timeout() { - if (retried < max_retries) - return retry(transport); - - var error = new Error('Request has timed out'); - error.request = message; - - error.retry = retry; - - dispatchCallback(error) - }; - - return sendRequest(transport); - }; - - // Return the packed message - message = packer.pack(message); - - transport = transport || this.getTransport(); - if (transport) - return transport.send(message); - - return message; - }; - - /** - * Decode and process a JsonRPC 2.0 message - * - * @param {string} message - string with the content of the message - * - * @returns {RpcNotification|RpcRequest|undefined} - the representation of the - * notification or the request. If a response was processed, it will return - * `undefined` to notify that it was processed - * - * @throws {TypeError} - Message is not defined - */ - this.decode = function (message, transport) { - if (!message) - throw new TypeError("Message is not defined"); - - try { - message = packer.unpack(message); - } catch (e) { - // Ignore invalid messages - return console.debug(e, message); - }; - - var id = message.id; - var ack = message.ack; - var method = message.method; - var params = message.params || {}; - - var from = params.from; - var dest = params.dest; - - // Ignore messages send by us - if (self.peerID != undefined && from == self.peerID) return; - - // Notification - if (id == undefined && ack == undefined) { - var notification = new RpcNotification(method, params); - - if (self.emit('request', notification)) return; - return notification; - }; - - - function processRequest() { - // If we have a transport and it's a duplicated request, reply inmediatly - transport = unifyTransport(transport) || self.getTransport(); - if (transport) { - var response = responses.get(id, from); - if (response) - return transport.send(response.message); - }; - - var idAck = (id != undefined) ? id : ack; - var request = new RpcRequest(method, params, idAck, from, transport); - - if (self.emit('request', request)) return; - return request; - }; - - function processResponse(request, error, result) { - request.callback(error, result); - }; - - function duplicatedResponse(timeout) { - console.warn("Response already processed", message); - - // Update duplicated responses timeout - clearTimeout(timeout); - storeProcessedResponse(ack, from); - }; - - - // Request, or response with own method - if (method) { - // Check if it's a response with own method - if (dest == undefined || dest == self.peerID) { - var request = requests.get(ack, from); - if (request) { - var responseMethods = request.responseMethods; - - if (method == responseMethods.error) - return processResponse(request, params); - - if (method == responseMethods.response) - return processResponse(request, null, params); - - return processRequest(); - } - - var processed = processedResponses.get(ack, from); - if (processed) - return duplicatedResponse(processed); - } - - // Request - return processRequest(); - }; - - var error = message.error; - var result = message.result; - - // Ignore responses not send to us - if (error && error.dest && error.dest != self.peerID) return; - if (result && result.dest && result.dest != self.peerID) return; - - // Response - var request = requests.get(ack, from); - if (!request) { - var processed = processedResponses.get(ack, from); - if (processed) - return duplicatedResponse(processed); - - return console.warn("No callback was defined for this message", message); - }; - - // Process response - processResponse(request, error, result); - }; -}; -inherits(RpcBuilder, EventEmitter); - - -RpcBuilder.RpcNotification = RpcNotification; - - -module.exports = RpcBuilder; - +var RpcBuilder = require('./RpcBuilder'); var clients = require('./clients'); var transports = require('./clients/transports'); +var packers = require('./packers'); RpcBuilder.clients = clients; RpcBuilder.clients.transports = transports; -RpcBuilder.packers = packers; \ No newline at end of file +RpcBuilder.packers = packers; + +module.exports = RpcBuilder;