Skip to content

Commit

Permalink
Fredzqm/Throttler return promise of handler result (#1049)
Browse files Browse the repository at this point in the history
* make Throttler take second type for handler result

* Add docs to Throttler
  • Loading branch information
fredzqm committed Dec 6, 2018
1 parent 6f3bd73 commit 1a70e45
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 47 deletions.
6 changes: 3 additions & 3 deletions src/database/remove.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export default class DatabaseRemove {
concurrency: number;
retries: number;
remote: RemoveRemote;
private jobStack: Stack<string>;
private jobStack: Stack<string, void>;
private waitingPath: Map<string, number>;

/**
Expand All @@ -35,7 +35,7 @@ export default class DatabaseRemove {
this.retries = options.retries;
this.remote = new RTDBRemoveRemote(options.instance);
this.waitingPath = new Map();
this.jobStack = new Stack({
this.jobStack = new Stack<string, void>({
name: "long delete stack",
concurrency: this.concurrency,
handler: this.chunkedDelete.bind(this),
Expand All @@ -49,7 +49,7 @@ export default class DatabaseRemove {
return prom;
}

private chunkedDelete(path: string): Promise<any> {
private chunkedDelete(path: string): Promise<void> {
return this.remote
.prefetchTest(path)
.then((test: NodeSize) => {
Expand Down
2 changes: 1 addition & 1 deletion src/test/throttler/queue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ describe("Queue", () => {

it("should be first-in-first-out", async () => {
const order: string[] = [];
const queue = new Queue<Task>({
const queue = new Queue<Task, void>({
handler: createHandler(order),
concurrency: 1,
});
Expand Down
4 changes: 2 additions & 2 deletions src/test/throttler/stack.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ describe("Stack", () => {

it("should be first-in-last-out", async () => {
const order: string[] = [];
const queue = new Stack<Task>({
const queue = new Stack<Task, void>({
handler: createHandler(order),
concurrency: 1,
});
Expand All @@ -30,7 +30,7 @@ describe("Stack", () => {

it("should not repeat completed tasks", async () => {
const order: string[] = [];
const queue = new Stack<Task>({
const queue = new Stack<Task, void>({
handler: createHandler(order),
concurrency: 1,
});
Expand Down
15 changes: 14 additions & 1 deletion src/test/throttler/throttler.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { Throttler, ThrottlerOptions } from "../../throttler/throttler";
const TEST_ERROR = new Error("foobar");

interface ThrottlerConstructor {
new <T>(options: ThrottlerOptions<T>): Throttler<T>;
new <T, R>(options: ThrottlerOptions<T, R>): Throttler<T, R>;
}

const throttlerTest = (throttlerConstructor: ThrottlerConstructor) => {
Expand Down Expand Up @@ -225,6 +225,19 @@ const throttlerTest = (throttlerConstructor: ThrottlerConstructor) => {
expect(q.total).to.equal(3);
});
});

it("should return the result of task", () => {
const handler = (task: number) => {
return Promise.resolve(`result: ${task}`);
};

const q = new throttlerConstructor({
handler,
});

expect(q.run(2)).to.eventually.to.equal("result: 2");
expect(q.run(3)).to.eventually.to.equal("result: 3");
});
};

describe("Throttler", () => {
Expand Down
4 changes: 2 additions & 2 deletions src/throttler/queue.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { Throttler, ThrottlerOptions } from "./throttler";

export class Queue<T> extends Throttler<T> {
export class Queue<T, R> extends Throttler<T, R> {
cursor: number = 0;

constructor(options: ThrottlerOptions<T>) {
constructor(options: ThrottlerOptions<T, R>) {
super(options);
this.name = this.name || "queue";
}
Expand Down
4 changes: 2 additions & 2 deletions src/throttler/stack.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Throttler, ThrottlerOptions } from "./throttler";

export class Stack<T> extends Throttler<T> {
export class Stack<T, R> extends Throttler<T, R> {
lastTotal: number = 0;
stack: number[] = [];

constructor(options: ThrottlerOptions<T>) {
constructor(options: ThrottlerOptions<T, R>) {
super(options);
this.name = this.name || "stack";
}
Expand Down
123 changes: 87 additions & 36 deletions src/throttler/throttler.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import * as logger from "../logger";

function _backoff(retryNumber: number, delay: number): Promise<void> {
function backoff(retryNumber: number, delay: number): Promise<void> {
return new Promise((resolve: () => void) => {
setTimeout(resolve, delay * Math.pow(2, retryNumber));
});
}

function DEFAULT_HANDLER(task: any): Promise<any> {
return (task as () => Promise<any>)();
function DEFAULT_HANDLER<R>(task: any): Promise<R> {
return (task as () => Promise<R>)();
}

export interface ThrottlerOptions<T> {
export interface ThrottlerOptions<T, R> {
name?: string;
concurrency?: number;
handler?: (task: T) => Promise<any>;
handler?: (task: T) => Promise<R>;
retries?: number;
backoff?: number;
}
Expand All @@ -31,7 +31,21 @@ export interface ThrottlerStats {
elapsed: number;
}

export abstract class Throttler<T> {
interface TaskData<T, R> {
task: T;
retryCount: number;
wait?: { resolve: (R: any) => void; reject: (err: Error) => void };
}

/**
* Throttler is a task scheduler that throttles the maximum number of tasks running at the same time.
* In the case of failure, it will retry with exponential backoff, until exceeding the retries limit.
* T is the type of task. R is the type of the handler's result.
* You can use throttler in two ways:
* 1. Specify handler that is (T) => R.
* 2. Not specify the handler, but T must be () => R.
*/
export abstract class Throttler<T, R> {
name: string = "";
concurrency: number = 200;
handler: (task: T) => Promise<any> = DEFAULT_HANDLER;
Expand All @@ -41,19 +55,18 @@ export abstract class Throttler<T> {
errored: number = 0;
retried: number = 0;
total: number = 0;
tasks: { [index: number]: T } = {};
taskDataMap = new Map<number, TaskData<T, R>>();
waits: Array<{ resolve: () => void; reject: (err: Error) => void }> = [];
min: number = 9999999999;
max: number = 0;
avg: number = 0;
retries: number = 0;
backoff: number = 200;
retryCounts: { [index: number]: number } = {};
closed: boolean = false;
finished: boolean = false;
startTime: number = 0;

constructor(options: ThrottlerOptions<T>) {
constructor(options: ThrottlerOptions<T, R>) {
if (options.name) {
this.name = options.name;
}
Expand Down Expand Up @@ -84,34 +97,42 @@ export abstract class Throttler<T> {
*/
abstract nextWaitingTaskIndex(): number;

/**
* Return a promise that waits until the Throttler is closed and all tasks finish.
*/
wait(): Promise<void> {
const p = new Promise<void>((resolve, reject) => {
this.waits.push({ resolve, reject });
});
return p;
}

/**
* Add the task to the throttler.
* When the task is completed, resolve will be called with handler's result.
* If this task fails after retries, reject will be called with the error.
*/
add(task: T): void {
if (this.closed) {
throw new Error("Cannot add a task to a closed throttler.");
}

if (!this.startTime) {
this.startTime = Date.now();
}
this.addHelper(task);
}

this.tasks[this.total] = task;
this.total++;
this.process();
/**
* Add the task to the throttler and return a promise of handler's result.
* If the task failed, both the promised returned by throttle and wait will reject.
*/
run(task: T): Promise<R> {
return new Promise((resolve, reject) => {
this.addHelper(task, { resolve, reject });
});
}

close(): boolean {
this.closed = true;
return this._finishIfIdle();
return this.finishIfIdle();
}

process(): void {
if (this._finishIfIdle() || this.active >= this.concurrency || !this.hasWaitingTask()) {
if (this.finishIfIdle() || this.active >= this.concurrency || !this.hasWaitingTask()) {
return;
}

Expand All @@ -120,12 +141,16 @@ export abstract class Throttler<T> {
}

async handle(cursorIndex: number): Promise<void> {
const task = this.tasks[cursorIndex];
const taskData = this.taskDataMap.get(cursorIndex);
if (!taskData) {
throw new Error(`taskData.get(${cursorIndex}) does not exist`);
}
const task = taskData.task;
const tname = this.taskName(cursorIndex);
const t0 = Date.now();

try {
await this.handler(task);
const result = await this.handler(task);
const dt = Date.now() - t0;
if (dt < this.min) {
this.min = dt;
Expand All @@ -138,16 +163,17 @@ export abstract class Throttler<T> {
this.success++;
this.complete++;
this.active--;
delete this.tasks[cursorIndex];
delete this.retryCounts[cursorIndex];
if (taskData.wait) {
taskData.wait.resolve(result);
}
this.taskDataMap.delete(cursorIndex);
this.process();
} catch (err) {
if (this.retries > 0) {
this.retryCounts[cursorIndex] = this.retryCounts[cursorIndex] || 0;
if (this.retryCounts[cursorIndex] < this.retries) {
this.retryCounts[cursorIndex]++;
if (taskData.retryCount < this.retries) {
taskData.retryCount++;
this.retried++;
await _backoff(this.retryCounts[cursorIndex], this.backoff);
await backoff(taskData.retryCount, this.backoff);
logger.debug(`[${this.name}] Retrying task`, tname);
return this.handle(cursorIndex);
}
Expand All @@ -156,12 +182,15 @@ export abstract class Throttler<T> {
this.errored++;
this.complete++;
this.active--;
if (this.retryCounts[cursorIndex] > 0) {
if (taskData.retryCount > 0) {
logger.debug(`[${this.name}] Retries exhausted for task ${tname}:`, err);
} else {
logger.debug(`[${this.name}] Error on task ${tname}:`, err);
}
this._finish(err);
if (taskData.wait) {
taskData.wait.reject(err);
}
this.finish(err);
}
}

Expand All @@ -181,20 +210,42 @@ export abstract class Throttler<T> {
}

taskName(cursorIndex: number): string {
const task = this.tasks[cursorIndex] || "finished task";
return typeof task === "string" ? task : `index ${cursorIndex}`;
const taskData = this.taskDataMap.get(cursorIndex);
if (!taskData) {
return "finished task";
}
return typeof taskData.task === "string" ? taskData.task : `index ${cursorIndex}`;
}

private addHelper(
task: T,
wait?: { resolve: (result: R) => void; reject: (err: Error) => void }
): void {
if (this.closed) {
throw new Error("Cannot add a task to a closed throttler.");
}
if (!this.startTime) {
this.startTime = Date.now();
}
this.taskDataMap.set(this.total, {
task,
wait,
retryCount: 0,
});
this.total++;
this.process();
}

private _finishIfIdle(): boolean {
private finishIfIdle(): boolean {
if (this.closed && !this.hasWaitingTask() && this.active === 0) {
this._finish(null);
this.finish(null);
return true;
}

return false;
}

private _finish(err: Error | null): void {
private finish(err: Error | null): void {
this.waits.forEach((p) => {
if (err) {
return p.reject(err);
Expand Down

0 comments on commit 1a70e45

Please sign in to comment.