Skip to content

Commit

Permalink
Merge e1ea714 into 5d77cff
Browse files Browse the repository at this point in the history
  • Loading branch information
fredzqm committed Dec 3, 2018
2 parents 5d77cff + e1ea714 commit 7012203
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 40 deletions.
20 changes: 10 additions & 10 deletions src/database/remove.ts
Expand Up @@ -3,7 +3,7 @@ import * as FirebaseError from "../error";
import * as logger from "../logger";

import { NodeSize, RemoveRemote, RTDBRemoveRemote } from "./removeRemote";
import { Queue } from "../queue";
import { Stack } from "../throttler/stack";

export interface DatabaseRemoveOptions {
// RTBD instance ID.
Expand All @@ -19,7 +19,7 @@ export default class DatabaseRemove {
public concurrency: number;
public retries: number;
public remote: RemoveRemote;
private jobQueue: Queue<string>;
private jobStack: Stack<string>;
private waitingPath: Map<string, number>;

/**
Expand All @@ -35,17 +35,17 @@ export default class DatabaseRemove {
this.retries = options.retries;
this.remote = new RTDBRemoveRemote(options.instance);
this.waitingPath = new Map();
this.jobQueue = new Queue({
name: "long delete queue",
this.jobStack = new Stack({
name: "long delete stack",
concurrency: this.concurrency,
handler: this.chunkedDelete.bind(this),
retries: this.retries,
});
}

public execute(): Promise<void> {
const prom: Promise<void> = this.jobQueue.wait();
this.jobQueue.add(this.path);
const prom: Promise<void> = this.jobStack.wait();
this.jobStack.add(this.path);
return prom;
}

Expand All @@ -60,7 +60,7 @@ export default class DatabaseRemove {
return this.remote.listPath(path).then((pathList: string[]) => {
if (pathList) {
for (const p of pathList) {
this.jobQueue.add(pathLib.join(path, p));
this.jobStack.add(pathLib.join(path, p));
}
this.waitingPath.set(path, pathList.length);
}
Expand All @@ -77,8 +77,8 @@ export default class DatabaseRemove {
return;
}
if (path === this.path) {
this.jobQueue.close();
logger.debug("[database][long delete queue][FINAL]", this.jobQueue.stats());
this.jobStack.close();
logger.debug("[database][long delete stack][FINAL]", this.jobStack.stats());
} else {
const parentPath = pathLib.dirname(path);
const prevParentPathReference = this.waitingPath.get(parentPath);
Expand All @@ -90,7 +90,7 @@ export default class DatabaseRemove {
}
this.waitingPath.set(parentPath, prevParentPathReference - 1);
if (this.waitingPath.get(parentPath) === 0) {
this.jobQueue.add(parentPath);
this.jobStack.add(parentPath);
this.waitingPath.delete(parentPath);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/deploy/hosting/uploader.js
Expand Up @@ -12,7 +12,7 @@ const hashcache = require("./hashcache");
const detectProjectRoot = require("../../detectProjectRoot");
const api = require("../../api");
const logger = require("../../logger");
const { Queue } = require("../../queue");
const { Queue } = require("../../throttler/queue");

const MIN_UPLOAD_TIMEOUT = 30000; // 30s
const MAX_UPLOAD_TIMEOUT = 7200000; // 2h
Expand Down
30 changes: 30 additions & 0 deletions src/test/throttler/queue.spec.ts
@@ -0,0 +1,30 @@
import { expect } from "chai";

import Queue from "../../throttler/queue";
import { createHandler, createTask, Task } from "./throttler.spec";

describe("Queue", () => {
it("should have default name of queue", () => {
const queue = new Queue({});
expect(queue.name).to.equal("queue");
});

it("should be first-in-first-out", async () => {
const order: string[] = [];
const queue = new Queue<Task>({
handler: createHandler(order),
concurrency: 1,
});

const blocker = await createTask("blocker", false);
queue.add(blocker);
queue.add(await createTask("1", true));
queue.add(await createTask("2", true));

blocker.resolve();

queue.close();
await queue.wait();
expect(order).to.deep.equal(["blocker", "1", "2"]);
});
});
64 changes: 64 additions & 0 deletions src/test/throttler/stack.spec.ts
@@ -0,0 +1,64 @@
import { expect } from "chai";

import Stack from "../../throttler/stack";
import { createHandler, createTask, Task } from "./throttler.spec";

describe("Stack", () => {
it("should have default name of stack", () => {
const stack = new Stack({});
expect(stack.name).to.equal("stack");
});

it("should be first-in-last-out", async () => {
const order: string[] = [];
const queue = new Stack<Task>({
handler: createHandler(order),
concurrency: 1,
});

const blocker = await createTask("blocker", false);
queue.add(blocker);
queue.add(await createTask("1", true));
queue.add(await createTask("2", true));

blocker.resolve();

queue.close();
await queue.wait();
expect(order).to.deep.equal(["blocker", "2", "1"]);
});

it("should not repeat completed tasks", async () => {
const order: string[] = [];
const queue = new Stack<Task>({
handler: createHandler(order),
concurrency: 1,
});

const t1 = await createTask("t1", false);
queue.add(t1);
const t2 = await createTask("t2", false);
queue.add(t2);

queue.add(await createTask("added before t1 finished a", true));
queue.add(await createTask("added before t1 finished b", true));
t1.resolve();

await t2.startExecutePromise; // wait until t2 starts to execute

queue.add(await createTask("added before t2 finished a", true));
queue.add(await createTask("added before t2 finished b", true));
t2.resolve();

queue.close();
await queue.wait();
expect(order).to.deep.equal([
"t1",
"added before t1 finished b",
"added before t1 finished a",
"t2",
"added before t2 finished b",
"added before t2 finished a",
]);
});
});
123 changes: 109 additions & 14 deletions src/test/queue.spec.ts → src/test/throttler/throttler.spec.ts
@@ -1,16 +1,25 @@
import * as chai from "chai";
import * as sinon from "sinon";
import { expect } from "chai";

const { expect } = chai;

import Queue from "../queue";
import Queue from "../../throttler/queue";
import Stack from "../../throttler/stack";
import { Throttler, ThrottlerOptions } from "../../throttler/throttler";

const TEST_ERROR = new Error("foobar");

describe("Queue", () => {
interface ThrottlerConstructor {
new <T>(options: ThrottlerOptions<T>): Throttler<T>;
}

const throttlerTest = (throttlerConstructor: ThrottlerConstructor) => {
it("should have no waiting task after creation", () => {
const queue = new throttlerConstructor({});
expect(queue.hasWaitingTask()).to.equal(false);
});

it("should return the task as the task name", () => {
const handler = sinon.stub().resolves();
const q = new Queue({
const q = new throttlerConstructor({
handler,
});

Expand All @@ -22,7 +31,7 @@ describe("Queue", () => {

it("should return the index as the task name", () => {
const handler = sinon.stub().resolves();
const q = new Queue({
const q = new throttlerConstructor({
handler,
});

Expand All @@ -33,7 +42,7 @@ describe("Queue", () => {

it("should return 'finished task' as the task name", () => {
const handler = sinon.stub().resolves();
const q = new Queue({
const q = new throttlerConstructor({
handler,
});

Expand All @@ -47,7 +56,7 @@ describe("Queue", () => {

it("should handle function tasks", () => {
const task = sinon.stub().resolves();
const q = new Queue({});
const q = new throttlerConstructor({});

q.add(task);
q.close();
Expand All @@ -64,7 +73,7 @@ describe("Queue", () => {

it("should handle tasks", () => {
const handler = sinon.stub().resolves();
const q = new Queue({
const q = new throttlerConstructor({
handler,
});

Expand All @@ -83,7 +92,7 @@ describe("Queue", () => {

it("should not retry", () => {
const handler = sinon.stub().rejects(TEST_ERROR);
const q = new Queue({
const q = new throttlerConstructor({
handler,
retries: 0,
});
Expand Down Expand Up @@ -111,7 +120,7 @@ describe("Queue", () => {

it("should retry the number of retries, plus one", () => {
const handler = sinon.stub().rejects(TEST_ERROR);
const q = new Queue({
const q = new throttlerConstructor({
backoff: 0,
handler,
retries: 3,
Expand Down Expand Up @@ -153,7 +162,7 @@ describe("Queue", () => {
return Promise.reject();
};

const q = new Queue({
const q = new throttlerConstructor({
backoff: 0,
concurrency: 2,
handler,
Expand Down Expand Up @@ -190,7 +199,7 @@ describe("Queue", () => {
.onCall(8)
.resolves(0);

const q = new Queue({
const q = new throttlerConstructor({
backoff: 0,
concurrency: 1, // this makes sure only one task is running at a time, so not flaky
handler,
Expand All @@ -216,4 +225,90 @@ describe("Queue", () => {
expect(q.total).to.equal(3);
});
});
};

describe("Throttler", () => {
describe("Queue", () => {
throttlerTest(Queue);
});
describe("Stack", () => {
throttlerTest(Stack);
});
});

/**
* Some shared test utility for Queue and Stack.
*/
export interface Task {
/**
* The identifier added to the ordering list.
*/
name: string;

/**
* Gets returned by the handler.
* We can control the timing of this promise in test.
*/
promise: Promise<any>;

/**
* Mark the task as done.
*/
resolve: (value?: any) => void;

/**
* Mark the task as failed.
*/
reject: (reason?: any) => void;

/**
* Mark the task as started.
*/
startExecute: (value?: any) => void;

/**
* A promise that wait until this task starts executing.
*/
startExecutePromise: Promise<any>;
}

export const createTask = (name: string, resolved: boolean) => {
return new Promise<Task>((res) => {
let resolve: (value?: any) => void = () => {
throw new Error("resolve is not set");
};
let reject: (reason?: any) => void = () => {
throw new Error("reject is not set");
};
let startExecute: (value?: any) => void = () => {
throw new Error("startExecute is not set");
};
const promise = new Promise((s, j) => {
resolve = s;
reject = j;
});
const startExecutePromise = new Promise((s, j) => {
startExecute = s;
});
res({
name,
promise,
resolve,
reject,
startExecute,
startExecutePromise,
});
if (resolved) {
resolve();
}
});
};

export const createHandler = (orderList: string[]) => {
return (task: Task) => {
task.startExecute();
return task.promise.then(() => {
orderList.push(task.name);
});
};
};
24 changes: 24 additions & 0 deletions src/throttler/queue.ts
@@ -0,0 +1,24 @@
import { Throttler, ThrottlerOptions } from "./throttler";

export class Queue<T> extends Throttler<T> {
public cursor: number = 0;

constructor(options: ThrottlerOptions<T>) {
super(options);
this.name = this.name || "queue";
}

public hasWaitingTask(): boolean {
return this.cursor !== this.total;
}

public nextWaitingTaskIndex(): number {
if (this.cursor >= this.total) {
throw new Error("There is no more task in queue");
}
this.cursor++;
return this.cursor - 1;
}
}

export default Queue;

0 comments on commit 7012203

Please sign in to comment.