Skip to content

Commit

Permalink
Merge bf42587 into 40afb70
Browse files Browse the repository at this point in the history
  • Loading branch information
bkendall committed Dec 5, 2018
2 parents 40afb70 + bf42587 commit e351430
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 48 deletions.
10 changes: 5 additions & 5 deletions src/database/remove.ts
Expand Up @@ -15,10 +15,10 @@ export interface DatabaseRemoveOptions {
}

export default class DatabaseRemove {
public path: string;
public concurrency: number;
public retries: number;
public remote: RemoveRemote;
path: string;
concurrency: number;
retries: number;
remote: RemoveRemote;
private jobStack: Stack<string>;
private waitingPath: Map<string, number>;

Expand All @@ -43,7 +43,7 @@ export default class DatabaseRemove {
});
}

public execute(): Promise<void> {
execute(): Promise<void> {
const prom: Promise<void> = this.jobStack.wait();
this.jobStack.add(this.path);
return prom;
Expand Down
6 changes: 3 additions & 3 deletions src/database/removeRemote.ts
Expand Up @@ -45,7 +45,7 @@ export class RTDBRemoveRemote implements RemoveRemote {
this.instance = instance;
}

public deletePath(path: string): Promise<boolean> {
deletePath(path: string): Promise<boolean> {
return new Promise((resolve, reject) => {
const url =
utils.addSubdomain(api.realtimeOrigin, this.instance) + path + ".json?print=silent";
Expand All @@ -72,7 +72,7 @@ export class RTDBRemoveRemote implements RemoveRemote {
});
}

public prefetchTest(path: string): Promise<NodeSize> {
prefetchTest(path: string): Promise<NodeSize> {
const url =
utils.addSubdomain(api.realtimeOrigin, this.instance) + path + ".json?timeout=100ms";
const reqOptions = {
Expand Down Expand Up @@ -110,7 +110,7 @@ export class RTDBRemoveRemote implements RemoveRemote {
});
}

public listPath(path: string): Promise<string[]> {
listPath(path: string): Promise<string[]> {
const url =
utils.addSubdomain(api.realtimeOrigin, this.instance) +
path +
Expand Down
8 changes: 4 additions & 4 deletions src/test/database/remove.spec.ts
Expand Up @@ -5,13 +5,13 @@ import DatabaseRemove from "../../database/remove";
import { NodeSize, RemoveRemote } from "../../database/removeRemote";

class TestRemoveRemote implements RemoveRemote {
public data: any;
data: any;

constructor(data: any) {
this.data = data;
}

public deletePath(path: string): Promise<boolean> {
deletePath(path: string): Promise<boolean> {
if (path === "/") {
this.data = null;
return Promise.resolve(true);
Expand All @@ -25,7 +25,7 @@ class TestRemoveRemote implements RemoveRemote {
return Promise.resolve(true);
}

public prefetchTest(path: string): Promise<NodeSize> {
prefetchTest(path: string): Promise<NodeSize> {
const d = this._dataAtpath(path);
if (!d) {
return Promise.resolve(NodeSize.EMPTY);
Expand All @@ -39,7 +39,7 @@ class TestRemoveRemote implements RemoveRemote {
}
}

public listPath(path: string): Promise<string[]> {
listPath(path: string): Promise<string[]> {
const d = this._dataAtpath(path);
if (d) {
return Promise.resolve(Object.keys(d));
Expand Down
6 changes: 3 additions & 3 deletions src/throttler/queue.ts
@@ -1,18 +1,18 @@
import { Throttler, ThrottlerOptions } from "./throttler";

export class Queue<T> extends Throttler<T> {
public cursor: number = 0;
cursor: number = 0;

constructor(options: ThrottlerOptions<T>) {
super(options);
this.name = this.name || "queue";
}

public hasWaitingTask(): boolean {
hasWaitingTask(): boolean {
return this.cursor !== this.total;
}

public nextWaitingTaskIndex(): number {
nextWaitingTaskIndex(): number {
if (this.cursor >= this.total) {
throw new Error("There is no more task in queue");
}
Expand Down
8 changes: 4 additions & 4 deletions src/throttler/stack.ts
@@ -1,19 +1,19 @@
import { Throttler, ThrottlerOptions } from "./throttler";

export class Stack<T> extends Throttler<T> {
public lastTotal: number = 0;
public stack: number[] = [];
lastTotal: number = 0;
stack: number[] = [];

constructor(options: ThrottlerOptions<T>) {
super(options);
this.name = this.name || "stack";
}

public hasWaitingTask(): boolean {
hasWaitingTask(): boolean {
return this.lastTotal !== this.total || this.stack.length > 0;
}

public nextWaitingTaskIndex(): number {
nextWaitingTaskIndex(): number {
while (this.lastTotal < this.total) {
this.stack.push(this.lastTotal);
this.lastTotal++;
Expand Down
58 changes: 29 additions & 29 deletions src/throttler/throttler.ts
Expand Up @@ -32,26 +32,26 @@ export interface ThrottlerStats {
}

export abstract class Throttler<T> {
public name: string = "";
public concurrency: number = 200;
public handler: (task: T) => Promise<any> = DEFAULT_HANDLER;
public active: number = 0;
public complete: number = 0;
public success: number = 0;
public errored: number = 0;
public retried: number = 0;
public total: number = 0;
public tasks: { [index: number]: T } = {};
public waits: Array<{ resolve: () => void; reject: (err: Error) => void }> = [];
public min: number = 9999999999;
public max: number = 0;
public avg: number = 0;
public retries: number = 0;
public backoff: number = 200;
public retryCounts: { [index: number]: number } = {};
public closed: boolean = false;
public finished: boolean = false;
public startTime: number = 0;
name: string = "";
concurrency: number = 200;
handler: (task: T) => Promise<any> = DEFAULT_HANDLER;
active: number = 0;
complete: number = 0;
success: number = 0;
errored: number = 0;
retried: number = 0;
total: number = 0;
tasks: { [index: number]: T } = {};
waits: Array<{ resolve: () => void; reject: (err: Error) => void }> = [];
min: number = 9999999999;
max: number = 0;
avg: number = 0;
retries: number = 0;
backoff: number = 200;
retryCounts: { [index: number]: number } = {};
closed: boolean = false;
finished: boolean = false;
startTime: number = 0;

constructor(options: ThrottlerOptions<T>) {
if (options.name) {
Expand All @@ -77,21 +77,21 @@ export abstract class Throttler<T> {
/**
* @return `true` if there are unscheduled task waiting to be scheduled.
*/
public abstract hasWaitingTask(): boolean;
abstract hasWaitingTask(): boolean;

/**
* @return the index of the next task to schedule.
*/
public abstract nextWaitingTaskIndex(): number;
abstract nextWaitingTaskIndex(): number;

public wait(): Promise<void> {
wait(): Promise<void> {
const p = new Promise<void>((resolve, reject) => {
this.waits.push({ resolve, reject });
});
return p;
}

public add(task: T): void {
add(task: T): void {
if (this.closed) {
throw new Error("Cannot add a task to a closed throttler.");
}
Expand All @@ -105,12 +105,12 @@ export abstract class Throttler<T> {
this.process();
}

public close(): boolean {
close(): boolean {
this.closed = true;
return this._finishIfIdle();
}

public process(): void {
process(): void {
if (this._finishIfIdle() || this.active >= this.concurrency || !this.hasWaitingTask()) {
return;
}
Expand All @@ -119,7 +119,7 @@ export abstract class Throttler<T> {
this.handle(this.nextWaitingTaskIndex());
}

public async handle(cursorIndex: number): Promise<void> {
async handle(cursorIndex: number): Promise<void> {
const task = this.tasks[cursorIndex];
const tname = this.taskName(cursorIndex);
const t0 = Date.now();
Expand Down Expand Up @@ -165,7 +165,7 @@ export abstract class Throttler<T> {
}
}

public stats(): ThrottlerStats {
stats(): ThrottlerStats {
return {
max: this.max,
min: this.min,
Expand All @@ -180,7 +180,7 @@ export abstract class Throttler<T> {
};
}

public taskName(cursorIndex: number): string {
taskName(cursorIndex: number): string {
const task = this.tasks[cursorIndex] || "finished task";
return typeof task === "string" ? task : `index ${cursorIndex}`;
}
Expand Down
1 change: 1 addition & 0 deletions tslint.json
Expand Up @@ -5,6 +5,7 @@
"rules": {
"arrow-parens": true,
"interface-name": false,
"member-access": [true, "no-public"],
"object-literal-key-quotes": [true, "as-needed"],
"object-literal-sort-keys": false,
"ordered-imports": [true, { "import-sources-order": "any" }],
Expand Down

0 comments on commit e351430

Please sign in to comment.