From e1768a5b7780b405112702d172c00fc74ce4c691 Mon Sep 17 00:00:00 2001 From: Timo Ebel Date: Tue, 26 Nov 2019 00:27:44 +0100 Subject: [PATCH] First version of a Saga including internal rollback handling. --- package.json | 5 +- src/CommandDispatcherLocal.js | 2 +- src/GenericErrors/SagaError.js | 38 +++++++++++ src/RootEntity.js | 2 +- src/Saga.js | 113 ++++++++++++++++++++++++++++++++- src/index.js | 2 + src/types.d.ts | 2 +- 7 files changed, 157 insertions(+), 7 deletions(-) create mode 100644 src/GenericErrors/SagaError.js diff --git a/package.json b/package.json index a1590ea..3ae8463 100644 --- a/package.json +++ b/package.json @@ -20,7 +20,7 @@ "homepage": "https://github.com/Rekhyt/ddd-js#readme", "devDependencies": { "chai": "^4.2.0", - "coveralls": "^3.0.2", + "coveralls": "^3.0.8", "eslint": "^5.9.0", "eslint-config-standard": "^12.0.0", "eslint-plugin-import": "^2.14.0", @@ -33,7 +33,8 @@ "proxyquire": "^2.1.0" }, "dependencies": { - "email-validator": "^2.0.4" + "email-validator": "^2.0.4", + "uuid": "^3.3.3" }, "nyc": { "include": [ diff --git a/src/CommandDispatcherLocal.js b/src/CommandDispatcherLocal.js index 2837452..bdd082d 100644 --- a/src/CommandDispatcherLocal.js +++ b/src/CommandDispatcherLocal.js @@ -37,7 +37,7 @@ class CommandDispatcherLocal { return } - await this._eventDispatcher.publishMany(this._subscriptions[command.name].execute(command)) + await this._eventDispatcher.publishMany(await this._subscriptions[command.name].execute(command)) } } diff --git a/src/GenericErrors/SagaError.js b/src/GenericErrors/SagaError.js new file mode 100644 index 0000000..6920182 --- /dev/null +++ b/src/GenericErrors/SagaError.js @@ -0,0 +1,38 @@ +class SagaError extends Error { + constructor () { + super() + + this._errors = [] + } + + /** + * @param {string} entityName + * @param {Error} error + */ + addError (entityName, error) { + this._errors.push({ entityName, error }) + } + + /** + * @returns {string} + */ + get message () { + return `Errors on entit${this._errors.length === 1 ? 'y' : 'ies'} ${this._errors.map(e => e.entityName).join(', ')}` + } + + /** + * @returns {{entityName: string, error: Error}[]} + */ + get errors () { + return this._errors + } + + /** + * @returns {boolean} + */ + hasErrors () { + return this._errors.length > 0 + } +} + +module.exports = SagaError diff --git a/src/RootEntity.js b/src/RootEntity.js index f76e3b2..2b56739 100644 --- a/src/RootEntity.js +++ b/src/RootEntity.js @@ -77,7 +77,7 @@ class RootEntity { * @param {Command} command * @returns {Event[]} */ - execute (command) { + async execute (command) { if (!this._commandHandlerFunctions[command.name]) { /* istanbul ignore next */ this.logger.error(new Error(`Cannot handle incoming command ${command.name || 'no name given'}.`)) diff --git a/src/Saga.js b/src/Saga.js index ca9e681..dc4fbf5 100644 --- a/src/Saga.js +++ b/src/Saga.js @@ -1,3 +1,6 @@ +const uuid = require('uuid/v4') +const SagaError = require('./GenericErrors/SagaError') + /** * @implements CommandHandler * @abstract @@ -10,11 +13,13 @@ class Saga { constructor (logger, commandDispatcher) { this.logger = logger this._commandDispatcher = commandDispatcher + this._commandHandlerFunctions = {} + this._runningSagas = {} } /** - * @returns {object} with event names as keys and handler functions as values + * @returns {object} with command names as keys and handler functions as values */ get commandHandlerFunctions () { return this._commandHandlerFunctions @@ -46,7 +51,7 @@ class Saga { * @param {Command} command * @returns {Event[]} */ - execute (command) { + async execute (command) { if (!this._commandHandlerFunctions[command.name]) { /* istanbul ignore next */ this.logger.error(new Error(`Cannot handle incoming command ${command.name || 'no name given'}.`)) @@ -64,6 +69,110 @@ class Saga { ) return this._commandHandlerFunctions[command.name](command) } + + /** + * @param {Command} command + * @returns {Promise} + */ + async _dispatch (command) { + return this._commandDispatcher.dispatch(command) + } + + /** + * @param {string} name + * @param {Object} payload + * @returns {{payload: Object, name: string, time: string}|Event} + */ + createEvent (name, payload = {}) { + return { + name, + time: new Date().toISOString(), + payload + } + } + + /** + * @returns {string} The unique identifier of the started saga + */ + start () { + const identifier = uuid() + this._runningSagas[identifier] = {} + this.logger.trace('Saga started', { class: this.constructor.name, identifier }) + + return identifier + } + + /** + * @param {string} identifier + * @param {Command} command + * @param {string} entity + * @param {Function} rollbackHandler + * @param {number} timeout + */ + addTask (identifier, command, entity, rollbackHandler, timeout = 1000) { + this._runningSagas[identifier][command.name] = { command, entity, rollbackHandler, timeout, status: 'added' } + this.logger.trace('Task added to saga', { class: this.constructor.name, identifier, command, entity, timeout }) + } + + /** + * @param {string} identifier + * @returns {Promise} + * @throws {SagaError} if any of the commands fail or time out + */ + async run (identifier) { + this.logger.trace('Running saga', { class: this.constructor.name, identifier }) + + const tasks = Object.values(this._runningSagas[identifier]) + const sagaError = new SagaError() + + this.logger.trace('Executing tasks.', { class: this.constructor.name }) + await Promise.all(Object.entries(tasks).map(async ([commandName, task]) => { + return new Promise(async resolve => { + const timeout = setTimeout( + () => { + task.status = 'timed out' + sagaError.addError(task.entity, new Error(`Command ${commandName} triggered by saga timed out.`)) + resolve() + }, + task.timeout + ) + + try { + this.logger.trace('Executing task.', { class: this.constructor.name, identifier, commandName, task }) + await this._dispatch(task.command) + this.logger.trace('Task executed.', { class: this.constructor.name, identifier, commandName, task }) + task.status = 'done' + } catch (err) { + this.logger.trace('Task execution failed.', { class: this.constructor.name, identifier, commandName, task }) + task.status = 'failed' + sagaError.addError(task.entity, err) + } + + clearTimeout(timeout) + resolve() + }) + })) + + this.logger.trace('Tasks executed.', { class: this.constructor.name, identifier }) + if (!sagaError.hasErrors()) return + + const rollbackCommands = [] + for (const task of tasks) { + this.logger.trace('Checking tasks for required rollback.', { class: this.constructor.name, identifier, task }) + if (task.status !== 'done' && task.status !== 'timed out') continue + + rollbackCommands.push(task.rollbackHandler()) + } + + try { + this.logger.trace('Executing rollback tasks.', { class: this.constructor.name, identifier, rollbackCommands }) + await Promise.all(rollbackCommands.map(c => this._dispatch(c))) + } catch (err) { + this.logger.fatal(err, 'At least one rollback command failed after at least one command of a saga failed!') + } + + throw sagaError + } } module.exports = Saga diff --git a/src/index.js b/src/index.js index e60fc9b..b9bc24f 100644 --- a/src/index.js +++ b/src/index.js @@ -7,6 +7,7 @@ const EventDispatcherEventEmitter = require('./EventDispatcherEventEmitter') const EventRepositoryJsonFile = require('./EventRepositoryJsonFile') const InvalidTypeError = require('./GenericErrors/InvalidTypeError') const InvalidArgumentError = require('./GenericErrors/InvalidArgumentError') +const SagaError = require('./GenericErrors/SagaError') const ValidationError = require('./GenericErrors/ValidationError') const DateTime = require('./ValueObject/DateTime') const EmailAddress = require('./ValueObject/EmailAddress') @@ -27,6 +28,7 @@ module.exports = { InvalidTypeError, InvalidArgumentError, + SagaError, ValidationError, DateTime, diff --git a/src/types.d.ts b/src/types.d.ts index c977b27..d3e01db 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -22,7 +22,7 @@ declare interface Command { } declare interface CommandHandler { - execute(command: Command): Event[] + execute(command: Command): Promise } declare interface CommandDispatcher {