Permalink
Browse files

Removed old Shuttle classes.

  • Loading branch information...
1 parent d838032 commit 6eecb0090a55f2ce45944518739e8a70daf206a8 @Schoonology committed Feb 22, 2013
Showing with 1 addition and 882 deletions.
  1. +1 −14 .gitignore
  2. +0 −117 lib/bridge.js
  3. +0 −107 lib/common.js
  4. +0 −83 lib/consumer.js
  5. +0 −153 lib/prosumer.js
  6. +0 −179 lib/router.js
  7. +0 −82 lib/service.js
  8. +0 −7 lib/shuttle.js
  9. +0 −15 test/fixtures/bridge.js
  10. +0 −32 test/fixtures/consumer.js
  11. +0 −53 test/fixtures/prosumer.js
  12. +0 −15 test/fixtures/router.js
  13. +0 −25 test/fixtures/service.js
View
15 .gitignore
@@ -1,15 +1,2 @@
-lib-cov
-*.seed
-*.log
-*.csv
-*.dat
-*.out
-*.pid
-*.gz
-
-pids
-logs
-results
-
node_modules
-npm-debug.log
+npm-debug.log
View
117 lib/bridge.js
@@ -1,117 +0,0 @@
-var EventEmitter = require('events').EventEmitter,
- util = require('util'),
- zmq = require('zmq'),
- common = require('./common');
-
-function Bridge(options) {
- if (!(this instanceof Bridge)) {
- return new Bridge(options);
- }
-
- options = options || {};
-
- EventEmitter.call(this);
-
- this.pending = {};
- this.pendingCount = 0;
- this.encoding = options.encoding || 'json';
-
- this._zdealer = null;
- this._zrouter = null;
-}
-util.inherits(Bridge, EventEmitter);
-
-Bridge.prototype.listen = listen;
-Bridge.prototype.listenForConsumers = listen;
-function listen(portOrPath, host) {
- this._initSockets();
- this._zrouter.bindSync(common.generateZmqUrl(portOrPath, host));
-}
-
-Bridge.prototype.connect = connect;
-Bridge.prototype.connectToConsumer = connect;
-function connect(portOrPath, host) {
- this._initSockets();
- this._zrouter.connect(common.generateZmqUrl(portOrPath, host));
-}
-
-Bridge.prototype.listenForServices = listenForServices;
-function listenForServices(portOrPath, host) {
- this._initSockets();
- this._zdealer.bindSync(common.generateZmqUrl(portOrPath, host));
-}
-
-Bridge.prototype.connectToService = connectToService;
-function connectToService(portOrPath, host) {
- this._initSockets();
- this._zdealer.connect(common.generateZmqUrl(portOrPath, host));
-}
-
-Bridge.prototype.close = close;
-function close() {
- this._zdealer.close();
- this._zdealer = null;
-
- this._zrouter.close();
- this._zrouter = null;
-}
-
-Bridge.prototype._initSockets = _initSockets;
-function _initSockets() {
- var self = this;
-
- if (self._zrouter || self._zdealer) {
- return;
- }
-
- self._zrouter = zmq.socket(zmq.types.router);
- self._zrouter.on('message', function handle(identity, name, requestId, data) {
- self._handleRequest(identity, name, requestId, data);
- });
-
- self._zdealer = zmq.socket(zmq.types.dealer);
- self._zdealer.on('message', function handle(requestId, err, data) {
- self._handleResponse(requestId, err, data);
- });
-}
-
-Bridge.prototype._handleRequest = _handleRequest;
-function _handleRequest(identity, name, requestId, data) {
- var internalRequestId = common.getRequestId(this.pending);
-
- if (internalRequestId == null) {
- this._zrouter.send([
- identity,
- requestId,
- common.pack(common.sanitizeError(new Error('Too many requests')), this.encoding),
- common.pack(null, this.encoding)
- ]);
- return;
- }
-
- this.pending[internalRequestId] = {
- identity: identity,
- requestId: requestId
- };
- ++this.pendingCount;
-
- this._zdealer.send([name, internalRequestId, data]);
-}
-
-Bridge.prototype._handleResponse = _handleResponse;
-function _handleResponse(internalRequestId, err, data) {
- internalRequestId = internalRequestId.toString('utf8');
-
- var originalRequest = this.pending[internalRequestId];
-
- this.pending[internalRequestId] = null;
- --this.pendingCount;
- this._zrouter.send([
- originalRequest.identity,
- originalRequest.requestId,
- err,
- data
- ]);
-}
-
-module.exports = Bridge;
View
107 lib/common.js
@@ -1,107 +0,0 @@
-var msgpack = require('msgpack3');
-
-function generateZmqUrl(portOrPath, host) {
- if (typeof portOrPath === 'number') {
- return 'tcp://' + (host || '127.0.0.1') + ':' + portOrPath;
- } else if (typeof portOrPath === 'string') {
- if (portOrPath.charAt(0) === '/') {
- // TODO: Windows IPC support.
- return 'ipc://' + portOrPath;
- } else {
- return portOrPath;
- }
- } else if (typeof portOrPath === 'object') {
- return common.generateZmqUrl(portOrPath.port || portOrPath.path, portOrPath.host);
- }
-}
-
-function sanitizeError(err, includeStack) {
- var safe = {};
-
- if (err == null) {
- return null;
- }
-
- ['name', 'message', 'code']
- .concat(Object.keys(err))
- .forEach(function (key) {
- if (err[key] !== undefined) {
- safe[key] = err[key];
- }
- });
-
- if (err instanceof Error) {
- safe._isError = true;
-
- if (includeStack) {
- safe.stack = err.stack;
- }
- }
-
- return safe;
-}
-
-function desanitizeError(safe) {
- var err;
-
- if (safe == null) {
- return null;
- }
-
- if (safe._isError) {
- err = new Error(safe.message);
- } else {
- err = {
- message: safe.message
- };
- }
-
- Object
- .keys(safe)
- .forEach(function (key) {
- err[key] = safe[key];
- });
-
- return err;
-}
-
-function getRequestId(existing) {
- var id = String(Math.random()).slice(2);
-
- if (typeof existing !== 'object') {
- return id;
- }
-
- while (existing[id]) {
- id = String(Math.random()).slice(2);
- }
-
- return id;
-}
-
-function pack(obj, encoding) {
- if (encoding === 'msgpack') {
- return msgpack.pack(obj);
- } else {
- // 'json', etc.
- return new Buffer(JSON.stringify(obj) || 'null', 'utf8');
- }
-}
-
-function unpack(buf, encoding) {
- if (encoding === 'msgpack') {
- return msgpack.unpack(buf);
- } else {
- // 'json', etc.
- return JSON.parse(buf.toString('utf8'));
- }
-}
-
-var common = module.exports = {
- generateZmqUrl: generateZmqUrl,
- sanitizeError: sanitizeError,
- desanitizeError: desanitizeError,
- getRequestId: getRequestId,
- pack: pack,
- unpack: unpack
-};
View
83 lib/consumer.js
@@ -1,83 +0,0 @@
-var zmq = require('zmq'),
- common = require('./common');
-
-function Consumer(options) {
- if (!(this instanceof Consumer)) {
- return new Consumer(options);
- }
-
- options = options || {};
-
- this.pending = {};
- this.pendingCount = 0;
- this.encoding = options.encoding || 'json';
-
- this._zdealer = null;
- // TODO: Accept a "timeout" option, and generate Stepdown-like timeout events.
-}
-
-Consumer.prototype.listen = listen;
-function listen(portOrPath, host) {
- this._initSocket();
- this._zdealer.bindSync(common.generateZmqUrl(portOrPath, host));
-}
-
-Consumer.prototype.connect = connect;
-function connect(portOrPath, host) {
- this._initSocket();
- this._zdealer.connect(common.generateZmqUrl(portOrPath, host));
-}
-
-Consumer.prototype.close = close;
-function close() {
- this._zdealer.close();
- this._zdealer = null;
-}
-
-Consumer.prototype.emit = emit;
-function emit(name, data, callback) {
- var requestId = common.getRequestId(this.pending);
-
- if (requestId == null) {
- callback(new Error('Too many requests'));
- return;
- }
-
- this.pending[requestId] = callback;
- ++this.pendingCount;
-
- this._zdealer.send([
- name,
- requestId,
- common.pack(data, this.encoding)
- ]);
-}
-
-Consumer.prototype._initSocket = _initSocket;
-function _initSocket() {
- var self = this;
-
- if (self._zdealer) {
- return;
- }
-
- self._zdealer = zmq.socket(zmq.types.dealer);
- self._zdealer.on('message', function handle(requestId, err, data) {
- self._handleResponse(requestId, err, data);
- });
-}
-
-Consumer.prototype._handleResponse = _handleResponse;
-function _handleResponse(requestId, err, data) {
- requestId = requestId.toString('utf8');
- err = common.unpack(err, this.encoding);
- data = common.unpack(data, this.encoding);
-
- var callback = this.pending[requestId];
-
- this.pending[requestId] = null;
- --this.pendingCount;
- callback(common.desanitizeError(err), data);
-}
-
-module.exports = Consumer;
View
153 lib/prosumer.js
@@ -1,153 +0,0 @@
-var EventEmitter = require('events').EventEmitter,
- util = require('util'),
- zmq = require('zmq'),
- common = require('./common');
-
-function Prosumer(options) {
- if (!(this instanceof Prosumer)) {
- return new Prosumer(options);
- }
-
- options = options || {};
-
- EventEmitter.call(this);
-
- this.pending = {};
- this.pendingCount = 0;
- this.encoding = options.encoding || 'json';
-
- this._zdealer = null;
- this._zrouter = null;
-}
-util.inherits(Prosumer, EventEmitter);
-
-Prosumer.prototype.listen = listen;
-Prosumer.prototype.listenForConsumers = listen;
-function listen(portOrPath, host) {
- this._initSockets();
- this._zrouter.bindSync(common.generateZmqUrl(portOrPath, host));
-}
-
-Prosumer.prototype.connect = connect;
-Prosumer.prototype.connectToConsumer = connect;
-function connect(portOrPath, host) {
- this._initSockets();
- this._zrouter.connect(common.generateZmqUrl(portOrPath, host));
-}
-
-Prosumer.prototype.listenForServices = listenForServices;
-function listenForServices(portOrPath, host) {
- this._initSockets();
- this._zdealer.bindSync(common.generateZmqUrl(portOrPath, host));
-}
-
-Prosumer.prototype.connectToService = connectToService;
-function connectToService(portOrPath, host) {
- this._initSockets();
- this._zdealer.connect(common.generateZmqUrl(portOrPath, host));
-}
-
-Prosumer.prototype.close = close;
-function close() {
- this._zdealer.close();
- this._zdealer = null;
-
- this._zrouter.close();
- this._zrouter = null;
-}
-
-Prosumer.prototype.emit = emit;
-function emit(name, data, callback) {
- if (EventEmitter.prototype.emit.call(this, name, data, callback)) {
- return;
- }
-
- if (name === 'newListener') {
- return;
- }
-
- var requestId = common.getRequestId(this.pending);
-
- if (requestId == null) {
- callback(new Error('Too many requests'));
- return;
- }
-
- this.pending[requestId] = callback;
- ++this.pendingCount;
-
- this._zdealer.send([
- name,
- requestId,
- common.pack(data, this.encoding)
- ]);
-}
-
-Prosumer.prototype._initSockets = _initSockets;
-function _initSockets() {
- var self = this;
-
- if (self._zrouter || self._zdealer) {
- return;
- }
-
- self._zrouter = zmq.socket(zmq.types.router);
- self._zrouter.on('message', function handle(identity, name, requestId, data) {
- self._handleRequest(identity, name, requestId, data);
- });
-
- self._zdealer = zmq.socket(zmq.types.dealer);
- self._zdealer.on('message', function handle(requestId, err, data) {
- self._handleResponse(requestId, err, data);
- });
-}
-
-Prosumer.prototype._handleRequest = _handleRequest;
-function _handleRequest(identity, name, requestId, data) {
- var self = this;
-
- name = name.toString('utf8');
- requestId = requestId.toString('utf8');
-
- if (self.listeners(name).length === 0) {
- self._zrouter.send([
- identity,
- requestId,
- common.pack(common.sanitizeError(new Error('No such event')), self.encoding),
- common.pack(null, self.encoding)
- ]);
- return;
- }
-
- data = common.unpack(data, self.encoding);
- EventEmitter.prototype.emit.call(this, name, data, function gotResponse(err, result) {
- // TODO: More than one argument.
- self._zrouter.send([
- identity,
- requestId,
- common.pack(common.sanitizeError(err), self.encoding),
- common.pack(result, self.encoding)
- ]);
- });
-}
-
-Prosumer.prototype._handleResponse = _handleResponse;
-function _handleResponse(requestId, err, data) {
- requestId = requestId.toString('utf8');
- err = common.unpack(err, this.encoding);
- data = common.unpack(data, this.encoding);
-
- var callback = this.pending[requestId];
-
- if (callback == null) {
- // We've received a response for a request we never sent. This can happen if connections are
- // erroneously made front-to-front and back-to-back, which is only possible with Prosumers.
- return;
- }
-
- this.pending[requestId] = null;
- --this.pendingCount;
- callback(common.desanitizeError(err), data);
-}
-
-module.exports = Prosumer;
View
179 lib/router.js
@@ -1,179 +0,0 @@
-var EventEmitter = require('events').EventEmitter,
- util = require('util'),
- zmq = require('zmq'),
- common = require('./common');
-
-function Router(options) {
- if (!(this instanceof Router)) {
- return new Router(options);
- }
-
- options = options || {};
-
- EventEmitter.call(this);
-
- this.pending = {};
- this.pendingCount = 0;
- this.fallbackService = this.fallbackService || 'fallback';
- this.delimiter = '::';
- this.trimServiceName = true;
- this.encoding = options.encoding || 'json';
-
- this._zdealers = null;
- this._zrouter = null;
-}
-util.inherits(Router, EventEmitter);
-
-Router.prototype.listen = listen;
-Router.prototype.listenForConsumers = listen;
-function listen(portOrPath, host) {
- this._initSockets();
- this._zrouter.bindSync(common.generateZmqUrl(portOrPath, host));
-}
-
-Router.prototype.connect = connect;
-Router.prototype.connectToConsumer = connect;
-function connect(portOrPath, host) {
- this._initSockets();
- this._zrouter.connect(common.generateZmqUrl(portOrPath, host));
-}
-
-Router.prototype.listenForServices = listenForServices;
-function listenForServices(name, portOrPath, host) {
- var socket = this._getDealer(name);
-
- socket.bindSync(common.generateZmqUrl(portOrPath, host));
-}
-
-Router.prototype.connectToService = connectToService;
-function connectToService(name, portOrPath, host) {
- var socket = this._getDealer(name);
-
- socket.connect(common.generateZmqUrl(portOrPath, host));
-}
-
-Router.prototype.close = close;
-function close() {
- var self = this;
-
- Object.keys(self._zdealers).forEach(function (key) {
- self._zdealers[key].close();
- });
- self._zdealers = null;
-
- self._zrouter.close();
- self._zrouter = null;
-}
-
-Router.prototype._initSockets = _initSockets;
-function _initSockets() {
- var self = this;
-
- if (self._zrouter) {
- return;
- }
-
- self._zrouter = zmq.socket(zmq.types.router);
- self._zrouter.on('message', function handle(identity, name, requestId, data) {
- self._handleRequest(identity, name, requestId, data);
- });
-
- self._zdealers = {};
-}
-
-Router.prototype._addDealer = _addDealer;
-function _addDealer(name) {
- var self = this;
-
- if (name == null) {
- name = self.fallbackService;
- }
-
- var socket = zmq.socket(zmq.types.dealer);
- self._zdealers[name] = socket;
- socket.on('message', function handle(requestId, err, data) {
- self._handleResponse(requestId, err, data);
- });
-
- return socket;
-}
-
-Router.prototype._getDealer = _getDealer;
-function _getDealer(name) {
- if (name == null) {
- name = this.fallbackService;
- }
-
- this._initSockets();
- return this._zdealers[name] || this._addDealer(name);
-}
-
-Router.prototype._handleRequest = _handleRequest;
-function _handleRequest(identity, name, requestId, data) {
- var internalRequestId = common.getRequestId(this.pending),
- split,
- service,
- socket;
-
- if (internalRequestId == null) {
- this._zrouter.send([
- identity,
- requestId,
- common.pack(common.sanitizeError(new Error('Too many requests')), this.encoding),
- common.pack(null, this.encoding)
- ]);
- return;
- }
-
- split = name.toString().split(this.delimiter);
- if (split.length > 1) {
- service = split.shift();
- if (this.trimServiceName) {
- name = split.join(this.delimiter);
- }
- } else {
- service = this.fallbackService;
- }
-
- // TODO: Ideally, we'd be able to check that we expect a service to exist before
- // lazy-adding it. That way, we could return a "No such service" Error here otherwise.
-
- socket = this._getDealer(service);
-
- this.pending[internalRequestId] = {
- identity: identity,
- requestId: requestId,
- name: name
- };
- ++this.pendingCount;
-
- this.emit('request', internalRequestId, this.pending[internalRequestId], data);
-
- socket.send([name, internalRequestId, data]);
-}
-
-Router.prototype._handleResponse = _handleResponse;
-function _handleResponse(internalRequestId, err, data) {
- internalRequestId = internalRequestId.toString('utf8');
-
- var originalRequest = this.pending[internalRequestId];
-
- if (originalRequest == null) {
- // We've received a response for a request we never sent. This can happen if connections are
- // erroneously made front-to-front and back-to-back, which is only possible with Prosumers.
- return;
- }
-
- this.emit('response', internalRequestId, originalRequest, data);
-
- this.pending[internalRequestId] = null;
- --this.pendingCount;
- this._zrouter.send([
- originalRequest.identity,
- originalRequest.requestId,
- err,
- data
- ]);
-}
-
-module.exports = Router;
View
82 lib/service.js
@@ -1,82 +0,0 @@
-var EventEmitter = require('events').EventEmitter,
- util = require('util'),
- zmq = require('zmq'),
- common = require('./common');
-
-function Service(options) {
- if (!(this instanceof Service)) {
- return new Service(options);
- }
-
- options = options || {};
-
- EventEmitter.call(this);
-
- this.encoding = options.encoding || 'json';
-
- this._zrouter = null;
-}
-util.inherits(Service, EventEmitter);
-
-Service.prototype.listen = listen;
-function listen(portOrPath, host) {
- this._initSocket();
- this._zrouter.bindSync(common.generateZmqUrl(portOrPath, host));
-}
-
-Service.prototype.connect = connect;
-function connect(portOrPath, host) {
- this._initSocket();
- this._zrouter.connect(common.generateZmqUrl(portOrPath, host));
-}
-
-Service.prototype.close = close;
-function close() {
- this._zrouter.close();
- this._zrouter = null;
-}
-
-Service.prototype._initSocket = _initSocket;
-function _initSocket() {
- var self = this;
-
- if (self._zrouter) {
- return;
- }
-
- self._zrouter = zmq.socket(zmq.types.router);
- self._zrouter.on('message', function handle(identity, name, requestId, data) {
- self._handleRequest(identity, name, requestId, data);
- });
-}
-
-Service.prototype._handleRequest = _handleRequest;
-function _handleRequest(identity, name, requestId, data) {
- var self = this;
-
- name = name.toString('utf8');
- requestId = requestId.toString('utf8');
-
- if (self.listeners(name).length === 0) {
- self._zrouter.send([
- identity,
- requestId,
- common.pack(common.sanitizeError(new Error('No such event')), self.encoding),
- common.pack(null, self.encoding)
- ]);
- return;
- }
-
- data = common.unpack(data, self.encoding);
- self.emit(name, data, function gotResponse(err, result) {
- // TODO: More than one argument.
- self._zrouter.send([
- identity,
- requestId,
- common.pack(common.sanitizeError(err), self.encoding),
- common.pack(result, self.encoding)
- ]);
- });
-}
-
-module.exports = Service;
View
7 lib/shuttle.js
@@ -1,7 +0,0 @@
-module.exports = {
- Service: require('./service'),
- Bridge: require('./bridge'),
- Router: require('./router'),
- Consumer: require('./consumer'),
- Prosumer: require('./prosumer')
-};
View
15 test/fixtures/bridge.js
@@ -1,15 +0,0 @@
-var shuttle = require('../..'),
- self = new shuttle.Bridge(),
- frontUrl = process.env.frontUrl,
- backUrl = process.env.backUrl;
-
-if (frontUrl) {
- console.log('B front:', frontUrl);
- self.listenForConsumers(frontUrl);
-}
-if (backUrl) {
- console.log('B back:', backUrl);
- self.listenForServices(backUrl);
-}
-
-process.send('ready');
View
32 test/fixtures/consumer.js
@@ -1,32 +0,0 @@
-var shuttle = require('../..'),
- self = new shuttle.Consumer(),
- listenUrl = process.env.listenUrl,
- connectUrl = process.env.connectUrl;
-
-if (listenUrl) {
- console.log('C listening:', listenUrl);
- self.listen(listenUrl);
-}
-if (connectUrl) {
- console.log('C connecting:', connectUrl);
- self.connect(connectUrl);
-}
-
-process.on('message', function (obj) {
- console.log('C Sending:', obj);
- self.emit(obj.name, obj.data, function (err, result) {
- if (err) {
- process.send({
- id: obj.id,
- err: err
- });
- } else {
- process.send({
- id: obj.id,
- result: result
- });
- }
- });
-});
-
-process.send('ready');
View
53 test/fixtures/prosumer.js
@@ -1,53 +0,0 @@
-var shuttle = require('../..'),
- self = new shuttle.Prosumer(),
- emitter = process.env.emitter === 'true',
- listenUrl = process.env.listenUrl,
- connectUrl = process.env.connectUrl;
-
-console.log('P emitting:', emitter);
-
-if (listenUrl) {
- console.log('P listening:', listenUrl);
- if (emitter) {
- self.listenForServices(listenUrl);
- } else {
- self.listenForConsumers(listenUrl);
- }
-}
-if (connectUrl) {
- console.log('P connecting:', connectUrl);
- if (emitter) {
- self.connectToService(connectUrl);
- } else {
- self.connectToConsumer(connectUrl);
- }
-}
-
-self.on('echo', function (data, callback) {
- console.log('P Echoing:', data);
- callback(null, data);
-});
-
-self.on('broken', function (data, callback) {
- console.log('P Broken:', data);
- callback(new Error('Broken'), null);
-});
-
-process.on('message', function (obj) {
- console.log('P Sending:', obj);
- self.emit(obj.name, obj.data, function (err, result) {
- if (err) {
- process.send({
- id: obj.id,
- err: err
- });
- } else {
- process.send({
- id: obj.id,
- result: result
- });
- }
- });
-});
-
-process.send('ready');
View
15 test/fixtures/router.js
@@ -1,15 +0,0 @@
-var shuttle = require('../..'),
- self = new shuttle.Router(),
- frontUrl = process.env.frontUrl,
- backUrl = process.env.backUrl;
-
-if (frontUrl) {
- console.log('R front:', frontUrl);
- self.listenForConsumers(frontUrl);
-}
-if (backUrl) {
- console.log('R back:', backUrl);
- self.listenForServices('test', backUrl);
-}
-
-process.send('ready');
View
25 test/fixtures/service.js
@@ -1,25 +0,0 @@
-var shuttle = require('../..'),
- self = new shuttle.Service(),
- listenUrl = process.env.listenUrl,
- connectUrl = process.env.connectUrl;
-
-if (listenUrl) {
- console.log('S listening:', listenUrl);
- self.listen(listenUrl);
-}
-if (connectUrl) {
- console.log('S connecting:', connectUrl);
- self.connect(connectUrl);
-}
-
-self.on('echo', function (data, callback) {
- console.log('S Echoing:', data);
- callback(null, data);
-});
-
-self.on('broken', function (data, callback) {
- console.log('S Broken:', data);
- callback(new Error('Broken'), null);
-});
-
-process.send('ready');

0 comments on commit 6eecb00

Please sign in to comment.