Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

first commit

  • Loading branch information...
commit e1a43b0b0bf4321d068dd37dec8a7495f91d1faa 0 parents
Adriano Raiano authored
Showing with 2,023 additions and 0 deletions.
  1. +13 −0 .gitignore
  2. +13 −0 .travis.yml
  3. +92 −0 README.markdown
  4. +15 −0 index.js
  5. +140 −0 lib/bases/aggregateBase.js
  6. +143 −0 lib/bases/commandHandlerBase.js
  7. +70 −0 lib/bases/sagaBase.js
  8. +138 −0 lib/bases/sagaHandlerBase.js
  9. +97 −0 lib/commandDispatcher.js
  10. +133 −0 lib/domain.js
  11. +7 −0 lib/eventEmitter.js
  12. +84 −0 lib/loaders/aggregateLoader.js
  13. +55 −0 lib/loaders/commandHandlerLoader.js
  14. +49 −0 lib/loaders/sagaHandlerLoader.js
  15. +24 −0 lib/loaders/sagaLoader.js
  16. +4 −0 lib/utils.js
  17. +51 −0 lib/utils/object.js
  18. +88 −0 lib/utils/path.js
  19. +19 −0 licence
  20. +44 −0 package.json
  21. +115 −0 test/aggregateTest.js
  22. +233 −0 test/commandDispatcherTest.js
  23. +72 −0 test/commandHandlerTest.js
  24. +35 −0 test/integration/aggregates/dummyAggregate.js
  25. +9 −0 test/integration/commandHandlers/dummyCommandHandler.js
  26. +179 −0 test/integration/domainTest.js
  27. +9 −0 test/integration/sagaHandlers/dummySagaHandler.js
  28. +9 −0 test/integration/sagas/dummySaga.js
  29. +1 −0  test/mocha.opts
  30. +82 −0 test/sagaHandlerTest.js
13 .gitignore
@@ -0,0 +1,13 @@
+$ cat .gitignore
+
+# Can ignore specific files
+.settings.xml
+.monitor
+
+# Use wildcards as well
+*~
+#*.swp
+
+# Can also ignore all directories and files in a directory.
+node_modules
+node_modules/**/*
13 .travis.yml
@@ -0,0 +1,13 @@
+before_script: "npm install --dev"
+
+language: node_js
+node_js:
+ - 0.6
+
+branches:
+ only:
+ - master
+
+notifications:
+ email:
+ - adriano@raiano.ch
92 README.markdown
@@ -0,0 +1,92 @@
+# Introduction
+
+[![Build Status](https://secure.travis-ci.org/adrai/node-cqrs-domain.png)](http://travis-ci.org/adrai/node-cqrs-domain)
+
+Node-cqrs-domain is a node.js module based on [nodeEventStore](http://jamuhl.github.com/nodeEventStore/) that.
+It can be very useful as domain component if you work with (d)ddd, cqrs, eventdenormalizer, host, etc.
+
+# Installation
+
+ $ npm install cqrs-domain
+
+# Usage
+
+## Initialization
+
+ var domain = require('cqrs-domain').domain;
+
+ domain.on('event', function(evt) {
+ // send to bus
+ });
+ domain.initialize({
+ commandHandlersPath: __dirname + '/commandHandlers',
+ aggregatesPath: __dirname + '/aggregates',
+ sagaHandlersPath: __dirname + '/sagaHandlers',
+ sagasPath: __dirname + '/sagas',
+ publishingInterval: 20,
+ snapshotThreshold: 10
+ }, function(err) {
+
+ });
+
+## Define aggregates...
+
+ var base = require('cqrs-domain').aggregateBase;
+
+ module.exports = base.extend({
+
+ changeDummy: function(data, callback) {
+ this.apply(this.toEvent('dummyChanged', data));
+
+ this.checkBusinessRules(callback);
+ },
+
+ destroyDummy: function(data, callback) {
+ this.apply(this.toEvent('dummyDestroyed', data));
+
+ this.checkBusinessRules(callback);
+ },
+
+ cancelDummy: function(data, callback) {
+ this.apply(this.toEvent('dummyCancelled', data));
+
+ this.checkBusinessRules(callback);
+ },
+
+ dummyChanged: function(data) {
+ this.set(data);
+ },
+
+ dummyCancelled: function(data) {
+ this.set('cancelled', true);
+ },
+
+ dummyDestroyed: function(data) {
+ this.set('destroyed', true);
+ }
+
+ });
+
+See [tests](https://github.com/adrai/node-cqrs-domain/tree/master/test) for detailed information...
+
+# License
+
+Copyright (c) 2012 Adriano Raiano
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
15 index.js
@@ -0,0 +1,15 @@
+var index;
+
+if (typeof module.exports !== 'undefined') {
+ index = module.exports;
+} else {
+ index = root.index = {};
+}
+
+index.VERSION = '0.0.1';
+
+index.domain = require('./lib/domain');
+index.aggregateBase = require('./lib/bases/aggregateBase');
+index.commandHandlerBase = require('./lib/bases/commandHandlerBase');
+index.sagaBase = require('./lib/bases/sagaBase');
+index.sagaHandlerBase = require('./lib/bases/sagaHandlerBase');
140 lib/bases/aggregateBase.js
@@ -0,0 +1,140 @@
+var utils = require('../utils')
+ , async = require('async')
+ , _ = require('underscore');
+
+var Aggregate = function(id) {
+ this.id = id;
+ this.uncommittedEvents = [];
+ this.attributes = { id: id, revision: 0, destroyed: false };
+};
+
+Aggregate.prototype = {
+
+ set: function(data) {
+ if (arguments.length === 2) {
+ this.attributes[arguments[0]] = arguments[1];
+ } else {
+ for(var m in data) {
+ this.attributes[m] = data[m];
+ }
+ }
+ },
+
+ get: function(attr) {
+ return this.attributes[attr];
+ },
+
+ toJSON: function() {
+ return _.clone(this.attributes);
+ },
+
+ toEvent: function(name, data) {
+ var event = {
+ event: name,
+ payload: data || {}
+ };
+
+ if (!event.payload.id) event.payload.id = this.id;
+
+ return event;
+ },
+
+ loadFromHistory: function(data, events) {
+ if (data) {
+ this.set(data);
+ }
+
+ if (events) {
+ this.apply(_.map(events, function(evt) {
+ evt.fromHistory = true;
+ return evt;
+ }));
+ }
+ },
+
+ apply: function(events, callback) {
+ var self = this;
+
+ if (!_.isArray(events)) {
+ events = [events];
+ }
+
+ var historyEvents = [];
+ var newEvents = [];
+ _.each(events, function(evt) {
+ if (evt.fromHistory) {
+ historyEvents.push(evt);
+ } else {
+ newEvents.push(evt);
+ }
+ });
+
+ _.each(historyEvents, function(evt) {
+ self[evt.event](evt.payload);
+ });
+
+ this.previousAttributes = this.toJSON();
+
+ _.each(newEvents, function(evt) {
+ self[evt.event](evt.payload);
+ evt.head = { revision: ++self.attributes.revision };
+ self.uncommittedEvents.push(evt);
+ });
+
+ if (callback) callback(null);
+
+ return;
+ },
+
+ validate: function(ruleName, data, callback) {
+ if(this.validationRules && this.validationRules[ruleName]) {
+ this.validationRules[ruleName].validate(data, function(err) {
+ if (err) {
+ var keys = _.toArray(err);
+ _.each(keys, function(key) {
+ key.type = 'validationRule';
+ });
+ callback(keys);
+ } else {
+ callback(null);
+ }
+ });
+ } else {
+ callback(null);
+ }
+ },
+
+ checkBusinessRules: function(callback) {
+ var self = this;
+ var changedAttributes = this.toJSON();
+ var keys = [];
+
+ if(!this.businessRules) return callback(null);
+
+ this.businessRules.forEach(function(rule, index) {
+ rule.call(self, changedAttributes, self.previousAttributes, function(ruleId, message) {
+ if (ruleId) {
+ if (!message) {
+ message = ruleId;
+ ruleId = arguments.callee.caller.name;
+ }
+ keys.push({ type: 'businessRule', ruleId: ruleId, message: message });
+ }
+ });
+
+ if (index === self.businessRules.length - 1) {
+ if (keys.length > 0) {
+ self.attributes = self.previousAttributes;
+ callback(keys);
+ } else {
+ callback(null);
+ }
+ }
+ });
+ }
+
+};
+
+Aggregate.extend = utils.extend;
+
+module.exports = Aggregate;
143 lib/bases/commandHandlerBase.js
@@ -0,0 +1,143 @@
+var eventEmitter = require('../eventEmitter')
+ , async = require('async')
+ , _ = require('underscore');
+
+var CommandHandler = {};
+CommandHandler.prototype = {
+
+ defaultHandle: function(id, cmd) {
+
+ var self = this;
+
+ async.waterfall([
+
+ // load aggregate
+ function(callback) {
+ self.loadAggregate(id, callback);
+ },
+
+ // reject command if aggregate has already been destroyed
+ function(aggregate, stream, callback) {
+ if(aggregate.get('destroyed')) {
+ return callback('Aggregate has already been destroyed!');
+ }
+
+ callback(null, aggregate, stream);
+ },
+
+ // check revision
+ function(aggregate, stream, callback) {
+ self.checkRevision(cmd, aggregate.get('revision'), function(err) {
+ callback(err, aggregate, stream);
+ });
+ },
+
+ // call validate command
+ function(aggregate, stream, callback) {
+ aggregate.validate(cmd.command, cmd.payload, function(err) {
+ callback(err, aggregate, stream);
+ });
+ },
+
+ // call command function on aggregate
+ function(aggregate, stream, callback) {
+ aggregate[cmd.command](cmd.payload, function(err) {
+ callback(err, aggregate, stream);
+ });
+ },
+
+ // commit the new events
+ function(aggregate, stream, callback) {
+ self.commit(cmd.id, aggregate.uncommittedEvents, stream);
+ callback(null);
+ }
+ ],
+
+ // finally publish commandRejected event on error
+ function(err) {
+ if (err) {
+ eventEmitter.emit('commandRejected', cmd, err);
+ }
+ eventEmitter.emit('handled:' + cmd.command, id, cmd);
+ });
+
+ },
+
+ commit: function(cmdId, uncommittedEvents, stream, callback) {
+
+ var self = this;
+
+ async.concat(uncommittedEvents, function(evt, next) {
+ evt.commandId = cmdId;
+
+ self.eventStore.getNewIdFromStorage(function(err, id) {
+ evt.id = id;
+ stream.addEvent(evt);
+ next(err);
+ });
+ },
+ // final
+ function(err) {
+ if (!err) {
+ stream.commit(function(err) {
+ // Check if snapshotting is needed.
+ if (stream.events.length >= self.options.snapshotThreshold) {
+ self.eventstore.createSnapshot(stream.streamId, stream.currentRevision(), aggregate.toJSON());
+ }
+ });
+ }
+ });
+ },
+
+ handle: function(id, cmd) {
+ if (this[cmd.command]) {
+ this[cmd.command](id, cmd);
+ } else {
+ this.defaultHandle(id, cmd);
+ }
+ },
+
+ loadAggregate: function(id, callback) {
+ var aggregate = new this.Aggregate(id);
+ this.eventStore.getFromSnapshot(id, function(err, snapshot, stream) {
+ async.map(stream.events, function(evt, next) {
+ next(null, evt.payload);
+ }, function(err, events) {
+ aggregate.loadFromHistory(snapshot, events);
+ callback(null, aggregate, stream);
+ });
+ });
+ },
+
+ checkRevision: function(cmd, aggRev, callback) {
+ if(!cmd.head || cmd.head.revision === undefined ||
+ (cmd.head && cmd.head.revision === aggRev)) {
+ return callback(null);
+ }
+
+ callback('Concurrency exception. Actual ' +
+ cmd.head.revision + ' expected ' + aggRev);
+ },
+
+ configure: function(fn) {
+ fn.call(this);
+ return this;
+ },
+
+ use: function(module) {
+ if (!module) return;
+
+ if (module.getFromSnapshot) {
+ this.eventStore = module;
+ }
+ }
+
+};
+
+module.exports = {
+
+ extend: function(obj) {
+ return _.extend(_.clone(CommandHandler.prototype), obj);
+ }
+
+};
70 lib/bases/sagaBase.js
@@ -0,0 +1,70 @@
+var utils = require('../utils')
+ , eventEmitter = require('../eventEmitter')
+ , _ = require('underscore');
+
+var Saga = function(id) {
+ this.id = id;
+ this.destroyed = false;
+ this.isInited = false;
+ this.uncommittedEvents = [];
+};
+
+Saga.prototype = {
+
+ loadFromHistory: function(events) {
+ if (events) {
+ this.transition(_.map(events, function(evt) {
+ evt.fromHistory = true;
+ return evt;
+ }));
+ }
+ },
+
+ destroy: function() {
+ this.destroyed = true;
+ },
+
+ sendCommand: function(cmd) {
+ if (!cmd.payload) cmd.payload = {};
+ if (!cmd.payload.id) cmd.payload.id = this.id;
+
+ if (this.isInited) {
+ //emit...
+ eventEmitter.emit('command:' + cmd.command, cmd);
+ }
+ },
+
+ transition: function(events) {
+ var self = this;
+
+ if (!_.isArray(events)) {
+ events = [events];
+ }
+
+ var historyEvents = [];
+ var newEvents = [];
+ _.each(events, function(evt) {
+ if (evt.fromHistory) {
+ historyEvents.push(evt);
+ } else {
+ newEvents.push(evt);
+ }
+ });
+
+ _.each(historyEvents, function(evt) {
+ self[evt.event](evt.payload);
+ });
+
+ _.each(newEvents, function(evt) {
+ self[evt.event](evt.payload);
+ self.uncommittedEvents.push(evt);
+ });
+
+ return;
+ }
+
+};
+
+Saga.extend = utils.extend;
+
+module.exports = Saga;
138 lib/bases/sagaHandlerBase.js
@@ -0,0 +1,138 @@
+var eventEmitter = require('../eventEmitter')
+ , async = require('async')
+ , _ = require('underscore');
+
+var SagaHandler = {};
+SagaHandler.prototype = {
+
+ sagas: {},
+
+ initialize: function() {
+
+ var self = this;
+
+ function initSaga(id, stream) {
+ var saga = new this.Saga(id);
+ async.map(stream.events, function(evt, next) {
+ next(null, evt.payload);
+ }, function(err, events) {
+ saga.loadFromHistory(events);
+ saga.isInited = true;
+ self.sagas[id] = { saga: saga, stream: stream };
+ });
+ }
+
+ this.eventStore.getEventStreams(this.saga, function(err, sagaStreams) {
+ if (!err) {
+ for(var i = 0, len = sagaStreams.length; i < len; i++) {
+ var stream = sagaStreams[i];
+ initSaga(stream.streamId, stream);
+ }
+ }
+ });
+
+ },
+
+ defaultHandle: function(id, evt) {
+
+ var self = this;
+
+ async.waterfall([
+
+ // load saga
+ function(callback) {
+ self.loadSaga(id, callback);
+ },
+
+ // transition the event
+ function(saga, stream, callback) {
+ saga.transition(evt);
+ callback(null, saga, stream);
+ },
+
+ // commit the uncommittedEvents
+ function(saga, stream, callback) {
+ if (saga.destroyed) {
+ stream.remove(function(err) {
+ callback(err, saga);
+ });
+ } else {
+ self.commit(saga.uncommittedEvents, stream, function(err) {
+ callback(err, saga);
+ });
+ }
+ }
+ ],
+
+ // finally
+ function(err, saga) {
+ if (!err) {
+ }
+ });
+
+ },
+
+ commit: function(uncommittedEvents, stream, callback) {
+ if (uncommittedEvents.length > 0) {
+ for(var i = 0, len = uncommittedEvents.length; i < len; i++) {
+ var evt = uncommittedEvents[i];
+ stream.addEvent(evt);
+ }
+ stream.commit(callback);
+ } else {
+ callback(null);
+ }
+ },
+
+ handle: function(evt) {
+ if (this[evt.event]) {
+ this[evt.event](evt);
+ } else {
+ this.defaultHandle(evt.payload.id, evt);
+ }
+ },
+
+ loadSaga: function(id, callback) {
+ var self = this;
+ var saga = this.sagas[id];
+ if (!saga) {
+ saga = { saga: new this.Saga(id) };
+ this.eventStore.getEventStream(id, function(err, stream) {
+ saga.stream = stream;
+ async.map(stream.events, function(evt, next) {
+ next(null, evt.payload);
+ }, function(err, events) {
+ saga.saga.loadFromHistory(events);
+ saga.saga.unemittedCommands = [];
+ saga.saga.isInited = true;
+ self.sagas[id] = saga;
+ callback(null, saga.saga, saga.stream);
+ });
+ });
+ } else {
+ callback(null, saga.saga, saga.stream);
+ }
+ },
+
+ configure: function(fn) {
+ fn.call(this);
+ return this;
+ },
+
+ use: function(module) {
+ if (!module) return;
+
+ if (module.getEventStreams) {
+ this.eventStore = module;
+ }
+ }
+
+};
+
+module.exports = {
+
+ extend: function(obj) {
+ return _.extend(_.clone(SagaHandler.prototype), obj);
+ }
+
+};
97 lib/commandDispatcher.js
@@ -0,0 +1,97 @@
+var async = require('async')
+ , eventEmitter = require('./eventEmitter');
+
+module.exports = {
+
+ configure: function(fn) {
+ fn.call(this);
+ return this;
+ },
+
+ use: function(module) {
+ if (!module) return;
+
+ if (module.push) {
+ this.commandQueue = module;
+ }
+ },
+
+ initialize: function(options, callback) {
+ var self = this;
+
+ if (!callback) callback = options;
+
+ eventEmitter.on('handled:*', function(id, command) {
+ self.commandQueue.remove(id, function() {});
+ });
+
+ this.reEmitCommands(callback);
+ },
+
+ reEmitCommands: function(callback) {
+ this.commandQueue.getAll(function(err, cmds) {
+ async.forEach(cmds, function(item, cb) {
+ eventEmitter.emit('handle:' + item.data.command, item.id, item.data);
+ cb();
+ }, callback);
+ });
+ },
+
+ dispatch: function(cmd, callback) {
+
+ var commandQueue = this.commandQueue;
+
+ if (!commandQueue) return callback(new Error('No commandQueue provided!'));
+
+ async.waterfall([
+
+ // ĥas no handlers
+ function(callback) {
+ var handlersCount = eventEmitter.listeners('handle:' + cmd.command).length;
+ if (handlersCount === 0) {
+ callback(new Error('no handler registered for this command'));
+ } else {
+ callback(null);
+ }
+ },
+
+ // use provided aggregateId or get one from commandQueue
+ function(callback){
+ if (cmd.payload && cmd.payload.id) {
+ callback(null, cmd.payload.id);
+ } else {
+ commandQueue.getNewId(function(err, newId) {
+ callback(err, newId);
+ });
+ }
+ },
+
+ // check if command can be dispatched
+ function(id, callback) {
+ commandQueue.isQueued(id, function(err) {
+ if (!err) callback(null, id);
+ else callback(new Error('Another command already queued for this aggregate *so richtig bös*'));
+ });
+ },
+
+ // emit command to commandEmitter
+ function(id, callback) {
+ commandQueue.push(id, cmd, function(err) {
+ if (!err) {
+ eventEmitter.emit('handle:' + cmd.command, id, cmd);
+ } else {
+ eventEmitter.emit('commandRejected', cmd, err.message);
+ }
+
+ callback(null);
+ });
+ }
+ ],
+
+ // final callback
+ function (err) {
+ if (callback) callback(err);
+ });
+ }
+
+};
133 lib/domain.js
@@ -0,0 +1,133 @@
+var async = require('async')
+ , EventEmitter2 = require('eventemitter2').EventEmitter2
+ , commandHandlerLoader = require('./loaders/commandHandlerLoader')
+ , aggregateLoader = require('./loaders/aggregateLoader')
+ , sagaHandlerLoader = require('./loaders/sagaHandlerLoader')
+ , sagaLoader = require('./loaders/sagaLoader')
+ , commandDispatcher = require('./commandDispatcher')
+ , eventStore = require('eventstore')
+ , _ = require('underscore')
+ , queue = require('node-queue')
+ , eventEmitter = require('./eventEmitter')
+ , domain;
+
+function createCommandRejectedEvent(cmd, reason) {
+ return {
+ event: 'commandRejected',
+ id: cmd.id + '_reject',
+ commandId: cmd.id,
+ payload: {
+ command: cmd,
+ reason: reason
+ },
+ head: cmd.head
+ };
+}
+
+function publish(msg) {
+ eventEmitter.emit('event:' + msg.event, msg);
+ domain.emit('event', msg);
+}
+
+module.exports = domain = _.extend(new EventEmitter2({
+ wildcard: true,
+ delimiter: ':',
+ maxListeners: 1000 // default would be 10!
+ }), {
+
+ initialize: function(options, callback) {
+
+ if(_.isFunction(options)) {
+ callback = options;
+ }
+
+ var defaults = {
+ publishingInterval: 200,
+ commandQueue: { type: 'inMemory', collectionName: 'commands' },
+ eventStore: { type: 'inMemory' }
+ };
+
+ _.defaults(options, defaults);
+
+ var es = eventStore.createStore({ publishingInterval: options.publishingInterval });
+ es.configure(function() {
+ this.use({ publish: publish });
+ if (options.eventStore.type !== 'inMemory') {
+ this.use(require('eventstore.' + options.eventStore.type).createStorage(options.eventStore));
+ }
+ }).start();
+
+ eventEmitter.on('commandRejected', function(cmd, reason) {
+ publish(createCommandRejectedEvent(cmd, reason));
+ });
+
+ eventEmitter.on('command:*', function(cmd) {
+ if (!cmd.id) {
+ es.getNewIdFromStorage(function(err, id) {
+ cmd.id = id;
+ domain.handle(cmd);
+ });
+ } else {
+ domain.handle(cmd);
+ }
+ });
+
+ async.parallel({
+ aggregates: function(callback) {
+ aggregateLoader.load(options.aggregatesPath, callback);
+ },
+ commandHandlers: function(callback) {
+ commandHandlerLoader.configure(function() {
+ this.use(es);
+ });
+ commandHandlerLoader.load(options.commandHandlersPath, options, callback);
+ },
+
+ sagas: function(callback) {
+ sagaLoader.load(options.sagasPath, callback);
+ },
+ sagaHandlers: function(callback) {
+ sagaHandlerLoader.configure(function() {
+ this.use(es);
+ });
+ sagaHandlerLoader.load(options.sagaHandlersPath, callback);
+ }
+ }, function(err, results) {
+
+ var aggregates = results.aggregates
+ , commandHandlers = results.commandHandlers;
+
+ for(var i = 0, len = commandHandlers.length; i < len; i++) {
+ var commandHandler = commandHandlers[i];
+ commandHandler.Aggregate = aggregates[commandHandler.aggregate];
+ }
+
+ var sagas = results.sagas
+ , sagaHandlers = results.sagaHandlers;
+
+ for(var j = 0, lenj = sagaHandlers.length; j < lenj; j++) {
+ var sagaHandler = sagaHandlers[j];
+ sagaHandler.Saga = sagas[sagaHandler.saga];
+ sagaHandler.initialize();
+ }
+
+ queue.connect(options.commandQueue, function(err, commandQueue) {
+ commandDispatcher.configure(function() {
+ this.use(commandQueue);
+ });
+ commandDispatcher.initialize({}, callback);
+ });
+
+ });
+
+ },
+
+ handle: function(cmd, callback) {
+
+ commandDispatcher.dispatch(cmd, function(err) {
+ if (callback) callback(null);
+ });
+
+ }
+
+});
7 lib/eventEmitter.js
@@ -0,0 +1,7 @@
+var EventEmitter2 = require('eventemitter2').EventEmitter2;
+
+module.exports = new EventEmitter2({
+ wildcard: true,
+ delimiter: ':',
+ maxListeners: 1000 // default would be 10!
+});
84 lib/loaders/aggregateLoader.js
@@ -0,0 +1,84 @@
+var path = require('path')
+ , async = require('async')
+ , utils = require('../utils')
+ , _ = require('underscore');
+
+module.exports = {
+
+ load: function(aggregatesPath, validationRulesPath, businessRulesPath, callback) {
+
+ if (arguments.length === 2) {
+ callback = validationRulesPath;
+ validationRulesPath = aggregatesPath + '/../validationRules';
+ businessRulesPath = aggregatesPath + '/../businessRules';
+ }
+
+ var aggregates = {};
+
+ if (!path.existsSync(aggregatesPath)){
+ return callback(null, aggregates);
+ }
+
+ async.parallel( {
+ cmdValRules: function(callback) {
+ var cmdValRules = {};
+
+ if (path.existsSync(validationRulesPath)) {
+ utils.path.dive(validationRulesPath, function(err, file) {
+ var validation = require(file);
+ var aggrName = validation.aggregate;
+ delete validation.aggregate;
+ cmdValRules[aggrName] = validation;
+ }, function() {
+ callback(null, cmdValRules);
+ });
+ } else {
+ callback(null, cmdValRules);
+ }
+ },
+
+ businessRules: function(callback) {
+ var businessRules = {};
+
+ if (path.existsSync(businessRulesPath)) {
+ utils.path.dive(businessRulesPath, function(err, file) {
+ var businessRule = require(file);
+
+ function iterator(rule) {
+ businessRules[m].push(rule);
+ }
+
+ for(var m in businessRule) {
+ businessRules[m] = businessRules[m] || [];
+ if (_.isArray(businessRule[m])) {
+ _.each(businessRule[m], iterator);
+ } else {
+ businessRules[m].push(businessRule[m]);
+ }
+ }
+ }, function() {
+ callback(null, businessRules);
+ });
+ } else {
+ callback(null, businessRules);
+ }
+ }
+ },
+
+ function(err, results) {
+ if (err) return callback(err);
+
+ utils.path.dive(aggregatesPath, function(err, file) {
+ var aggregate = require(file);
+ var name = path.basename(file, '.js');
+
+ aggregate.prototype.validationRules = results.cmdValRules[name];
+ aggregate.prototype.businessRules = results.businessRules[name];
+
+ aggregates[name] = aggregate;
+ }, function() {
+ callback(null, aggregates);
+ });
+ });
+ }
+};
55 lib/loaders/commandHandlerLoader.js
@@ -0,0 +1,55 @@
+var eventEmitter = require('../eventEmitter')
+ , path = require('path')
+ , utils = require('../utils')
+ , commandHandlerLoader;
+
+module.exports = commandHandlerLoader = {
+
+ configure: function(fn) {
+ fn.call(commandHandlerLoader);
+ return commandHandlerLoader;
+ },
+
+ use: function(module) {
+ if (!module) return;
+
+ if (module.getFromSnapshot) {
+ commandHandlerLoader.eventStore = module;
+ }
+ },
+
+ load: function(p, options, callback) {
+
+ if (!callback) {
+ callback = options;
+ options = { snapshotThreshold: 10 };
+ }
+
+ var commandHandlers = [];
+
+ if (!path.existsSync(p)){
+ return callback(null, commandHandlers);
+ }
+
+ utils.path.dive(p, function(err, file) {
+ var commandHandler = require(file);
+ commandHandler.options = options;
+ commandHandlers.push(commandHandler);
+
+ commandHandler.configure(function() {
+ commandHandler.use(commandHandlerLoader.eventStore);
+ });
+
+ function action(id, cmd) {
+ commandHandler.handle(id, cmd);
+ }
+
+ for(var i = 0, len = commandHandler.commands.length; i < len; i++) {
+ var cmdName = commandHandler.commands[i];
+ eventEmitter.on('handle:' + cmdName, action);
+ }
+ }, function() {
+ callback(null, commandHandlers);
+ });
+ }
+};
49 lib/loaders/sagaHandlerLoader.js
@@ -0,0 +1,49 @@
+var eventEmitter = require('../eventEmitter')
+ , path = require('path')
+ , utils = require('../utils')
+ , sagaHandlerLoader;
+
+module.exports = sagaHandlerLoader = {
+
+ configure: function(fn) {
+ fn.call(sagaHandlerLoader);
+ return sagaHandlerLoader;
+ },
+
+ use: function(module) {
+ if (!module) return;
+
+ if (module.getEventStreams) {
+ sagaHandlerLoader.eventStore = module;
+ }
+ },
+
+ load: function(p, callback) {
+
+ var sagaHandlers = [];
+
+ if (!path.existsSync(p)){
+ return callback(null, sagaHandlers);
+ }
+
+ utils.path.dive(p, function(err, file) {
+ var sagaHandler = require(file);
+ sagaHandlers.push(sagaHandler);
+
+ sagaHandler.configure(function() {
+ sagaHandler.use(sagaHandlerLoader.eventStore);
+ });
+
+ function action(evt) {
+ sagaHandler.handle(evt);
+ }
+
+ for(var i = 0, len = sagaHandler.events.length; i < len; i++) {
+ var evtName = sagaHandler.events[i];
+ eventEmitter.on('event:' + evtName, action);
+ }
+ }, function() {
+ callback(null, sagaHandlers);
+ });
+ }
+};
24 lib/loaders/sagaLoader.js
@@ -0,0 +1,24 @@
+var path = require('path')
+ , utils = require('../utils')
+ , sagaLoader;
+
+module.exports = sagaLoader = {
+
+ load: function(p, callback) {
+
+ var sagas = {};
+
+ if (!path.existsSync(p)){
+ return callback(null, sagas);
+ }
+
+ utils.path.dive(p, function(err, file) {
+ var saga = require(file);
+ var name = path.basename(file, '.js');
+
+ sagas[name] = saga;
+ }, function() {
+ callback(null, sagas);
+ });
+ }
+};
4 lib/utils.js
@@ -0,0 +1,4 @@
+module.exports = {
+ extend: require('./utils/object'),
+ path: require('./utils/path')
+};
51 lib/utils/object.js
@@ -0,0 +1,51 @@
+var _ = require('underscore');
+
+// https://gist.github.com/1714086
+// Shared empty constructor function to aid in prototype-chain creation.
+var ctor = function(){};
+
+// Helper function to correctly set up the prototype chain, for subclasses.
+// Similar to `goog.inherits`, but uses a hash of prototype properties and
+// class properties to be extended.
+var inherits = function(parent, protoProps, staticProps) {
+ var child;
+
+ // The constructor function for the new subclass is either defined by you
+ // (the "constructor" property in your `extend` definition), or defaulted
+ // by us to simply call the parent's constructor.
+ if (protoProps && protoProps.hasOwnProperty('constructor')) {
+ child = protoProps.constructor;
+ } else {
+ child = function(){ parent.apply(this, arguments); };
+ }
+
+ // Inherit class (static) properties from parent.
+ _.extend(child, parent);
+
+ // Set the prototype chain to inherit from `parent`, without calling
+ // `parent`'s constructor function.
+ ctor.prototype = parent.prototype;
+ child.prototype = new ctor();
+
+ // Add prototype properties (instance properties) to the subclass,
+ // if supplied.
+ if (protoProps) _.extend(child.prototype, protoProps);
+
+ // Add static properties to the constructor function, if supplied.
+ if (staticProps) _.extend(child, staticProps);
+
+ // Correctly set child's `prototype.constructor`.
+ child.prototype.constructor = child;
+
+ // Set a convenience property in case the parent's prototype is needed later.
+ child.__super__ = parent.prototype;
+
+ return child;
+};
+
+// The self-propagating extend function that Backbone classes use.
+module.exports = function (protoProps, classProps) {
+ var child = inherits(this, protoProps, classProps);
+ child.extend = this.extend;
+ return child;
+};
88 lib/utils/path.js
@@ -0,0 +1,88 @@
+var fs = require('fs');
+
+module.exports = {
+
+ dive: function(dir, opt, action, complete) {
+
+ // default options
+ var defaultOpt = {
+ all: false,
+ recursive: true,
+ directories: false
+ };
+
+ // check args
+ if (typeof opt == 'function') {
+ if (typeof action == 'undefined')
+ complete = function () {};
+ else
+ complete = action;
+
+ action = opt;
+ opt = { };
+ } else if (typeof complete == 'undefined')
+ complete = function () {};
+
+ // Assert that dir is a string
+ if (typeof dir != 'string')
+ dir = process.cwd();
+
+ opt.all = opt.all || defaultOpt.all;
+ opt.recursive = opt.recursive || defaultOpt.recursive;
+ opt.directories = opt.directories || defaultOpt.directories;
+
+ function dive(dir) {
+ // Read the directory
+ fs.readdir(dir, function(err, list) {
+ todo--;
+ // Return the error if something went wrong
+ if (err) return action(err);
+
+ // For every file in the list
+ list.forEach(function(file) {
+
+ if (opt.all || file[0] != '.') {
+ todo++;
+
+ // Full path of that file
+ var path = dir + '/' + file;
+ // Get the file's stats
+ fs.stat(path, function(err, stat) {
+ if (err) {
+ todo--;
+ return action(err);
+ }
+
+ // If the file is a directory
+ if (stat) {
+ if (stat.isDirectory()) {
+ // Call action if enabled for directories
+ if (opt.directories)
+ action(null, path, stat);
+
+ // Dive into the directory
+ if (opt.recursive) {
+ dive(path);
+ }
+ } else {
+ // Call the action
+ action(null, path, stat);
+
+ if (!--todo)
+ complete();
+ }
+ }
+ });
+ }
+ });
+ //empty directories
+ if(!list.length && !todo) {
+ complete();
+ }
+ });
+ }
+
+ var todo = 1;
+ dive(dir);
+ }
+};
19 licence
@@ -0,0 +1,19 @@
+Copyright (c) 2012 Adriano Raiano
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
44 package.json
@@ -0,0 +1,44 @@
+{
+ "author": "adrai",
+ "name": "cqrs-domain",
+ "version": "0.0.1",
+ "private": false,
+ "main": "index.js",
+ "engines": {
+ "node": ">=0.6.16"
+ },
+ "directories": {
+ "lib": "./lib"
+ },
+ "dependencies": {
+ "async": ">= 0.1.18",
+ "underscore": ">= 1.3.3",
+ "eventemitter2": ">= 0.4.8",
+ "node-queue": ">= 0.1.1",
+ "eventstore": ">= 0.6.0"
+ },
+ "devDependencies": {
+ "mocha": ">= 1.0.1",
+ "expect.js": ">= 0.1.2",
+ "rule-validator": ">= 0.1.0"
+ },
+ "description": "",
+ "keywords" : [
+ "cqrs",
+ "eventsourcing",
+ "ddd",
+ "dddd",
+ "command",
+ "event",
+ "eventdenormalizer",
+ "domain"
+ ],
+ "homepage": "https://github.com/adrai/node-cqrs-domain",
+ "repository": {
+ "type": "git",
+ "url": "git@github.com:adrai/node-cqrs-domain.git"
+ },
+ "scripts" : {
+ "test" : "mocha"
+ }
+}
115 test/aggregateTest.js
@@ -0,0 +1,115 @@
+var expect = require('expect.js')
+ , aggregateBase = require('../index').aggregateBase
+ , ruleBase = require('rule-validator');
+
+var valRules = ruleBase.extend(
+ {
+ doSomethingCommand: {
+ setMePass: {
+ type: 'string',
+ minLength: 1
+ },
+ setMeFails: {
+ type: 'string',
+ minLength: 100
+ }
+ }
+ }
+);
+
+var Aggregate = aggregateBase.extend({
+
+ doSomethingCommand: function(data, callback) {
+ this.apply(this.toEvent('SomethingDoneEvent', data), callback);
+ },
+
+ SomethingDoneEvent: function(data) {
+ this.set(data);
+ },
+
+ validationRules: valRules,
+
+ businessRules: [
+ function(changed, previous, callback) {
+ if (changed.a > changed.b) {
+ callback('b must be bigger than a!');
+ } else {
+ callback(null);
+ }
+ },
+ function(changed, previous, callback) {
+ if (changed.d > changed.c) {
+ callback('c must be bigger than d!');
+ } else {
+ callback(null);
+ }
+ },
+ function(changed, previous, callback) {
+ if (changed.a < previous.a) {
+ callback('a must be bigger than a before!');
+ } else {
+ callback(null);
+ }
+ }
+ ]
+});
+var aggregate = new Aggregate('id_1');
+aggregate.set({revision: 0});
+
+describe('Aggregation Base', function() {
+
+ describe('command validation', function() {
+
+ it('it should pass given valid data', function(done) {
+ aggregate.validate('doSomethingCommand', { setMePass: 'ok' }, function(err) {
+ expect(err).not.to.be.ok();
+ done();
+ });
+ });
+
+ it('it should fail given invalid data', function(done) {
+ aggregate.validate('doSomethingCommand', { setMeFails: 'nok' }, function(err) {
+ expect(err).to.be.ok();
+ done();
+ });
+ });
+
+ });
+
+ describe('business rules validation', function() {
+
+ it('it should pass given valid data', function(done) {
+ aggregate.doSomethingCommand( { a: 2, b: 7} );
+ aggregate.checkBusinessRules(function(err) {
+ expect(err).not.to.be.ok();
+ done();
+ });
+ });
+
+ it('it should fail given invalid data', function(done) {
+ aggregate.doSomethingCommand( { a: 8, b: 2} );
+ aggregate.checkBusinessRules(function(err) {
+ expect(err).to.be.ok();
+ done();
+ });
+ });
+
+ it('it should fail with second rule', function(done) {
+ aggregate.doSomethingCommand( { d: 8, c: 2} );
+ aggregate.checkBusinessRules(function(err) {
+ expect(err).to.be.ok();
+ done();
+ });
+ });
+
+ it('it should fail with rule that uses previous', function(done) {
+ aggregate.doSomethingCommand( { a: 1, b: 2} );
+ aggregate.checkBusinessRules(function(err) {
+ expect(err).to.be.ok();
+ done();
+ });
+ });
+
+ });
+
+});
233 test/commandDispatcherTest.js
@@ -0,0 +1,233 @@
+var expect = require('expect.js')
+ , async = require('async')
+ , commandDispatcher = require('../lib/commandDispatcher')
+ , queue = require('node-queue')
+ , commandQueue
+ , eventEmitter = require('../lib/eventEmitter');
+
+function cleanQueue(done) {
+ commandQueue.getAll(function(err, cmds) {
+ async.forEach(cmds, function(item, callback) {
+ commandQueue.remove(item.id, callback);
+ }, function(err) {
+ if (!err) done();
+ });
+ });
+}
+
+describe('commandDispatcher', function() {
+
+ before(function(done) {
+ queue.connect(function(err, cmdQueue) {
+ commandQueue = cmdQueue;
+ commandDispatcher.configure(function() {
+ this.use(commandQueue);
+ });
+ done();
+ });
+ });
+
+ afterEach(function(done) {
+ cleanQueue(done);
+ });
+
+ describe('calling initialize', function() {
+
+ describe('having zero entries', function() {
+
+ after(function() {
+ // remove listeners as you connect in a second test again
+ // else you would call the dequeue function more than once
+ // as it is bound in initalize function
+ eventEmitter.removeAllListeners('handled:*');
+ });
+
+ it('it should connect', function(done) {
+ commandDispatcher.initialize({}, function(err) {
+ expect(err).not.to.be.ok();
+ done();
+ });
+ });
+
+ });
+
+ describe('having any entries', function() {
+
+ var emitted = false;
+
+ beforeEach(function(done) {
+ eventEmitter.once('handle:changeDummy', function() { emitted = true; });
+
+ // remove listeners as you connect in a second test again
+ // else you would call the dequeue function more than once
+ // as it is bound in initalize function
+ eventEmitter.removeAllListeners('handled:*');
+
+ commandQueue.push('cmdid', { id: 'cmdid', command: 'changeDummy' }, done);
+ });
+
+ it('it should connect', function(done) {
+ commandDispatcher.initialize({}, function(err) {
+ expect(err).not.to.be.ok();
+ done();
+ });
+ });
+
+ it('it should reemit the commands', function(done) {
+ commandDispatcher.initialize({}, function(err) {
+ expect(emitted).to.eql(true);
+ done();
+ });
+ });
+
+ });
+
+ });
+
+ describe('being initialized', function() {
+
+ var command;
+
+ before(function(done) {
+ commandDispatcher.initialize(done);
+ });
+
+ beforeEach(function()
+ {
+ command = {
+ id: 'cmdid',
+ command: 'changeDummy',
+ payload: { id: '1' }
+ };
+ });
+
+ describe('calling dispatch', function() {
+
+ describe('having no commandhandler', function() {
+
+ it('it should callback with error', function(done) {
+ commandDispatcher.dispatch(command, function(err) {
+ expect(err).to.be.ok();
+ done();
+ });
+ });
+
+ it('it should not add a command to commandQueue', function(done) {
+ commandDispatcher.dispatch(command, function(err) {
+ commandQueue.getAll(function(err, items) {
+ expect(items).to.be.an('array');
+ expect(items).to.have.length(0);
+ done();
+ });
+ });
+ });
+
+ });
+
+ describe('having a commandhandler', function() {
+
+ beforeEach(function() {
+ eventEmitter.once('handle:changeDummy', function() {});
+ });
+
+ it('it should callback with success', function(done) {
+ commandDispatcher.dispatch(command, function(err) {
+ expect(err).not.to.be.ok();
+ done();
+ });
+ });
+
+ it('the commandQueueStore should contain an entry', function(done) {
+ commandDispatcher.dispatch(command, function(err) {
+ commandQueue.getAll(function(err, items) {
+ expect(items).to.be.an('array');
+ expect(items).to.have.length(1);
+ done();
+ });
+ });
+ });
+
+ describe('having no payload id', function() {
+
+ beforeEach(function() {
+ delete command.payload.id;
+ });
+
+ it('it should create a new id', function(done) {
+ commandDispatcher.dispatch(command, function(err) {
+ commandQueue.getAll(function(err, items) {
+ expect(items[0]).to.have.property('id');
+ done();
+ });
+ });
+
+ });
+
+ it('the commandQueueStore should contain an entry', function(done) {
+ commandDispatcher.dispatch(command, function(err) {
+ commandQueue.getAll(function(err, items) {
+ expect(items).to.be.an('array');
+ expect(items).to.have.length(1);
+ done();
+ });
+ });
+ });
+
+ });
+
+ describe('having a command with already blocked aggregate', function() {
+
+ it('it should callback with error', function(done) {
+ commandDispatcher.dispatch(command, function(err) {
+ commandDispatcher.dispatch(command, function(err) {
+ expect(err).to.be.ok();
+ done();
+ });
+ });
+ });
+
+ });
+
+ });
+
+ });
+
+ describe('noting handled:* event being raised', function() {
+
+ describe('on existing entry', function() {
+
+ before(function(done) {
+ commandQueue.push('cmdid', { id: 'cmdid', command: 'changeDummy' }, done);
+ });
+
+ it('it should remove the entry', function(done) {
+ eventEmitter.emit('handled:changeDummy', 'cmdid', { id: 'cmdid', command: 'changeDummy'} );
+
+ commandQueue.getAll(function(err, entries) {
+ expect(entries).to.be.an('array');
+ expect(entries).to.have.length(0);
+ done();
+ });
+ });
+
+ });
+
+ describe('on non existing entry', function() {
+
+ it('it should not create an entry', function(done) {
+ eventEmitter.emit('handled:changeDummy', 'cmdid', { id: 'cmdid', command: 'changeDummy'} );
+
+ commandQueue.getAll(function(err, entries) {
+ expect(entries).to.be.an('array');
+ expect(entries).to.have.length(0);
+ done();
+ });
+ });
+
+ });
+
+ });
+
+ });
+
+});
72 test/commandHandlerTest.js
@@ -0,0 +1,72 @@
+var expect = require('expect.js')
+ , EventEmitter = require('events').EventEmitter
+ , commandHandlerBase = require('../index').commandHandlerBase
+ , aggregateBase = require('../index').aggregateBase;
+
+var stream = new EventEmitter();
+
+var Aggregate = aggregateBase.extend({
+
+ doSomethingCommand: function(data, callback) {
+ this.apply(this.toEvent('SomethingDoneEvent', data), callback);
+ },
+
+ SomethingDoneEvent: function(data) {
+ this.set(data);
+ },
+
+ validate: function(ruleName, data, callback) {
+ callback();
+ }
+});
+var aggregate = new Aggregate('id_1');
+aggregate.set({revision: 0});
+
+
+var commandHandler = commandHandlerBase.extend({
+ commands: ['doSomethingCommand'],
+ aggregate: 'overridden load!',
+
+ stream: stream,
+
+ loadAggregate: function(id, callback) {
+ this.aggregate = aggregate;
+
+ callback(null, this.aggregate, this.stream);
+ },
+
+ commit: function(cmdId, uncommittedEvents, stream, callback) {
+ var self = this;
+
+ stream.uncommittedEvents = uncommittedEvents;
+ stream.emit('done', uncommittedEvents);
+ }
+});
+
+
+describe('CommandHandlerBase', function() {
+
+ describe('calling handle function', function() {
+
+ // given
+ var command = {
+ command: 'doSomethingCommand',
+ payload: {
+ setMe: 'should be set on aggregate'
+ }
+ };
+
+ it('it should be passing command to aggregate and given back an event', function(done) {
+ // then
+ commandHandler.stream.on('done', function(uncommittedEvents) {
+ expect(uncommittedEvents[0].payload.setMe).to.eql('should be set on aggregate');
+ done();
+ });
+
+ // when
+ commandHandler.handle('id_1', command);
+ });
+
+ });
+
+});
35 test/integration/aggregates/dummyAggregate.js
@@ -0,0 +1,35 @@
+var base = require('../../../index').aggregateBase;
+
+module.exports = base.extend({
+
+ changeDummy: function(data, callback) {
+ this.apply(this.toEvent('dummyChanged', data));
+
+ this.checkBusinessRules(callback);
+ },
+
+ destroyDummy: function(data, callback) {
+ this.apply(this.toEvent('dummyDestroyed', data));
+
+ this.checkBusinessRules(callback);
+ },
+
+ cancelDummy: function(data, callback) {
+ this.apply(this.toEvent('dummyCancelled', data));
+
+ this.checkBusinessRules(callback);
+ },
+
+ dummyChanged: function(data) {
+ this.set(data);
+ },
+
+ dummyCancelled: function(data) {
+ this.set('cancelled', true);
+ },
+
+ dummyDestroyed: function(data) {
+ this.set('destroyed', true);
+ }
+
+});
9 test/integration/commandHandlers/dummyCommandHandler.js
@@ -0,0 +1,9 @@
+var commandHandlerBase = require('../../../index').commandHandlerBase;
+
+module.exports = commandHandlerBase.extend({
+
+ commands: ['changeDummy', 'destroyDummy', 'cancelDummy' ],
+
+ aggregate: 'dummyAggregate'
+
+});
179 test/integration/domainTest.js
@@ -0,0 +1,179 @@
+var expect = require('expect.js')
+ , domain = require('../../index').domain;
+
+describe('Domain', function() {
+
+ describe('noting a command', function() {
+
+ describe('having bad data', function() {
+
+ it('it should acknowledge the command', function(done) {
+
+ var cmd = 'foobar';
+ domain.handle(cmd, function(err) {
+ expect(err).not.to.be.ok();
+ done();
+ });
+
+ });
+
+ });
+
+ describe('having well-formed data', function() {
+
+ describe('having no command handlers', function() {
+
+ it('it should acknowledge the command', function(done) {
+
+ var cmd = {
+ command: 'changeDummy',
+ id: '82517'
+ };
+ domain.handle(cmd, function(err) {
+ expect(err).not.to.be.ok();
+ done();
+ });
+
+ });
+
+ });
+
+ describe('having any command handlers', function() {
+
+ var dummyEmitter = new (require('events').EventEmitter)();
+
+ before(function(done) {
+
+ domain.on('event', function(evt) {
+ dummyEmitter.emit('published', evt);
+ });
+ domain.initialize({
+ commandHandlersPath: __dirname + '/commandHandlers',
+ aggregatesPath: __dirname + '/aggregates',
+ sagaHandlersPath: __dirname + '/sagaHandlers',
+ sagasPath: __dirname + '/sagas',
+ publishingInterval: 20
+ }, done);
+
+ });
+
+ it('it should acknowledge the command', function(done) {
+
+ var cmd = {
+ command: 'foobar',
+ id: '82517'
+ };
+ domain.handle(cmd, function(err) {
+ expect(err).not.to.be.ok();
+ done();
+ });
+
+ });
+
+ it('it should publish an event', function(done) {
+
+ var cmd = {
+ command: 'changeDummy',
+ id: '82517'
+ };
+
+ dummyEmitter.once('published', function(evt) {
+ expect(evt.event).to.eql('dummyChanged');
+ expect(evt.commandId).to.eql(cmd.id);
+ done();
+ });
+
+ domain.handle(cmd, function(err) {});
+
+ });
+
+ describe('when the underlying aggregate has been destroyed', function() {
+
+ var cmd = {
+ command: 'changeDummy',
+ id: '12345',
+ payload: {
+ id: '82517'
+ }
+ };
+
+ beforeEach(function(done) {
+
+ dummyEmitter.once('published', function(evt) {
+ done();
+ });
+
+ domain.handle({
+ command: 'destroyDummy',
+ payload: {
+ id: '82517'
+ }
+ }, function() {});
+
+ });
+
+ it('it should raise a commandRejected event', function(done) {
+
+ dummyEmitter.once('published', function(evt) {
+ expect(evt.event).to.eql('commandRejected');
+ expect(evt.commandId).to.eql(cmd.id);
+ done();
+ });
+
+ domain.handle(cmd, function(err) {});
+
+ });
+
+ });
+
+ });
+
+ });
+
+ });
+
+ describe('having any saga handlers', function() {
+
+ var dummyEmitter2 = new (require('events').EventEmitter)();
+
+ before(function(done) {
+
+ domain.on('event', function(evt) {
+ if (evt.event === 'dummyDestroyed') {
+ dummyEmitter2.emit('published', evt);
+ }
+ });
+ domain.initialize({
+ commandHandlersPath: __dirname + '/commandHandlers',
+ aggregatesPath: __dirname + '/aggregates',
+ sagaHandlersPath: __dirname + '/sagaHandlers',
+ sagasPath: __dirname + '/sagas',
+ publishingInterval: 20
+ }, done);
+
+ });
+
+ describe('noting an expected event', function() {
+
+ it('it should emit an other event', function(done) {
+
+ var cmd = {
+ command: 'cancelDummy',
+ id: '82517'
+ };
+
+ dummyEmitter2.once('published', function(evt) {
+ expect(evt.event).to.eql('dummyDestroyed');
+ done();
+ });
+
+ domain.handle(cmd, function(err) {});
+
+ });
+
+ });
+
+ });
+
+
+});
9 test/integration/sagaHandlers/dummySagaHandler.js
@@ -0,0 +1,9 @@
+var sagaHandlerBase = require('../../../index').sagaHandlerBase;
+
+module.exports = sagaHandlerBase.extend({
+
+ events: ['dummyCancelled'],
+
+ saga: 'dummySaga'
+
+});
9 test/integration/sagas/dummySaga.js
@@ -0,0 +1,9 @@
+var base = require('../../../index').sagaBase;
+
+module.exports = base.extend({
+
+ dummyCancelled: function(data) {
+ this.sendCommand( { command: 'destroyDummy', payload: { id: data.id } } );
+ }
+
+});
1  test/mocha.opts
@@ -0,0 +1 @@
+-R spec
82 test/sagaHandlerTest.js
@@ -0,0 +1,82 @@
+var expect = require('expect.js')
+ , EventEmitter = require('events').EventEmitter
+ , eventEmitter = require('../lib/eventEmitter')
+ , sagaHandlerBase = require('../index').sagaHandlerBase
+ , sagaBase = require('../index').sagaBase;
+
+var stream = new EventEmitter();
+var commandEmitter = new EventEmitter();
+
+var Saga = sagaBase.extend({
+ somethingDoneEvent: function(evt) {
+ evt.command = 'blaCmd';
+ delete evt.event;
+ delete evt.commandId;
+ this.sendCommand(evt);
+ commandEmitter.emit('done');
+ }
+});
+var saga = new Saga('id_1');
+saga.isInited = true;
+
+var sagaHandler = sagaHandlerBase.extend({
+ events: ['somethingDoneEvent'],
+ saga: 'overridden load!',
+
+ stream: stream,
+
+ loadSaga: function(id, callback) {
+ this.saga = saga;
+
+ callback(null, this.saga, this.stream);
+ },
+
+ commit: function(uncommittedEvents, stream, callback) {
+ stream.uncommittedEvents = uncommittedEvents;
+ stream.emit('done', uncommittedEvents);
+ }
+});
+
+
+describe('SagaHandlerBase', function() {
+
+ describe('calling handle function', function() {
+
+ // given
+ var evt = {
+ event: 'somethingDoneEvent',
+ payload: {
+ id: 'id_1',
+ setMe: 'bimbimbim'
+ }
+ };
+
+ it('it should be passing event to saga', function(done) {
+ // then
+ sagaHandler.stream.once('done', function(uncommittedEvents) {
+ expect(uncommittedEvents[0].payload.setMe).to.eql('bimbimbim');
+ done();
+ });
+
+ // when
+ sagaHandler.handle(evt);
+ });
+
+ it('it should be have unemittedCommands', function(done) {
+ var cmdEmitted = false;
+ eventEmitter.once('command:blaCmd', function() {
+ cmdEmitted = true;
+ });
+ // then
+ commandEmitter.once('done', function() {
+ expect(cmdEmitted).to.be.eql(true);
+ done();
+ });
+
+ // when
+ sagaHandler.handle(evt);
+ });
+
+ });
+
+});
Please sign in to comment.
Something went wrong with that request. Please try again.