Skip to content

Commit

Permalink
timeout part 1
Browse files Browse the repository at this point in the history
  • Loading branch information
orthagonal committed Feb 24, 2018
1 parent 66212b8 commit 40c014a
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 3 deletions.
27 changes: 24 additions & 3 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ const fs = require('fs');
const path = require('path');

class Queue {
constructor(mongoUrl, collectionName, waitDelay = 500) {
constructor(mongoUrl, collectionName, waitDelay = 500, maxProcessingTime = 300000) {
this.jobs = {};
this.mongoUrl = mongoUrl;
this.collectionName = collectionName;
this.waitDelay = waitDelay;
this.maxProcessingTime = maxProcessingTime;
this.conn = null;
this.db = null;
this.Joi = Joi;

if (!this.mongoUrl) {
throw new Error('mongoUrl not set');
}
Expand All @@ -36,11 +36,32 @@ class Queue {
await this.conn.close();
}

async stalledJobs() {
const outstandingJobs = await this.db.find({
status: 'processing',
}).toArray();
const cleanJobs = [];
outstandingJobs.forEach(job => {
if (new Date().getTime() - job.startTime.getTime() > this.maxProcessingTime) {
cleanJobs.push(this.db.update({
_id: job._id,
}, {
$set: {
status: 'error',
error: `Job exceeded maximum-allowed processing time of ${this.maxProcessingTime}ms`
}
}));
// todo: should emit 'error' event when merged with EventEmitter branch:
}
});
await Promise.all(cleanJobs);
}

async process() {
if (this.exiting) {
return;
}

await this.stalledJobs();
const job = await this.getJob();

if (!job || !job.value) {
Expand Down
33 changes: 33 additions & 0 deletions test/queueJob.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -564,3 +564,36 @@ tap.test('queue - update job', async (t) => {
await q.stop();
t.end();
});

tap.test('jobs that process long than max expiration time are errored', async (t) => {
const q = new Queue('mongodb://localhost:27017/queue', 'queue', 50, 100);
await q.start();

const job = {
name: 'testJob',
payloadValidation: q.Joi.object().keys({
foo: 'bar'
}),
async process(data) {
await wait(1000000);
}
};

q.createJob(job);

await q.queueJob({
name: 'testJob',
payload: {
foo: 'bar'
}
});

await wait(3000);

const jobs = await q.db.find({}).toArray();
t.isLike(jobs[0], {
status: 'error'
});
await q.stop();
t.end();
});

0 comments on commit 40c014a

Please sign in to comment.