Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

updated eventstore

  • Loading branch information...
commit 21171aca0d70581a4cffb996bfe3159277c73d69 2 parents 82888be + 2c334ac
@adrai authored
View
2  .travis.yml
@@ -2,8 +2,8 @@ before_script: "npm install --dev"
language: node_js
node_js:
- - 0.6
- 0.8
+ - 0.1
branches:
only:
View
23 README.md
@@ -26,7 +26,9 @@ It can be very useful as domain component if you work with (d)ddd, cqrs, eventde
snapshotThreshold: 10,
forcedQueuing: false,
disableQueuing: false,
- handleUpdispatchedEvents: true
+ handleUpdispatchedEvents: true//,
+ // retryOnConcurrencyTimeout: 800,
+ // commandLock: { type: 'inMemory', collectionName: 'commandlock' }
}, function(err) {
});
@@ -77,10 +79,27 @@ See [tests](https://github.com/adrai/node-cqrs-domain/tree/master/test) for deta
# Release Notes
-## v0.6.1
+## v0.7.3
- updated eventstore
+## v0.7.2
+
+- update dependencies
+
+## v0.7.1
+
+- load sagas always from db
+
+## v0.7.0
+
+- introduced commandLock for distributed domain (handling same aggregate instance on multiple machines)
+
+## v0.6.1
+
+- buffer commands by aggregate id
+>>>>>>> 2c334aca7fffdae5fd1ff21fe0044c1a51100356
+
## v0.6.0
- don't publish in eventstore but publish in domain
View
228 lib/bases/commandHandlerBase.js
@@ -1,9 +1,28 @@
var eventEmitter = require('../eventEmitter'),
async = require('async'),
- _ = require('lodash');
+ _ = require('lodash'),
+ util = require('util'),
+ EventEmitter2 = require('eventemitter2').EventEmitter2,
+ uuid = require('node-uuid').v4;
+
+function randomBetween(min, max) {
+ return Math.round(min + Math.random() * (max - min));
+}
+
+var CommandHandler = function() {
+ EventEmitter2.call(this, {
+ wildcard: true,
+ delimiter: ':',
+ maxListeners: 1000 // default would be 10!
+ });
+
+ this.buffered = {};
+ this.id = uuid().toString();
+};
+
+util.inherits(CommandHandler, EventEmitter2);
-var CommandHandler = {};
-CommandHandler.prototype = {
+_.extend(CommandHandler.prototype, {
defaultHandle: function(id, cmd) {
@@ -58,51 +77,98 @@ CommandHandler.prototype = {
});
},
+ reorderCommandLock: function(id, callback) {
+ var self = this;
+ this.commandLock.find({ aggregateId: id }, function(err, res) {
+ res = res || [];
+ res = _.sortBy(res, function(item) {
+ return item.id === self.id;
+ });
+
+ async.each(res, function(item, callback) {
+ item.destroy();
+ self.commandLock.commit(item, callback);
+ }, callback);
+ });
+ },
+
finish: function(id, cmd, err) {
- if (err) {
- eventEmitter.emit('commandRejected', cmd, err);
+ var self = this;
+
+ function _finish() {
+ if (err) {
+ eventEmitter.emit('commandRejected', cmd, err);
+ }
+ eventEmitter.emit('handled:' + cmd.command, id, cmd);
+ self.emit('handled:' + id + ':' + cmd.id, id, cmd);
+ }
+ if (this.commandLock) {
+ this.reorderCommandLock(id, function() {
+ _finish();
+ });
+ } else {
+ _finish();
}
- eventEmitter.emit('handled:' + cmd.command, id, cmd);
},
commit: function(cmd, aggregate, stream, callback) {
-
var self = this;
- async.concat(aggregate.uncommittedEvents, function(evt, next) {
- evt.commandId = cmd.id;
- if (cmd.head) {
- evt.head = _.extend(_.clone(cmd.head), evt.head);
- }
-
- self.getNewId(function(err, id) {
- evt.id = id;
- stream.addEvent(evt);
- next(err);
- });
- },
- // final
- function(err) {
- if (callback && err) { callback(err); }
- if (!err) {
- stream.commit(function(err, stream) {
- if (err) {
- if (callback) { callback(err); }
- return;
- }
+ function _commit() {
+ async.concat(aggregate.uncommittedEvents, function(evt, next) {
+ evt.commandId = cmd.id;
+ if (cmd.head) {
+ evt.head = _.extend(_.clone(cmd.head), evt.head);
+ }
- async.each(stream.eventsToDispatch,
- function(evtToSetDispatched, clb) {
- self.publisher.publish(evtToSetDispatched.payload);
- self.eventStore.setEventToDispatched(evtToSetDispatched, clb);
- },
- function(err) {
+ self.getNewId(function(err, id) {
+ evt.id = id;
+ stream.addEvent(evt);
+ next(err);
+ });
+ },
+ // final
+ function(err) {
+ if (callback && err) { callback(err); }
+ if (!err) {
+ stream.commit(function(err, stream) {
+ if (err) {
if (callback) { callback(err); }
+ return;
}
- );
- });
- }
- });
+
+ async.each(stream.eventsToDispatch,
+ function(evtToSetDispatched, clb) {
+ self.publisher.publish(evtToSetDispatched.payload);
+ self.eventStore.setEventToDispatched(evtToSetDispatched, clb);
+ },
+ function(err) {
+ if (callback) { callback(err); }
+ }
+ );
+ });
+ }
+ });
+ }
+
+ if (this.commandLock) {
+ this.commandLock.find({ aggregateId: aggregate.id }, function(err, res) {
+ res = res || [];
+ if (res.length !== 1 || res[0].id !== self.id) {
+ // concurrency exception!!!
+ self.reorderCommandLock(aggregate.id, function() {
+ // retry
+ setTimeout(function() {
+ self._handle(aggregate.id, cmd);
+ }, randomBetween(0, self.options.retryOnConcurrencyTimeout));
+ });
+ } else {
+ _commit();
+ }
+ });
+ } else {
+ _commit();
+ }
},
validate: function(ruleName, data, callback) {
@@ -113,7 +179,7 @@ CommandHandler.prototype = {
}
},
- handle: function(id, cmd) {
+ _handle: function(id, cmd) {
if (this[cmd.command]) {
this[cmd.command](id, cmd);
} else {
@@ -121,30 +187,66 @@ CommandHandler.prototype = {
}
},
- loadAggregate: function(id, callback) {
+ handle: function(id, cmd) {
var self = this;
- var aggregate = new this.Aggregate(id);
- this.eventStore.getFromSnapshot(id, function(err, snapshot, stream) {
- async.map(stream.events, function(evt, next) {
- next(null, evt.payload);
- }, function(err, events) {
- aggregate.loadFromHistory(snapshot.data, events);
-
- // Check if snapshotting is needed.
- var snapshotThreshold = aggregate.getSnapshotThreshold() || self.options.snapshotThreshold;
- if (stream.events.length >= snapshotThreshold) {
- var streamId = stream.streamId,
- revision = stream.currentRevision(),
- data = aggregate.toJSON();
-
- process.nextTick(function() {
- self.eventStore.createSnapshot(streamId, revision, data);
- });
- }
- callback(null, aggregate, stream);
+ this.buffered[id] = this.buffered[id] || [];
+ this.buffered[id].push({ id: id, cmd: cmd });
+
+ this.once('handled:' + id + ':' + cmd.id, function(id, cmd) {
+ self.buffered[id] = _.reject(self.buffered[id], function(entry) {
+ return entry.id === id && entry.cmd === cmd;
});
+
+ if (self.buffered[id].length > 0) {
+ var nextCmd = self.buffered[id][0];
+ self._handle(nextCmd.id, nextCmd.cmd);
+ }
});
+
+ if (this.buffered[id].length === 1) {
+ this._handle(id, cmd);
+ }
+ },
+
+ loadAggregate: function(id, callback) {
+ var self = this;
+
+ function _loadAggregate() {
+ var aggregate = new self.Aggregate(id);
+ self.eventStore.getFromSnapshot(id, function(err, snapshot, stream) {
+ async.map(stream.events, function(evt, next) {
+ next(null, evt.payload);
+ }, function(err, events) {
+ aggregate.loadFromHistory(snapshot.data, events);
+
+ // Check if snapshotting is needed.
+ var snapshotThreshold = aggregate.getSnapshotThreshold() || self.options.snapshotThreshold;
+ if (stream.events.length >= snapshotThreshold) {
+ var streamId = stream.streamId,
+ revision = stream.currentRevision(),
+ data = aggregate.toJSON();
+
+ process.nextTick(function() {
+ self.eventStore.createSnapshot(streamId, revision, data);
+ });
+ }
+
+ callback(null, aggregate, stream);
+ });
+ });
+ }
+
+ if (this.commandLock) {
+ this.commandLock.get(this.id, function(err, res) {
+ res.set('aggregateId', id);
+ self.commandLock.commit(res, function() {
+ _loadAggregate();
+ });
+ });
+ } else {
+ _loadAggregate();
+ }
},
getNewId: function(callback) {
@@ -176,14 +278,18 @@ CommandHandler.prototype = {
if (module.publish) {
this.publisher = module;
}
+
+ if (module.commit && module.get && module.find) {
+ this.commandLock = module;
+ }
}
-};
+});
module.exports = {
extend: function(obj) {
- return _.extend(_.clone(CommandHandler.prototype), obj);
+ return _.extend(new CommandHandler(), obj);
}
};
View
10 lib/bases/sagaHandlerBase.js
@@ -87,8 +87,8 @@ SagaHandler.prototype = {
loadSaga: function(id, callback) {
var self = this;
- var saga = this.sagas[id];
- if (!saga) {
+ // var saga = this.sagas[id];
+ // if (!saga) {
saga = new this.Saga(id);
saga.commit = function(callback) {
self.commit(this, callback);
@@ -99,9 +99,9 @@ SagaHandler.prototype = {
callback(err, saga);
});
});
- } else {
- callback(null, saga);
- }
+ // } else {
+ // callback(null, saga);
+ // }
},
configure: function(fn) {
View
31 lib/domain.js
@@ -10,6 +10,7 @@ var async = require('async'),
queue = require('node-queue'),
eventEmitter = require('./eventEmitter'),
repository = require('viewmodel').write.create(),
+ commandLock = require('viewmodel').write.create(),
nodeEventedCommand = require('nodeEventedCommand'),
hub = nodeEventedCommand.hub.create(),
Command = nodeEventedCommand.Command.create(hub),
@@ -63,9 +64,12 @@ module.exports = domain = _.extend(new EventEmitter2({
commandQueue: { type: 'inMemory', collectionName: 'commands' },
eventStore: { type: 'inMemory' },
repository: { type: 'inMemory', collectionName: 'sagas' },
+ // commandLock: { type: 'inMemory', collectionName: 'commandlock' },
forcedQueuing: false,
disableQueuing: false,
- handleUpdispatchedEvents: true
+ handleUpdispatchedEvents: true,
+ retryOnConcurrencyTimeout: 800,
+ snapshotThreshold: 10
};
_.defaults(options, defaults);
@@ -73,6 +77,10 @@ module.exports = domain = _.extend(new EventEmitter2({
options.commandQueue.collectionName = options.commandQueue.collectionName || defaults.commandQueue.collectionName;
options.repository.collectionName = options.repository.collectionName || defaults.repository.collectionName;
+ if (options.commandLock) {
+ options.commandLock.collectionName = options.commandLock.collectionName || 'commandlock';
+ }
+
// initialize the hub by passing the function that gets the command id from the event
hub.init(newGetCommandId || getCommandId);
@@ -118,11 +126,22 @@ module.exports = domain = _.extend(new EventEmitter2({
aggregateLoader.load(options.aggregatesPath, callback);
},
commandHandlers: function(callback) {
- commandHandlerLoader.configure(function() {
- this.use(es);
- this.use({ publish: publish });
- });
- commandHandlerLoader.load(options.commandHandlersPath, options, callback);
+ if (options.commandLock) {
+ commandLock.init(options.commandLock, function() {
+ commandHandlerLoader.configure(function() {
+ this.use(es);
+ this.use({ publish: publish });
+ this.use(commandLock);
+ });
+ commandHandlerLoader.load(options.commandHandlersPath, options, callback);
+ });
+ } else {
+ commandHandlerLoader.configure(function() {
+ this.use(es);
+ this.use({ publish: publish });
+ });
+ commandHandlerLoader.load(options.commandHandlersPath, options, callback);
+ }
},
sagas: function(callback) {
View
12 lib/loaders/commandHandlerLoader.js
@@ -21,6 +21,10 @@ module.exports = commandHandlerLoader = {
if (module.publish) {
commandHandlerLoader.publisher = module;
}
+
+ if (module.commit && module.get && module.find) {
+ commandHandlerLoader.commandLock = module;
+ }
},
load: function(commandHandlersPath, validationRulesPath, options, callback) {
@@ -28,13 +32,16 @@ module.exports = commandHandlerLoader = {
if (arguments.length === 2) {
callback = validationRulesPath;
validationRulesPath = commandHandlersPath + '/../validationRules';
- options = { snapshotThreshold: 10 };
+ options = { snapshotThreshold: 10, retryOnConcurrencyTimeout: 800 };
} else if (arguments.length === 3) {
callback = options;
options = validationRulesPath;
validationRulesPath = commandHandlersPath + '/../validationRules';
}
+ options.snapshotThreshold = options.snapshotThreshold || 10;
+ options.retryOnConcurrencyTimeout = options.retryOnConcurrencyTimeout || 800;
+
var commandHandlers = [];
if (!existsSync(commandHandlersPath)){
@@ -70,6 +77,9 @@ module.exports = commandHandlerLoader = {
commandHandler.configure(function() {
commandHandler.use(commandHandlerLoader.eventStore);
commandHandler.use(commandHandlerLoader.publisher);
+ if (commandHandlerLoader.commandLock) {
+ commandHandler.use(commandHandlerLoader.commandLock);
+ }
});
function action(id, cmd) {
View
4 package.json
@@ -1,7 +1,7 @@
{
"author": "adrai",
"name": "cqrs-domain",
- "version": "0.6.1",
+ "version": "0.7.3",
"private": false,
"main": "index.js",
"engines": {
@@ -16,7 +16,7 @@
"eventemitter2": ">= 0.4.13",
"node-queue": ">= 0.4.0",
"eventstore": ">= 0.7.2",
- "viewmodel": ">= 0.5.0",
+ "viewmodel": ">= 0.5.3",
"nodeEventedCommand": ">= 0.1.2",
"retry": ">= 0.6.0",
"node-uuid": ">= 1.4.1"
View
191 test/integration/domainTest.js
@@ -1,29 +1,31 @@
-var expect = require('expect.js')
- , domain = require('../../index').domain;
+var expect = require('expect.js'),
+ domain = require('../../index').domain;
describe('Domain', function() {
- describe('noting a command', function() {
+ var dummyEmitter = new (require('events').EventEmitter)();
- describe('having well-formed data', function() {
+ before(function(done) {
- describe('having any command handlers', function() {
+ domain.on('event', function(evt) {
+ dummyEmitter.emit('published', evt);
+ });
+ domain.initialize({
+ commandHandlersPath: __dirname + '/commandHandlers',
+ aggregatesPath: __dirname + '/aggregates',
+ sagaHandlersPath: __dirname + '/sagaHandlers',
+ sagasPath: __dirname + '/sagas',
+ commandLock: { type: 'inMemory', collectionName: 'commandlock' },
+ disableQueuing: true
+ }, done);
- var dummyEmitter = new (require('events').EventEmitter)();
+ });
- before(function(done) {
+ describe('noting a command', function() {
- domain.on('event', function(evt) {
- dummyEmitter.emit('published', evt);
- });
- domain.initialize({
- commandHandlersPath: __dirname + '/commandHandlers',
- aggregatesPath: __dirname + '/aggregates',
- sagaHandlersPath: __dirname + '/sagaHandlers',
- sagasPath: __dirname + '/sagas'
- }, done);
+ describe('having well-formed data', function() {
- });
+ describe('having any command handlers', function() {
describe('having bad data', function() {
@@ -125,27 +127,57 @@ describe('Domain', function() {
});
- });
+ describe('when sending multiple commands together', function() {
- describe('having a command handler that sends commands to other command handlers', function() {
+ var cmd1 = {
+ command: 'changeDummy',
+ id: '123455',
+ payload: {
+ id: '12382517'
+ }
+ };
+
+ var cmd2 = {
+ command: 'changeDummy',
+ id: '23455789',
+ payload: {
+ id: '12382517'
+ }
+ };
+
+ var cmd3 = {
+ command: 'changeDummy',
+ id: '2312345789',
+ payload: {
+ id: '12382517'
+ }
+ };
+
+ it('it should set revision correctly', function(done) {
- var dummyEmitter = new (require('events').EventEmitter)();
+ var count = 0;
+ var handle;
+ dummyEmitter.on('published', handle = function(evt) {
+ count++;
+ if (count === 3) {
+ expect(evt.head.revision).to.eql(3);
+ dummyEmitter.removeListener('published', handle);
+ done();
+ }
+ });
- before(function(done) {
+ domain.handle(cmd1, function(err) {});
+ domain.handle(cmd2, function(err) {});
+ domain.handle(cmd3, function(err) {});
- domain.on('event', function(evt) {
- dummyEmitter.emit('published', evt);
});
- domain.initialize({
- commandHandlersPath: __dirname + '/commandHandlers',
- aggregatesPath: __dirname + '/aggregates',
- sagaHandlersPath: __dirname + '/sagaHandlers',
- sagasPath: __dirname + '/sagas',
- publishingInterval: 20
- }, done);
});
+ });
+
+ describe('having a command handler that sends commands to other command handlers', function() {
+
it('it should acknowledge the command', function(done) {
var cmd = {
@@ -155,9 +187,15 @@ describe('Domain', function() {
haha: 'hihi'
}
};
+
+ var called = false;
+ dummyEmitter.once('published', function(evt) {
+ expect(called).to.be.ok();
+ done();
+ });
domain.handle(cmd, function(err) {
+ called = true;
expect(err).not.to.be.ok();
- done();
});
});
@@ -172,8 +210,8 @@ describe('Domain', function() {
}
};
- var fooItedReceived = false
- , fooCretedReceived = false;
+ var fooItedReceived = false,
+ fooCretedReceived = false;
function finish(evt) {
if (fooItedReceived && fooCretedReceived) {
@@ -200,30 +238,74 @@ describe('Domain', function() {
});
- });
-
- });
+ describe('simulating mutliple process handling the same aggregate instance', function() {
- describe('having any saga handlers', function() {
+ var cmdHandle = require('./commandHandlers/dummyCommandHandler'),
+ orgHandle;
+
+ before(function() {
+ orgHandle = cmdHandle.handle;
+ cmdHandle.handle = cmdHandle._handle;
+ });
- var dummyEmitter2 = new (require('events').EventEmitter)();
+ after(function() {
+ cmdHandle.handle = orgHandle;
+ });
- before(function(done) {
+ describe('sending multiple commands together', function() {
+
+ var cmd1 = {
+ command: 'changeDummy',
+ id: '1234552',
+ payload: {
+ id: '123825172'
+ }
+ };
+
+ var cmd2 = {
+ command: 'changeDummy',
+ id: '234557892',
+ payload: {
+ id: '123825172'
+ }
+ };
+
+ var cmd3 = {
+ command: 'changeDummy',
+ id: '23123457892',
+ payload: {
+ id: '123825172'
+ }
+ };
+
+ it('it should set revision correctly', function(done) {
+
+ var count = 0;
+ var handle;
+ dummyEmitter.on('published', handle = function(evt) {
+ count++;
+ if (count === 3) {
+ expect(evt.head.revision).to.eql(3);
+ dummyEmitter.removeListener('published', handle);
+ done();
+ }
+ });
+
+ domain.handle(cmd1, function(err) {});
+ domain.handle(cmd2, function(err) {});
+ domain.handle(cmd3, function(err) {});
+
+ });
+
+ });
- domain.on('event', function(evt) {
- if (evt.event === 'dummyDestroyed') {
- dummyEmitter2.emit('published', evt);
- }
});
- domain.initialize({
- commandHandlersPath: __dirname + '/commandHandlers',
- aggregatesPath: __dirname + '/aggregates',
- sagaHandlersPath: __dirname + '/sagaHandlers',
- sagasPath: __dirname + '/sagas',
- publishingInterval: 20
- }, done);
});
+
+ });
+
+ describe('having any saga handlers', function() {
describe('noting an expected event', function() {
@@ -231,12 +313,13 @@ describe('Domain', function() {
var cmd = {
command: 'cancelDummy',
- id: '82517'
+ id: '825171111'
};
- dummyEmitter2.once('published', function(evt) {
- expect(evt.event).to.eql('dummyDestroyed');
- done();
+ dummyEmitter.on('published', function(evt) {
+ if (evt.event === 'dummyDestroyed') {
+ done();
+ }
});
domain.handle(cmd, function(err) {});
Please sign in to comment.
Something went wrong with that request. Please try again.