Skip to content

Commit

Permalink
Merge 17ea79e into 95bc627
Browse files Browse the repository at this point in the history
  • Loading branch information
nassiharel committed Feb 17, 2020
2 parents 95bc627 + 17ea79e commit 4c3a2de
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 2 deletions.
14 changes: 14 additions & 0 deletions REFERENCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- [Queue#getDelayed](#queuegetdelayed)
- [Queue#getCompleted](#queuegetcompleted)
- [Queue#getFailed](#queuegetfailed)
- [Queue#getStalled](#queuegetstalled)

- [Job](#job)

Expand Down Expand Up @@ -94,6 +95,7 @@ interface AdvancedSettings {
lockDuration: number = 30000; // Key expiration time for job locks.
lockRenewTime: number = 15000; // Interval on which to acquire the job lock
stalledInterval: number = 30000; // How often check for stalled jobs (use 0 for never checking).
stalledCheck: boolean = true, // enable or disable stalled check on consumer process function.
maxStalledCount: number = 1; // Max amount of times a stalled job will be re-processed.
guardInterval: number = 5000; // Poll interval for delayed jobs and added jobs.
retryProcessDelay: number = 5000; // delay before processing next job in case of internal error.
Expand Down Expand Up @@ -607,6 +609,18 @@ Returns a promise that will return an array with the failed jobs between start a

---

### Queue#getStalled

```ts
getStalledJobs() : Promise<Array<Job>>
```

Returns a promise that will return an array of stalled jobs.

> This will work only if you disable the consumer stalled check by setting the `stalledCheck` to `false`.
---


### Queue#clean

```ts
Expand Down
65 changes: 65 additions & 0 deletions lib/commands/getStalledJobs-7.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
--[[
get stalled jobs.
Input:
KEYS[1] 'stalled' (SET)
KEYS[2] 'wait', (LIST)
KEYS[3] 'active', (LIST)
KEYS[4] 'failed', (ZSET)
KEYS[5] 'stalled-check', (KEY)
KEYS[6] 'meta-paused', (KEY)
KEYS[7] 'paused', (LIST)
ARGV[1] Max stalled job count
ARGV[2] queue.toKey('')
ARGV[3] timestamp
ARGV[4] max check time
]]

local rcall = redis.call

-- Find all stalled jobs
local stalling = rcall('SMEMBERS', KEYS[1])
local stalled = {}
local failed = {}
if(#stalling > 0) then

rcall('DEL', KEYS[1])

local MAX_STALLED_JOB_COUNT = tonumber(ARGV[1])

-- Remove from active list
for i, jobId in ipairs(stalling) do
local jobKey = ARGV[2] .. jobId

-- Check that the lock is also missing, then we can handle this job as really stalled.
if(rcall("EXISTS", jobKey .. ":lock") == 0) then
-- Remove from the active queue.
local removed = rcall("LREM", KEYS[3], 1, jobId)

if(removed > 0) then
-- If this job has been stalled too many times, such as if it crashes the worker, then fail it.
local stalledCount = rcall("HINCRBY", jobKey, "stalledCounter", 1)

if(stalledCount > MAX_STALLED_JOB_COUNT) then
rcall("ZADD", KEYS[4], ARGV[3], jobId)
rcall("HSET", jobKey, "failedReason", "job stalled more than allowable limit")
table.insert(failed, jobId)
else
-- Do not Move the job back to the wait queue, so it wouldn't be picked up by a waiting worker.
table.insert(stalled, jobId)
end
end
end
end
end

-- Mark potentially stalled jobs
local active = rcall('LRANGE', KEYS[3], 0, -1)
if(#active > 0) then
rcall('SADD', KEYS[1], unpack(active))
end

return {failed, stalled}
38 changes: 36 additions & 2 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ const MINIMUM_REDIS_VERSION = '2.8.18';
lockDuration?: number = 30000,
lockRenewTime?: number = lockDuration / 2,
stalledInterval?: number = 30000,
stalledCheck?: boolean = true, // enable or disable stalled check on the consumer process function
maxStalledCount?: number = 1, // The maximum number of times a job can be recovered from the 'stalled' state
guardInterval?: number = 5000,
retryProcessDelay?: number = 5000,
Expand Down Expand Up @@ -208,6 +209,7 @@ const Queue = function Queue(name, url, opts) {
this.settings = _.defaults(opts.settings, {
lockDuration: 30000,
stalledInterval: 30000,
stalledCheck: true,
maxStalledCount: 1,
guardInterval: 5000,
retryProcessDelay: 5000,
Expand Down Expand Up @@ -896,7 +898,7 @@ Queue.prototype.updateDelayTimer = function() {
* of processing a job, leaving it in 'active' but without a job lock.
*/
Queue.prototype.moveUnlockedJobsToWait = function() {
if (this.closing) {
if (this.closing || !this.settings.stalledCheck) {
return Promise.resolve();
}

Expand Down Expand Up @@ -927,9 +929,41 @@ Queue.prototype.moveUnlockedJobsToWait = function() {
});
};

/**
* This function is based on <moveUnlockedJobsToWait> function.
* except that it will return the stalled jobs instead of moving them to wait.
* This function returns array of <Jobs>.
* if the Job has failedReason: it means that the job is stalled more than allowable limit.
* otherwise it's just a stalled job.
*/

Queue.prototype.getStalledJobs = function() {
if (this.closing) {
return Promise.resolve([]);
}

return scripts
.getStalledJobs(this)
.then(([failed, stalled]) => {
const jobs = failed.concat(stalled).map(jobId => {
return this.getJobFromId(jobId);
});
return Promise.all(jobs);
})
.catch(err => {
const error = new Error('Failed to get stalled jobs, ' + err.message);
this.emit('error', err, error.message);
return Promise.reject(error);
});
};

Queue.prototype.startMoveUnlockedJobsToWait = function() {
clearInterval(this.moveUnlockedJobsToWaitInterval);
if (this.settings.stalledInterval > 0 && !this.closing) {
if (
this.settings.stalledInterval > 0 &&
!this.closing &&
this.settings.stalledCheck
) {
this.moveUnlockedJobsToWaitInterval = setInterval(
this.moveUnlockedJobsToWait,
this.settings.stalledInterval
Expand Down
24 changes: 24 additions & 0 deletions lib/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,30 @@ const scripts = {
return queue.client.moveStalledJobsToWait(keys.concat(args));
},

/**
* This function is exactly like <moveUnlockedJobsToWait> function.
* except that it will not move unlocked jobs to wait, just mark as stalled, failed
*/

getStalledJobs(queue) {
const keys = [
queue.keys.stalled,
queue.keys.wait,
queue.keys.active,
queue.keys.failed,
queue.keys['stalled-check'],
queue.keys['meta-paused'],
queue.keys.paused
];
const args = [
queue.settings.maxStalledCount,
queue.toKey(''),
Date.now(),
queue.settings.stalledInterval
];
return queue.client.getStalledJobs(keys.concat(args));
},

cleanJobsInSet(queue, set, ts, limit) {
return queue.client.cleanJobsInSet([
queue.toKey(set),
Expand Down
75 changes: 75 additions & 0 deletions test/test_queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -1412,6 +1412,81 @@ describe('Queue', () => {
.catch(done);
});

it('should get empty stalled jobs', function(done) {
this.timeout(3000);
const type = 'empty-stalled-queue-job';
const producer = new Queue(type);

producer.close();
producer.getStalledJobs().then(jobs => {
expect(jobs).to.eql([]);
done();
});
});

it('should failed to get stalled jobs', function(done) {
this.timeout(3000);
const type = 'failed-stalled-queue-job';
const data = { foo: 'bar', stalled: 'ok' };
const jobError = new Error(
'Failed to get stalled jobs, Connection is closed.'
);
const producer = new Queue(type);

const stalledJobs = function() {
producer.disconnect();
producer.getStalledJobs().catch(error => {
expect(error.message).to.be.eql(jobError.message);
done();
});
};

producer.add(type, data).then(() => {
stalledJobs();
});
});

it('should get stalled jobs', function(done) {
this.timeout(5000);

const type = 'stalled-queue-job';
const data = { foo: 'bar', stalled: 'ok' };
const setting = {
settings: {
lockRenewTime: 5000,
lockDuration: 500,
stalledInterval: 100000
}
};
const producer = new Queue(type);
const consumer = new Queue(type, setting);

const stalledJobs = function() {
producer.getStalledJobs().then(() => {
producer.getStalledJobs().then(jobs => {
const job = jobs[0];
expect(job.id).to.eql('1');
expect(job.data).to.eql(data);
done();
});
});
};

consumer.process(type, () => {
consumer.close();
consumer.disconnect();

setTimeout(() => {
stalledJobs();
}, 2000);
});

producer.add(type, data).then(job => {
expect(job.id).to.be.ok;
expect(job.data.foo).to.be.eql('bar');
});
});

it('process a job that fails', done => {
const jobError = new Error('Job Failed');

Expand Down

0 comments on commit 4c3a2de

Please sign in to comment.