Skip to content

Commit

Permalink
introduce versioned messages and snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
adrai committed Jan 17, 2014
1 parent ed3311d commit 43e70b7
Show file tree
Hide file tree
Showing 12 changed files with 357 additions and 41 deletions.
50 changes: 50 additions & 0 deletions README.md
Expand Up @@ -43,6 +43,23 @@ It can be very useful as domain component if you work with (d)ddd, cqrs, eventde

module.exports = base.extend({

// snapshotThreshold: 20,
// or
// snapshotThreshold: function() { return 12 + 10; },
//
// used to version the snap shots
// version: 3,
//
// laodSnapshot: function(data, version) {
// if (version === 1) {
// this.set(snap.data);
// } else {
// this.set(snap.data);
// }
// },

// commands

changeDummy: function(data, callback) {
this.apply(this.toEvent('dummyChanged', data));

Expand All @@ -61,6 +78,23 @@ It can be very useful as domain component if you work with (d)ddd, cqrs, eventde
this.checkBusinessRules(callback);
},

fooIt: function(data, callback) {
this.apply(this.toEvent('fooIted', data));

this.checkBusinessRules(callback);
},

versionedCmd: function(data, callback) {
this.apply(this.toEvent('versionedEvt', data), callback);
},

versionedCmd_1: function(data, callback) {
this.apply(this.toEvent('versionedEvt', data, 1), callback);
},


// events

dummyChanged: function(data) {
this.set(data);
},
Expand All @@ -71,6 +105,18 @@ It can be very useful as domain component if you work with (d)ddd, cqrs, eventde

dummyDestroyed: function(data) {
this.set('destroyed', true);
},

fooIted: function(data) {
this.set('foo', true);
},

versionedEvt: function(data) {
this.set(data);
},

versionedEvt_1: function(data) {
this.set(data);
}

});
Expand All @@ -79,6 +125,10 @@ See [tests](https://github.com/adrai/node-cqrs-domain/tree/master/test) for deta

# Release Notes

## v0.7.5

- introduce versioned messages and snapshots

## v0.7.4

- fixed naming of handleUndispatchedEvents option
Expand Down
56 changes: 45 additions & 11 deletions lib/bases/aggregateBase.js
Expand Up @@ -31,20 +31,30 @@ Aggregate.prototype = {
return _.clone(this.attributes);
},

toEvent: function(name, data) {
toEvent: function(name, data, version) {
var event = {
event: name,
payload: data || {}
};

if (!event.payload.id) event.payload.id = this.id;

if (version !== null && version !== undefined) {
event.head = { version: version };
}

return event;
},

loadFromHistory: function(data, events) {
if (data) {
this.set(data);
laodSnapshot: function(data, version) {
this.set(data);
},

loadFromHistory: function(snap, events) {
if (snap && snap.data && snap.version) {
this.laodSnapshot(snap.data, snap.version);
} else if (snap && snap.data) {
this.laodSnapshot(snap.data);
}

if (events) {
Expand All @@ -55,6 +65,21 @@ Aggregate.prototype = {
}
},

applyEvent: function(evt, callback) {
if (evt.head &&
evt.head.version !== null &&
evt.head.version !== undefined &&
this[evt.event + '_' + evt.head.version]) {
this[evt.event + '_' + evt.head.version](evt.payload);
if (callback) callback(null);
return;
}

this[evt.event](evt.payload);

if (callback) callback(null);
},

apply: function(events, callback) {
var self = this;

Expand All @@ -73,24 +98,23 @@ Aggregate.prototype = {
});

_.each(historyEvents, function(evt) {
self[evt.event](evt.payload);
self.applyEvent(evt);

if (self.attributes.revision < evt.head.revision) {
if (evt.head && self.attributes.revision < evt.head.revision) {
self.attributes.revision = evt.head.revision;
}
});

this.previousAttributes = this.toJSON();

_.each(newEvents, function(evt) {
self[evt.event](evt.payload);
evt.head = { revision: ++self.attributes.revision };
self.applyEvent(evt);
evt.head = evt.head || {};
evt.head.revision = ++self.attributes.revision;
self.uncommittedEvents.push(evt);
});

if (callback) callback(null);

return;
},

checkBusinessRules: function(callback) {
Expand All @@ -101,7 +125,15 @@ Aggregate.prototype = {
if(!this.businessRules) return callback(null);

async.each(this.businessRules, function(rule, callback) {
rule.call(self, changedAttributes, self.previousAttributes, self.uncommittedEvents, function(ruleId, message) {
var args = [changedAttributes,
self.previousAttributes,
self.uncommittedEvents];

if (rule.length === 5) {
args.push(self.version);
}

args.push(function(ruleId, message) {
if (ruleId) {
if (!message) {
message = ruleId;
Expand All @@ -111,6 +143,8 @@ Aggregate.prototype = {
}
callback(null);
});

rule.apply(self, args);
}, function() {
if (keys.length > 0) {
self.attributes = self.previousAttributes;
Expand Down
47 changes: 37 additions & 10 deletions lib/bases/commandHandlerBase.js
Expand Up @@ -53,14 +53,14 @@ _.extend(CommandHandler.prototype, {

// call validate command
function(aggregate, stream, callback) {
self.validate(cmd.command, cmd.payload, function(err) {
self.validate(cmd, function(err) {
callback(err, aggregate, stream);
});
},

// call command function on aggregate
function(aggregate, stream, callback) {
aggregate[cmd.command](cmd.payload, function(err) {
self.handleCommand(aggregate, cmd, function(err) {
callback(err, aggregate, stream);
});
},
Expand All @@ -77,6 +77,22 @@ _.extend(CommandHandler.prototype, {
});
},

handleCommand: function(aggregate, cmd, callback) {
if (cmd.head &&
cmd.head.version !== null &&
cmd.head.version !== undefined &&
aggregate[cmd.command + '_' + cmd.head.version]) {
aggregate[cmd.command + '_' + cmd.head.version](cmd.payload, function(err) {
if (callback) callback(err);
});
return;
}

aggregate[cmd.command](cmd.payload, function(err) {
if (callback) callback(err);
});
},

reorderCommandLock: function(id, callback) {
var self = this;
this.commandLock.find({ aggregateId: id }, function(err, res) {
Expand Down Expand Up @@ -171,12 +187,22 @@ _.extend(CommandHandler.prototype, {
}
},

validate: function(ruleName, data, callback) {
if(this.validationRules && this.validationRules[ruleName]) {
this.validationRules[ruleName].validate(data, callback);
} else {
callback(null);
validate: function(cmd, callback) {
if (this.validationRules &&
cmd.head &&
cmd.head.version !== null &&
cmd.head.version !== undefined &&
this.validationRules[cmd.command + '_' + cmd.head.version]) {
this.validationRules[cmd.command + '_' + cmd.head.version].validate(cmd.payload, callback);
return;
}

if (this.validationRules && this.validationRules[cmd.command]) {
this.validationRules[cmd.command].validate(cmd.payload, callback);
return;
}

callback(null);
},

_handle: function(id, cmd) {
Expand Down Expand Up @@ -218,17 +244,18 @@ _.extend(CommandHandler.prototype, {
async.map(stream.events, function(evt, next) {
next(null, evt.payload);
}, function(err, events) {
aggregate.loadFromHistory(snapshot.data, events);
aggregate.loadFromHistory(snapshot, events);

// Check if snapshotting is needed.
var snapshotThreshold = aggregate.getSnapshotThreshold() || self.options.snapshotThreshold;
if (stream.events.length >= snapshotThreshold) {
var streamId = stream.streamId,
revision = stream.currentRevision(),
data = aggregate.toJSON();
data = aggregate.toJSON(),
version = aggregate.version;

process.nextTick(function() {
self.eventStore.createSnapshot(streamId, revision, data);
self.eventStore.createSnapshot(streamId, revision, data, version);
});
}

Expand Down
34 changes: 29 additions & 5 deletions lib/bases/sagaBase.js
Expand Up @@ -26,9 +26,17 @@ Saga.prototype = {
return this.attributes[attr];
},

loadData: function(data, version) {
this.set(data);
},

load: function(data, callback) {
if (data) {
this.set(data);
if (data && data.version) {
var version = data.version;
delete data.version;
this.loadData(data, version);
} else if (data) {
this.loadData(data);
}

var self = this;
Expand All @@ -45,7 +53,11 @@ Saga.prototype = {
},

toJSON: function() {
return _.clone(this.attributes);
var clone = _.clone(this.attributes);
if (this.version !== null && this.version !== undefined) {
clone.version = this.version;
}
return clone;
},

sendCommand: function(cmd) {
Expand All @@ -58,6 +70,18 @@ Saga.prototype = {
}
},

transitionEvent: function(evt, callback) {
if (evt.head &&
evt.head.version !== null &&
evt.head.version !== undefined &&
this[evt.event + '_' + evt.head.version]) {
this[evt.event + '_' + evt.head.version](evt.payload, callback);
return;
}

this[evt.event](evt.payload, callback);
},

transition: function(events, callback) {
var self = this;

Expand All @@ -76,10 +100,10 @@ Saga.prototype = {
});

async.forEach(historyEvents, function(evt, callback) {
self[evt.event](evt.payload, callback);
self.transitionEvent(evt, callback);
}, function(err) {
async.forEach(newEvents, function(evt, callback) {
self[evt.event](evt.payload, function(err) {
self.transitionEvent(evt, function(err) {
self.uncommittedEvents.push(evt);
callback(err);
});
Expand Down
14 changes: 12 additions & 2 deletions lib/bases/sagaHandlerBase.js
Expand Up @@ -78,11 +78,21 @@ SagaHandler.prototype = {
},

handle: function(evt) {
if (this[evt.event] &&
evt.head &&
evt.head.version !== null &&
evt.head.version !== undefined &&
this[evt.event + '_' + evt.head.version]) {
this[evt.event + '_' + evt.head.version](evt);
return;
}

if (this[evt.event]) {
this[evt.event](evt);
} else {
this.defaultHandle(evt.payload.id, evt);
return;
}

this.defaultHandle(evt.payload.id, evt);
},

loadSaga: function(id, callback) {
Expand Down
4 changes: 2 additions & 2 deletions package.json
@@ -1,7 +1,7 @@
{
"author": "adrai",
"name": "cqrs-domain",
"version": "0.7.4",
"version": "0.7.5",
"private": false,
"main": "index.js",
"engines": {
Expand All @@ -15,7 +15,7 @@
"lodash": ">= 2.4.1",
"eventemitter2": ">= 0.4.13",
"node-queue": ">= 0.4.0",
"eventstore": ">= 0.7.2",
"eventstore": ">= 0.7.3",
"viewmodel": ">= 0.5.3",
"nodeEventedCommand": ">= 0.1.2",
"retry": ">= 0.6.0",
Expand Down

0 comments on commit 43e70b7

Please sign in to comment.