Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

extended eventdispatcher to be forked as child process

  • Loading branch information...
commit 0bff2a5e0574c6fff29d8bebd91c40ad6be3e1b6 1 parent 01c0826
@adrai authored
View
80 lib/eventDispatcher.js
@@ -7,39 +7,63 @@
// implementation.
// __Example:__
//
-// var eventDispatcher = require('./eventDispatcher')
-// var dispatcher = eventDispatcher.create(this);
-//
-// // start the instance
-// dispatcher.start()
+// dispatcher = new EventDispatcher(options);
+// dispatcher.useLogger(logger)
+// .usePublisher(publisher)
+// .useStorage(storage)
+// .start();
-var root = this
- , dispatcher
- , EventDispatcher;
-
-if (typeof exports !== 'undefined') {
- dispatcher = exports;
-} else {
- dispatcher = root.eventDispatcher = {};
-}
-
-dispatcher.VERSION = '0.3.0';
-
-// Create an instance by passing in the eventstore
-dispatcher.create = function(store) {
- return new EventDispatcher(store);
-};
+var EventDispatcher;
// ## EventDispatcher
-EventDispatcher = function(store) {
- this.publishingInterval = store.options.publishingInterval || 100;
- this.storage = store.storage;
- this.publisher = store.publisher;
- this.logger = store.logger;
+EventDispatcher = function(options) {
+ this.options = options;
+ this.publishingInterval = this.options.publishingInterval || 100;
+ this.publisher = null;
+ this.logger = null;
this.undispatchedEventsQueue = [];
};
EventDispatcher.prototype = {
+
+ // __useLogger:__ use this function to to inject the logger.
+ //
+ // `eventDispatcher.useLogger(logger)`
+ //
+ // - __logger:__ the logger that should be injected
+ useLogger: function(logger) {
+
+ this.logger = logger;
+
+ return this;
+
+ },
+
+ // __usePublisher:__ use this function to to inject the publisher.
+ //
+ // `eventDispatcher.usePublisher(publisher)`
+ //
+ // - __publisher:__ the publisher that should be injected
+ usePublisher: function(publisher) {
+
+ this.publisher = publisher;
+
+ return this;
+
+ },
+
+ // __useStorage:__ use this function to to inject the storage.
+ //
+ // `eventDispatcher.useStorage(storage)`
+ //
+ // - __storageModule:__ the storage that should be injected
+ useStorage: function(storage) {
+
+ this.storage = storage;
+
+ return this;
+
+ },
// __log:__ Just a helper function to shorten logging calls
log: function(msg, level) {
@@ -134,4 +158,6 @@ EventDispatcher.prototype = {
worker.start();
});
}
-};
+};
+
+module.exports = EventDispatcher;
View
71 lib/eventDispatcherProcess.js
@@ -0,0 +1,71 @@
+// lib/eventDispatcherProcess.js v0.3.0
+// (c) 2012 Kaba AG, MIC AWM; under MIT License
+// (by) Jan Muehlemann (jamuhl)
+// , Adriano Raiano (adrai)
+
+// This is the event dispatcher process that will be called as separate process...
+// __Example:__
+//
+// var self = this;
+// this.dispatcher = cp.fork(__dirname + '/eventDispatcherProcess.js');
+// this.dispatcher.send({ action: 'use', payload: { options: this.options, storageModulePath: this.storage.filename } });
+// this.dispatcher.on('message', function(m) {
+// if (m.action === 'publish') {
+// self.publisher.publish(JSON.deserialize(m.payload));
+// }
+// });
+// this.dispatcher.send({ action: 'start' });
+//
+// // create a handle function on fork
+// this.dispatcher.addUndispatchedEvents = function(evts) {
+// self.dispatcher.send({ action: 'addUndispatchedEvents', payload: JSON.stringify(evts) });
+// };
+
+var EventDispatcher = require('./eventDispatcher')
+ , util = require('./util')
+ , options = null
+ , logger = null
+ , publisher = null
+ , storageModulePath = null
+ , eventDispatcher = null;
+
+// when receiving a message from parent process...
+process.on('message', function(m) {
+ if (m.action === 'use') {
+
+ if (m.payload.options) {
+ options = m.payload.options;
+ }
+ if (m.payload.storageModulePath) {
+ storageModulePath = m.payload.storageModulePath;
+ }
+
+ } else if (m.action === 'start') {
+
+ var publisher = {
+ publish: function(msg) {
+ process.send({ action: 'publish', payload: JSON.stringify(msg) });
+ }
+ };
+
+ // prepare event dispatcher
+ eventDispatcher = new EventDispatcher(options);
+ eventDispatcher.usePublisher(publisher);
+
+ require(storageModulePath).createStorage(
+ options.repository,
+ function(err, storage) {
+ if (err) {
+ console.log(err);
+ } else {
+ eventDispatcher.useStorage(storage)
+ .start();
+ process.send({ action: 'start' });
+ }
+ }
+ );
+
+ } else if (m.action === 'addUndispatchedEvents') {
+ eventDispatcher.addUndispatchedEvents(JSON.deserialize(m.payload));
+ }
+});
View
41 lib/eventStore.js
@@ -24,10 +24,11 @@
// // start the instance
// es.start()
-var eventDispatcher = require('./eventDispatcher')
+var EventDispatcher = require('./eventDispatcher')
, interfaces = require('./interfaces')
, util = require('./util')
, async = require('async')
+ , cp = require('child_process')
, root = this
, eventStore
, Store;
@@ -74,9 +75,41 @@ Store.prototype = {
if (!this.publisher) {
this.use(require('./publisher/fakePublisher').createPublisher());
}
-
- this.dispatcher = eventDispatcher.create(this);
- this.dispatcher.start();
+
+
+ if (this.storage.filename && this.storage.options) {
+
+ // if fork enabled, start event dispatcher as child process...
+ if (this.logger) {
+ this.logger.info('Start event dispatcher as child process!');
+ }
+ var self = this;
+ this.dispatcher = cp.fork(__dirname + '/eventDispatcherProcess.js');
+ this.dispatcher.send({ action: 'use', payload: { options: this.options, storageModulePath: this.storage.filename } });
+ this.dispatcher.on('message', function(m) {
+ if (m.action === 'publish') {
+ self.publisher.publish(JSON.deserialize(m.payload));
+ }
+ });
+ this.dispatcher.send({ action: 'start' });
+
+ // create a handle function on fork
+ this.dispatcher.addUndispatchedEvents = function(evts) {
+ self.dispatcher.send({ action: 'addUndispatchedEvents', payload: JSON.stringify(evts) });
+ };
+ } else {
+
+ // else, start event dispatcher in same process...
+ if (this.logger) {
+ this.logger.info('Start event dispatcher in same process!');
+ }
+ this.dispatcher = new EventDispatcher(this.options);
+ this.dispatcher.useLogger(this.logger)
+ .usePublisher(this.publisher)
+ .useStorage(this.storage)
+ .start();
+ }
+
},
// __configure:__ configure your eventstore to use wished modules
View
144 lib/json.js
@@ -0,0 +1,144 @@
+// EXTEND JSON
+
+// JSON-Serialize.js 1.1.0
+// (c) 2011 Kevin Malakoff.
+// JSON-Serialize is freely distributable under the MIT license.
+// https://github.com/kmalakoff/json-serialize
+//
+
+(function() {
+
+this.JSON || (this.JSON = {}); // hopefully JSON is defined!
+JSON.SERIALIZE_VERSION = '1.1.0';
+
+////////////////HELPERS - BEGIN//////////////////
+var isEmpty = function(obj) {
+ for(var key in obj) {
+ // a property, not a function
+ if (obj.hasOwnProperty(key)) return false;
+ }
+ return true;
+};
+
+var isArray = function(obj) {
+ return obj.constructor == Array;
+};
+
+var stringHasISO8601DateSignature = function(string) {
+ return (string.length>=19) && (string[4] == '-') && (string[7] == '-') && (string[10] == 'T') && (string[string.length-1] == 'Z');
+};
+
+var keyPath = function(object, keypath) {
+ var keypath_components = keypath.split('.');
+ if (keypath_components.length===1) return ((object instanceof Object) && (object.hasOwnProperty(keypath))) ? object[keypath] : void 0; // optimization
+ var key, current_object = object;
+ for (var i = 0, l = keypath_components.length; i < l;) {
+ key = keypath_components[i];
+ if (!(key in current_object)) break;
+ if (++i === l) return current_object[key];
+ current_object = current_object[key];
+ if (!current_object || !(current_object instanceof Object)) break;
+ }
+ return void 0;
+};
+////////////////HELPERS - END//////////////////
+
+// Convert an array of objects or an object to JSON using the convention that if an
+// object has a toJSON function, it will use it rather than the raw object.
+JSON.serialize = function(obj, options) {
+ // Simple type - exit quickly
+ if (!obj || (typeof(obj)!=='object')) return obj;
+
+ // use toJSON function - Note: Dates have a built in toJSON that converts them to ISO8601 UTC ("Z") strings
+ if(obj.toJSON) return obj.toJSON();
+ else if (isEmpty(obj)) return null;
+
+ // serialize an array
+ var result;
+ if (isArray(obj)) {
+ result = [];
+ for (var i=0, l=obj.length; i<l;i++) { result.push(JSON.serialize(obj[i])); }
+ return result;
+ }
+
+ // serialize the properties
+ else {
+ result = {};
+ for (var key in obj) { result[key] = JSON.serialize(obj[key]); }
+ return result;
+ }
+};
+
+// Deserialized an array of JSON objects or each object individually using the following conventions:
+// 1) if JSON has a recognized type identifier ('\_type' as default), it will try to create an instance.
+// 2) if the class refered to by the type identifier has a fromJSON function, it will try to create an instance.
+// <br/>**Options:**<br/>
+//* `skip_type_field` - skip a type check. Useful for if your model is already deserialized and you want to deserialize your properties. See Backbone.Articulation for an example.
+//* `skip_dates` - skip the automatic Date conversion check from ISO8601 string format. Useful if you want to keep your dates in string format.
+// <br/>**Global settings:**<br/>
+//* `JSON.deserialize.TYPE_FIELD` - the field key in the serialized JSON that is used for constructor lookup.<br/>
+//* `JSON.deserialize.NAMESPACE_ROOTS` - the array of roots that are used to find the constructor. Useful for reducing global namespace pollution<br/>
+JSON.deserialize = function(json, options) {
+ var json_type = typeof(json);
+
+ // special checks for strings
+ if (json_type==='string') {
+ // The object is still a JSON string, convert to JSON
+ if (json.length && ((json[0] === '{') || (json[0] === '['))) {
+ try { var json_as_JSON = JSON.parse(json); if (json_as_JSON) json = json_as_JSON; json_type = typeof(json); }
+ catch (e) {throw new TypeError("Unable to parse JSON: " + json);}
+ }
+ // the object looks like a Date serialized to ISO8601 UTC ("Z") format, try automatically converting
+ else if (!(options && options.skip_dates) && stringHasISO8601DateSignature(json)) {
+ try { var date = new Date(json); if (date) return date; }
+ catch (e) {}
+ }
+ }
+
+ // Simple type - exit quickly
+ if ((json_type!=='object') || isEmpty(json)) return json;
+
+ // Parse an array
+ var result;
+ if (isArray(json)) {
+ result = [];
+ for (var i=0, l=json.length; i<l;i++) { result.push(JSON.deserialize(json[i])); }
+ return result;
+ }
+
+ // Parse the properties individually
+ else if ((options && options.skip_type_field) || !json.hasOwnProperty(JSON.deserialize.TYPE_FIELD)) {
+ result = {};
+ for (var key in json) { result[key] = JSON.deserialize(json[key]); }
+ return result;
+ }
+
+ // Find and use the fromJSON function
+ else
+ {
+ var type = json[JSON.deserialize.TYPE_FIELD];
+ var root, constructor_or_root, instance;
+
+ // Try searching in the available namespaces
+ for (var j=0, k=JSON.deserialize.NAMESPACE_ROOTS.length; j<k;j++) {
+ root = JSON.deserialize.NAMESPACE_ROOTS[j];
+ constructor_or_root = keyPath(root, type);
+ if (!constructor_or_root) continue;
+
+ // class/root parse function
+ if (constructor_or_root.fromJSON) return constructor_or_root.fromJSON(json);
+ // instance parse function (Backbone.Model and Backbone.Collection style)
+ else if (constructor_or_root.prototype && constructor_or_root.prototype.parse) {
+ instance = new constructor_or_root();
+ if (instance.set) return instance.set(instance.parse(json));
+ else return instance.parse(json);
+ }
+ }
+
+ return null;
+ }
+};
+
+JSON.deserialize.TYPE_FIELD = '_type';
+JSON.deserialize.NAMESPACE_ROOTS = [this];
+})();
View
5 lib/storage/inMemory/storage.js
@@ -25,9 +25,14 @@ inMemoryStorage.createStorage = function(options, callback) {
// ## inMemory storage
Storage = function(options, callback) {
+
+ this.filename = __filename;
+
if (typeof options === 'function')
callback = options;
+ this.options = options;
+
this.store = {};
this.snapshots = {};
View
2  lib/util.js
@@ -7,6 +7,8 @@
var root = this;
+require('./json');
+
var util;
if (typeof exports !== 'undefined') {
View
3  storage/couchDb/storage.js
@@ -32,6 +32,9 @@ couchDbStorage.createStorage = function(options, callback) {
// ## CouchDb storage
Storage = function(options, callback) {
+
+ this.filename = __filename;
+
if (typeof options === 'function')
callback = options;
View
3  storage/mongoDb/storage.js
@@ -33,6 +33,9 @@ mongoDbStorage.createStorage = function(options, callback) {
// ## MongoDb storage
Storage = function(options, callback) {
+
+ this.filename = __filename;
+
if (typeof options === 'function')
callback = options;
View
3  storage/redis/storage.js
@@ -31,6 +31,9 @@ redisStorage.createStorage = function(options, callback) {
// ## redis storage
Storage = function(options, callback) {
+
+ this.filename = __filename;
+
if (typeof options === 'function')
callback = options;
Please sign in to comment.
Something went wrong with that request. Please try again.