Skip to content

Commit

Permalink
feat: improve job failure events (#227)
Browse files Browse the repository at this point in the history
Adds the documented but missing `retrying` event, and adds a `failed:fatal` event that signals that
a job has failed permanently. The `failed` event remains as it was, signaling job execution
failures without signaling the outcome of the job.

Co-authored-by: Eli Skeggs <eli@mixmax.com>
Co-authored-by: Hugh Secker-Walker <hsw@hodain.net>
  • Loading branch information
3 people committed Nov 6, 2023
1 parent 0e10264 commit 15d02c2
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 71 deletions.
17 changes: 17 additions & 0 deletions lib/job.js
Expand Up @@ -222,6 +222,23 @@ class Job extends Emitter {
if (cb) helpers.asCallback(promise, cb);
return promise;
}

/**
* Compute the delay for rescheduling the job after it fails.
*
* @return {number} The number of milliseconds into the future to schedule it.
* Negative if no defined strategy or no remaining retries.
*/
computeDelay() {
const strategyName = this.options.backoff
? this.options.backoff.strategy
: 'immediate';
const strategy =
this.options.retries > 0
? this.queue.backoffStrategies.get(strategyName)
: null;
return strategy ? strategy(this) : -1;
}
}

module.exports = Job;
69 changes: 38 additions & 31 deletions lib/queue.js
Expand Up @@ -591,9 +591,8 @@ class Queue extends Emitter {
}

_finishJob(err, data, job) {
const status = err ? 'failed' : 'succeeded';

if (this._isClosed) {
const status = err ? 'failed' : 'succeeded';
throw new Error(`unable to update the status of ${status} job ${job.id}`);
}

Expand All @@ -602,36 +601,26 @@ class Queue extends Emitter {
.lrem(this.toKey('active'), 0, job.id)
.srem(this.toKey('stalling'), job.id);

const jobEvent = {
id: job.id,
event: status,
data: err ? err.message : data,
};
const delay = err ? job.computeDelay() : -1;
const status = err ? (delay >= 0 ? 'retrying' : 'failed') : 'succeeded';

job.status = status;
if (err) {
const errInfo = err.stack || err.message || err;
job.options.stacktraces.unshift(errInfo);
}

const strategyName = job.options.backoff
? job.options.backoff.strategy
: 'immediate';
const strategy =
job.options.retries > 0
? this.backoffStrategies.get(strategyName)
: null;
const delay = strategy ? strategy(job) : -1;
if (delay < 0) {
job.status = 'failed';
switch (status) {
case 'failed':
if (this.settings.removeOnFailure) {
multi.hdel(this.toKey('jobs'), job.id);
} else {
multi.hset(this.toKey('jobs'), job.id, job.toData());
multi.sadd(this.toKey('failed'), job.id);
}
} else {
job.options.retries -= 1;
job.status = 'retrying';
jobEvent.event = 'retrying';
break;
case 'retrying':
--job.options.retries;
multi.hset(this.toKey('jobs'), job.id, job.toData());
if (delay === 0) {
multi.lpush(this.toKey('waiting'), job.id);
Expand All @@ -641,19 +630,26 @@ class Queue extends Emitter {
.zadd(this.toKey('delayed'), time, job.id)
.publish(this.toKey('earlierDelayed'), time);
}
}
} else {
job.status = 'succeeded';
if (this.settings.removeOnSuccess) {
multi.hdel(this.toKey('jobs'), job.id);
} else {
multi.hset(this.toKey('jobs'), job.id, job.toData());
multi.sadd(this.toKey('succeeded'), job.id);
}
break;
case 'succeeded':
if (this.settings.removeOnSuccess) {
multi.hdel(this.toKey('jobs'), job.id);
} else {
multi.hset(this.toKey('jobs'), job.id, job.toData());
multi.sadd(this.toKey('succeeded'), job.id);
}
break;
}

if (this.settings.sendEvents) {
multi.publish(this.toKey('events'), JSON.stringify(jobEvent));
multi.publish(
this.toKey('events'),
JSON.stringify({
id: job.id,
event: status,
data: err ? err.message : data,
})
);
}

const result = err || data;
Expand Down Expand Up @@ -733,6 +729,17 @@ class Queue extends Emitter {
const status = results[0],
result = results[1];
this.emit(status, job, result);

// Workaround for #184: emit failed event for backwards
// compatibility while affording for a separate event that
// identifies the final failure.
const emitExtra =
status === 'retrying'
? 'failed'
: status === 'failed'
? 'failed:fatal'
: null;
if (emitExtra) this.emit(emitExtra, job, result);
}
}, this._emitErrorAfterTick);
}),
Expand Down
140 changes: 100 additions & 40 deletions test/queue-test.js
Expand Up @@ -463,25 +463,36 @@ describe('Queue', (it) => {
const queue = t.context.makeQueue();

const jobs = spitter();
queue.process((job) => jobs.pushSuspend(job));
queue.process(2, (job) => jobs.pushSuspend(job));

await queue.createJob({}).save();
const [, finishJob] = await jobs.shift();
await Promise.all([
queue.createJob({}).save(),
queue.createJob({}).save(),
]);
const [[, finishJob1], [, finishJob2]] = await Promise.all([
jobs.shift(),
jobs.shift(),
]);

await t.throwsAsync(() => queue.close(10), {message: /timed out/});

await t.throwsAsync(() => queue.close(10));
finishJob(null);
finishJob1();
const fail = Promise.reject(new Error('test error'));
fail.catch(() => {}); // Prevent unhandled rejections.
finishJob2(fail);

await helpers.delay(5);

const errors = t.context.queueErrors,
count = errors.length;
t.context.queueErrors = errors.filter((err) => {
return (
err.message !== 'unable to update the status of succeeded job 1'
);
});
t.is(t.context.queueErrors.length, count - 1);
t.context.handleErrors(t);
t.is(t.context.queueErrors.length, 2);
t.true(
t.context.queueErrors.every((err) =>
/^unable to update the status of (?:succeeded job 1|failed job 2)$/.test(
err.message
)
)
);
t.not(...t.context.queueErrors.slice(0, 2));
t.context.queueErrors.length = 0;
});

it('should not error on close', async (t) => {
Expand Down Expand Up @@ -875,24 +886,61 @@ describe('Queue', (it) => {
t.is(counts.succeeded, 1);
});

it('reports a failed job', async (t) => {
it('should report a failed job', async (t) => {
const queue = t.context.makeQueue();

queue.process(async (job) => {
t.is(job.data.foo, 'bar');
throw new Error('failed!');
});

const events = [
helpers.waitOn(queue, 'failed'),
helpers.waitOn(queue, 'failed:fatal'),
];

const job = await queue.createJob({foo: 'bar'}).save();
t.truthy(job.id);

const failedJob = await helpers.waitOn(queue, 'failed');
const [failedJob, fatalJob] = await Promise.all(events);
t.is(failedJob.id, job.id);
t.is(fatalJob.id, job.id);

const counts = await queue.checkHealth();
t.is(counts.failed, 1);
});

it('should report a retried job', async (t) => {
const queue = t.context.makeQueue({getEvents: false, storeJobs: false});

queue.process(async (job) => {
t.is(job.data.foo, 'bar');
if (job.options.retries) throw new Error('failed for retry!');
// job succeeds on the retry
});

const emits = Promise.all([
helpers.waitOn(queue, 'failed'),
helpers.waitOn(queue, 'retrying'),
helpers.waitOn(queue, 'succeeded'),
]);

const job = await queue.createJob({foo: 'bar'}).retries(1).save();
t.truthy(job.id);

const [failedJob, retriedJob, succeededJob] = await emits;
t.is(failedJob.id, job.id);
t.is(failedJob.status, 'retrying');
t.is(retriedJob.id, job.id);
t.is(retriedJob.status, 'retrying');
t.is(succeededJob.id, job.id);
t.is(succeededJob.status, 'succeeded');

const counts = await queue.checkHealth();
t.is(counts.failed, 0);
t.is(counts.succeeded, 1);
});

it('should not report the latest job for custom job ids', async (t) => {
const queue = t.context.makeQueue();

Expand Down Expand Up @@ -1536,32 +1584,38 @@ describe('Queue', (it) => {
t.is(err.message, `Job ${job.id} timed out (10 ms)`);
});

it('processes a job that auto-retries', async (t) => {
it('should process a job that auto-retries', async (t) => {
const queue = t.context.makeQueue();
const retries = 1;
const failMsg = 'failing to auto-retry...';
const retries = 2;
const failMsg = 'failing for auto-retry...';

const end = helpers.deferred(),
finish = end.defer();

let failCount = 0;
function validateEvent(job, err) {
t.truthy(job);
t.is(job.data.foo, 'bar');
t.is(err.message, failMsg);
}

const retryingStub = sinon.stub().callsFake(validateEvent),
failedStub = sinon.stub().callsFake(validateEvent);

queue
.on('retrying', retryingStub)
.on('failed', failedStub)
.on('failed:fatal', () => t.fail('unexpected fatal failure'));

queue.process(async (job) => {
t.is(job.data.foo, 'bar');
if (job.options.retries) {
throw new Error(failMsg);
}
t.is(failCount, retries);
t.is(retryingStub.callCount, retries);
t.is(failedStub.callCount, retries);
finish();
});

queue.on('failed', (job, err) => {
++failCount;
t.truthy(job);
t.is(job.data.foo, 'bar');
t.is(err.message, failMsg);
});

const job = await queue.createJob({foo: 'bar'}).retries(retries).save();
t.truthy(job.id);
t.is(job.data.foo, 'bar');
Expand Down Expand Up @@ -1590,30 +1644,36 @@ describe('Queue', (it) => {
t.true(called);
});

it('processes a job that times out and auto-retries', async (t) => {
it('should process a job that times out and auto-retries', async (t) => {
const queue = t.context.makeQueue();
const retries = 1;

const end = helpers.deferred(),
finish = end.defer();

let failCount = 0;
function validateEvent(job, err) {
t.regex(err.message, /timed out/);
t.truthy(job);
t.is(job.data.foo, 'bar');
}

const retryingStub = sinon.stub().callsFake(validateEvent),
failedStub = sinon.stub().callsFake(validateEvent);
queue
.on('retrying', retryingStub)
.on('failed', failedStub)
.on('failed:fatal', () => t.fail('unexpected fatal failure'));

queue.process(async (job) => {
t.is(job.data.foo, 'bar');
if (job.options.retries) {
return helpers.defer(20);
return helpers.delay(20);
}
t.is(failCount, retries);
t.is(retryingStub.callCount, retries);
t.is(failedStub.callCount, retries);
finish();
});

queue.on('failed', (job) => {
failCount += 1;
t.truthy(job);
t.is(job.data.foo, 'bar');
});

const job = await queue
.createJob({foo: 'bar'})
.timeout(10)
Expand Down Expand Up @@ -2379,8 +2439,8 @@ describe('Queue', (it) => {
});
await job.save();

// Wait for the event to show up in both, but only bind the value from the event on the job
// object.
// Wait for the event to show up in both, but only bind the value from the
// event on the job object.
const [jobEvents, queueEvents] = await record;

const jobErr = jobEvents[0][1];
Expand Down

0 comments on commit 15d02c2

Please sign in to comment.