Permalink
Browse files

asynchronous api for saga

  • Loading branch information...
1 parent 24f85b7 commit 68f8682c64d04cdfe68f030c38f724720dd0b45b @adrai committed Jun 3, 2013
Showing with 89 additions and 76 deletions.
  1. +4 −0 README.md
  2. +21 −13 lib/bases/sagaBase.js
  3. +13 −12 lib/bases/sagaHandlerBase.js
  4. +48 −49 lib/domain.js
  5. +1 −1 package.json
  6. +2 −1 test/sagaHandlerTest.js
View
@@ -76,6 +76,10 @@ See [tests](https://github.com/adrai/node-cqrs-domain/tree/master/test) for deta
# Release Notes
+## v0.4.0
+
+- asynchronous api for saga
+
## v0.3.9
- optimized performance a little
View
@@ -1,5 +1,6 @@
var utils = require('../utils')
, eventEmitter = require('../eventEmitter')
+ , async = require('async')
, _ = require('lodash');
var Saga = function(id) {
@@ -25,15 +26,22 @@ Saga.prototype = {
return this.attributes[attr];
},
- load: function(data) {
+ load: function(data, callback) {
if (data) {
this.set(data);
}
+ var self = this;
+
if (this.initialize) {
- this.initialize();
+ this.initialize(function(err) {
+ self.isInited = true;
+ callback(err);
+ });
+ } else {
+ this.isInited = true;
+ callback(null);
}
- this.isInited = true;
},
toJSON: function() {
@@ -50,7 +58,7 @@ Saga.prototype = {
}
},
- transition: function(events) {
+ transition: function(events, callback) {
var self = this;
if (!_.isArray(events)) {
@@ -67,16 +75,16 @@ Saga.prototype = {
}
});
- _.each(historyEvents, function(evt) {
- self[evt.event](evt.payload);
+ async.forEach(historyEvents, function(evt, callback) {
+ self[evt.event](evt.payload, callback);
+ }, function(err) {
+ async.forEach(newEvents, function(evt, callback) {
+ self[evt.event](evt.payload, function(err) {
+ self.uncommittedEvents.push(evt);
+ callback(err);
+ });
+ }, callback);
});
-
- _.each(newEvents, function(evt) {
- self[evt.event](evt.payload);
- self.uncommittedEvents.push(evt);
- });
-
- return;
}
};
@@ -7,22 +7,21 @@ SagaHandler.prototype = {
sagas: {},
- initialize: function() {
+ initialize: function(callback) {
var self = this;
- function initSaga(id, data) {
+ function initSaga(id, data, callback) {
var saga = new self.Saga(id);
- saga.load(data);
self.sagas[id] = saga;
+ saga.load(data, callback);
}
this.repository.find({ type: this.saga }, function(err, sagas) {
if (!err) {
- for(var i = 0, len = sagas.length; i < len; i++) {
- var saga = sagas[i];
- initSaga(saga.id, saga);
- }
+ async.forEach(sagas, function(saga, callback) {
+ initSaga(saga.id, saga, callback);
+ }, callback);
}
});
@@ -41,8 +40,9 @@ SagaHandler.prototype = {
// transition the event
function(saga, callback) {
- saga.transition(evt);
- callback(null, saga);
+ saga.transition(evt, function(err) {
+ callback(err, saga);
+ });
},
// commit the uncommittedEvents
@@ -88,9 +88,10 @@ SagaHandler.prototype = {
if (!saga) {
saga = new this.Saga(id);
this.repository.get(id, function(err, sagaData) {
- saga.load(sagaData);
self.sagas[id] = saga;
- callback(null, saga);
+ saga.load(sagaData, function(err) {
+ callback(err, saga);
+ });
});
} else {
callback(null, saga);
@@ -104,7 +105,7 @@ SagaHandler.prototype = {
use: function(module) {
if (!module) return;
-
+
if (module.commit) {
this.repository = module;
}
View
@@ -131,7 +131,7 @@ module.exports = domain = _.extend(new EventEmitter2({
var aggregates = results.aggregates
, commandHandlers = results.commandHandlers;
-
+
for(var i = 0, len = commandHandlers.length; i < len; i++) {
var commandHandler = commandHandlers[i];
commandHandler.Aggregate = aggregates[commandHandler.aggregate];
@@ -141,57 +141,56 @@ module.exports = domain = _.extend(new EventEmitter2({
var sagas = results.sagas
, sagaHandlers = results.sagaHandlers;
- for(var j = 0, lenj = sagaHandlers.length; j < lenj; j++) {
- var sagaHandler = sagaHandlers[j];
+ async.forEach(sagaHandlers, function(sagaHandler, callback) {
sagaHandler.Saga = sagas[sagaHandler.saga];
- sagaHandler.initialize();
- }
+ sagaHandler.initialize(callback);
+ }, function(err) {
+ queue.connect(options.commandQueue, function(err, commandQueue) {
+ commandDispatcher.configure(function() {
+ this.use(commandQueue);
+ });
- queue.connect(options.commandQueue, function(err, commandQueue) {
- commandDispatcher.configure(function() {
- this.use(commandQueue);
- });
+ var worker = {};
+
+ // starts the worker by using an interval loop
+ worker.start = function() {
+ worker.process = setInterval(function() {
+
+ // if the last loop is still in progress leave this loop
+ if (worker.isRunning)
+ return;
+
+ worker.isRunning = true;
+
+ (function next(e) {
- var worker = {};
-
- // starts the worker by using an interval loop
- worker.start = function() {
- worker.process = setInterval(function() {
-
- // if the last loop is still in progress leave this loop
- if (worker.isRunning)
- return;
-
- worker.isRunning = true;
-
- (function next(e) {
-
- // dipatch one command in queue and call the _next_ callback, which
- // will call _process_ for the next command in queue.
- var process = function(cmdPointer, next) {
-
- // Publish it now...
- commandDispatcher.dispatch(cmdPointer.command, function(err) {
- if (cmdPointer.callback) cmdPointer.callback(null);
- next();
- });
- };
-
- // serial _process_ all events in queue
- if (!e && commandBuffer.length) {
- process(commandBuffer.shift(), next);
- }
- })();
-
- worker.isRunning = false;
-
- }, 10);
- };
-
- // fire things off
- worker.start();
-
- commandDispatcher.initialize({}, callback);
+ // dipatch one command in queue and call the _next_ callback, which
+ // will call _process_ for the next command in queue.
+ var process = function(cmdPointer, next) {
+
+ // Publish it now...
+ commandDispatcher.dispatch(cmdPointer.command, function(err) {
+ if (cmdPointer.callback) cmdPointer.callback(null);
+ next();
+ });
+ };
+
+ // serial _process_ all events in queue
+ if (!e && commandBuffer.length) {
+ process(commandBuffer.shift(), next);
+ }
+ })();
+
+ worker.isRunning = false;
+
+ }, 10);
+ };
+
+ // fire things off
+ worker.start();
+
+ commandDispatcher.initialize({}, callback);
+ });
});
});
View
@@ -1,7 +1,7 @@
{
"author": "adrai",
"name": "cqrs-domain",
- "version": "0.3.9",
+ "version": "0.4.0",
"private": false,
"main": "index.js",
"engines": {
@@ -8,14 +8,15 @@ var stream = new EventEmitter();
var commandEmitter = new EventEmitter();
var Saga = sagaBase.extend({
- somethingDoneEvent: function(evt) {
+ somethingDoneEvent: function(evt, callback) {
this.set('a', 'b');
this.set({ c: 'd' });
evt.command = 'blaCmd';
delete evt.event;
delete evt.commandId;
this.sendCommand(evt);
commandEmitter.emit('done');
+ callback();
}
});
var saga = new Saga('id_1');

0 comments on commit 68f8682

Please sign in to comment.