diff --git a/packages/hemera/lib/circuitBreaker.js b/packages/hemera/lib/circuitBreaker.js new file mode 100644 index 000000000..bec1a23ef --- /dev/null +++ b/packages/hemera/lib/circuitBreaker.js @@ -0,0 +1,189 @@ +'use strict' + +/** + * Copyright 2016-present, Dustin Deus (deusdustin@gmail.com) + * All rights reserved. + * + * This source code is licensed under the MIT-style license found in the + * LICENSE file in the root directory of this source tree. + * + */ + +const EventEmitter = require('events') + +/** + * Based on https://docs.microsoft.com/en-us/azure/architecture/patterns/circuit-breaker + * + * @class CircuitBreaker + * @extends {EventEmitter} + */ +class CircuitBreaker extends EventEmitter { + constructor (options) { + super() + + // states + this.CIRCUIT_CLOSE = 'close' + this.CIRCUIT_HALF_OPEN = 'half_open' + this.CIRCUIT_OPEN = 'open' + + // intial state + this._state = this.CIRCUIT_CLOSE + // current fauilures + this._failureCount = 0 + // successes count + this._successesCount = 0 + // max failure threshold + this._maxFailures = options.maxFailures + // min successes to close the circuit breaker + this._minSuccesses = options.minSuccesses + // the timeout when the circuit breaker in in half open state + this._halfOpenTime = options.halfOpenTime + // interval when the circuit breaker will reset + this._resetIntervalTime = options.resetIntervalTime + // half open timer + this._failureTimer = null + } + + /** + * + * + * @readonly + * + * @memberof CircuitBreaker + */ + get state () { + return this._state + } + /** + * The failure counter used by the Closed state is time based. It's automatically reset at periodic intervals. + * This helps to prevent the circuit breaker from entering the Open state if it experiences occasional failures + * + * + * @memberof CircuitBreaker + */ + startResetInterval () { + this._resetInterval = setInterval(() => { + this._state = this.CIRCUIT_CLOSE + }, this._resetIntervalTime) + } + + /** + * + * + * + * @memberof CircuitBreaker + */ + clearHalfOpenTimer () { + if (this._halfOpenTime) { + clearTimeout(this._halfOpenTime) + } + } + + /** + * + * + * + * @memberof CircuitBreaker + */ + startHalfOpenTimer () { + this._halfOpenTimer = setTimeout(() => { + this._successesCount = 0 + this._state = this.CIRCUIT_HALF_OPEN + }, this._halfOpenTime) + // unref from event loop + this._halfOpenTimer.unref() + } + /** + * + * + * + * @memberof CircuitBreaker + */ + clearResetInterval () { + if (this._resetInterval) { + clearInterval(this._resetInterval) + } + } + + /** + * + * + * @returns + * + * @memberof CircuitBreaker + */ + available () { + return this._state !== this.CIRCUIT_OPEN + } + + /** + * + * + * + * @memberof CircuitBreaker + */ + success () { + this.check(true) + } + + /** + * + * + * + * @memberof CircuitBreaker + */ + failure () { + this.check(false) + } + /** + * + * + * @param {any} pattern + * @returns + * + * @memberof CircuitBreaker + */ + check (success) { + if (this._state === this.CIRCUIT_HALF_OPEN) { + // The counter used by the Half-Open state records the number of successful attempts to invoke the operation. + // The circuit breaker reverts to the Closed state after a specified number of consecutive operation invocations have been successful. + if (success === true) { + if (this._successesCount >= this._minSuccesses) { + this._state = this.CIRCUIT_CLOSE + this.emit('stateChange', { state: this.CIRCUIT_CLOSE }) + } + this._successesCount += 1 + this.emit('success', { count: this._successesCount }) + } else if (success === false) { + // If any invocation fails, the circuit breaker enters the Open state immediately and + // the success counter will be reset the next time it enters the Half-Open state. + this._state = this.CIRCUIT_OPEN + this._failureCount = 0 + this.clearHalfOpenTimer() + this.emit('stateChange', { state: this.CIRCUIT_OPEN }) + } + } else if (this._state === this.CIRCUIT_OPEN) { + this._failureCount = 0 + // At this point the proxy starts a timeout timer + // and when this timer expires the proxy is placed into the Half-Open state. + this.startHalfOpenTimer() + this.emit('stateChange', { state: this.CIRCUIT_HALF_OPEN }) + } else if (this._state === this.CIRCUIT_CLOSE) { + // when request fails we increment the failureCount + if (success === false) { + this._failureCount += 1 + this.emit('failure', { count: this._failureCount }) + } + + // when we reach maximum failure threshold we open the circuit breaker and start the reset timer + if (this._failureCount >= this._maxFailures) { + this._state = this.CIRCUIT_OPEN + this.clearResetInterval(this._resetInterval) + this.startResetInterval() + this.emit('stateChange', { state: this.CIRCUIT_OPEN }) + } + } + } +} + +module.exports = CircuitBreaker diff --git a/packages/hemera/lib/errors.js b/packages/hemera/lib/errors.js index e21d514eb..3b9195d4a 100644 --- a/packages/hemera/lib/errors.js +++ b/packages/hemera/lib/errors.js @@ -21,6 +21,7 @@ const PatternNotFound = HemeraError.subclass('PatternNotFound') const MaxRecursionError = HemeraError.subclass('MaxRecursionError') const PayloadValidationError = HemeraError.subclass('PayloadValidationError') const ProcessLoadError = HemeraError.subclass('ProcessLoadError') +const CircuitBreakerError = HemeraError.subclass('CircuitBreakerError') module.exports = { HemeraError, @@ -32,5 +33,6 @@ module.exports = { FatalError, PatternNotFound, PayloadValidationError, - ProcessLoadError + ProcessLoadError, + CircuitBreakerError } diff --git a/packages/hemera/lib/index.js b/packages/hemera/lib/index.js index a31b09462..d80fd27d1 100644 --- a/packages/hemera/lib/index.js +++ b/packages/hemera/lib/index.js @@ -27,6 +27,7 @@ const SuperError = require('super-error') const Co = require('co') const Errors = require('./errors') +const CircuitBreaker = require('./circuitBreaker') const Constants = require('./constants') const Extension = require('./extension') const Util = require('./util') @@ -70,6 +71,13 @@ var defaultConfig = { maxRssBytes: 0, // Reject requests when process RSS is over size in bytes (zero is no max) maxEventLoopDelay: 0 // Milliseconds of delay after which requests are rejected (zero is no max) } + }, + circuitBreaker: { + enabled: false, + minSuccesses: 1, + halfOpenTime: 5 * 1000, + resetIntervalTime: 15 * 1000, + maxFailures: 3 } } @@ -600,9 +608,16 @@ class Hemera extends EventEmitter { // check if an error was already wrapped if (self._response.error) { + // don't handle circuit breaker error as failure + if (self._config.circuitBreaker.enabled && !(self._response.error instanceof Errors.CircuitBreakerError)) { + self._circuitBreaker.failure() + } self.emit('serverResponseError', self._response.error) self.log.error(self._response.error) } else if (err) { // check for an extension error + if (self._config.circuitBreaker.enabled) { + self._circuitBreaker.failure() + } if (err instanceof SuperError) { self._response.error = err.rootCause || err.cause || err } else { @@ -612,6 +627,8 @@ class Hemera extends EventEmitter { const internalError = new Errors.HemeraError(Constants.EXTENSION_ERROR, self.errorDetails).causedBy(err) self.log.error(internalError) self.emit('serverResponseError', self._response.error) + } else if (self._config.circuitBreaker.enabled) { + self._circuitBreaker.success() } // reply value from extension @@ -792,7 +809,21 @@ class Hemera extends EventEmitter { function onServerPreRequestHandler (err, value) { let self = this + // icnoming pattern self._pattern = self._request.payload.pattern + // find matched route + self._actMeta = self._router.lookup(self._pattern) + + if (self._config.circuitBreaker.enabled && self._actMeta) { + // get circuit breaker of server method + self._circuitBreaker = self._actMeta.actMeta.circuitBreaker + if (!self._circuitBreaker.available()) { + // trigger halp open timer + self._circuitBreaker.failure() + self._response.error = new Errors.CircuitBreakerError(`Circuit breaker is ${self._circuitBreaker.state}`, { state: self._circuitBreaker.state }) + return self.finish() + } + } if (err) { if (err instanceof SuperError) { @@ -810,9 +841,6 @@ class Hemera extends EventEmitter { return self.finish() } - // find matched route - self._actMeta = self._router.lookup(self._pattern) - // check if a handler is registered with this pattern if (self._actMeta) { self._extensions.onServerPreHandler.dispatch(self, onServerPreHandler) @@ -839,6 +867,7 @@ class Hemera extends EventEmitter { ctx._pattern = {} ctx._actMeta = {} ctx._isServer = true + ctx._circuitBreaker = null ctx._extensions.onServerPreRequest.dispatch(ctx, onServerPreRequestHandler) } @@ -912,7 +941,8 @@ class Hemera extends EventEmitter { let actMeta = new Add({ schema: schema, pattern: origPattern, - plugin: this.plugin$ + plugin: this.plugin$, + circuitBreaker: new CircuitBreaker(this._config.circuitBreaker) }, { generators: this._config.generators }) // cb is null when we use chaining syntax diff --git a/test/hemera/circuit-breaker.spec.js b/test/hemera/circuit-breaker.spec.js new file mode 100644 index 000000000..d9228442e --- /dev/null +++ b/test/hemera/circuit-breaker.spec.js @@ -0,0 +1,410 @@ +'use strict' + +describe('Circuit breaker', function () { + var PORT = 6242 + var flags = ['--user', 'derek', '--pass', 'foobar'] + var authUrl = 'nats://derek:foobar@localhost:' + PORT + var server + + // Start up our own nats-server + before(function (done) { + server = HemeraTestsuite.start_server(PORT, flags, done) + }) + + // Shutdown our server after we are done + after(function () { + server.kill() + }) + + it('Should be able to call without error', function (done) { + const nats = require('nats').connect(authUrl) + + const hemera = new Hemera(nats, { + logLevel: 'silent', + circuitBreaker: { + enabled: true, + maxFailures: 2 + } + }) + + hemera.ready(() => { + hemera.add({ + cmd: 'add', + topic: 'math' + }, (resp, cb) => { + cb() + }) + + hemera.act({ + topic: 'math', + cmd: 'add', + a: 1, + b: 2 + }, (err) => { + expect(err).to.be.not.exists() + hemera.act({ + topic: 'math', + cmd: 'add', + a: 1, + b: 2 + }, (err) => { + expect(err).to.be.not.exists() + hemera.close() + done() + }) + }) + }) + }) + + it('Should return error because circuit breaker is OPEN', function (done) { + const nats = require('nats').connect(authUrl) + + const hemera = new Hemera(nats, { + logLevel: 'silent', + circuitBreaker: { + enabled: true, + maxFailures: 2 + } + }) + + let stateOpenEvent = Sinon.spy() + + hemera.ready(() => { + hemera.add({ + cmd: 'add', + topic: 'math' + }, (resp, cb) => { + cb(new Error('test')) + }) + + const cb = hemera.router.lookup({ + cmd: 'add', + topic: 'math' + }).actMeta.circuitBreaker + + cb.on('stateChange', (event) => { + if (event.state === 'open') { + stateOpenEvent() + } + }) + + hemera.act({ + topic: 'math', + cmd: 'add', + a: 1, + b: 2 + }, (err) => { + expect(err).to.be.exists() + expect(err.name).to.be.equals('Error') + expect(err.message).to.be.equals('test') + hemera.act({ + topic: 'math', + cmd: 'add', + a: 1, + b: 2 + }, (err) => { + expect(err).to.be.exists() + expect(err.name).to.be.equals('Error') + expect(err.message).to.be.equals('test') + hemera.act({ + topic: 'math', + cmd: 'add', + a: 1, + b: 2 + }, (err) => { + expect(stateOpenEvent.calledOnce).to.be.equals(true) + expect(err.name).to.be.equals('CircuitBreakerError') + expect(err.message).to.be.equals('Circuit breaker is open') + hemera.close() + done() + }) + }) + }) + }) + }) + + it('Should auto close the circuit breaker after certain amount of time', function (done) { + const nats = require('nats').connect(authUrl) + + const hemera = new Hemera(nats, { + circuitBreaker: { + enabled: true, + maxFailures: 2, + halfOpenTime: 100, + resetIntervalTime: 100 + } + }) + + let stateOpenEvent = Sinon.spy() + + hemera.ready(() => { + hemera.add({ + cmd: 'add', + topic: 'math' + }, (resp, cb) => { + if (resp.error) { + cb(new Error('test')) + } else { + cb() + } + }) + + const cb = hemera.router.lookup({ + cmd: 'add', + topic: 'math' + }).actMeta.circuitBreaker + + cb.on('stateChange', (event) => { + if (event.state === 'open') { + stateOpenEvent() + } + }) + + hemera.act({ + topic: 'math', + cmd: 'add', + a: 1, + b: 2, + error: true + }, (err) => { + expect(err).to.be.exists() + expect(err.name).to.be.equals('Error') + expect(err.message).to.be.equals('test') + hemera.act({ + topic: 'math', + cmd: 'add', + a: 1, + b: 2, + error: true + }, (err) => { + expect(err).to.be.exists() + expect(err.name).to.be.equals('Error') + expect(err.message).to.be.equals('test') + expect(stateOpenEvent.calledOnce).to.be.equals(true) + setTimeout(() => { + hemera.act({ + topic: 'math', + cmd: 'add', + a: 1, + b: 2, + error: false + }, (err) => { + expect(err).to.be.not.exists() + hemera.close() + done() + }) + }, 300) + }) + }) + }) + }) + + it('Should be able to close the circuit breaker when the next call is successfully', function (done) { + const nats = require('nats').connect(authUrl) + + const hemera = new Hemera(nats, { + circuitBreaker: { + enabled: true, + maxFailures: 2, + halfOpenTime: 100 + } + }) + + let stateOpenEvent = Sinon.spy() + let stateHalfOpenEvent = Sinon.spy() + let stateCloseEvent = Sinon.spy() + + hemera.ready(() => { + hemera.add({ + cmd: 'add', + topic: 'math' + }, (resp, cb) => { + if (resp.error) { + cb(new Error('test')) + } else { + cb() + } + }) + + const cb = hemera.router.lookup({ + cmd: 'add', + topic: 'math' + }).actMeta.circuitBreaker + + cb.on('stateChange', (event) => { + if (event.state === 'open') { + stateOpenEvent() + } else if (event.state === 'half_open') { + stateHalfOpenEvent() + } else if (event.state === 'close') { + stateCloseEvent() + } + }) + + hemera.act({ + topic: 'math', + cmd: 'add', + a: 1, + b: 2, + error: true + }, (err) => { + expect(err).to.be.exists() + expect(err.name).to.be.equals('Error') + expect(err.message).to.be.equals('test') + hemera.act({ + topic: 'math', + cmd: 'add', + a: 1, + b: 2, + error: true + }, (err) => { + expect(err).to.be.exists() + expect(err.name).to.be.equals('Error') + expect(err.message).to.be.equals('test') + hemera.act({ + topic: 'math', + cmd: 'add', + a: 1, + b: 2, + error: true + }, (err) => { + expect(err).to.be.exists() + expect(err.name).to.be.equals('CircuitBreakerError') + expect(err.message).to.be.equals('Circuit breaker is open') + expect(stateOpenEvent.calledOnce).to.be.equals(true) + // wait until half open timer is finished + setTimeout(() => { + hemera.act({ + topic: 'math', + cmd: 'add', + a: 1, + b: 2, + error: false + }, (err) => { + expect(err).to.be.not.exists() + expect(stateHalfOpenEvent.calledOnce).to.be.equals(true) + hemera.act({ + topic: 'math', + cmd: 'add', + a: 1, + b: 2, + error: false + }, (err) => { + expect(err).to.be.not.exists() + expect(stateCloseEvent.calledOnce).to.be.equals(true) + hemera.close() + done() + }) + }) + }, 150) + }) + }) + }) + }) + }) + + it('Should be able to open the circuit breaker after an error in halp open state', function (done) { + const nats = require('nats').connect(authUrl) + + const hemera = new Hemera(nats, { + circuitBreaker: { + enabled: true, + maxFailures: 2, + halfOpenTime: 100 + } + }) + + let stateOpenEvent = Sinon.spy() + let stateHalfOpenEvent = Sinon.spy() + let stateCloseEvent = Sinon.spy() + + hemera.ready(() => { + hemera.add({ + cmd: 'add', + topic: 'math' + }, (resp, cb) => { + if (resp.error) { + cb(new Error('test')) + } else { + cb() + } + }) + + const cb = hemera.router.lookup({ + cmd: 'add', + topic: 'math' + }).actMeta.circuitBreaker + + cb.on('stateChange', (event) => { + if (event.state === 'open') { + stateOpenEvent() + } else if (event.state === 'half_open') { + stateHalfOpenEvent() + } else if (event.state === 'close') { + stateCloseEvent() + } + }) + + hemera.act({ + topic: 'math', + cmd: 'add', + a: 1, + b: 2, + error: true + }, (err) => { + expect(err).to.be.exists() + expect(err.name).to.be.equals('Error') + expect(err.message).to.be.equals('test') + hemera.act({ + topic: 'math', + cmd: 'add', + a: 1, + b: 2, + error: true + }, (err) => { + expect(err).to.be.exists() + expect(err.name).to.be.equals('Error') + expect(err.message).to.be.equals('test') + hemera.act({ + topic: 'math', + cmd: 'add', + a: 1, + b: 2, + error: true + }, (err) => { + expect(err).to.be.exists() + expect(err.name).to.be.equals('CircuitBreakerError') + expect(err.message).to.be.equals('Circuit breaker is open') + expect(stateOpenEvent.calledOnce).to.be.equals(true) + // wait until half open timer is finished + setTimeout(() => { + hemera.act({ + topic: 'math', + cmd: 'add', + a: 1, + b: 2, + error: false + }, (err) => { + expect(err).to.be.not.exists() + expect(stateHalfOpenEvent.calledOnce).to.be.equals(true) + hemera.act({ + topic: 'math', + cmd: 'add', + a: 1, + b: 2, + error: true + }, (err) => { + // return business error instead circuit breaker + expect(err).to.be.exists() + expect(stateOpenEvent.calledTwice).to.be.equals(true) + hemera.close() + done() + }) + }) + }, 150) + }) + }) + }) + }) + }) +}) diff --git a/test/hemera/default-config.spec.js b/test/hemera/default-config.spec.js index b8f742c89..22460b69a 100644 --- a/test/hemera/default-config.spec.js +++ b/test/hemera/default-config.spec.js @@ -31,10 +31,10 @@ describe('Hemera default config', function () { errio: { recursive: true, // Recursively serialize and deserialize nested errors inherited: true, // Include inherited properties - stack: true, // Include stack property - private: false, // Include properties with leading or trailing underscores - exclude: [], // Property names to exclude (low priority) - include: [] // Property names to include (high priority) + stack: true, // Include stack property + private: false, // Include properties with leading or trailing underscores + exclude: [], // Property names to exclude (low priority) + include: [] // Property names to include (high priority) }, bloomrun: { indexing: 'inserting', // Pattern indexing method "inserting" or "depth" @@ -43,13 +43,20 @@ describe('Hemera default config', function () { load: { checkPolicy: true, process: { - sampleInterval: 0 // Frequency of load sampling in milliseconds (zero is no sampling) + sampleInterval: 0 // Frequency of load sampling in milliseconds (zero is no sampling) }, policy: { - maxHeapUsedBytes: 0, // Reject requests when V8 heap is over size in bytes (zero is no max) - maxRssBytes: 0, // Reject requests when process RSS is over size in bytes (zero is no max) - maxEventLoopDelay: 0 // Milliseconds of delay after which requests are rejected (zero is no max) + maxHeapUsedBytes: 0, // Reject requests when V8 heap is over size in bytes (zero is no max) + maxRssBytes: 0, // Reject requests when process RSS is over size in bytes (zero is no max) + maxEventLoopDelay: 0 // Milliseconds of delay after which requests are rejected (zero is no max) } + }, + circuitBreaker: { + enabled: false, + minSuccesses: 1, + halfOpenTime: 5 * 1000, + resetIntervalTime: 15 * 1000, + maxFailures: 3 } }