Skip to content

Commit

Permalink
refactor queue.js so retryCount is per task (#988)
Browse files Browse the repository at this point in the history
* refactor queue.js so retryCount is per task

* Add tests for queue; allow custom backoff

* add sinon types

* add test for two tasks having separate retry count

* added a couple of more tests

* added more asertions

* tests about taskName

* handle comments

* use callCountMap for testing concurrency
  • Loading branch information
fredzqm committed Nov 5, 2018
1 parent 0fddfb3 commit eee904b
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 16 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
"@types/glob": "^7.1.1",
"@types/mocha": "^5.2.5",
"@types/node": "^10.12.0",
"@types/sinon": "^5.0.5",
"chai": "^4.2.0",
"chai-as-promised": "^7.1.1",
"coveralls": "^3.0.1",
Expand Down
31 changes: 16 additions & 15 deletions src/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ class Queue {
this.max = 0;
this.avg = 0;
this.retries = options.retries || 0;
this.backoff = 200;
this.backoff = typeof options.backoff == "number" ? options.backoff : 200;
this.retryCounts = {};
this.closed = false;
this.finished = false;
}

taskName(task) {
return typeof task === "string" ? task : "index " + this.tasks.indexOf(task);
taskName(cursorIndex) {
const task = this.tasks[cursorIndex];
return typeof task === "string" ? task : "index " + cursorIndex;
}

wait() {
Expand Down Expand Up @@ -73,13 +74,13 @@ class Queue {
return;
}

const task = this.tasks[this.cursor];
this.cursor++;
this.active++;
this.handle(task);
this.handle(this.cursor - 1);
}

handle(task) {
handle(cursorIndex) {
const task = this.tasks[cursorIndex];
const t0 = Date.now();
const self = this;
this.handler(task)
Expand All @@ -100,29 +101,29 @@ class Queue {
})
.catch(function(err) {
if (self.retries > 0) {
self.retryCounts[task] = self.retryCounts[task] || 0;
if (self.retryCounts[task] < self.retries) {
self.retryCounts[task]++;
self.retryCounts[cursorIndex] = self.retryCounts[cursorIndex] || 0;
if (self.retryCounts[cursorIndex] < self.retries) {
self.retryCounts[cursorIndex]++;
self.retried++;
return _backoff(self.retryCounts[task], self.backoff).then(function() {
logger.debug("[" + self.name + "] Retrying task", self.taskName(task));
return self.handle(task);
return _backoff(self.retryCounts[cursorIndex], self.backoff).then(function() {
logger.debug("[" + self.name + "] Retrying task", self.taskName(cursorIndex));
return self.handle(cursorIndex);
});
}
}

self.errored++;
self.complete++;
self.active--;
if (self.retryCounts[task] > 0) {
if (self.retryCounts[cursorIndex] > 0) {
logger.debug(
"[" + self.name + "] Retries exhausted for task",
self.taskName(task),
self.taskName(cursorIndex),
":",
err
);
} else {
logger.debug("[" + self.name + "] Error on task", self.taskName(task), ":", err);
logger.debug("[" + self.name + "] Error on task", self.taskName(cursorIndex), ":", err);
}
self._finish(err);
});
Expand Down
206 changes: 206 additions & 0 deletions src/test/queue.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
import * as chai from "chai";
import * as sinon from "sinon";

const { expect } = chai;

import Queue = require("../queue");

const TEST_ERROR = new Error("foobar");

describe("Queue", () => {
it("should ignore non-number backoff", () => {
const q = new Queue({
backoff: "not a number",
});
expect(q.backoff).to.equal(200);
});

it("should return the task as the task name", () => {
const handler = sinon.stub().resolves();
const q = new Queue({
handler,
});

const stringTask = "test task";
q.add(stringTask);

expect(q.taskName(0)).to.equal(stringTask);
});

it("should return the index as the task name", () => {
const handler = sinon.stub().resolves();
const q = new Queue({
handler,
});

q.add(2);

expect(q.taskName(0)).to.equal("index 0");
});

it("should handle function tasks", () => {
const task = sinon.stub().resolves();
const q = new Queue({});

q.add(task);
q.close();

return q.wait().then(() => {
expect(task.callCount).to.equal(1);
expect(q.complete).to.equal(1);
expect(q.success).to.equal(1);
expect(q.errored).to.equal(0);
expect(q.retried).to.equal(0);
});
});

it("should handle tasks", () => {
const handler = sinon.stub().resolves();
const q = new Queue({
handler,
});

q.add(4);
q.close();

return q.wait().then(() => {
expect(handler.callCount).to.equal(1);
expect(q.complete).to.equal(1);
expect(q.success).to.equal(1);
expect(q.errored).to.equal(0);
expect(q.retried).to.equal(0);
});
});

it("should not retry", () => {
const handler = sinon.stub().rejects(TEST_ERROR);
const q = new Queue({
handler,
retries: 0,
});

q.add(4);
q.close();

return q
.wait()
.then(() => {
throw new Error("handler should have rejected");
})
.catch((err: Error) => {
expect(err).to.equal(TEST_ERROR);
})
.then(() => {
expect(handler.callCount).to.equal(1);
expect(q.complete).to.equal(1);
expect(q.success).to.equal(0);
expect(q.errored).to.equal(1);
expect(q.retried).to.equal(0);
});
});

it("should retry the number of retries, plus one", () => {
const handler = sinon.stub().rejects(TEST_ERROR);
const q = new Queue({
backoff: 0,
handler,
retries: 3,
});

q.add(4);
q.close();

return q
.wait()
.then(() => {
throw new Error("handler should have rejected");
})
.catch((err: Error) => {
expect(err).to.equal(TEST_ERROR);
})
.then(() => {
expect(handler.callCount).to.equal(4);
expect(q.complete).to.equal(1);
expect(q.success).to.equal(0);
expect(q.errored).to.equal(1);
expect(q.retried).to.equal(3);
});
});

it("should handle tasks in concurrency", () => {
const callCountMap = new Map<any, number>();
const handler = (task: any) => {
let count = callCountMap.get(task);
if (!count) {
count = 0;
}
count += 1;
callCountMap.set(task, count);
if (count > 2) {
return Promise.resolve();
}
return Promise.reject();
};

const q = new Queue({
backoff: 0,
concurrency: 2,
handler,
retries: 2,
});

q.add("1");
q.add("2");
q.add("3");
q.close();

return q
.wait()
.catch((err: Error) => {
throw new Error("handler should have passed ");
})
.then(() => {
expect(q.complete).to.equal(3);
expect(q.success).to.equal(3);
expect(q.errored).to.equal(0);
expect(q.retried).to.equal(6);
});
});

it("should retry the number of retries for mutiple identical tasks", () => {
const handler = sinon
.stub()
.rejects(TEST_ERROR)
.onCall(2)
.resolves(0)
.onCall(5)
.resolves(0)
.onCall(8)
.resolves(0);

const q = new Queue({
backoff: 0,
concurrency: 1, // this makes sure only one task is running at a time, so not flaky
handler,
retries: 2,
});

q.add(5);
q.add(5);
q.add(5);
q.close();

return q
.wait()
.catch((err: Error) => {
throw new Error("handler should have passed");
})
.then(() => {
expect(handler.callCount).to.equal(9);
expect(q.complete).to.equal(3);
expect(q.success).to.equal(3);
expect(q.errored).to.equal(0);
expect(q.retried).to.equal(6);
});
});
});
2 changes: 1 addition & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"outDir": "lib",
"removeComments": true,
"sourceMap": true,
"target": "ES5"
"target": "ES6"
},
"include": [
"src/**/*"
Expand Down

0 comments on commit eee904b

Please sign in to comment.