Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

introduced commandLock for distributed domain (handling same aggregat…

…e instance on multiple machines)
  • Loading branch information...
commit a0362eae758c46c8461f390436c032786aaa35f9 1 parent b9f7b24
@adrai authored
View
8 README.md
@@ -26,7 +26,9 @@ It can be very useful as domain component if you work with (d)ddd, cqrs, eventde
snapshotThreshold: 10,
forcedQueuing: false,
disableQueuing: false,
- handleUpdispatchedEvents: true
+ handleUpdispatchedEvents: true//,
+ // retryOnConcurrencyTimeout: 800,
+ // commandLock: { type: 'inMemory', collectionName: 'commandlock' }
}, function(err) {
});
@@ -77,6 +79,10 @@ See [tests](https://github.com/adrai/node-cqrs-domain/tree/master/test) for deta
# Release Notes
+## v0.7.0
+
+- introduced commandLock for distributed domain (handling same aggregate instance on multiple machines)
+
## v0.6.1
- buffer commands by aggregate id
View
206 lib/bases/commandHandlerBase.js
@@ -2,7 +2,12 @@ var eventEmitter = require('../eventEmitter'),
async = require('async'),
_ = require('lodash'),
util = require('util'),
- EventEmitter2 = require('eventemitter2').EventEmitter2;
+ EventEmitter2 = require('eventemitter2').EventEmitter2,
+ uuid = require('node-uuid').v4;
+
+function randomBetween(min, max) {
+ return Math.round(min + Math.random() * (max - min));
+}
var CommandHandler = function() {
EventEmitter2.call(this, {
@@ -12,6 +17,7 @@ var CommandHandler = function() {
});
this.buffered = {};
+ this.id = uuid().toString();
};
util.inherits(CommandHandler, EventEmitter2);
@@ -71,52 +77,98 @@ _.extend(CommandHandler.prototype, {
});
},
+ reorderCommandLock: function(id, callback) {
+ var self = this;
+ this.commandLock.find({ aggregateId: id }, function(err, res) {
+ res = res || [];
+ res = _.sortBy(res, function(item) {
+ return item.id === self.id;
+ });
+
+ async.each(res, function(item, callback) {
+ item.destroy();
+ self.commandLock.commit(item, callback);
+ }, callback);
+ });
+ },
+
finish: function(id, cmd, err) {
- if (err) {
- eventEmitter.emit('commandRejected', cmd, err);
+ var self = this;
+
+ function _finish() {
+ if (err) {
+ eventEmitter.emit('commandRejected', cmd, err);
+ }
+ eventEmitter.emit('handled:' + cmd.command, id, cmd);
+ self.emit('handled:' + id + ':' + cmd.id, id, cmd);
+ }
+ if (this.commandLock) {
+ this.reorderCommandLock(id, function() {
+ _finish();
+ });
+ } else {
+ _finish();
}
- eventEmitter.emit('handled:' + cmd.command, id, cmd);
- this.emit('handled:' + id + ':' + cmd.id, id, cmd);
},
commit: function(cmd, aggregate, stream, callback) {
-
var self = this;
- async.concat(aggregate.uncommittedEvents, function(evt, next) {
- evt.commandId = cmd.id;
- if (cmd.head) {
- evt.head = _.extend(_.clone(cmd.head), evt.head);
- }
-
- self.getNewId(function(err, id) {
- evt.id = id;
- stream.addEvent(evt);
- next(err);
- });
- },
- // final
- function(err) {
- if (callback && err) { callback(err); }
- if (!err) {
- stream.commit(function(err, stream) {
- if (err) {
- if (callback) { callback(err); }
- return;
- }
+ function _commit() {
+ async.concat(aggregate.uncommittedEvents, function(evt, next) {
+ evt.commandId = cmd.id;
+ if (cmd.head) {
+ evt.head = _.extend(_.clone(cmd.head), evt.head);
+ }
- async.each(stream.eventsToDispatch,
- function(evtToSetDispatched, clb) {
- self.publisher.publish(evtToSetDispatched.payload);
- self.eventStore.setEventToDispatched(evtToSetDispatched, clb);
- },
- function(err) {
+ self.getNewId(function(err, id) {
+ evt.id = id;
+ stream.addEvent(evt);
+ next(err);
+ });
+ },
+ // final
+ function(err) {
+ if (callback && err) { callback(err); }
+ if (!err) {
+ stream.commit(function(err, stream) {
+ if (err) {
if (callback) { callback(err); }
+ return;
}
- );
- });
- }
- });
+
+ async.each(stream.eventsToDispatch,
+ function(evtToSetDispatched, clb) {
+ self.publisher.publish(evtToSetDispatched.payload);
+ self.eventStore.setEventToDispatched(evtToSetDispatched, clb);
+ },
+ function(err) {
+ if (callback) { callback(err); }
+ }
+ );
+ });
+ }
+ });
+ }
+
+ if (this.commandLock) {
+ this.commandLock.find({ aggregateId: aggregate.id }, function(err, res) {
+ res = res || [];
+ if (res.length !== 1 || res[0].id !== self.id) {
+ // concurrency exception!!!
+ self.reorderCommandLock(aggregate.id, function() {
+ // retry
+ setTimeout(function() {
+ self._handle(aggregate.id, cmd);
+ }, randomBetween(0, self.options.retryOnConcurrencyTimeout));
+ });
+ } else {
+ _commit();
+ }
+ });
+ } else {
+ _commit();
+ }
},
validate: function(ruleName, data, callback) {
@@ -127,60 +179,74 @@ _.extend(CommandHandler.prototype, {
}
},
+ _handle: function(id, cmd) {
+ if (this[cmd.command]) {
+ this[cmd.command](id, cmd);
+ } else {
+ this.defaultHandle(id, cmd);
+ }
+ },
+
handle: function(id, cmd) {
var self = this;
this.buffered[id] = this.buffered[id] || [];
this.buffered[id].push({ id: id, cmd: cmd });
- this.on('handled:' + id + ':' + cmd.id, function(id, cmd) {
+ this.once('handled:' + id + ':' + cmd.id, function(id, cmd) {
self.buffered[id] = _.reject(self.buffered[id], function(entry) {
return entry.id === id && entry.cmd === cmd;
});
if (self.buffered[id].length > 0) {
var nextCmd = self.buffered[id][0];
- if (self[nextCmd.cmd.command]) {
- self[nextCmd.cmd.command](nextCmd.id, nextCmd.cmd);
- } else {
- self.defaultHandle(nextCmd.id, nextCmd.cmd);
- }
+ self._handle(nextCmd.id, nextCmd.cmd);
}
});
if (this.buffered[id].length === 1) {
- if (this[cmd.command]) {
- this[cmd.command](id, cmd);
- } else {
- this.defaultHandle(id, cmd);
- }
+ this._handle(id, cmd);
}
},
loadAggregate: function(id, callback) {
var self = this;
- var aggregate = new this.Aggregate(id);
- this.eventStore.getFromSnapshot(id, function(err, snapshot, stream) {
- async.map(stream.events, function(evt, next) {
- next(null, evt.payload);
- }, function(err, events) {
- aggregate.loadFromHistory(snapshot.data, events);
-
- // Check if snapshotting is needed.
- var snapshotThreshold = aggregate.getSnapshotThreshold() || self.options.snapshotThreshold;
- if (stream.events.length >= snapshotThreshold) {
- var streamId = stream.streamId,
- revision = stream.currentRevision(),
- data = aggregate.toJSON();
-
- process.nextTick(function() {
- self.eventStore.createSnapshot(streamId, revision, data);
- });
- }
- callback(null, aggregate, stream);
+ function _loadAggregate() {
+ var aggregate = new self.Aggregate(id);
+ self.eventStore.getFromSnapshot(id, function(err, snapshot, stream) {
+ async.map(stream.events, function(evt, next) {
+ next(null, evt.payload);
+ }, function(err, events) {
+ aggregate.loadFromHistory(snapshot.data, events);
+
+ // Check if snapshotting is needed.
+ var snapshotThreshold = aggregate.getSnapshotThreshold() || self.options.snapshotThreshold;
+ if (stream.events.length >= snapshotThreshold) {
+ var streamId = stream.streamId,
+ revision = stream.currentRevision(),
+ data = aggregate.toJSON();
+
+ process.nextTick(function() {
+ self.eventStore.createSnapshot(streamId, revision, data);
+ });
+ }
+
+ callback(null, aggregate, stream);
+ });
});
- });
+ }
+
+ if (this.commandLock) {
+ this.commandLock.get(this.id, function(err, res) {
+ res.set('aggregateId', id);
+ self.commandLock.commit(res, function() {
+ _loadAggregate();
+ });
+ });
+ } else {
+ _loadAggregate();
+ }
},
getNewId: function(callback) {
@@ -212,6 +278,10 @@ _.extend(CommandHandler.prototype, {
if (module.publish) {
this.publisher = module;
}
+
+ if (module.commit && module.get && module.find) {
+ this.commandLock = module;
+ }
}
});
View
31 lib/domain.js
@@ -10,6 +10,7 @@ var async = require('async'),
queue = require('node-queue'),
eventEmitter = require('./eventEmitter'),
repository = require('viewmodel').write.create(),
+ commandLock = require('viewmodel').write.create(),
nodeEventedCommand = require('nodeEventedCommand'),
hub = nodeEventedCommand.hub.create(),
Command = nodeEventedCommand.Command.create(hub),
@@ -63,9 +64,12 @@ module.exports = domain = _.extend(new EventEmitter2({
commandQueue: { type: 'inMemory', collectionName: 'commands' },
eventStore: { type: 'inMemory' },
repository: { type: 'inMemory', collectionName: 'sagas' },
+ // commandLock: { type: 'inMemory', collectionName: 'commandlock' },
forcedQueuing: false,
disableQueuing: false,
- handleUpdispatchedEvents: true
+ handleUpdispatchedEvents: true,
+ retryOnConcurrencyTimeout: 800,
+ snapshotThreshold: 10
};
_.defaults(options, defaults);
@@ -73,6 +77,10 @@ module.exports = domain = _.extend(new EventEmitter2({
options.commandQueue.collectionName = options.commandQueue.collectionName || defaults.commandQueue.collectionName;
options.repository.collectionName = options.repository.collectionName || defaults.repository.collectionName;
+ if (options.commandLock) {
+ options.commandLock.collectionName = options.commandLock.collectionName || defaults.commandLock.collectionName;
+ }
+
// initialize the hub by passing the function that gets the command id from the event
hub.init(newGetCommandId || getCommandId);
@@ -118,11 +126,22 @@ module.exports = domain = _.extend(new EventEmitter2({
aggregateLoader.load(options.aggregatesPath, callback);
},
commandHandlers: function(callback) {
- commandHandlerLoader.configure(function() {
- this.use(es);
- this.use({ publish: publish });
- });
- commandHandlerLoader.load(options.commandHandlersPath, options, callback);
+ if (options.commandLock) {
+ commandLock.init(options.commandLock, function() {
+ commandHandlerLoader.configure(function() {
+ this.use(es);
+ this.use({ publish: publish });
+ this.use(commandLock);
+ });
+ commandHandlerLoader.load(options.commandHandlersPath, options, callback);
+ });
+ } else {
+ commandHandlerLoader.configure(function() {
+ this.use(es);
+ this.use({ publish: publish });
+ });
+ commandHandlerLoader.load(options.commandHandlersPath, options, callback);
+ }
},
sagas: function(callback) {
View
12 lib/loaders/commandHandlerLoader.js
@@ -21,6 +21,10 @@ module.exports = commandHandlerLoader = {
if (module.publish) {
commandHandlerLoader.publisher = module;
}
+
+ if (module.commit && module.get && module.find) {
+ commandHandlerLoader.commandLock = module;
+ }
},
load: function(commandHandlersPath, validationRulesPath, options, callback) {
@@ -28,13 +32,16 @@ module.exports = commandHandlerLoader = {
if (arguments.length === 2) {
callback = validationRulesPath;
validationRulesPath = commandHandlersPath + '/../validationRules';
- options = { snapshotThreshold: 10 };
+ options = { snapshotThreshold: 10, retryOnConcurrencyTimeout: 800 };
} else if (arguments.length === 3) {
callback = options;
options = validationRulesPath;
validationRulesPath = commandHandlersPath + '/../validationRules';
}
+ options.snapshotThreshold = options.snapshotThreshold || 10;
+ options.retryOnConcurrencyTimeout = options.retryOnConcurrencyTimeout || 800;
+
var commandHandlers = [];
if (!existsSync(commandHandlersPath)){
@@ -70,6 +77,9 @@ module.exports = commandHandlerLoader = {
commandHandler.configure(function() {
commandHandler.use(commandHandlerLoader.eventStore);
commandHandler.use(commandHandlerLoader.publisher);
+ if (commandHandlerLoader.commandLock) {
+ commandHandler.use(commandHandlerLoader.commandLock);
+ }
});
function action(id, cmd) {
View
2  package.json
@@ -1,7 +1,7 @@
{
"author": "adrai",
"name": "cqrs-domain",
- "version": "0.6.1",
+ "version": "0.7.0",
"private": false,
"main": "index.js",
"engines": {
View
153 test/integration/domainTest.js
@@ -1,30 +1,31 @@
-var expect = require('expect.js')
- , domain = require('../../index').domain;
+var expect = require('expect.js'),
+ domain = require('../../index').domain;
describe('Domain', function() {
- describe('noting a command', function() {
+ var dummyEmitter = new (require('events').EventEmitter)();
- describe('having well-formed data', function() {
+ before(function(done) {
- describe('having any command handlers', function() {
+ domain.on('event', function(evt) {
+ dummyEmitter.emit('published', evt);
+ });
+ domain.initialize({
+ commandHandlersPath: __dirname + '/commandHandlers',
+ aggregatesPath: __dirname + '/aggregates',
+ sagaHandlersPath: __dirname + '/sagaHandlers',
+ sagasPath: __dirname + '/sagas',
+ commandLock: { type: 'inMemory', collectionName: 'commandlock' },
+ disableQueuing: true
+ }, done);
- var dummyEmitter = new (require('events').EventEmitter)();
+ });
- before(function(done) {
+ describe('noting a command', function() {
- domain.on('event', function(evt) {
- dummyEmitter.emit('published', evt);
- });
- domain.initialize({
- commandHandlersPath: __dirname + '/commandHandlers',
- aggregatesPath: __dirname + '/aggregates',
- sagaHandlersPath: __dirname + '/sagaHandlers',
- sagasPath: __dirname + '/sagas',
- disableQueuing: true
- }, done);
+ describe('having well-formed data', function() {
- });
+ describe('having any command handlers', function() {
describe('having bad data', function() {
@@ -177,23 +178,6 @@ describe('Domain', function() {
describe('having a command handler that sends commands to other command handlers', function() {
- var dummyEmitter = new (require('events').EventEmitter)();
-
- before(function(done) {
-
- domain.on('event', function(evt) {
- dummyEmitter.emit('published', evt);
- });
- domain.initialize({
- commandHandlersPath: __dirname + '/commandHandlers',
- aggregatesPath: __dirname + '/aggregates',
- sagaHandlersPath: __dirname + '/sagaHandlers',
- sagasPath: __dirname + '/sagas',
- publishingInterval: 20
- }, done);
-
- });
-
it('it should acknowledge the command', function(done) {
var cmd = {
@@ -203,9 +187,15 @@ describe('Domain', function() {
haha: 'hihi'
}
};
+
+ var called = false;
+ dummyEmitter.once('published', function(evt) {
+ expect(called).to.be.ok();
+ done();
+ });
domain.handle(cmd, function(err) {
+ called = true;
expect(err).not.to.be.ok();
- done();
});
});
@@ -220,8 +210,8 @@ describe('Domain', function() {
}
};
- var fooItedReceived = false
- , fooCretedReceived = false;
+ var fooItedReceived = false,
+ fooCretedReceived = false;
function finish(evt) {
if (fooItedReceived && fooCretedReceived) {
@@ -248,30 +238,74 @@ describe('Domain', function() {
});
- });
-
- });
+ describe('simulating mutliple process handling the same aggregate instance', function() {
- describe('having any saga handlers', function() {
+ var cmdHandle = require('./commandHandlers/dummyCommandHandler'),
+ orgHandle;
+
+ before(function() {
+ orgHandle = cmdHandle.handle;
+ cmdHandle.handle = cmdHandle._handle;
+ });
+
+ after(function() {
+ cmdHandle.handle = orgHandle;
+ });
- var dummyEmitter2 = new (require('events').EventEmitter)();
+ describe('sending multiple commands together', function() {
- before(function(done) {
+ var cmd1 = {
+ command: 'changeDummy',
+ id: '1234552',
+ payload: {
+ id: '123825172'
+ }
+ };
+
+ var cmd2 = {
+ command: 'changeDummy',
+ id: '234557892',
+ payload: {
+ id: '123825172'
+ }
+ };
+
+ var cmd3 = {
+ command: 'changeDummy',
+ id: '23123457892',
+ payload: {
+ id: '123825172'
+ }
+ };
+
+ it('it should set revision correctly', function(done) {
+
+ var count = 0;
+ var handle;
+ dummyEmitter.on('published', handle = function(evt) {
+ count++;
+ if (count === 3) {
+ expect(evt.head.revision).to.eql(3);
+ dummyEmitter.removeListener('published', handle);
+ done();
+ }
+ });
+
+ domain.handle(cmd1, function(err) {});
+ domain.handle(cmd2, function(err) {});
+ domain.handle(cmd3, function(err) {});
+
+ });
+
+ });
- domain.on('event', function(evt) {
- if (evt.event === 'dummyDestroyed') {
- dummyEmitter2.emit('published', evt);
- }
});
- domain.initialize({
- commandHandlersPath: __dirname + '/commandHandlers',
- aggregatesPath: __dirname + '/aggregates',
- sagaHandlersPath: __dirname + '/sagaHandlers',
- sagasPath: __dirname + '/sagas',
- publishingInterval: 20
- }, done);
});
+
+ });
+
+ describe('having any saga handlers', function() {
describe('noting an expected event', function() {
@@ -279,12 +313,13 @@ describe('Domain', function() {
var cmd = {
command: 'cancelDummy',
- id: '82517'
+ id: '825171111'
};
- dummyEmitter2.once('published', function(evt) {
- expect(evt.event).to.eql('dummyDestroyed');
- done();
+ dummyEmitter.on('published', function(evt) {
+ if (evt.event === 'dummyDestroyed') {
+ done();
+ }
});
domain.handle(cmd, function(err) {});
Please sign in to comment.
Something went wrong with that request. Please try again.