From da72beee1f835abe41372cac5d1b3f1fd4615a75 Mon Sep 17 00:00:00 2001 From: Adriano Raiano Date: Mon, 3 Jun 2013 11:08:05 +0200 Subject: [PATCH] asynchronous api for saga --- README.md | 4 ++ lib/domain/bases/sagaBase.js | 34 ++++++---- lib/domain/bases/sagaHandlerBase.js | 25 ++++---- lib/domain/domain.js | 98 +++++++++++++++-------------- package.json | 2 +- test/sagaHandlerTest.js | 3 +- 6 files changed, 92 insertions(+), 74 deletions(-) diff --git a/README.md b/README.md index babc51a..53b2eb7 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,10 @@ See [tests](https://github.com/adrai/node-cqs/tree/master/test) for detailed inf # Release Notes +## v0.4.0 + +- asynchronous api for saga + ## v0.3.7 - optimized performance a little diff --git a/lib/domain/bases/sagaBase.js b/lib/domain/bases/sagaBase.js index 5f65882..40d989b 100644 --- a/lib/domain/bases/sagaBase.js +++ b/lib/domain/bases/sagaBase.js @@ -1,5 +1,6 @@ var utils = require('../../utils') , eventEmitter = require('../eventEmitter') + , async = require('async') , _ = require('lodash'); var Saga = function(id) { @@ -25,15 +26,22 @@ Saga.prototype = { return this.attributes[attr]; }, - load: function(data) { + load: function(data, callback) { if (data) { this.set(data); } + var self = this; + if (this.initialize) { - this.initialize(); + this.initialize(function(err) { + self.isInited = true; + callback(err); + }); + } else { + this.isInited = true; + callback(null); } - this.isInited = true; }, toJSON: function() { @@ -50,7 +58,7 @@ Saga.prototype = { } }, - transition: function(events) { + transition: function(events, callback) { var self = this; if (!_.isArray(events)) { @@ -67,16 +75,16 @@ Saga.prototype = { } }); - _.each(historyEvents, function(evt) { - self[evt.event](evt.payload); + async.forEach(historyEvents, function(evt, callback) { + self[evt.event](evt.payload, callback); + }, function(err) { + async.forEach(newEvents, function(evt, callback) { + self[evt.event](evt.payload, function(err) { + self.uncommittedEvents.push(evt); + callback(err); + }); + }, callback); }); - - _.each(newEvents, function(evt) { - self[evt.event](evt.payload); - self.uncommittedEvents.push(evt); - }); - - return; } }; diff --git a/lib/domain/bases/sagaHandlerBase.js b/lib/domain/bases/sagaHandlerBase.js index 09e7bf8..858897f 100644 --- a/lib/domain/bases/sagaHandlerBase.js +++ b/lib/domain/bases/sagaHandlerBase.js @@ -7,22 +7,21 @@ SagaHandler.prototype = { sagas: {}, - initialize: function() { + initialize: function(callback) { var self = this; - function initSaga(id, data) { + function initSaga(id, data, callback) { var saga = new self.Saga(id); - saga.load(data); self.sagas[id] = saga; + saga.load(data, callback); } this.repository.find({ type: this.saga }, function(err, sagas) { if (!err) { - for(var i = 0, len = sagas.length; i < len; i++) { - var saga = sagas[i]; - initSaga(saga.id, saga); - } + async.forEach(sagas, function(saga, callback) { + initSaga(saga.id, saga, callback); + }, callback); } }); @@ -41,8 +40,9 @@ SagaHandler.prototype = { // transition the event function(saga, callback) { - saga.transition(evt); - callback(null, saga); + saga.transition(evt, function(err) { + callback(err, saga); + }); }, // commit the uncommittedEvents @@ -88,9 +88,10 @@ SagaHandler.prototype = { if (!saga) { saga = new this.Saga(id); this.repository.get(id, function(err, sagaData) { - saga.load(sagaData); self.sagas[id] = saga; - callback(null, saga); + saga.load(sagaData, function(err) { + callback(err, saga); + }); }); } else { callback(null, saga); @@ -104,7 +105,7 @@ SagaHandler.prototype = { use: function(module) { if (!module) return; - + if (module.commit) { this.repository = module; } diff --git a/lib/domain/domain.js b/lib/domain/domain.js index b15b9c4..0936f21 100644 --- a/lib/domain/domain.js +++ b/lib/domain/domain.js @@ -60,11 +60,14 @@ module.exports = domain = _.extend(new EventEmitter2({ var defaults = { commandQueue: { type: 'inMemory', collectionName: 'commands' }, - repository: { type: 'inMemory' } + repository: { type: 'inMemory', collectionName: 'sagas' } }; _.defaults(options, defaults); + options.commandQueue.collectionName = options.commandQueue.collectionName || defaults.commandQueue.collectionName; + options.repository.collectionName = options.repository.collectionName || defaults.repository.collectionName; + // initialize the hub by passing the function that gets the command id from the event hub.init(newGetCommandId || getCommandId); @@ -133,57 +136,58 @@ module.exports = domain = _.extend(new EventEmitter2({ var sagas = results.sagas , sagaHandlers = results.sagaHandlers; - for(var j = 0, lenj = sagaHandlers.length; j < lenj; j++) { - var sagaHandler = sagaHandlers[j]; + async.forEach(sagaHandlers, function(sagaHandler, callback) { sagaHandler.Saga = sagas[sagaHandler.saga]; - sagaHandler.initialize(); - } + sagaHandler.initialize(callback); + }, function(err) { - queue.connect(options.commandQueue, function(err, commandQueue) { - commandDispatcher.configure(function() { - this.use(commandQueue); - }); - - var worker = {}; - - // starts the worker by using an interval loop - worker.start = function() { - worker.process = setInterval(function() { + queue.connect(options.commandQueue, function(err, commandQueue) { + commandDispatcher.configure(function() { + this.use(commandQueue); + }); - // if the last loop is still in progress leave this loop - if (worker.isRunning) - return; + var worker = {}; + + // starts the worker by using an interval loop + worker.start = function() { + worker.process = setInterval(function() { + + // if the last loop is still in progress leave this loop + if (worker.isRunning) + return; + + worker.isRunning = true; - worker.isRunning = true; - - (function next(e) { - - // dipatch one command in queue and call the _next_ callback, which - // will call _process_ for the next command in queue. - var process = function(cmdPointer, next) { + (function next(e) { - // Publish it now... - commandDispatcher.dispatch(cmdPointer.command, function(err) { - if (cmdPointer.callback) cmdPointer.callback(null); - next(); - }); - }; - - // serial _process_ all events in queue - if (!e && commandBuffer.length) { - process(commandBuffer.shift(), next); - } - })(); - - worker.isRunning = false; - - }, 10); - }; - - // fire things off - worker.start(); - - commandDispatcher.initialize({}, callback); + // dipatch one command in queue and call the _next_ callback, which + // will call _process_ for the next command in queue. + var process = function(cmdPointer, next) { + + // Publish it now... + commandDispatcher.dispatch(cmdPointer.command, function(err) { + if (cmdPointer.callback) cmdPointer.callback(null); + next(); + }); + }; + + // serial _process_ all events in queue + if (!e && commandBuffer.length) { + process(commandBuffer.shift(), next); + } + })(); + + worker.isRunning = false; + + }, 10); + }; + + // fire things off + worker.start(); + + commandDispatcher.initialize({}, callback); + }); + }); }); diff --git a/package.json b/package.json index 9890845..0310095 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "author": "adrai", "name": "node-cqs", - "version": "0.3.7", + "version": "0.4.0", "private": false, "main": "index.js", "engines": { diff --git a/test/sagaHandlerTest.js b/test/sagaHandlerTest.js index 08608e8..3b26db6 100644 --- a/test/sagaHandlerTest.js +++ b/test/sagaHandlerTest.js @@ -8,7 +8,7 @@ var stream = new EventEmitter(); var commandEmitter = new EventEmitter(); var Saga = sagaBase.extend({ - somethingDoneEvent: function(evt) { + somethingDoneEvent: function(evt, callback) { this.set('a', 'b'); this.set({ c: 'd' }); evt.command = 'blaCmd'; @@ -16,6 +16,7 @@ var Saga = sagaBase.extend({ delete evt.commandId; this.sendCommand(evt); commandEmitter.emit('done'); + callback(null); } }); var saga = new Saga('id_1');