Skip to content
Browse files

Initial version of the new Shuttle API.

Leaner, brokerless, and focused entirely on the event semantics.
  • Loading branch information...
1 parent 4ab68be commit 0900268bcd93fd640c09803c0a769e4d6ab88fe3 @Schoonology committed Feb 21, 2013
Showing with 722 additions and 135 deletions.
  1. +31 −122 README.md
  2. +1 −5 index.js
  3. +15 −0 lib/index.js
  4. +241 −0 lib/request/emitter.js
  5. +217 −0 lib/request/handler.js
  6. +9 −0 lib/request/index.js
  7. +94 −0 lib/synchronization/emitter.js
  8. +39 −0 lib/synchronization/handler.js
  9. +9 −0 lib/synchronization/index.js
  10. +3 −3 package.json
  11. +63 −5 test/shuttle.js
View
153 README.md
@@ -4,163 +4,72 @@ A massively-distributable, service-oriented architecture with all the flexibilit
## Installation
-Before you can install shuttle with NPM, you need to have the ZeroMQ sources installed locally. This will be a platform-dependent task, but most platforms have tools to make this easier:
+Before you can install Shuttle with NPM, you need to have the development source for ZeroMQ 3.2.x installed locally. This will be a platform-dependent task, but most platforms have tools to make this easier:
```bash
-brew install zeromq
+brew install zeromq --devel
yum install zeromq-devel
```
-After that's ready, npm can be used as normal:
+After that's ready, npm can be used as normal to install Shuttle:
```bash
npm install shuttle
```
-## Connection
+## Transports
-Before going into what the various classes are, it's important to note an important feature of Shuttle: Either side of any connection can act as the listener, with the other connecting. For example, a Service can connect to a listening Consumer or a Consumer can connect to a listening Service. This is accomplished with two methods, available on each class:
+Shuttle officially supports and has been tested over the following transports:
- * listen(url) - Listen on the interface and port provided as a TCP URL (e.g. tcp://127.0.0.1:3000).
- * listen(path) - Listen on the IPC interface provided as a file path (e.g. /tmp/shuttletest).
- * listen(port, [host]) - Listen on the optional `host` interface (defaults to '*') at `port`.
- * connect(url) - Connect to the endpoint at the specified TCP URL.
- * connect(path) - Connect to the endpoint at the specified IPC path.
- * connect(port, [host]) - Connect to the endpoint at the optional `host` interface (defaults to '*') at `port`.
+ * IPC (`ipc://`)
+ * TCP (`tcp://`)
-## Classes
+While these have not been tested, Shuttle may also work over the following transports:
-The classes used by Shuttle are purposefully simple:
+ * Intra-process (`inproc://`)
+ * PGM (`pgm://`)
+ * EPGM (`epgm://`)
-### Consumer & Service
+In the remainder of the documentation, "port" refers to a single interface on a single transport, even if that interface is not TPC. If documentation mentions "two ports" and you intend to use IPC, for example, you will need two file descriptors, e.g. `/tmp/a` and `/tmp/b`.
-The Consumer emits events round-robin to all Services, which in turn listen on these events, firing the provided callback either as acknowledgement (required) or with the desired additional result.
+## Mesh
-```javascript
-var consumer = new shuttle.Consumer(),
- service = new shuttle.Service();
+The Shuttle "mesh" consists of Services and Consumers connected _directly_ to one another, with Consumers making requests and getting updates and Services sending responses and sending updates. Unlike other service-oriented architectures (and early versions of Shuttle), _Shuttle is brokerless._
-service.listen(5050);
-consumer.connect(5050);
+There are two types of Services, with corresponding Consumers: RequestServices (with RequestConsumers) and SynchronizationServices (with SynchronizationConsumers).
-service.on('test', function (data, callback) {
- console.log('Test:', data);
- callback(null, { ok: true });
-});
+There should be a 1:1 relationship between Consumer instances and Service types. A Consumer can be connected to multiple of the same service, but should not be connected to multiple types of Services. Create another Consumer instance for the second Service type needed (and so forth). To make this design restriction easier, MIXIN_THINGs can be used to consolidate multiple Consumer instances into a single, easier-to-use interface. (If desired, the same can be done of Services, even with the same MIXIN_THING.)
-consumer.emit('test', { answer: 42 }, function (err, response) {
- console.log('Error?', !!err);
- console.log('Response:', response);
-});
+### Static Topography
-// Output:
-//
-// Test: { answer: 42 }
-// Error? false
-// Response: { ok: true }
-```
-
-### Prosumer
-
-The Prosumer is both a Service and Consumer in one. It can emit events like a Consumer and listen on events like a Service.
-
-```javascript
-var prosumer1 = new shuttle.Prosumer(),
- prosumer2 = new shuttle.Prosumer();
-
-prosumer2.listenForConsumers(5050);
-prosumer1.connectToService(5050);
-
-prosumer2.on('test', function (data, callback) {
- console.log('Test:', data);
- callback(null, { ok: true });
-});
-
-prosumer1.emit('test', { answer: 42 }, function (err, response) {
- console.log('Error?', !!err);
- console.log('Response:', response);
-});
-
-// Output:
-//
-// Test: { answer: 42 }
-// Error? false
-// Response: { ok: true }
-```
+The simplest topography is static; port locations are well-known, defined up-front, and never change. Services bind to these ports, and Consumers connect to them. Start-up is as fast as possible, but the rigidity is often undesirable.
-### Bridge
+### Dynamic Topography
-When running a lot of Services with a lot of Consumers, the number of interfaces to track can quickly get out of hand. To make life easier, there's the Bridge. Any requests made of the Bridge are passed along, also round-robin, to all connected Services.
+The most flexible topography is dynamic; port locations for the majority of Services are not well-known, and can change at any time. To facilitate this, N SynchronizationServices (see Discovery) are started at well-known locations, and both Services and Consumers connect to this (via their own SynchronizationConsumer) service to locate one another.
-```javascript
-var consumer = new shuttle.Consumer(),
- bridge = new shuttle.Bridge(),
- service = new shuttle.Service();
+### "Reversed" Topographies
-bridge.listenForConsumers(5050);
-bridge.listenForServices(5051);
+Ordinarily, Services bind and Consumers connect. However, _this is not required!_ In some instances, it may be wiser for Consumers to bind and Services to connect. Some examples:
-consumer.connect(5050);
-service.connect(5051);
+ * Dynamic Slaves with a Static Master - N Consumers bind to ports, constantly making requests. Services connect to the Consumer as they start, processing the requests as fast as possible.
+ * Brokers - Although Shuttle is brokerless, simple Brokers can be built _on top of Shuttle._ In this case, the Broker contains N Consumers and a single Service, all of which bind. The rest of the mesh connects.
-service.on('test', function (data, callback) {
- console.log('Test:', data);
- callback(null, { ok: true });
-});
-consumer.emit('test', { answer: 42 }, function (err, response) {
- console.log('Error?', !!err);
- console.log('Response:', response);
-});
-// Output:
-//
-// Test: { answer: 42 }
-// Error? false
-// Response: { ok: true }
-```
-
-### Router
-
-A Router is just a special bridge that can assign names to the interfaces it exposes to services. Any services that connect to those interfaces can be addressed by clients by name.
-
-```javascript
-var consumer = new shuttle.Consumer(),
- router = new shuttle.Router(),
- service = new shuttle.Service();
-
-router.listenForConsumers(5050);
-router.listenForServices('test', 5051);
+cb = new MIXIN_THING({
+ delimeter: '::' // Looks like EE2!
+})
+cb.emit('service::event', {}, function () {})
-// Optional, additional settings
-router.delimiter = '/';
-router.trimServiceName = true;
+## Thanks
-consumer.connect(5050);
-service.connect(5051);
-
-// This service will be addressed by 'test/*', but we'll only get the '*' part.
-service.on('test', function (data, callback) {
- console.log('Test:', data);
- callback(null, { ok: true });
-});
-
-consumer.emit('test/test', { answer: 42 }, function (err, response) {
- console.log('Error?', !!err);
- console.log('Response:', response);
-});
-
-// As before, our output:
-//
-// Test: { answer: 42 }
-// Error? false
-// Response: { ok: true }
-```
+Kabe & Adam
## License
```
-Copyright (C) 2012 Michael Schoonmaker (michael.r.schoonmaker@gmail.com)
+Copyright (C) 2012-2013 Michael Schoonmaker (michael.r.schoonmaker@gmail.com)
This project is free software released under the MIT/X11 license:
View
6 index.js
@@ -1,5 +1 @@
-module.exports = require('./lib/shuttle');
-
-// HACK - Hitting the HWM can block the process completely. To unblock applications already using Shuttle,
-// I've added this hack to unblock them periodically.
-setInterval(console.log, 100)
+module.exports = require('./lib')
View
15 lib/index.js
@@ -0,0 +1,15 @@
+var request = require('./request')
+ , synchronization = require('./synchronization')
+
+module.exports = {
+ request: request,
+ synchronization: synchronization,
+ RequestEmitter: request.Emitter,
+ createRequestEmitter: request.createEmitter,
+ RequestHandler: request.Handler,
+ createRequestHandler: request.createHandler,
+ SynchronizationEmitter: synchronization.Emitter,
+ createSynchronizationEmitter: synchronization.createEmitter,
+ SynchronizationHandler: synchronization.Handler,
+ createSynchronizationHandler: synchronization.createHandler
+}
View
241 lib/request/emitter.js
@@ -0,0 +1,241 @@
+//
+// # RequestEmitter
+//
+
+//
+// ## Error Handling
+//
+// All of the error handling for RequestEmitters is facilitated by the callback functions its methods accept.
+//
+var url = require('url')
+ , debug = require('debug')('shuttle:RequestEmitter')
+ , zmqstream = require('zmq-stream')
+
+//
+// ## RequestEmitter `RequestEmitter(options)`
+//
+// Creates a new RequestEmitter with the specified options:
+//
+// * `timeout`: A duration, in ms, that the RequestEmitter will wait for a response before retrying or raising an
+// Error. If zero, no timeout mechanism will be used. If requests can not be guaranteed to be idempotent, `timeout` and
+// `retries` should not be used. Defaults to 0 to keep from unduly making this guarantee.
+// * `retries`: A number of times that the RequestEmitter will automatically retry a request that has timed out before
+// raising an Error. If zero, no retries will be made, even if `timeout` is defined. If requests can not be guaranteed
+// to be idempotent, `timeout` and `retries` should not be used. Defaults to 3, which is an arbitrarily-chosen number.
+// * `linger`: A duration, in ms, that the RequestEmitter will wait for outgoing messages to be sent before releasing
+// its resources after `close` is called. Outgoing messages take a non-zero time to be completely sent, and can be
+// dropped by a subsequent call to `close`. A value of -1 indicates an infinite delay. Defaults to 0.
+//
+function RequestEmitter(obj) {
+ if (!(this instanceof RequestEmitter)) {
+ return new RequestEmitter(obj)
+ }
+
+ obj = obj || {}
+
+ this.linger = (typeof obj.linger === 'number') ? obj.linger : -1
+
+ this._nextRequestId = 0
+ this._pending = {}
+ this._zdealer = null
+}
+RequestEmitter.createEmitter = RequestEmitter
+
+//
+// ## emit `emit(name, data, callback)`
+//
+// Emits an event (request) named **name**, along with **data**, over the Shuttle mesh, to be handled by some
+// RequestHandler. If successful, **Callback** will receive the result of the request. Otherwise, **callback** will
+// receive the error.
+//
+// If the request times out (as defined by `RequestEmitter.timeout` and `RequestEmitter.retries`), **callback** will
+// receive a TimeoutError.
+//
+// NOTE: Do not try to call other EventEmitter methods like `on`. It won't work.
+//
+RequestEmitter.prototype.emit = emit
+function emit(name, data, callback) {
+ var self = this
+ , requestId = String(self._nextRequestId++)
+ , payload
+
+ self._pending[requestId] = callback
+ payload = JSON.stringify({
+ name: name,
+ data: data
+ })
+
+ debug('Emitting %s as request %s with %s.', name, requestId, payload)
+
+ if (!self._zdealer.write([
+ new Buffer(0),
+ new Buffer(requestId),
+ new Buffer(payload)
+ ])) {
+ // TODO: Bubble up the internal 'drain' event?
+ // TODO: Put some place safe.
+ callback(new Error('Too many requests'))
+ }
+}
+
+//
+// ## listen `listen(options)`
+//
+// ### Also `listenForRequests`
+//
+// Synchronously listens for RequestHandler connections. If **options.url** is provided, that URL will be used.
+// Otherwise, **options** will be formatted as a URL as defined by the core `url` module.
+//
+RequestEmitter.prototype.listenForRequests = listen
+RequestEmitter.prototype.listen = listen
+function listen(options) {
+ var self = this
+ , opts = options || {}
+ , iface = options.url
+
+ if (!iface) {
+ iface = url.format(opts)
+ }
+
+ debug('Listening to %s.', iface)
+
+ self._initSocket()
+ self._zdealer.bind(iface)
+}
+
+//
+// ## connect `connect(options)`
+//
+// ### Also `connectForRequests`
+//
+// Synchronously connects to a listening RequestHandler. If **options.url** is provided, that URL will be used.
+// Otherwise, **options** will be formatted as a URL as defined by the core `url` module.
+//
+RequestEmitter.prototype.connectForRequests = connect
+RequestEmitter.prototype.connect = connect
+function connect(options) {
+ var self = this
+ , opts = options || {}
+ , iface = options.url
+
+ if (!iface) {
+ iface = url.format(opts)
+ }
+
+ debug('Connecting to %s.', iface)
+
+ self._initSocket()
+ self._zdealer.connect(iface)
+}
+
+//
+// ## close `close()`
+//
+// Synchonously releases the underlying resources, allowing the RequestEmitter to be `connect`ed or `listen`ed again
+// freely.
+//
+// Unless the RequestEmitter was configured with a `linger` period, all pending outgoing messages will be dropped.
+//
+RequestEmitter.prototype.close = close
+function close() {
+ var self = this
+
+ if (!self._zdealer) {
+ return
+ }
+
+ debug('Closing.')
+
+ self._zdealer.close()
+ self._zdealer = null
+}
+
+//
+// ## _initSocket `_initSocket()`
+//
+// Internal use only.
+//
+// Creates the underlying networking resources.
+//
+RequestEmitter.prototype._initSocket = _initSocket
+function _initSocket() {
+ var self = this
+
+ if (self._zdealer) {
+ return
+ }
+
+ self._zdealer = new zmqstream.Socket({
+ type: zmqstream.Type.DEALER
+ })
+
+ self._zdealer.set(zmqstream.Option.LINGER, self.linger)
+
+ self._zdealer.on('readable', function () {
+ self._handle()
+ })
+ self._handle()
+}
+
+//
+// ## _handle `_handle()`
+//
+// Internal use only.
+//
+// Polls the network for requests, re-emitting them locally for handling.
+//
+RequestEmitter.prototype._handle = _handle
+function _handle() {
+ var self = this
+ , messages = null
+
+ // 1. If the dealer socket is currently closed, it cannot be read from. Otherwise, read from it.
+ if (!self._zdealer) {
+ return
+ }
+
+ messages = self._zdealer.read(10)
+
+ // 1. If there are no messages, we'll come back to this later. For now, just leave.
+ if (!messages) {
+ return
+ }
+
+ // 1. For each message, we want to look up the callback and fire it.
+ messages.forEach(function (envelope) {
+ var payload
+ , requestId
+
+ // 1. If we don't have three message frames or the body isn't valid JSON, this didn't come from a
+ // RequestHandler. It's safe to ignore.
+ if (envelope.length !== 3) {
+ return
+ }
+
+ try {
+ payload = JSON.parse(envelope.pop().toString('utf8'))
+ } catch (e) {
+ return
+ }
+
+ requestId = String(envelope.pop())
+
+ // 1. If we don't have a callback function, we assume it's meant to be a fire-and-forget event, so we'll ignore
+ // the response.
+ if (!self._pending[requestId]) {
+ return
+ }
+
+ debug('Handling response to request %s with (%s, %s).', requestId, payload.err, JSON.stringify(payload.data))
+
+ self._pending[requestId](payload.err, payload.data)
+ ;delete self._pending[requestId]
+ })
+
+ // 1. We may have more messages to receive, so try reading again soon.
+ process.nextTick(function () {
+ self._handle()
+ })
+}
+
+module.exports = RequestEmitter
View
217 lib/request/handler.js
@@ -0,0 +1,217 @@
+//
+// # RequestHandler
+//
+
+//
+// ## Error Handling
+//
+// Since none of the RequestHandler's methods take callback functions, all error handling (like request handling) is
+// facilitated by an 'error' event being emitted.
+//
+var EventEmitter = require('events').EventEmitter
+ , url = require('url')
+ , debug = require('debug')('shuttle:RequestHandler')
+ , mi = require('mi')
+ , zmqstream = require('zmq-stream')
+
+//
+// ## RequestHandler `RequestHandler(options)`
+//
+// Creates a new RequestHandler with the specified options:
+//
+// * `linger`: A duration, in ms, that the RequestHandler will wait for outgoing messages to be sent before releasing
+// its resources after `close` is called. Outgoing messages take a non-zero time to be completely sent, and can be
+// dropped by a subsequent call to `close`. A value of -1 indicates an infinite delay. Defaults to -1.
+//
+function RequestHandler(obj) {
+ if (!(this instanceof RequestHandler)) {
+ return new RequestHandler(obj)
+ }
+
+ EventEmitter.call(this)
+
+ obj = obj || {}
+
+ this.linger = (typeof obj.linger === 'number') ? obj.linger : -1
+
+ this._zrouter = null
+}
+RequestHandler.createHandler = RequestHandler
+
+//
+// ## EventEmitter API (`on`, `once`, `removeAllListeners`, etc.)
+//
+// RequestHandler inherits from EventEmitter to facilitate local subscriptions to remote events. See the Node.js
+// Documentation's [Events API](http://nodejs.org/api/events.html) for more information.
+//
+// NOTE: Do not call `emit`. While it should work as expected, it can cause leaks in this abstraction.
+//
+mi.extend(RequestHandler, EventEmitter)
+
+//
+// ## listen `listen(options)`
+//
+// ### Also `listenForRequests`
+//
+// Synchronously listens for RequestEmitter connections. If **options.url** is provided, that URL will be used.
+// Otherwise, **options** will be formatted as a URL as defined by the core `url` module.
+//
+RequestHandler.prototype.listenForRequests = listen
+RequestHandler.prototype.listen = listen
+function listen(options) {
+ var self = this
+ , opts = options || {}
+ , iface = options.url
+
+ if (!iface) {
+ iface = url.format(opts)
+ }
+
+ debug('Listening to %s.', iface)
+
+ self._initSocket()
+ self._zrouter.bind(iface)
+}
+
+//
+// ## connect `connect(options)`
+//
+// ### Also `connectForRequests`
+//
+// Synchronously connects to a listening RequestEmitter. If **options.url** is provided, that URL will be used.
+// Otherwise, **options** will be formatted as a URL as defined by the core `url` module.
+//
+RequestHandler.prototype.connectForRequests = connect
+RequestHandler.prototype.connect = connect
+function connect(options) {
+ var self = this
+ , opts = options || {}
+ , iface = options.url
+
+ if (!iface) {
+ iface = url.format(opts)
+ }
+
+ debug('Connecting to %s.', iface)
+
+ self._initSocket()
+ self._zrouter.connect(iface)
+}
+
+//
+// ## close `close()`
+//
+// Synchonously releases the underlying resources, allowing the RequestHandler to be `connect`ed or `listen`ed again
+// freely.
+//
+// Unless the RequestHandler was configured with a `linger` period, all pending outgoing messages will be dropped.
+//
+RequestHandler.prototype.close = close
+function close() {
+ var self = this
+
+ if (!self._zrouter) {
+ return
+ }
+
+ debug('Closing.')
+
+ self._zrouter.close()
+ self._zrouter = null
+}
+
+//
+// ## _initSocket `_initSocket()`
+//
+// Internal use only.
+//
+// Creates the underlying networking resources.
+//
+RequestHandler.prototype._initSocket = _initSocket
+function _initSocket() {
+ var self = this
+
+ if (self._zrouter) {
+ return
+ }
+
+ self._zrouter = new zmqstream.Socket({
+ type: zmqstream.Type.ROUTER
+ })
+
+ self._zrouter.set(zmqstream.Option.LINGER, self.linger)
+
+ self._zrouter.on('readable', function () {
+ self._handle()
+ })
+ self._handle()
+}
+
+//
+// ## _handle `_handle()`
+//
+// Internal use only.
+//
+// Polls the network for requests, re-emitting them locally for handling.
+//
+RequestHandler.prototype._handle = _handle
+function _handle() {
+ var self = this
+ , messages = null
+
+ // 1. If the router socket is currently closed, it cannot be read from. Otherwise, read from it.
+ if (!self._zrouter) {
+ return
+ }
+
+ messages = self._zrouter.read(100)
+
+ // 1. If there are no messages, we'll come back to this later. For now, just leave.
+ if (!messages) {
+ return
+ }
+
+ // 1. For each message, we want to emit a local event to be handled.
+ messages.forEach(function (envelope) {
+ var payload
+
+ // 1. If we don't have four message frames or the body isn't valid JSON, this didn't come from a
+ // RequestEmitter. It's safe to ignore.
+ if (envelope.length !== 4) {
+ return
+ }
+
+ try {
+ payload = JSON.parse(envelope.pop().toString('utf8'))
+ } catch (e) {
+ return
+ }
+
+ debug('Emitting %s with %s.', payload.name, JSON.stringify(payload.data))
+
+ self.emit(payload.name, payload.data, function (err, data) {
+ // 1. Once we receive a response, write back to the router this response.
+ var response = JSON.stringify({
+ err: err,
+ data: data
+ })
+
+ envelope.push(new Buffer(response))
+
+ debug('Handling response to %s with %s.', payload.name, response)
+
+ // 1. If `write` returns false, we're out of resources to send more messages, and need to throw an error.
+ if (!self._zrouter.write(envelope)) {
+ // TODO: Put some place safe.
+ self.emit('error', new Error('Too many responses'))
+ }
+ })
+ })
+
+ // 1. We may have more messages to receive, so try reading again soon.
+ process.nextTick(function () {
+ self._handle()
+ })
+}
+
+module.exports = RequestHandler
View
9 lib/request/index.js
@@ -0,0 +1,9 @@
+var Emitter = require('./emitter')
+ , Handler = require('./handler')
+
+module.exports = {
+ Emitter: Emitter,
+ Handler: Handler,
+ createEmitter: Emitter.createEmitter,
+ createHandler: Handler.createHandler
+}
View
94 lib/synchronization/emitter.js
@@ -0,0 +1,94 @@
+var zmqstream = require('zmq-stream')
+ , RequestEmitter = require('../request/emitter')
+ , util = require('util')
+
+//
+// # SynchronizationEmitter
+//
+// In addition to the options available to a RequestEmitter, SynchronizationEmitters have the following options:
+//
+// * `autoUpdate`: A value, expressed as a Boolean, that indicates whether or not the SynchronizationEmitter should
+// cache and automatically update any value retrieved via `get`. If true, subsequent calls to `get` will not result in
+// a request, but will respond with the cached value at some point in the future. Defaults to true.
+//
+function SynchronizationEmitter(obj) {
+ if (!(this instanceof SynchronizationEmitter)) {
+ return new SynchronizationEmitter(obj)
+ }
+
+ obj = obj || {}
+
+ RequestEmitter.call(this, obj)
+
+ this._zsub = new zmqstream.Socket({
+ type: zmqstream.Type.SUB
+ })
+}
+SynchronizationEmitter.createEmitter = SynchronizationEmitter
+util.inherits(SynchronizationEmitter, RequestEmitter)
+
+//
+// ## get `get(options, callback)`
+//
+// Asynchronously attempts to retrieve **options.key**. If successful, **callback** receives the current value.
+// Otherwise, **callback** will receive the error. Uses the same timeout/retry/error behaviour as `emit`.
+//
+// The **callback** is guaranteed to be called asynchronously, even if the value is already present as set by the
+// `autoUpdate` option.
+//
+SynchronizationEmitter.prototype.get = get
+function get(options, callback) {
+ var self = this
+}
+
+//
+// ## set `set(options, callback)`
+//
+// Asynchronously attempts to update **options.key** to **options.value**. If successful, **callback** receives the
+// current value. Otherwise, **callback** will receive the error. Uses the same timeout/retry/error behaviour as `emit`.
+//
+SynchronizationEmitter.prototype.set = set
+function set(options, callback) {
+ var self = this
+}
+
+//
+// ## on `on(name, handler)`
+//
+// ### Also: `once`, `addListener`, `removeAllListeners`, etc.
+//
+// Registers **handler** as an event handler to remote (broadcast) events named **name** received from
+// SynchronizationHandlers over the Shuttle mesh.
+//
+// Behaves identically to EventEmitter.on, but only for remote events.
+//
+// Returns the SynchronizationEmitter.
+//
+SynchronizationEmitter.prototype.on = on
+function on(name, handler) {
+ var self = this
+}
+
+//
+// ## listenForBroadcasts `listenForBroadcasts(options)`
+//
+// Synchronously listens for broadcast connections from SynchronizationHandlers. If **options.url** is provided, that
+// URL will be used. Otherwise, **options** will be formatted as a URL as defined by the core `url` module.
+//
+SynchronizationEmitter.prototype.listenForBroadcasts = listenForBroadcasts
+function listenForBroadcasts(options) {
+ var self = this
+}
+
+//
+// ## connectForBroadcasts `connectForBroadcasts(options)`
+//
+// Synchronously connects to a listening SynchronizationHandler's broadcast socket. If **options.url** is provided, that
+// URL will be used. Otherwise, **options** will be formatted as a URL as defined by the core `url` module.
+//
+SynchronizationEmitter.prototype.connectForBroadcasts = connectForBroadcasts
+function connectForBroadcasts(options) {
+ var self = this
+}
+
+module.exports = SynchronizationEmitter
View
39 lib/synchronization/handler.js
@@ -0,0 +1,39 @@
+var zmqstream = require('zmq-stream')
+ , RequestHandler = require('../request/handler')
+ , SynchronizationEmitter = require('./emitter')
+ , util = require('util')
+
+//
+// # SynchronizationHandler
+//
+// In addition to the options available to a SynchronizationEmitter or a RequestHandler, SynchronizationHandlers have
+// the following options:
+//
+function SynchronizationHandler(obj) {
+ if (!(this instanceof SynchronizationHandler)) {
+ return new SynchronizationHandler(obj)
+ }
+
+ obj = obj || {}
+
+ RequestHandler.call(this, obj)
+
+ this._zpub = new zmqstream.Socket({
+ type: zmqstream.Type.PUB
+ })
+}
+SynchronizationHandler.createHandler = SynchronizationHandler
+util.inherits(SynchronizationHandler, RequestHandler)
+
+//
+// ## emit `emit(name, data)`
+//
+// Emits an event (broadcast) named **name**, along with **data**, over the Shuttle mesh to interested
+// SynchronizationEmitters.
+//
+SynchronizationHandler.prototype.emit = emit
+function emit(name, data) {
+ var self = this
+}
+
+module.exports = SynchronizationHandler
View
9 lib/synchronization/index.js
@@ -0,0 +1,9 @@
+var Emitter = require('./emitter')
+ , Handler = require('./handler')
+
+module.exports = {
+ Emitter: Emitter,
+ Handler: Handler,
+ createEmitter: Emitter.createEmitter,
+ createHandler: Handler.createHandler
+}
View
6 package.json
@@ -4,11 +4,11 @@
"description": "A massively-distributable, service-oriented architecture with all the flexibility of Node.",
"main": "index.js",
"dependencies": {
- "msgpack3": "~0.1.2",
- "zmq": "https://github.com/SchoonologyRRL/zeromq.node/tarball/patch-1"
+ "zmq-stream": "~0.2.3",
+ "mi": "~1.0.0",
+ "debug": "~0.7.2"
},
"devDependencies": {
- "stepdown": "~0.2.12",
"mocha": "~1.6.0",
"chai": "~1.3.0"
},
View
68 test/shuttle.js
@@ -1,14 +1,71 @@
/*global describe:true, it:true, before:true, after:true, beforeEach:true, afterEach:true */
+var shuttle = require('../')
+ , expect = require('chai').expect
+
+function generateTestUrl() {
+ return 'ipc:///tmp/' + Math.random().toString().slice(2)
+}
+
+describe('Shuttle', function () {
+ describe('Request', function () {
+ beforeEach(function () {
+ this.emitter = shuttle.createRequestEmitter()
+ this.handler = shuttle.createRequestHandler()
+ this.url = generateTestUrl()
+
+ this.handler.listenForRequests({
+ url: this.url
+ })
+ this.emitter.connectForRequests({
+ url: this.url
+ })
+ })
+
+ afterEach(function () {
+ this.emitter.close()
+ this.handler.close()
+ })
+
+ it('should work', function (done) {
+ this.handler.on('echo', function (data, callback) {
+ callback(null, data)
+ })
+
+ this.emitter.emit('echo', {
+ test: true
+ }, function (err, response) {
+ expect(response).to.have.property('test', true)
+ done(err)
+ })
+ })
+ })
+
+ describe('Synchronization', function () {
+ it('should work')
+ })
+})
+
+/*
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
var fork = require('child_process').fork,
path = require('path'),
- expect = require('chai').expect,
stepdown = require('stepdown'),
shuttle = require('../');
-function generateTestUrl() {
- return '/tmp/' + Math.random().toString().slice(2);
-}
-
function startChild(name, env) {
return fork(path.join(__dirname, 'fixtures', name), [], {
silent: true,
@@ -191,3 +248,4 @@ describe('Shuttle', function () {
it('should support Prosumer-Router-Prosumer', generateEchoTest('prosumer', 'prosumer'));
});
});
+*/

0 comments on commit 0900268

Please sign in to comment.
Something went wrong with that request. Please try again.