Permalink
Browse files

added disableQueuing flag

  • Loading branch information...
1 parent 2b1ece0 commit acd28ea20011ba6facd103f9e7d0b8a3e4865035 @adrai committed Oct 12, 2013
Showing with 271 additions and 188 deletions.
  1. +6 −1 README.md
  2. +39 −23 lib/commandDispatcher.js
  3. +15 −6 lib/domain.js
  4. +8 −7 package.json
  5. +203 −151 test/commandDispatcherTest.js
View
@@ -26,7 +26,8 @@ It can be very useful as domain component if you work with (d)ddd, cqrs, eventde
publishingInterval: 20,
forkEventDispatching: true,
snapshotThreshold: 10,
- forcedQueuing: false
+ forcedQueuing: false,
+ disableQueuing: false
}, function(err) {
});
@@ -77,6 +78,10 @@ See [tests](https://github.com/adrai/node-cqrs-domain/tree/master/test) for deta
# Release Notes
+## v0.4.4
+
+- added disableQueuing flag
+
## v0.4.3
- strip .js file extensions to enable loading of .coffee scripts too
View
@@ -1,6 +1,7 @@
var async = require('async')
, retry = require('retry')
- , eventEmitter = require('./eventEmitter');
+ , eventEmitter = require('./eventEmitter')
+ , uuid = require('node-uuid').v4;
module.exports = {
@@ -27,29 +28,35 @@ module.exports = {
this.options = options;
}
- eventEmitter.on('handled:*', function(id, command) {
- self.commandQueue.remove(id, function() {});
- });
+ if (this.commandQueue) {
+ eventEmitter.on('handled:*', function(id, command) {
+ self.commandQueue.remove(id, function() {});
+ });
- this.reEmitCommands(callback);
+ this.reEmitCommands(callback);
+ } else {
+ callback(null);
+ }
},
reEmitCommands: function(callback) {
- this.commandQueue.getAll(function(err, cmds) {
- async.forEach(cmds, function(item, cb) {
- eventEmitter.emit('handle:' + item.data.command, item.id, item.data);
- cb();
- }, callback);
- });
+ if (this.commandQueue) {
+ this.commandQueue.getAll(function(err, cmds) {
+ async.forEach(cmds, function(item, cb) {
+ eventEmitter.emit('handle:' + item.data.command, item.id, item.data);
+ cb();
+ }, callback);
+ });
+ } else {
+ callback(null);
+ }
},
dispatch: function(cmd, callback) {
var options = this.options
, commandQueue = this.commandQueue;
- if (!commandQueue) return callback(new Error('No commandQueue provided!'));
-
async.waterfall([
// ĥas no handlers
@@ -63,7 +70,11 @@ module.exports = {
},
// use provided aggregateId or get one from commandQueue
- function(callback){
+ function(callback) {
+ if (!commandQueue) {
+ return callback(null, uuid().toString());
+ }
+
if (options.forcedQueuing) {
commandQueue.getNewId(function(err, newId) {
callback(err, newId);
@@ -83,7 +94,7 @@ module.exports = {
// check if command can be dispatched
function(id, callback) {
- if (options.forcedQueuing) {
+ if (options.forcedQueuing || !commandQueue) {
callback(null, id);
return;
}
@@ -114,15 +125,20 @@ module.exports = {
// emit command to commandEmitter
function(id, callback) {
- commandQueue.push(id, cmd, function(err) {
- if (!err) {
- eventEmitter.emit('handle:' + cmd.command, id, cmd);
- } else {
- eventEmitter.emit('commandRejected', cmd, err.message);
- }
-
+ if (!commandQueue) {
+ eventEmitter.emit('handle:' + cmd.command, id, cmd);
callback(null);
- });
+ } else {
+ commandQueue.push(id, cmd, function(err) {
+ if (!err) {
+ eventEmitter.emit('handle:' + cmd.command, id, cmd);
+ } else {
+ eventEmitter.emit('commandRejected', cmd, err.message);
+ }
+
+ callback(null);
+ });
+ }
}
],
View
@@ -65,7 +65,8 @@ module.exports = domain = _.extend(new EventEmitter2({
commandQueue: { type: 'inMemory', collectionName: 'commands' },
eventStore: { type: 'inMemory' },
repository: { type: 'inMemory', collectionName: 'sagas' },
- forcedQueuing: false
+ forcedQueuing: false,
+ disableQueuing: false
};
_.defaults(options, defaults);
@@ -146,11 +147,8 @@ module.exports = domain = _.extend(new EventEmitter2({
sagaHandler.Saga = sagas[sagaHandler.saga];
sagaHandler.initialize(callback);
}, function(err) {
- queue.connect(options.commandQueue, function(err, commandQueue) {
- commandDispatcher.configure(function() {
- this.use(commandQueue);
- });
+ function initCommandDispatcher() {
var worker = {};
// starts the worker by using an interval loop
@@ -191,7 +189,18 @@ module.exports = domain = _.extend(new EventEmitter2({
worker.start();
commandDispatcher.initialize({ forcedQueuing: options.forcedQueuing}, callback);
- });
+ }
+
+ if (options.disableQueuing) {
+ initCommandDispatcher();
+ } else {
+ queue.connect(options.commandQueue, function(err, commandQueue) {
+ commandDispatcher.configure(function() {
+ this.use(commandQueue);
+ });
+ initCommandDispatcher();
+ });
+ }
});
});
View
@@ -1,7 +1,7 @@
{
"author": "adrai",
"name": "cqrs-domain",
- "version": "0.4.3",
+ "version": "0.4.4",
"private": false,
"main": "index.js",
"engines": {
@@ -18,15 +18,16 @@
"eventstore": ">= 0.6.2",
"viewmodel": ">= 0.4.10",
"nodeEventedCommand": ">= 0.1.2",
- "retry": ">= 0.6.0"
+ "retry": ">= 0.6.0",
+ "node-uuid": ">= 1.4.1"
},
"devDependencies": {
"mocha": ">= 1.0.1",
"expect.js": ">= 0.1.2",
- "rule-validator": ">= 0.1.1"
+ "rule-validator": ">= 0.2.0"
},
"description": "Node-cqrs-domain is a node.js module based on nodeEventStore. It can be very useful as domain component if you work with (d)ddd, cqrs, eventdenormalizer, host, etc.",
- "keywords" : [
+ "keywords": [
"cqrs",
"eventsourcing",
"ddd",
@@ -50,7 +51,7 @@
"url": "https://raw.github.com/adrai/node-cqrs-domain/master/licence"
}
],
- "scripts" : {
- "test" : "mocha && mocha test/integration/domainTest.js"
- }
+ "scripts": {
+ "test": "mocha && mocha test/integration/domainTest.js"
+ }
}
Oops, something went wrong.

0 comments on commit acd28ea

Please sign in to comment.