Skip to content

Commit

Permalink
introduced replay stremed
Browse files Browse the repository at this point in the history
  • Loading branch information
adrai committed Jan 16, 2014
1 parent 33427b7 commit a7e8a59
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 49 deletions.
1 change: 1 addition & 0 deletions .npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
test
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ It can be very useful as eventdenormalizer component if you work with (d)ddd, cq
// to replay
eventDenormalizer.replay([] /* array of ordered events */, function(err) {});

// to replay streamed
eventDenormalizer.replayStreamed(function(replay, done) {

replay(evt1);
replay(evt2);
replay(evt3);

done(function(err) { });

});

## Define ViewBuilders...

var base = require('cqrs-eventdenormalizer').viewBuilderBase;
Expand Down
43 changes: 43 additions & 0 deletions lib/bases/viewBuilderBase.js
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,49 @@ ViewBuilder.prototype = {

});

},

replayStreamed: function(fn, retryTimout) {

var self = this;

retryTimout = retryTimout || 10;

var queue = [];

var replay = function(evt) {
queue.push(evt);

(function handle(e) {
self.handle(e, true, function(err) {
queue.splice(_.indexOf(e), 1);

if (queue.length > 0) {
handle(queue[0]);
}
});
})(evt);
};

var done = function(callback) {
(function retry() {
if (queue.length > 0) {
return setTimeout(retry, retryTimout);
}

var replVms = _.values(self.replayingVms);

async.each(replVms, function(vm, callback) {
self.saveViewModel(vm, callback);
}, function(err) {
self.replayingVms = {};
callback(err);
});
})();
};

fn(replay, done);

}

};
Expand Down
57 changes: 9 additions & 48 deletions lib/eventDenormalizer.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ var viewBuilderLoader = require('./loaders/viewBuilderLoader'),
_ = require('lodash'),
async = require('async'),
queue = require('node-queue'),
eventQueue,
guardStore,
viewBuilders,
replayHandler = require('./replayHandler'),
repository = require('viewmodel').write.create(),
revisionGuardStore = require('./revisionGuardStore'),
revisionGuard = require('./revisionGuard'),
eventQueue,
guardStore,
viewBuilders,
evtDen;

module.exports = evtDen = _.extend(new EventEmitter2({
Expand Down Expand Up @@ -103,6 +104,9 @@ module.exports = evtDen = _.extend(new EventEmitter2({
this.use(eventDispatcher);
this.use(eventQueue);
});

replayHandler.initialize(viewBuilders, guardStore);

revisionGuard.initialize({
ignoreRevision: options.ignoreRevision,
queueTimeout: options.revisionGuardQueueTimeout,
Expand Down Expand Up @@ -143,51 +147,8 @@ module.exports = evtDen = _.extend(new EventEmitter2({
}
},

replay: function(evts, callback) {
replay: replayHandler.replay,

var revisionMap = {},
groupedEvents = {};

_.each(evts, function(evt) {
if (evt.head && evt.head.revision) {
revisionMap[evt.payload.id] = evt.head.revision;
}

var interested = _.filter(viewBuilders, function(vB) {
return _.contains(vB.registeredEventNames, evt.event);
});

_.each(interested, function(inter) {
groupedEvents[inter.id] = groupedEvents[inter.id] || [];
groupedEvents[inter.id].push(evt);
});
});

async.series([
function(callback) {
async.each(viewBuilders, function(viewBuilder, callback) {
if (!groupedEvents[viewBuilder.id] || groupedEvents[viewBuilder.id].length === 0) {
return callback(null);
}

viewBuilder.replay(groupedEvents[viewBuilder.id], callback);

}, callback);
},
function(callback) {
var ids = _.keys(revisionMap);
async.each(ids, function(id, callback) {
guardStore.getRevision(id, function(err, entry) {
if (err) { return callback(err); }

entry.revision = revisionMap[id];
guardStore.saveRevision(entry, callback);
});
}, callback);
}
], function(err) {
if (callback) callback(err);
});
}
replayStreamed: replayHandler.replayStreamed

});
119 changes: 119 additions & 0 deletions lib/replayHandler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
var _ = require('lodash'),
async = require('async'),
viewBuilders,
guardStore;

module.exports = {
initialize: function(options, vB, gS) {
if (!gS) {
gS = vB;
vB = options;
options = {};
}

viewBuilders = vB;

guardStore = gS;
},

replay: function(evts, callback) {

var revisionMap = {},
groupedEvents = {};

_.each(evts, function(evt) {
if (evt.head && evt.head.revision) {
revisionMap[evt.payload.id] = evt.head.revision;
}

var interested = _.filter(viewBuilders, function(vB) {
return _.contains(vB.registeredEventNames, evt.event);
});

_.each(interested, function(inter) {
groupedEvents[inter.id] = groupedEvents[inter.id] || [];
groupedEvents[inter.id].push(evt);
});
});

async.series([
function(callback) {
async.each(viewBuilders, function(viewBuilder, callback) {
if (!groupedEvents[viewBuilder.id] || groupedEvents[viewBuilder.id].length === 0) {
return callback(null);
}

viewBuilder.replay(groupedEvents[viewBuilder.id], callback);

}, callback);
},
function(callback) {
var ids = _.keys(revisionMap);
async.each(ids, function(id, callback) {
guardStore.getRevision(id, function(err, entry) {
if (err) { return callback(err); }

entry.revision = revisionMap[id];
guardStore.saveRevision(entry, callback);
});
}, callback);
}
], function(err) {
if (callback) callback(err);
});
},

replayStreamed: function(fn, retryTimout) {

var revisionMap = {},
viewBuilderReplayStreams = {};

var replay = function(evt) {
if (evt.head && evt.head.revision) {
revisionMap[evt.payload.id] = evt.head.revision;
}

var interested = _.filter(viewBuilders, function(vB) {
return _.contains(vB.registeredEventNames, evt.event);
});

_.each(interested, function(inter) {
if (!viewBuilderReplayStreams[inter.id]) {
inter.replayStreamed(function(vbReplay, vbDone) {
viewBuilderReplayStreams[inter.id] = {
replay: vbReplay,
done: vbDone
};
}, retryTimout);
}

viewBuilderReplayStreams[inter.id].replay(evt);
});
};

var done = function(callback) {
async.series([
function(callback) {
async.each(_.values(viewBuilderReplayStreams), function(viewBuilderReplayStream, callback) {
viewBuilderReplayStream.done(callback);
}, callback);
},
function(callback) {
var ids = _.keys(revisionMap);
async.each(ids, function(id, callback) {
guardStore.getRevision(id, function(err, entry) {
if (err) { return callback(err); }

entry.revision = revisionMap[id];
guardStore.saveRevision(entry, callback);
});
}, callback);
}
], function(err) {
if (callback) callback(err);
});
};

fn(replay, done);
}
};
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"author": "adrai",
"name": "cqrs-eventdenormalizer",
"version": "0.3.2",
"version": "0.3.3",
"private": false,
"main": "index.js",
"engines": {
Expand Down
95 changes: 95 additions & 0 deletions test/integration/eventDenormalizerTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -778,4 +778,99 @@ describe('EventDenormalizer', function() {

});

describe('repalying streamed', function() {

var evts = [
{
id: '222222-111111',
event: 'dummyCreated',
head: {
revision: 1
},
payload: {
id: '222222-1987',
first: 'when created'
}
},
{
id: '222222-222222',
event: 'loaded',
head: {
revision: 1
},
payload: {
id: '222222-2014'
}
},
{
id: '222222-333333',
event: 'dummyChanged',
head: {
revision: 2
},
payload: {
id: '222222-1987',
second: 'when updated'
}
},
{
id: '222222-444444',
event: 'somethingFlushed',
head: {
revision: 2
},
payload: {
id: '222222-2014'
}
}
];

it('it should work as expected', function(done) {

eventDenormalizer.replayStreamed(function(replay, finished) {
for (var i = 0, length = evts.length; i < length; i++) {
replay(evts[i]);
}
finished(function(err) {
async.series([
function(callback) {
dummyRepo.get('222222-1987', function(err, res) {
expect(res.first).to.eql('when created');
expect(res.second).to.eql('when updated');

callback(err);
});
},
function(callback) {
dummyRepo.get('222222-2014', function(err, res) {
expect(res.loadedSet).to.eql(undefined);
expect(res.flushSet).to.eql(undefined);

callback(err);
});
},
function(callback) {
dummy2Repo.get('222222-2014', function(err, res) {
expect(res.loadedSet).to.eql('loaded');
expect(res.flushSet).to.eql('flushed');

callback(err);
});
},
function(callback) {
dummy2Repo.get('222222-1987', function(err, res) {
expect(res.first).to.eql(undefined);
expect(res.second).to.eql(undefined);

callback(err);
});
}
], done);
});
});

});

});

});

0 comments on commit a7e8a59

Please sign in to comment.