Skip to content

Commit

Permalink
add circuit breaker implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
StarpTech committed May 17, 2017
1 parent 3b92b83 commit 4a59134
Show file tree
Hide file tree
Showing 5 changed files with 651 additions and 13 deletions.
189 changes: 189 additions & 0 deletions packages/hemera/lib/circuitBreaker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
'use strict'

/**
* Copyright 2016-present, Dustin Deus (deusdustin@gmail.com)
* All rights reserved.
*
* This source code is licensed under the MIT-style license found in the
* LICENSE file in the root directory of this source tree.
*
*/

const EventEmitter = require('events')

/**
* Based on https://docs.microsoft.com/en-us/azure/architecture/patterns/circuit-breaker
*
* @class CircuitBreaker
* @extends {EventEmitter}
*/
class CircuitBreaker extends EventEmitter {
constructor (options) {
super()

// states
this.CIRCUIT_CLOSE = 'close'
this.CIRCUIT_HALF_OPEN = 'half_open'
this.CIRCUIT_OPEN = 'open'

// intial state
this._state = this.CIRCUIT_CLOSE
// current fauilures
this._failureCount = 0
// successes count
this._successesCount = 0
// max failure threshold
this._maxFailures = options.maxFailures
// min successes to close the circuit breaker
this._minSuccesses = options.minSuccesses
// the timeout when the circuit breaker in in half open state
this._halfOpenTime = options.halfOpenTime
// interval when the circuit breaker will reset
this._resetIntervalTime = options.resetIntervalTime
// half open timer
this._failureTimer = null
}

/**
*
*
* @readonly
*
* @memberof CircuitBreaker
*/
get state () {
return this._state
}
/**
* The failure counter used by the Closed state is time based. It's automatically reset at periodic intervals.
* This helps to prevent the circuit breaker from entering the Open state if it experiences occasional failures
*
*
* @memberof CircuitBreaker
*/
startResetInterval () {
this._resetInterval = setInterval(() => {
this._state = this.CIRCUIT_CLOSE
}, this._resetIntervalTime)
}

/**
*
*
*
* @memberof CircuitBreaker
*/
clearHalfOpenTimer () {
if (this._halfOpenTime) {
clearTimeout(this._halfOpenTime)
}
}

/**
*
*
*
* @memberof CircuitBreaker
*/
startHalfOpenTimer () {
this._halfOpenTimer = setTimeout(() => {
this._successesCount = 0
this._state = this.CIRCUIT_HALF_OPEN
}, this._halfOpenTime)
// unref from event loop
this._halfOpenTimer.unref()
}
/**
*
*
*
* @memberof CircuitBreaker
*/
clearResetInterval () {
if (this._resetInterval) {
clearInterval(this._resetInterval)
}
}

/**
*
*
* @returns
*
* @memberof CircuitBreaker
*/
available () {
return this._state !== this.CIRCUIT_OPEN
}

/**
*
*
*
* @memberof CircuitBreaker
*/
success () {
this.check(true)
}

/**
*
*
*
* @memberof CircuitBreaker
*/
failure () {
this.check(false)
}
/**
*
*
* @param {any} pattern
* @returns
*
* @memberof CircuitBreaker
*/
check (success) {
if (this._state === this.CIRCUIT_HALF_OPEN) {
// The counter used by the Half-Open state records the number of successful attempts to invoke the operation.
// The circuit breaker reverts to the Closed state after a specified number of consecutive operation invocations have been successful.
if (success === true) {
if (this._successesCount >= this._minSuccesses) {
this._state = this.CIRCUIT_CLOSE
this.emit('stateChange', { state: this.CIRCUIT_CLOSE })
}
this._successesCount += 1
this.emit('success', { count: this._successesCount })
} else if (success === false) {
// If any invocation fails, the circuit breaker enters the Open state immediately and
// the success counter will be reset the next time it enters the Half-Open state.
this._state = this.CIRCUIT_OPEN
this._failureCount = 0
this.clearHalfOpenTimer()
this.emit('stateChange', { state: this.CIRCUIT_OPEN })
}
} else if (this._state === this.CIRCUIT_OPEN) {
this._failureCount = 0
// At this point the proxy starts a timeout timer
// and when this timer expires the proxy is placed into the Half-Open state.
this.startHalfOpenTimer()
this.emit('stateChange', { state: this.CIRCUIT_HALF_OPEN })
} else if (this._state === this.CIRCUIT_CLOSE) {
// when request fails we increment the failureCount
if (success === false) {
this._failureCount += 1
this.emit('failure', { count: this._failureCount })
}

// when we reach maximum failure threshold we open the circuit breaker and start the reset timer
if (this._failureCount >= this._maxFailures) {
this._state = this.CIRCUIT_OPEN
this.clearResetInterval(this._resetInterval)
this.startResetInterval()
this.emit('stateChange', { state: this.CIRCUIT_OPEN })
}
}
}
}

module.exports = CircuitBreaker
4 changes: 3 additions & 1 deletion packages/hemera/lib/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const PatternNotFound = HemeraError.subclass('PatternNotFound')
const MaxRecursionError = HemeraError.subclass('MaxRecursionError')
const PayloadValidationError = HemeraError.subclass('PayloadValidationError')
const ProcessLoadError = HemeraError.subclass('ProcessLoadError')
const CircuitBreakerError = HemeraError.subclass('CircuitBreakerError')

module.exports = {
HemeraError,
Expand All @@ -32,5 +33,6 @@ module.exports = {
FatalError,
PatternNotFound,
PayloadValidationError,
ProcessLoadError
ProcessLoadError,
CircuitBreakerError
}
38 changes: 34 additions & 4 deletions packages/hemera/lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const SuperError = require('super-error')
const Co = require('co')

const Errors = require('./errors')
const CircuitBreaker = require('./circuitBreaker')
const Constants = require('./constants')
const Extension = require('./extension')
const Util = require('./util')
Expand Down Expand Up @@ -70,6 +71,13 @@ var defaultConfig = {
maxRssBytes: 0, // Reject requests when process RSS is over size in bytes (zero is no max)
maxEventLoopDelay: 0 // Milliseconds of delay after which requests are rejected (zero is no max)
}
},
circuitBreaker: {
enabled: false,
minSuccesses: 1,
halfOpenTime: 5 * 1000,
resetIntervalTime: 15 * 1000,
maxFailures: 3
}
}

Expand Down Expand Up @@ -600,9 +608,16 @@ class Hemera extends EventEmitter {

// check if an error was already wrapped
if (self._response.error) {
// don't handle circuit breaker error as failure
if (self._config.circuitBreaker.enabled && !(self._response.error instanceof Errors.CircuitBreakerError)) {
self._circuitBreaker.failure()
}
self.emit('serverResponseError', self._response.error)
self.log.error(self._response.error)
} else if (err) { // check for an extension error
if (self._config.circuitBreaker.enabled) {
self._circuitBreaker.failure()
}
if (err instanceof SuperError) {
self._response.error = err.rootCause || err.cause || err
} else {
Expand All @@ -612,6 +627,8 @@ class Hemera extends EventEmitter {
const internalError = new Errors.HemeraError(Constants.EXTENSION_ERROR, self.errorDetails).causedBy(err)
self.log.error(internalError)
self.emit('serverResponseError', self._response.error)
} else if (self._config.circuitBreaker.enabled) {
self._circuitBreaker.success()
}

// reply value from extension
Expand Down Expand Up @@ -792,7 +809,21 @@ class Hemera extends EventEmitter {
function onServerPreRequestHandler (err, value) {
let self = this

// icnoming pattern
self._pattern = self._request.payload.pattern
// find matched route
self._actMeta = self._router.lookup(self._pattern)

if (self._config.circuitBreaker.enabled && self._actMeta) {
// get circuit breaker of server method
self._circuitBreaker = self._actMeta.actMeta.circuitBreaker
if (!self._circuitBreaker.available()) {
// trigger halp open timer
self._circuitBreaker.failure()
self._response.error = new Errors.CircuitBreakerError(`Circuit breaker is ${self._circuitBreaker.state}`, { state: self._circuitBreaker.state })
return self.finish()
}
}

if (err) {
if (err instanceof SuperError) {
Expand All @@ -810,9 +841,6 @@ class Hemera extends EventEmitter {
return self.finish()
}

// find matched route
self._actMeta = self._router.lookup(self._pattern)

// check if a handler is registered with this pattern
if (self._actMeta) {
self._extensions.onServerPreHandler.dispatch(self, onServerPreHandler)
Expand All @@ -839,6 +867,7 @@ class Hemera extends EventEmitter {
ctx._pattern = {}
ctx._actMeta = {}
ctx._isServer = true
ctx._circuitBreaker = null

ctx._extensions.onServerPreRequest.dispatch(ctx, onServerPreRequestHandler)
}
Expand Down Expand Up @@ -912,7 +941,8 @@ class Hemera extends EventEmitter {
let actMeta = new Add({
schema: schema,
pattern: origPattern,
plugin: this.plugin$
plugin: this.plugin$,
circuitBreaker: new CircuitBreaker(this._config.circuitBreaker)
}, { generators: this._config.generators })

// cb is null when we use chaining syntax
Expand Down
Loading

0 comments on commit 4a59134

Please sign in to comment.