Skip to content

Commit

Permalink
fix revisionGuard when handling duplicate events at the same time
Browse files Browse the repository at this point in the history
  • Loading branch information
adrai committed Apr 10, 2015
1 parent 853bd8f commit add83c7
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 28 deletions.
13 changes: 8 additions & 5 deletions lib/definitions/viewBuilder.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,11 @@ _.extend(ViewBuilder.prototype, {
*/
extractId: function (evt, callback) {
if (this.id && dotty.exists(evt, this.id)) {
debug('found viewmodel id in event');
debug('[' + this.name + '] found viewmodel id in event');
return callback(null, dotty.get(evt, this.id));
}

debug('not found viewmodel id in event, generate new id');
debug('[' + this.name + '] not found viewmodel id in event, generate new id');
this.collection.getNewId(callback);
},

Expand Down Expand Up @@ -239,7 +239,10 @@ _.extend(ViewBuilder.prototype, {
vm.set(_.cloneDeep(initValues));
}

debug('call denormalizer function');
var evtId = dotty.get(evt, this.definitions.event.id);

var debugOutPut = evtId ? (', [eventId]=' + evtId) : '';
debug('[' + this.name + ']' + debugOutPut + ' call denormalizer function');
this.denormFn(_.cloneDeep(payload), vm, function (err) {
if (err) {
debug(err);
Expand All @@ -248,7 +251,7 @@ _.extend(ViewBuilder.prototype, {

var notification = self.generateNotification(evt, vm);

debug('generate new id for notification');
debug('[' + self.name + ']' + debugOutPut + ' generate new id for notification');
self.getNewId(function (err, newId) {
if (err) {
debug(err);
Expand All @@ -263,7 +266,7 @@ _.extend(ViewBuilder.prototype, {

if (err instanceof ConcurrencyError) {
var retryIn = randomBetween(0, self.options.retryOnConcurrencyTimeout || 800);
debug('retry in ' + retryIn + 'ms');
debug('[' + self.name + ']' + debugOutPut + ' retry in ' + retryIn + 'ms');
setTimeout(function() {
self.loadViewModel(vm.id, function (err, vm) {
if (err) {
Expand Down
1 change: 1 addition & 0 deletions lib/denormalizer.js
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ _.extend(Denormalizer.prototype, {
* `function(err){}`
*/
clear: function (callback) {
this.revisionGuard.currentHandlingRevisions = {};
this.replayHandler.clear(callback);
}

Expand Down
24 changes: 24 additions & 0 deletions lib/errors/alreadyDenormalizingError.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict';

// Grab the util module that's bundled with Node
var util = require('util');

// Create a new custom Error constructor
function AlreadyDenormalizingError(msg) {
// Pass the constructor to V8's
// captureStackTrace to clean up the output
Error.captureStackTrace(this, AlreadyDenormalizingError);

// If defined, store a custom error message
if (msg) {
this.message = msg;
}
}

// Extend our custom Error from Error
util.inherits(AlreadyDenormalizingError, Error);

// Give our custom error a name property. Helpful for logging the error later.
AlreadyDenormalizingError.prototype.name = AlreadyDenormalizingError.name;

module.exports = AlreadyDenormalizingError;
9 changes: 7 additions & 2 deletions lib/orderQueue.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
'use strict';

var debug = require('debug')('denormalizer:orderQueue'),
_ = require('lodash');
_ = require('lodash'),
AlreadyDenormalizingError = require('./errors/alreadyDenormalizingError');

/**
* Queue constructor
Expand Down Expand Up @@ -32,6 +33,10 @@ Queue.prototype = {
});

if (alreadyInQueue) {
debug('event already denormalizing [concatenatedId]=' + id + ', [evtId]=' + objId);
clb(new AlreadyDenormalizingError('Event: [id]=' + objId + ', [evtId]=' + objId + ' already denormalizing!'), function (done) {
done(null);
});
return;
}

Expand All @@ -44,7 +49,7 @@ Queue.prototype = {
if (fn) {
var self = this;
(function wait () {
debug('wait called');
debug('wait called [concatenatedId]=' + id + ', [evtId]=' + objId);
setTimeout(function () {
var found = _.find(self.queue[id], function (o) {
return o.id === objId;
Expand Down
56 changes: 36 additions & 20 deletions lib/revisionGuard.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ var debug = require('debug')('denormalizer:revisionGuard'),
Queue = require('./orderQueue'),
ConcurrencyError = require('./errors/concurrencyError'),
AlreadyDenormalizedError = require('./errors/alreadyDenormalizedError'),
AlreadyDenormalizingError = require('./errors/alreadyDenormalizingError'),
dotty = require('dotty');

/**
Expand Down Expand Up @@ -49,6 +50,8 @@ function RevisionGuard (store, options) {

this.queue = new Queue({ queueTimeout: this.options.queueTimeout });

this.currentHandlingRevisions = {};

this.onEventMissing(function (info, evt) {
debug('missing events: ', info, evt);
});
Expand Down Expand Up @@ -128,14 +131,14 @@ RevisionGuard.prototype = {

/**
* Queues an event with its callback by aggregateId
* @param {String} aggId The aggregate id.
* @param {Object} evt The event object.
* @param {Function} callback The event callback.
*/
queueEvent: function (aggId, evt, callback) {
queueEvent: function (evt, callback) {
var self = this;
var evtId = dotty.get(evt, this.definition.id);
var revInEvt = dotty.get(evt, this.definition.revision);
var aggId = dotty.get(evt, this.definition.aggregateId);

var concatenatedId = this.getConcatenatedId(evt);

Expand All @@ -148,19 +151,19 @@ RevisionGuard.prototype = {
}

if (revInEvt === revInStore) {
debug('revision match');
debug('revision match [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
callback(null, function (clb) {
self.finishGuard(evt, revInStore, clb);
});
return;
}

if (loopCount < self.options.queueTimeoutMaxLoops) {
debug('revision mismatch, try/wait again...');
debug('revision mismatch => try/wait again... [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
return waitAgain();
}

debug('event timeouted');
debug('event timeouted [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
// try to replay depending from id and evt...
var info = {
aggregateId: aggId,
Expand All @@ -182,7 +185,6 @@ RevisionGuard.prototype = {
* `function(err){}`
*/
finishGuard: function (evt, revInStore, callback) {
var aggId = dotty.get(evt, this.definition.aggregateId);
var evtId = dotty.get(evt, this.definition.id);
var revInEvt = dotty.get(evt, this.definition.revision);

Expand All @@ -195,7 +197,7 @@ RevisionGuard.prototype = {
debug(err);
if (err instanceof ConcurrencyError) {
var retryIn = randomBetween(0, self.options.retryOnConcurrencyTimeout || 800);
debug('retry in ' + retryIn + 'ms');
debug('retry in ' + retryIn + 'ms for [concatenatedId]=' + concatenatedId + ', [revInStore]=' + (revInStore || 'null') + ', [revInEvt]=' + revInEvt);
setTimeout(function() {
self.guard(evt, callback);
}, retryIn);
Expand All @@ -207,17 +209,18 @@ RevisionGuard.prototype = {

self.queue.remove(concatenatedId, evtId);
callback(null);

var pendingEvents = self.queue.get(concatenatedId);
if (!pendingEvents || pendingEvents.length === 0) return debug('no other pending event found');
if (!pendingEvents || pendingEvents.length === 0) return debug('no other pending event found [concatenatedId]=' + concatenatedId + ', [revInStore]=' + (revInStore || 'null') + ', [revInEvt]=' + revInEvt);

var nextEvent = _.find(pendingEvents, function (e) {
var revInEvt = dotty.get(e.payload, self.definition.revision);
return revInEvt === revInStore;
var revInNextEvt = dotty.get(e.payload, self.definition.revision);
return revInNextEvt === revInEvt + 1;
});
if (!nextEvent) return debug('no next pending event found');

debug('found next pending event, guard');
if (!nextEvent) return debug('no next pending event found [concatenatedId]=' + concatenatedId + ', [revInStore]=' + (revInStore || 'null') + ', [revInEvt]=' + revInEvt);

debug('found next pending event => guard [concatenatedId]=' + concatenatedId + ', [revInStore]=' + (revInStore || 'null') + ', [revInEvt]=' + revInEvt);
self.guard(nextEvent.payload, nextEvent.callback);
});
},
Expand All @@ -236,35 +239,45 @@ RevisionGuard.prototype = {
}

var self = this;

var aggId = dotty.get(evt, this.definition.aggregateId);

var revInEvt = dotty.get(evt, this.definition.revision);

var concatenatedId = this.getConcatenatedId(evt);

function proceed (revInStore) {
if (!revInStore) {
debug('first revision to store');
debug('first revision to store [concatenatedId]=' + concatenatedId + ', [revInStore]=' + (revInStore || 'null') + ', [revInEvt]=' + revInEvt);
callback(null, function (clb) {
self.finishGuard(evt, revInStore, clb);
});
return;
}

if (revInEvt < revInStore) {
debug('event already denormalized');
callback(new AlreadyDenormalizedError(), function (clb) {
debug('event already denormalized [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
callback(new AlreadyDenormalizedError('Event: [id]=' + dotty.get(evt, self.definition.id) + ', [revision]=' + revInEvt + ', [concatenatedId]=' + concatenatedId + ' already denormalized!'), function (clb) {
clb(null);
});
return;
}

if (revInEvt > revInStore) {
debug('queue event');
self.queueEvent(aggId, evt, callback);
debug('queue event [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
self.queueEvent(evt, callback);
return;
}

if (self.currentHandlingRevisions[concatenatedId] >= revInEvt) {
debug('event already denormalizing [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
callback(new AlreadyDenormalizingError('Event: [id]=' + dotty.get(evt, self.definition.id) + ', [revision]=' + revInEvt + ', [concatenatedId]=' + concatenatedId + ' already denormalizing!'), function (clb) {
clb(null);
});
return;
}

self.currentHandlingRevisions[concatenatedId] = revInEvt;

debug('event is in correct order => go for it! [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
callback(null, function (clb) {
self.finishGuard(evt, revInStore, clb);
});
Expand All @@ -279,14 +292,17 @@ RevisionGuard.prototype = {
}

if (loop <= 0) {
debug('finished loops for retry => proceed [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
return proceed(revInStore);
}

if (!revInStore && revInEvt !== 1) {
debug('no revision in store => retry [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
retry(max, --loop);
return;
}

debug('revision in store existing => proceed [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
proceed(revInStore);
});
}, randomBetween(max / 5, max));
Expand Down
3 changes: 3 additions & 0 deletions releasenotes.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## [v1.3.12](https://github.com/adrai/node-cqrs-eventdenormalizer/compare/v1.3.11...v1.3.12)
- fix revisionGuard when handling duplicate events at the same time

## [v1.3.11](https://github.com/adrai/node-cqrs-eventdenormalizer/compare/v1.3.10...v1.3.11)
- update viewmodel dependency

Expand Down
Loading

0 comments on commit add83c7

Please sign in to comment.