Permalink
Browse files

fixed timing issue on eventstorage initialization

  • Loading branch information...
1 parent 1833208 commit 23451a8c9150ca59f315e28eba6a2d17df5b32d3 Adriano Raiano committed Feb 24, 2013
Showing with 119 additions and 121 deletions.
  1. +9 −1 README.md
  2. +108 −118 lib/domain.js
  3. +2 −2 package.json
View
@@ -74,9 +74,17 @@ It can be very useful as domain component if you work with (d)ddd, cqrs, eventde
See [tests](https://github.com/adrai/node-cqrs-domain/tree/master/test) for detailed information...
+# Release Notes
+
+## v0.3.8
+
+- updated eventstore package
+- optimized initialization
+
+
# License
-Copyright (c) 2012 Adriano Raiano
+Copyright (c) 2013 Adriano Raiano
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
View
@@ -75,135 +75,125 @@ module.exports = domain = _.extend(new EventEmitter2({
// initialize the hub by passing the function that gets the command id from the event
hub.init(newGetCommandId || getCommandId);
- var es = eventStore.createStore({ publishingInterval: options.publishingInterval, forkDispatching: options.forkEventDispatching }),
- storage;
-
- var initEventStore = function() {
- es.configure(function() {
- this.use({ publish: publish });
- if (storage) {
- this.use(storage);
- }
- }).start();
- };
-
- if (options.eventStore.type !== 'inMemory') {
- storage = require('eventstore.' + options.eventStore.type).createStorage(options.eventStore, function(err) {
- initEventStore();
- });
- } else {
- initEventStore();
- }
+ var es = eventStore.createStore({ publishingInterval: options.publishingInterval, forkDispatching: options.forkEventDispatching });
+ es.configure(function() {
+ this.use({ publish: publish });
+ if (options.eventStore.type !== 'inMemory') {
+ this.use(require('eventstore.' + options.eventStore.type).createStorage(options.eventStore));
+ }
+ }).start(function(err) {
- eventEmitter.on('commandRejected', function(cmd, reason) {
- publish(createCommandRejectedEvent(cmd, reason));
- });
+ eventEmitter.on('commandRejected', function(cmd, reason) {
+ publish(createCommandRejectedEvent(cmd, reason));
+ });
- eventEmitter.on('command:*', function(cmd) {
- if (!cmd.id) {
- es.getNewIdFromStorage(function(err, id) {
- cmd.id = id;
+ eventEmitter.on('command:*', function(cmd) {
+ if (!cmd.id) {
+ es.getNewIdFromStorage(function(err, id) {
+ cmd.id = id;
+ domain.handle(cmd);
+ });
+ } else {
domain.handle(cmd);
- });
- } else {
- domain.handle(cmd);
- }
- });
-
- async.parallel({
- aggregates: function(callback) {
- aggregateLoader.load(options.aggregatesPath, callback);
- },
- commandHandlers: function(callback) {
- commandHandlerLoader.configure(function() {
- this.use(es);
- });
- commandHandlerLoader.load(options.commandHandlersPath, options, callback);
- },
-
- sagas: function(callback) {
- sagaLoader.load(options.sagasPath, callback);
- },
- sagaHandlers: function(callback) {
- async.series([
- function(callback) {
- repository.init(options.repository, callback);
- },
- function(callback) {
- sagaHandlerLoader.configure(function() {
- this.use(repository);
- });
- callback(null);
- }
- ], function(err) {
- sagaHandlerLoader.load(options.sagaHandlersPath, callback);
- });
- }
- }, function(err, results) {
-
- var aggregates = results.aggregates
- , commandHandlers = results.commandHandlers;
-
- for(var i = 0, len = commandHandlers.length; i < len; i++) {
- var commandHandler = commandHandlers[i];
- commandHandler.Aggregate = aggregates[commandHandler.aggregate];
- commandHandler.Command = Command;
- }
+ }
+ });
- var sagas = results.sagas
- , sagaHandlers = results.sagaHandlers;
+ async.parallel({
+ aggregates: function(callback) {
+ aggregateLoader.load(options.aggregatesPath, callback);
+ },
+ commandHandlers: function(callback) {
+ commandHandlerLoader.configure(function() {
+ this.use(es);
+ });
+ commandHandlerLoader.load(options.commandHandlersPath, options, callback);
+ },
+
+ sagas: function(callback) {
+ sagaLoader.load(options.sagasPath, callback);
+ },
+ sagaHandlers: function(callback) {
+ async.series([
+ function(callback) {
+ repository.init(options.repository, callback);
+ },
+ function(callback) {
+ sagaHandlerLoader.configure(function() {
+ this.use(repository);
+ });
+ callback(null);
+ }
+ ], function(err) {
+ sagaHandlerLoader.load(options.sagaHandlersPath, callback);
+ });
+ }
+ }, function(err, results) {
+
+ var aggregates = results.aggregates
+ , commandHandlers = results.commandHandlers;
+
+ for(var i = 0, len = commandHandlers.length; i < len; i++) {
+ var commandHandler = commandHandlers[i];
+ commandHandler.Aggregate = aggregates[commandHandler.aggregate];
+ commandHandler.Command = Command;
+ }
- for(var j = 0, lenj = sagaHandlers.length; j < lenj; j++) {
- var sagaHandler = sagaHandlers[j];
- sagaHandler.Saga = sagas[sagaHandler.saga];
- sagaHandler.initialize();
- }
+ var sagas = results.sagas
+ , sagaHandlers = results.sagaHandlers;
- queue.connect(options.commandQueue, function(err, commandQueue) {
- commandDispatcher.configure(function() {
- this.use(commandQueue);
- });
+ for(var j = 0, lenj = sagaHandlers.length; j < lenj; j++) {
+ var sagaHandler = sagaHandlers[j];
+ sagaHandler.Saga = sagas[sagaHandler.saga];
+ sagaHandler.initialize();
+ }
- var worker = {};
-
- // starts the worker by using an interval loop
- worker.start = function() {
- worker.process = setInterval(function() {
+ queue.connect(options.commandQueue, function(err, commandQueue) {
+ commandDispatcher.configure(function() {
+ this.use(commandQueue);
+ });
- // if the last loop is still in progress leave this loop
- if (worker.isRunning)
- return;
+ var worker = {};
+
+ // starts the worker by using an interval loop
+ worker.start = function() {
+ worker.process = setInterval(function() {
+
+ // if the last loop is still in progress leave this loop
+ if (worker.isRunning)
+ return;
+
+ worker.isRunning = true;
- worker.isRunning = true;
-
- (function next(e) {
-
- // dipatch one command in queue and call the _next_ callback, which
- // will call _process_ for the next command in queue.
- var process = function(cmdPointer, next) {
+ (function next(e) {
- // Publish it now...
- commandDispatcher.dispatch(cmdPointer.command, function(err) {
- if (cmdPointer.callback) cmdPointer.callback(null);
- next();
- });
- };
-
- // serial _process_ all events in queue
- if (!e && commandBuffer.length) {
- process(commandBuffer.shift(), next);
- }
- })();
-
- worker.isRunning = false;
-
- }, 10);
- };
-
- // fire things off
- worker.start();
+ // dipatch one command in queue and call the _next_ callback, which
+ // will call _process_ for the next command in queue.
+ var process = function(cmdPointer, next) {
+
+ // Publish it now...
+ commandDispatcher.dispatch(cmdPointer.command, function(err) {
+ if (cmdPointer.callback) cmdPointer.callback(null);
+ next();
+ });
+ };
+
+ // serial _process_ all events in queue
+ if (!e && commandBuffer.length) {
+ process(commandBuffer.shift(), next);
+ }
+ })();
+
+ worker.isRunning = false;
+
+ }, 10);
+ };
+
+ // fire things off
+ worker.start();
+
+ commandDispatcher.initialize({}, callback);
+ });
- commandDispatcher.initialize({}, callback);
});
});
View
@@ -1,7 +1,7 @@
{
"author": "adrai",
"name": "cqrs-domain",
- "version": "0.3.7",
+ "version": "0.3.8",
"private": false,
"main": "index.js",
"engines": {
@@ -15,7 +15,7 @@
"lodash": ">= 0.10.x",
"eventemitter2": ">= 0.4.8",
"node-queue": ">= 0.2.3",
- "eventstore": ">= 0.6.1",
+ "eventstore": ">= 0.6.2",
"viewmodel": ">= 0.4.10",
"nodeEventedCommand": ">= 0.1.2",
"retry": ">= 0.6.0"

0 comments on commit 23451a8

Please sign in to comment.