Skip to content
This repository has been archived by the owner on Sep 1, 2020. It is now read-only.

Commit

Permalink
asynchronous api for saga
Browse files Browse the repository at this point in the history
  • Loading branch information
adrai committed Jun 3, 2013
1 parent ea41d37 commit da72bee
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 74 deletions.
4 changes: 4 additions & 0 deletions README.md
Expand Up @@ -90,6 +90,10 @@ See [tests](https://github.com/adrai/node-cqs/tree/master/test) for detailed inf

# Release Notes

## v0.4.0

- asynchronous api for saga

## v0.3.7

- optimized performance a little
Expand Down
34 changes: 21 additions & 13 deletions lib/domain/bases/sagaBase.js
@@ -1,5 +1,6 @@
var utils = require('../../utils')
, eventEmitter = require('../eventEmitter')
, async = require('async')
, _ = require('lodash');

var Saga = function(id) {
Expand All @@ -25,15 +26,22 @@ Saga.prototype = {
return this.attributes[attr];
},

load: function(data) {
load: function(data, callback) {
if (data) {
this.set(data);
}

var self = this;

if (this.initialize) {
this.initialize();
this.initialize(function(err) {
self.isInited = true;
callback(err);
});
} else {
this.isInited = true;
callback(null);
}
this.isInited = true;
},

toJSON: function() {
Expand All @@ -50,7 +58,7 @@ Saga.prototype = {
}
},

transition: function(events) {
transition: function(events, callback) {
var self = this;

if (!_.isArray(events)) {
Expand All @@ -67,16 +75,16 @@ Saga.prototype = {
}
});

_.each(historyEvents, function(evt) {
self[evt.event](evt.payload);
async.forEach(historyEvents, function(evt, callback) {
self[evt.event](evt.payload, callback);
}, function(err) {
async.forEach(newEvents, function(evt, callback) {
self[evt.event](evt.payload, function(err) {
self.uncommittedEvents.push(evt);
callback(err);
});
}, callback);
});

_.each(newEvents, function(evt) {
self[evt.event](evt.payload);
self.uncommittedEvents.push(evt);
});

return;
}

};
Expand Down
25 changes: 13 additions & 12 deletions lib/domain/bases/sagaHandlerBase.js
Expand Up @@ -7,22 +7,21 @@ SagaHandler.prototype = {

sagas: {},

initialize: function() {
initialize: function(callback) {

var self = this;

function initSaga(id, data) {
function initSaga(id, data, callback) {
var saga = new self.Saga(id);
saga.load(data);
self.sagas[id] = saga;
saga.load(data, callback);
}

this.repository.find({ type: this.saga }, function(err, sagas) {
if (!err) {
for(var i = 0, len = sagas.length; i < len; i++) {
var saga = sagas[i];
initSaga(saga.id, saga);
}
async.forEach(sagas, function(saga, callback) {
initSaga(saga.id, saga, callback);
}, callback);
}
});

Expand All @@ -41,8 +40,9 @@ SagaHandler.prototype = {

// transition the event
function(saga, callback) {
saga.transition(evt);
callback(null, saga);
saga.transition(evt, function(err) {
callback(err, saga);
});
},

// commit the uncommittedEvents
Expand Down Expand Up @@ -88,9 +88,10 @@ SagaHandler.prototype = {
if (!saga) {
saga = new this.Saga(id);
this.repository.get(id, function(err, sagaData) {
saga.load(sagaData);
self.sagas[id] = saga;
callback(null, saga);
saga.load(sagaData, function(err) {
callback(err, saga);
});
});
} else {
callback(null, saga);
Expand All @@ -104,7 +105,7 @@ SagaHandler.prototype = {

use: function(module) {
if (!module) return;

if (module.commit) {
this.repository = module;
}
Expand Down
98 changes: 51 additions & 47 deletions lib/domain/domain.js
Expand Up @@ -60,11 +60,14 @@ module.exports = domain = _.extend(new EventEmitter2({

var defaults = {
commandQueue: { type: 'inMemory', collectionName: 'commands' },
repository: { type: 'inMemory' }
repository: { type: 'inMemory', collectionName: 'sagas' }
};

_.defaults(options, defaults);

options.commandQueue.collectionName = options.commandQueue.collectionName || defaults.commandQueue.collectionName;
options.repository.collectionName = options.repository.collectionName || defaults.repository.collectionName;

// initialize the hub by passing the function that gets the command id from the event
hub.init(newGetCommandId || getCommandId);

Expand Down Expand Up @@ -133,57 +136,58 @@ module.exports = domain = _.extend(new EventEmitter2({
var sagas = results.sagas
, sagaHandlers = results.sagaHandlers;

for(var j = 0, lenj = sagaHandlers.length; j < lenj; j++) {
var sagaHandler = sagaHandlers[j];
async.forEach(sagaHandlers, function(sagaHandler, callback) {
sagaHandler.Saga = sagas[sagaHandler.saga];
sagaHandler.initialize();
}
sagaHandler.initialize(callback);
}, function(err) {

queue.connect(options.commandQueue, function(err, commandQueue) {
commandDispatcher.configure(function() {
this.use(commandQueue);
});

var worker = {};

// starts the worker by using an interval loop
worker.start = function() {
worker.process = setInterval(function() {
queue.connect(options.commandQueue, function(err, commandQueue) {
commandDispatcher.configure(function() {
this.use(commandQueue);
});

// if the last loop is still in progress leave this loop
if (worker.isRunning)
return;
var worker = {};

// starts the worker by using an interval loop
worker.start = function() {
worker.process = setInterval(function() {

// if the last loop is still in progress leave this loop
if (worker.isRunning)
return;

worker.isRunning = true;

worker.isRunning = true;

(function next(e) {

// dipatch one command in queue and call the _next_ callback, which
// will call _process_ for the next command in queue.
var process = function(cmdPointer, next) {
(function next(e) {

// Publish it now...
commandDispatcher.dispatch(cmdPointer.command, function(err) {
if (cmdPointer.callback) cmdPointer.callback(null);
next();
});
};

// serial _process_ all events in queue
if (!e && commandBuffer.length) {
process(commandBuffer.shift(), next);
}
})();

worker.isRunning = false;

}, 10);
};

// fire things off
worker.start();

commandDispatcher.initialize({}, callback);
// dipatch one command in queue and call the _next_ callback, which
// will call _process_ for the next command in queue.
var process = function(cmdPointer, next) {

// Publish it now...
commandDispatcher.dispatch(cmdPointer.command, function(err) {
if (cmdPointer.callback) cmdPointer.callback(null);
next();
});
};

// serial _process_ all events in queue
if (!e && commandBuffer.length) {
process(commandBuffer.shift(), next);
}
})();

worker.isRunning = false;

}, 10);
};

// fire things off
worker.start();

commandDispatcher.initialize({}, callback);
});

});

});
Expand Down
2 changes: 1 addition & 1 deletion package.json
@@ -1,7 +1,7 @@
{
"author": "adrai",
"name": "node-cqs",
"version": "0.3.7",
"version": "0.4.0",
"private": false,
"main": "index.js",
"engines": {
Expand Down
3 changes: 2 additions & 1 deletion test/sagaHandlerTest.js
Expand Up @@ -8,14 +8,15 @@ var stream = new EventEmitter();
var commandEmitter = new EventEmitter();

var Saga = sagaBase.extend({
somethingDoneEvent: function(evt) {
somethingDoneEvent: function(evt, callback) {
this.set('a', 'b');
this.set({ c: 'd' });
evt.command = 'blaCmd';
delete evt.event;
delete evt.commandId;
this.sendCommand(evt);
commandEmitter.emit('done');
callback(null);
}
});
var saga = new Saga('id_1');
Expand Down

0 comments on commit da72bee

Please sign in to comment.