Skip to content

Commit

Permalink
First version of a Saga including internal rollback handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rekhyt committed Nov 25, 2019
1 parent 4c98bb0 commit e1768a5
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 7 deletions.
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"homepage": "https://github.com/Rekhyt/ddd-js#readme",
"devDependencies": {
"chai": "^4.2.0",
"coveralls": "^3.0.2",
"coveralls": "^3.0.8",
"eslint": "^5.9.0",
"eslint-config-standard": "^12.0.0",
"eslint-plugin-import": "^2.14.0",
Expand All @@ -33,7 +33,8 @@
"proxyquire": "^2.1.0"
},
"dependencies": {
"email-validator": "^2.0.4"
"email-validator": "^2.0.4",
"uuid": "^3.3.3"
},
"nyc": {
"include": [
Expand Down
2 changes: 1 addition & 1 deletion src/CommandDispatcherLocal.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class CommandDispatcherLocal {
return
}

await this._eventDispatcher.publishMany(this._subscriptions[command.name].execute(command))
await this._eventDispatcher.publishMany(await this._subscriptions[command.name].execute(command))
}
}

Expand Down
38 changes: 38 additions & 0 deletions src/GenericErrors/SagaError.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
class SagaError extends Error {
constructor () {
super()

this._errors = []
}

/**
* @param {string} entityName
* @param {Error} error
*/
addError (entityName, error) {
this._errors.push({ entityName, error })
}

/**
* @returns {string}
*/
get message () {
return `Errors on entit${this._errors.length === 1 ? 'y' : 'ies'} ${this._errors.map(e => e.entityName).join(', ')}`
}

/**
* @returns {{entityName: string, error: Error}[]}
*/
get errors () {
return this._errors
}

/**
* @returns {boolean}
*/
hasErrors () {
return this._errors.length > 0
}
}

module.exports = SagaError
2 changes: 1 addition & 1 deletion src/RootEntity.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class RootEntity {
* @param {Command} command
* @returns {Event[]}
*/
execute (command) {
async execute (command) {
if (!this._commandHandlerFunctions[command.name]) {
/* istanbul ignore next */
this.logger.error(new Error(`Cannot handle incoming command ${command.name || 'no name given'}.`))
Expand Down
113 changes: 111 additions & 2 deletions src/Saga.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
const uuid = require('uuid/v4')
const SagaError = require('./GenericErrors/SagaError')

/**
* @implements CommandHandler
* @abstract
Expand All @@ -10,11 +13,13 @@ class Saga {
constructor (logger, commandDispatcher) {
this.logger = logger
this._commandDispatcher = commandDispatcher

this._commandHandlerFunctions = {}
this._runningSagas = {}
}

/**
* @returns {object} with event names as keys and handler functions as values
* @returns {object} with command names as keys and handler functions as values
*/
get commandHandlerFunctions () {
return this._commandHandlerFunctions
Expand Down Expand Up @@ -46,7 +51,7 @@ class Saga {
* @param {Command} command
* @returns {Event[]}
*/
execute (command) {
async execute (command) {
if (!this._commandHandlerFunctions[command.name]) {
/* istanbul ignore next */
this.logger.error(new Error(`Cannot handle incoming command ${command.name || 'no name given'}.`))
Expand All @@ -64,6 +69,110 @@ class Saga {
)
return this._commandHandlerFunctions[command.name](command)
}

/**
* @param {Command} command
* @returns {Promise<void>}
*/
async _dispatch (command) {
return this._commandDispatcher.dispatch(command)
}

/**
* @param {string} name
* @param {Object} payload
* @returns {{payload: Object, name: string, time: string}|Event}
*/
createEvent (name, payload = {}) {
return {
name,
time: new Date().toISOString(),
payload
}
}

/**
* @returns {string} The unique identifier of the started saga
*/
start () {
const identifier = uuid()
this._runningSagas[identifier] = {}
this.logger.trace('Saga started', { class: this.constructor.name, identifier })

return identifier
}

/**
* @param {string} identifier
* @param {Command} command
* @param {string} entity
* @param {Function} rollbackHandler
* @param {number} timeout
*/
addTask (identifier, command, entity, rollbackHandler, timeout = 1000) {
this._runningSagas[identifier][command.name] = { command, entity, rollbackHandler, timeout, status: 'added' }
this.logger.trace('Task added to saga', { class: this.constructor.name, identifier, command, entity, timeout })
}

/**
* @param {string} identifier
* @returns {Promise<void>}
* @throws {SagaError} if any of the commands fail or time out
*/
async run (identifier) {
this.logger.trace('Running saga', { class: this.constructor.name, identifier })

const tasks = Object.values(this._runningSagas[identifier])
const sagaError = new SagaError()

this.logger.trace('Executing tasks.', { class: this.constructor.name })
await Promise.all(Object.entries(tasks).map(async ([commandName, task]) => {
return new Promise(async resolve => {
const timeout = setTimeout(
() => {
task.status = 'timed out'
sagaError.addError(task.entity, new Error(`Command ${commandName} triggered by saga timed out.`))
resolve()
},
task.timeout
)

try {
this.logger.trace('Executing task.', { class: this.constructor.name, identifier, commandName, task })
await this._dispatch(task.command)
this.logger.trace('Task executed.', { class: this.constructor.name, identifier, commandName, task })
task.status = 'done'
} catch (err) {
this.logger.trace('Task execution failed.', { class: this.constructor.name, identifier, commandName, task })
task.status = 'failed'
sagaError.addError(task.entity, err)
}

clearTimeout(timeout)
resolve()
})
}))

this.logger.trace('Tasks executed.', { class: this.constructor.name, identifier })
if (!sagaError.hasErrors()) return

const rollbackCommands = []
for (const task of tasks) {
this.logger.trace('Checking tasks for required rollback.', { class: this.constructor.name, identifier, task })
if (task.status !== 'done' && task.status !== 'timed out') continue

rollbackCommands.push(task.rollbackHandler())
}

try {
this.logger.trace('Executing rollback tasks.', { class: this.constructor.name, identifier, rollbackCommands })
await Promise.all(rollbackCommands.map(c => this._dispatch(c)))
} catch (err) {
this.logger.fatal(err, 'At least one rollback command failed after at least one command of a saga failed!')
}

throw sagaError
}
}

module.exports = Saga
2 changes: 2 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const EventDispatcherEventEmitter = require('./EventDispatcherEventEmitter')
const EventRepositoryJsonFile = require('./EventRepositoryJsonFile')
const InvalidTypeError = require('./GenericErrors/InvalidTypeError')
const InvalidArgumentError = require('./GenericErrors/InvalidArgumentError')
const SagaError = require('./GenericErrors/SagaError')
const ValidationError = require('./GenericErrors/ValidationError')
const DateTime = require('./ValueObject/DateTime')
const EmailAddress = require('./ValueObject/EmailAddress')
Expand All @@ -27,6 +28,7 @@ module.exports = {

InvalidTypeError,
InvalidArgumentError,
SagaError,
ValidationError,

DateTime,
Expand Down
2 changes: 1 addition & 1 deletion src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ declare interface Command {
}

declare interface CommandHandler {
execute(command: Command): Event[]
execute(command: Command): Promise<Event[]>
}

declare interface CommandDispatcher {
Expand Down

0 comments on commit e1768a5

Please sign in to comment.