Skip to content

Commit

Permalink
feat(middleware-retry): add client side rate limiter for adaptive mode (
Browse files Browse the repository at this point in the history
#2439)

* chore(middleware-retry): add RateLimiter interface

* chore: add scaffolding for default RateLimiter

* feat: add getSendToken implementation

* chore: code to achieve RateLimiter using functions

Discontinuing as it's getting complex. Will use class instead.

* feat: add DefaultRateLimiter

* test: rateLimiter.updateClientSendingRate

* test: rateLimiter.cubicSuccess

* test: rateLimiter.cubicThrottle

* test: rateLimiter.getSendToken
  • Loading branch information
trivikr committed May 29, 2021
1 parent f4c8f09 commit 8ef104d
Show file tree
Hide file tree
Showing 3 changed files with 288 additions and 0 deletions.
120 changes: 120 additions & 0 deletions packages/middleware-retry/src/DefaultRateLimiter.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import { isThrottlingError } from "@aws-sdk/service-error-classification";

import { DefaultRateLimiter } from "./DefaultRateLimiter";

jest.mock("@aws-sdk/service-error-classification");

describe(DefaultRateLimiter.name, () => {
beforeEach(() => {
(isThrottlingError as jest.Mock).mockReturnValue(false);
});

afterEach(() => {
jest.clearAllMocks();
});

describe("getSendToken", () => {
beforeEach(() => {
jest.useFakeTimers();
});

afterEach(() => {
jest.useRealTimers();
});

it.each([
[0.5, 892.8571428571428],
[1, 1785.7142857142856],
[2, 2000],
])("timestamp: %d, delay: %d", async (timestamp, delay) => {
jest.spyOn(Date, "now").mockImplementation(() => 0);
const rateLimiter = new DefaultRateLimiter();

(isThrottlingError as jest.Mock).mockReturnValueOnce(true);
jest.spyOn(Date, "now").mockImplementation(() => timestamp * 1000);
rateLimiter.updateClientSendingRate({});

rateLimiter.getSendToken();
jest.runAllTimers();
expect(setTimeout).toHaveBeenLastCalledWith(expect.any(Function), delay);
});
});

describe("cubicSuccess", () => {
it.each([
[5, 7],
[6, 9.64893601],
[7, 10.00003085],
[8, 10.45328452],
[9, 13.40869703],
[10, 21.26626836],
[11, 36.42599853],
])("timestamp: %d, calculatedRate: %d", (timestamp, calculatedRate) => {
jest.spyOn(Date, "now").mockImplementation(() => 0);
const rateLimiter = new DefaultRateLimiter();
rateLimiter["lastMaxRate"] = 10;
rateLimiter["lastThrottleTime"] = 5;

jest.spyOn(Date, "now").mockImplementation(() => timestamp * 1000);

const cubicSuccessSpy = jest.spyOn(DefaultRateLimiter.prototype as any, "cubicSuccess");
rateLimiter.updateClientSendingRate({});
expect(cubicSuccessSpy).toHaveLastReturnedWith(calculatedRate);
});
});

describe("cubicThrottle", () => {
it.each([
[5, 0.112],
[6, 0.09333333],
[7, 0.08],
[8, 0.07],
[9, 0.06222222],
])("timestamp: %d, calculatedRate: %d", (timestamp, calculatedRate) => {
jest.spyOn(Date, "now").mockImplementation(() => 0);
const rateLimiter = new DefaultRateLimiter();
rateLimiter["lastMaxRate"] = 10;
rateLimiter["lastThrottleTime"] = 5;

(isThrottlingError as jest.Mock).mockReturnValueOnce(true);
jest.spyOn(Date, "now").mockImplementation(() => timestamp * 1000);
const cubicThrottleSpy = jest.spyOn(DefaultRateLimiter.prototype as any, "cubicThrottle");
rateLimiter.updateClientSendingRate({});
expect(cubicThrottleSpy).toHaveLastReturnedWith(calculatedRate);
});
});

it("updateClientSendingRate", () => {
jest.spyOn(Date, "now").mockImplementation(() => 0);
const rateLimiter = new DefaultRateLimiter();

const testCases: [boolean, number, number, number][] = [
[false, 0.2, 0, 0.5],
[false, 0.4, 0, 0.5],
[false, 0.6, 4.8, 0.5],
[false, 0.8, 4.8, 0.5],
[false, 1, 4.16, 0.5],
[false, 1.2, 4.16, 0.6912],
[false, 1.4, 4.16, 1.0976],
[false, 1.6, 5.632, 1.6384],
[false, 1.8, 5.632, 2.3328],
[true, 2, 4.3264, 3.02848],
[false, 2.2, 4.3264, 3.486639],
[false, 2.4, 4.3264, 3.821874],
[false, 2.6, 5.66528, 4.053386],
[false, 2.8, 5.66528, 4.200373],
[false, 3.0, 4.333056, 4.282037],
[true, 3.2, 4.333056, 2.997426],
[false, 3.4, 4.333056, 3.452226],
];

testCases.forEach(([isThrottlingErrorReturn, timestamp, measuredTxRate, fillRate]) => {
(isThrottlingError as jest.Mock).mockReturnValue(isThrottlingErrorReturn);
jest.spyOn(Date, "now").mockImplementation(() => timestamp * 1000);

rateLimiter.updateClientSendingRate({});
expect(rateLimiter["measuredTxRate"]).toEqual(measuredTxRate);
expect(parseFloat(rateLimiter["fillRate"].toFixed(6))).toEqual(fillRate);
});
});
});
150 changes: 150 additions & 0 deletions packages/middleware-retry/src/DefaultRateLimiter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import { isThrottlingError } from "@aws-sdk/service-error-classification";

import { RateLimiter } from "./types";

export interface DefaultRateLimiterOptions {
beta?: number;
minCapacity?: number;
minFillRate?: number;
scaleConstant?: number;
smooth?: number;
}

export class DefaultRateLimiter implements RateLimiter {
// User configurable constants
private beta: number;
private minCapacity: number;
private minFillRate: number;
private scaleConstant: number;
private smooth: number;

// Pre-set state variables
private currentCapacity = 0;
private enabled = false;
private lastMaxRate = 0;
private measuredTxRate = 0;
private requestCount = 0;

// Other state variables
private fillRate: number;
private lastThrottleTime: number;
private lastTimestamp = 0;
private lastTxRateBucket: number;
private maxCapacity: number;
private timeWindow = 0;

constructor(options?: DefaultRateLimiterOptions) {
this.beta = options?.beta ?? 0.7;
this.minCapacity = options?.minCapacity ?? 1;
this.minFillRate = options?.minFillRate ?? 0.5;
this.scaleConstant = options?.scaleConstant ?? 0.4;
this.smooth = options?.smooth ?? 0.8;

const currentTimeInSeconds = this.getCurrentTimeInSeconds();
this.lastThrottleTime = currentTimeInSeconds;
this.lastTxRateBucket = Math.floor(this.getCurrentTimeInSeconds());

this.fillRate = this.minFillRate;
this.maxCapacity = this.minCapacity;
}

private getCurrentTimeInSeconds() {
return Date.now() / 1000;
}

public async getSendToken() {
return this.acquireTokenBucket(1);
}

private async acquireTokenBucket(amount: number) {
// Client side throttling is not enabled until we see a throttling error.
if (!this.enabled) {
return;
}

this.refillTokenBucket();
if (amount > this.currentCapacity) {
const delay = ((amount - this.currentCapacity) / this.fillRate) * 1000;
await new Promise((resolve) => setTimeout(resolve, delay));
}
this.currentCapacity = this.currentCapacity - amount;
}

private refillTokenBucket() {
const timestamp = this.getCurrentTimeInSeconds();
if (!this.lastTimestamp) {
this.lastTimestamp = timestamp;
return;
}

const fillAmount = (timestamp - this.lastTimestamp) * this.fillRate;
this.currentCapacity = Math.min(this.maxCapacity, this.currentCapacity + fillAmount);
this.lastTimestamp = timestamp;
}

public updateClientSendingRate(response: any) {
let calculatedRate: number;
this.updateMeasuredRate();

if (isThrottlingError(response)) {
const rateToUse = !this.enabled ? this.measuredTxRate : Math.min(this.measuredTxRate, this.fillRate);
this.lastMaxRate = rateToUse;
this.calculateTimeWindow();
this.lastThrottleTime = this.getCurrentTimeInSeconds();
calculatedRate = this.cubicThrottle(rateToUse);
this.enableTokenBucket();
} else {
this.calculateTimeWindow();
calculatedRate = this.cubicSuccess(this.getCurrentTimeInSeconds());
}

const newRate = Math.min(calculatedRate, 2 * this.measuredTxRate);
this.updateTokenBucketRate(newRate);
}

private calculateTimeWindow() {
this.timeWindow = this.getPrecise(Math.pow((this.lastMaxRate * (1 - this.beta)) / this.scaleConstant, 1 / 3));
}

private cubicThrottle(rateToUse: number) {
return this.getPrecise(rateToUse * this.beta);
}

private cubicSuccess(timestamp: number) {
return this.getPrecise(
this.scaleConstant * Math.pow(timestamp - this.lastThrottleTime - this.timeWindow, 3) + this.lastMaxRate
);
}

private enableTokenBucket() {
this.enabled = true;
}

private updateTokenBucketRate(newRate: number) {
// Refill based on our current rate before we update to the new fill rate.
this.refillTokenBucket();

this.fillRate = Math.max(newRate, this.minFillRate);
this.maxCapacity = Math.max(newRate, this.minCapacity);

// When we scale down we can't have a current capacity that exceeds our maxCapacity.
this.currentCapacity = Math.min(this.currentCapacity, this.maxCapacity);
}

private updateMeasuredRate() {
const t = this.getCurrentTimeInSeconds();
const timeBucket = Math.floor(t * 2) / 2;
this.requestCount++;

if (timeBucket > this.lastTxRateBucket) {
const currentRate = this.requestCount / (timeBucket - this.lastTxRateBucket);
this.measuredTxRate = this.getPrecise(currentRate * this.smooth + this.measuredTxRate * (1 - this.smooth));
this.requestCount = 0;
this.lastTxRateBucket = timeBucket;
}
}

private getPrecise(num: number) {
return parseFloat(num.toFixed(8));
}
}
18 changes: 18 additions & 0 deletions packages/middleware-retry/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,21 @@ export interface RetryQuota {
*/
releaseRetryTokens: (releaseCapacityAmount?: number) => void;
}

export interface RateLimiter {
/**
* If there is sufficient capacity (tokens) available, it immediately returns.
* If there is not sufficient capacity, it will either sleep a certain amount
* of time until the rate limiter can retrieve a token from its token bucket
* or raise an exception indicating there is insufficient capacity.
*/
getSendToken: () => Promise<void>;

/**
* Updates the client sending rate based on response.
* If the response was successful, the capacity and fill rate are increased.
* If the response was a throttling response, the capacity and fill rate are
* decreased. Transient errors do not affect the rate limiter.
*/
updateClientSendingRate: (response: any) => void;
}

0 comments on commit 8ef104d

Please sign in to comment.