Skip to content

Commit

Permalink
Add Sliding breaker (Count and Time)
Browse files Browse the repository at this point in the history
  • Loading branch information
tichon29 committed Oct 21, 2020
1 parent 7bdd075 commit 59e67c0
Show file tree
Hide file tree
Showing 7 changed files with 580 additions and 19 deletions.
9 changes: 7 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import { Timeout, TimeoutError } from './module/timeout';
import { Retry } from './module/retry';
import { RateLimit, RateLimitError } from './module/rate-limit';
import { Fallback } from './module/fallback';
import { BreakerError } from './module/breaker';
import { BreakerError, BreakerState } from './module/breaker';
import { ConsecutiveBreaker } from './module/breaker/consecutive-breaker';
import { SlidingCountBreaker } from './module/breaker/sliding/count-breaker';
import { SlidingTimeBreaker } from './module/breaker/sliding/time-breaker';

// Default Export
export {
Expand All @@ -30,6 +32,9 @@ export {
NoFuncError,
TimeoutError,
BreakerError,
BreakerState,
RateLimit,
RateLimitError
RateLimitError,
SlidingCountBreaker,
SlidingTimeBreaker
};
83 changes: 71 additions & 12 deletions src/module/breaker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import { Circuit } from '../../circuit';
// TODO
export interface BreakerOptions extends ModuleOptions {
state?: BreakerState;
resetDelay?: number;
openStateDelay?: number;
halfOpenStateMaxDelay?: number;
}

export class BreakerError extends Error {
Expand All @@ -20,36 +21,94 @@ export enum BreakerState {
OPENED = 'opened'
}

export class Breaker extends Module {
export interface BreakerNotification {
onOpened? (): void;
onHalfOpened? (): void;
onClosed? (): void;
}

export class Breaker extends Module implements BreakerNotification {
// Public Attributes
public state: BreakerState;
public resetDelay: number;
private previousState: BreakerState;
public openStateDelay: number;
public halfOpenStateMaxDelay: number;
private halfOpenMaxDelayTimeout = 0;

// Constructor
constructor (options?: BreakerOptions) {
super(options);
this.state = options?.state || BreakerState.CLOSED;
this.resetDelay = options?.resetDelay || 60 * 1000;
this.previousState = this.state;
this.openStateDelay = options?.openStateDelay || 60 * 1000;
this.halfOpenStateMaxDelay = options?.halfOpenStateMaxDelay || 0;
if (this.state === BreakerState.OPENED) {
this.setHalfDelay();
} else if (this.state === BreakerState.HALF_OPENED) {
this.setOpenDelay();
}
}

public onOpened(): void {
//Implementation on classes extending Breaker
}

public onClosed(): void {
//Implementation on classes extending Breaker
}

public onHalfOpened(): void {
//Implementation on classes extending Breaker
}

// Public Methods
public async execute<T> (circuit: Circuit, promise: any, ...params: any[]): Promise<T> {
this.emit('execute', circuit);
return promise(promise, ...params);
}
public open (): void {
this.state = BreakerState.OPENED;
this.logger?.debug('Breaker: Open');
this.setHalfDelay();
if (this.state !== BreakerState.OPENED) {
this.clearHalfOpenTimeout();
this.state = BreakerState.OPENED;
this.logger?.debug('Breaker: Open');
this.setHalfDelay();
this.onOpened();
}
}
public halfOpen (): void {
this.state = BreakerState.HALF_OPENED;
if (this.state !== BreakerState.HALF_OPENED) {
this.clearHalfOpenTimeout();
this.state = BreakerState.HALF_OPENED;
this.setOpenDelay();
this.onHalfOpened();
}
}
public close (): void {
this.state = BreakerState.CLOSED;
if (this.state !== BreakerState.CLOSED) {
this.clearHalfOpenTimeout();
this.logger?.debug('Breaker: Close');
this.state = BreakerState.CLOSED;
this.onClosed();
}
}
public setHalfDelay (): void {
private setHalfDelay (): void {
setTimeout(() => {
this.state = BreakerState.HALF_OPENED;
this.logger?.debug('Breaker: Half Open');
}, this.resetDelay);
this.halfOpen();
}, this.openStateDelay);
}
private setOpenDelay (): void {
if (this.halfOpenStateMaxDelay) {
this.halfOpenMaxDelayTimeout = <unknown>setTimeout(() => {
this.halfOpenMaxDelayTimeout = 0;
this.open();
}, this.halfOpenStateMaxDelay) as number;
}
}
private clearHalfOpenTimeout (): void {
if (this.halfOpenMaxDelayTimeout) {
clearTimeout(this.halfOpenMaxDelayTimeout);
this.halfOpenMaxDelayTimeout = 0;
}
}
}
26 changes: 26 additions & 0 deletions src/module/breaker/sliding/count-breaker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { SlidingWindowBreaker, SlidingWindowRequestResult } from './index';

export class SlidingCountBreaker extends SlidingWindowBreaker<SlidingWindowRequestResult> {

public async executeInClosed<T> (promise: any, ...params: any[]): Promise<T> {
const {requestResult, response } = await this.executePromise(promise, ...params);
this.callsInClosedState.push(requestResult);
const nbCalls = this.callsInClosedState.length;
if (nbCalls >= this.minimumNumberOfCalls) {
if (nbCalls > this.slidingWindowSize) {
this.callsInClosedState.shift();
}
this.checkCallRatesClosed(this.open.bind(this));
}
if (requestResult === SlidingWindowRequestResult.FAILURE) {
return Promise.reject(response);
} else {
return Promise.resolve(response);
}
}

private checkCallRatesClosed(callbackFailure: (() => void)): void {
const {nbSlow, nbFailure} = this.callsInClosedState.reduce(this.getNbSlowAndFailure, {nbSlow: 0, nbFailure: 0});
this.checkResult(nbSlow, nbFailure, this.callsInClosedState.length, callbackFailure);
}
}
141 changes: 141 additions & 0 deletions src/module/breaker/sliding/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import { Breaker, BreakerOptions, BreakerState, BreakerError } from '../index';
import { Circuit } from '../../../circuit';

export interface SlidingWindowBreakerOptions extends BreakerOptions {
slidingWindowSize?: number;
minimumNumberOfCalls?: number;
failureRateThreshold?: number;
slowCallRateThreshold?: number;
slowCallDurationThreshold?: number;
permittedNumberOfCallsInHalfOpenSate?: number;
}

export enum SlidingWindowRequestResult {
SUCCESS = 0,
FAILURE = 1,
TIMEOUT = 2
}

export abstract class SlidingWindowBreaker<T> extends Breaker {
public slidingWindowSize: number;
public minimumNumberOfCalls: number;
public failureRateThreshold: number;
public slowCallRateThreshold: number;
public slowCallDurationThreshold: number;
public permittedNumberOfCallsInHalfOpenSate: number;
public callsInClosedState: T[];
private nbCallsInHalfOpenedState: number;
private callsInHalfOpenedState: SlidingWindowRequestResult[];

constructor (options?: SlidingWindowBreakerOptions) {
super(options);
this.slidingWindowSize = options?.slidingWindowSize || 10;
this.minimumNumberOfCalls = options?.minimumNumberOfCalls || 10;
if (this.slidingWindowSize < this.minimumNumberOfCalls) {
this.slidingWindowSize = this.minimumNumberOfCalls;
}
this.failureRateThreshold = (options?.failureRateThreshold || 50) / 100;
this.slowCallDurationThreshold = options?.slowCallDurationThreshold || 60000;
this.slowCallRateThreshold = (options?.slowCallRateThreshold || 100) / 100;
this.permittedNumberOfCallsInHalfOpenSate = options?.permittedNumberOfCallsInHalfOpenSate || 10;
this.nbCallsInHalfOpenedState = 0;
this.callsInHalfOpenedState = [];
this.callsInClosedState = [];
}

private reinitializeCounters (): void {
this.nbCallsInHalfOpenedState = 0;
this.callsInClosedState = [];
this.callsInHalfOpenedState = [];
}
public onOpened(): void {
this.reinitializeCounters();
}

public onClosed(): void {
this.reinitializeCounters();
}

public onHalfOpened(): void {
this.reinitializeCounters();
}

public async execute<T1> (circuit: Circuit, promise: any, ...params: any[]): Promise<T1> {
this.emit('execute', circuit);
switch (this.state) {
case BreakerState.OPENED:
return Promise.reject(new BreakerError('Circuit is opened'));
case BreakerState.HALF_OPENED:
return await this.executeInHalfOpened(promise, ...params);
case BreakerState.CLOSED:
return await this.executeInClosed(promise, ...params);
}
}

abstract executeInClosed<T1> (promise: any, ...params: any[]): Promise<T1>;

protected async executeInHalfOpened<T1> (promise: any, ...params: any[]): Promise<T1> {
if (this.nbCallsInHalfOpenedState < this.permittedNumberOfCallsInHalfOpenSate) {
this.nbCallsInHalfOpenedState++;
const {requestResult, response } = await this.executePromise(promise, ...params);
this.callsInHalfOpenedState.push(requestResult);
if (this.callsInHalfOpenedState.length === this.permittedNumberOfCallsInHalfOpenSate) {
this.checkCallRatesHalfOpen(this.open.bind(this), this.close.bind(this));
}
if (requestResult === SlidingWindowRequestResult.FAILURE) {
return Promise.reject(response);
} else {
return Promise.resolve(response);
}
} else {
return Promise.reject(new BreakerError('Circuit is half opened and max allowed request in this state has been reached'));
}
}

protected executePromise(promise: any, ...params: any[]): Promise<{requestResult: SlidingWindowRequestResult, response: any}> {
const beforeRequest = (new Date()).getTime();
return promise(...params)
.then((res: any) => {
const afterRequest = (new Date()).getTime();
let requestResp = SlidingWindowRequestResult.SUCCESS;
if (this.slowCallDurationThreshold !== 0 && this.slowCallDurationThreshold !== Infinity) {
if ((afterRequest - beforeRequest) > this.slowCallDurationThreshold) {
requestResp = SlidingWindowRequestResult.TIMEOUT;
}
}
return {requestResult: requestResp, response: res};
})
.catch((err: any) => {
return {requestResult: SlidingWindowRequestResult.FAILURE, response: err};
});
}

protected checkCallRatesHalfOpen(callbackFailure: (() => void), callbackSuccess?: (() => void)): void {
const {nbSlow, nbFailure} = this.callsInHalfOpenedState.reduce(this.getNbSlowAndFailure, {nbSlow: 0, nbFailure: 0});
this.checkResult(nbSlow, nbFailure, this.callsInHalfOpenedState.length, callbackFailure, callbackSuccess);
}

protected checkResult(nbSlow: number, nbFailure: number, nbCalls: number, callbackFailure: (() => void), callbackSuccess?: (() => void)): void {
if (
(this.slowCallRateThreshold < 100 && ((nbSlow / nbCalls) >= this.slowCallRateThreshold)) ||
(this.failureRateThreshold < 100 && ((nbFailure / nbCalls) >= this.failureRateThreshold))
) {
callbackFailure();
} else {
if (callbackSuccess) {
callbackSuccess();
}
}
}

protected getNbSlowAndFailure(acc: {nbSlow: number, nbFailure: number}, current: SlidingWindowRequestResult): {nbSlow: number, nbFailure: number} {
switch(current) {
case SlidingWindowRequestResult.FAILURE:
acc.nbFailure++;
break;
case SlidingWindowRequestResult.TIMEOUT:
acc.nbSlow++;
}
return acc;
}
}
72 changes: 72 additions & 0 deletions src/module/breaker/sliding/time-breaker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { BreakerState } from '../index';
import { SlidingWindowBreaker, SlidingWindowBreakerOptions, SlidingWindowRequestResult } from './index';

export interface SlidingTimeElem {
result: SlidingWindowRequestResult,
timestamp: number
}

interface SlidingTimeBreakerOptions extends SlidingWindowBreakerOptions {
slidingWindowSizeInMs?: boolean
}

export class SlidingTimeBreaker extends SlidingWindowBreaker<SlidingTimeElem> {
private maxSize: number;
constructor(options?: SlidingTimeBreakerOptions) {
super(options);
if (options?.slidingWindowSizeInMs) {
//Sliding window is in ms, no need to multiply by 1000
} else {
this.slidingWindowSize = this.slidingWindowSize * 1000;
}
this.maxSize = 1000;
}

private filterCalls(): void {
let nbCalls = this.callsInClosedState.length;
if (nbCalls >= this.maxSize) {
this.callsInClosedState.shift;
nbCalls--;
}
let stillOk = true;
const now = (new Date()).getTime();
for (let i=0; i<nbCalls && stillOk;i++) {
if ((now - this.callsInClosedState[0].timestamp) > this.slidingWindowSize) {
this.callsInClosedState.shift();
} else {
stillOk = false;
}
}
}

public async executeInClosed<T> (promise: any, ...params: any[]): Promise<T> {
const {requestResult, response } = await this.executePromise(promise, ...params);
//this.callsInClosedState = this.callsInClosedState.filter((elem) => (now - elem.timestamp) <= this.slidingWindowSize)
this.filterCalls();
this.callsInClosedState.push({result: requestResult, timestamp: (new Date()).getTime()});
if (this.callsInClosedState.length >= this.minimumNumberOfCalls) {
this.checkCallRatesClosed(this.open.bind(this));
}
if (requestResult === SlidingWindowRequestResult.FAILURE) {
return Promise.reject(response);
} else {
return Promise.resolve(response);
}
}

private checkCallRatesClosed(callbackFailure: (() => void)): void {
const {nbSlow, nbFailure} = this.callsInClosedState.reduce(this.getNbSlowAndFailureTimeElem, {nbSlow: 0, nbFailure: 0});
this.checkResult(nbSlow, nbFailure, this.callsInClosedState.length, callbackFailure);
}

public getNbSlowAndFailureTimeElem (acc: {nbSlow: number, nbFailure: number}, current: SlidingTimeElem): {nbSlow: number, nbFailure: number} {
switch(current.result) {
case SlidingWindowRequestResult.FAILURE:
acc.nbFailure++;
break;
case SlidingWindowRequestResult.TIMEOUT:
acc.nbSlow++;
}
return acc;
}
}
2 changes: 1 addition & 1 deletion src/module/rate-limit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export class RateLimit extends Module {
return promise(...params);
} else {
if (now - this.requestsTime[0] > this.limitPeriod) {
this.requestsTime.splice(0,1);
this.requestsTime.shift();
this.requestsTime.push(now);
return promise(...params);
} else {
Expand Down
Loading

0 comments on commit 59e67c0

Please sign in to comment.