diff --git a/REFERENCE.md b/REFERENCE.md index 49c52c0d1..d0e8e46a9 100644 --- a/REFERENCE.md +++ b/REFERENCE.md @@ -8,6 +8,7 @@ - [Queue#resume](#queueresume) - [Queue#whenCurrentJobsFinished](#queuewhencurrentjobsfinished) - [Queue#count](#queuecount) + - [Queue#removeJobs](#queueremovejobs) - [Queue#empty](#queueempty) - [Queue#clean](#queueclean) - [Queue#close](#queueclose) @@ -308,7 +309,7 @@ pause(isLocal?: boolean, doNotWaitActive?: boolean): Promise Returns a promise that resolves when the queue is paused. A paused queue will not process new jobs until resumed, but current jobs being processed will continue until they are finalized. The pause can be either global or local. If global, all workers in all queue instances for a given queue will be paused. If local, just this worker will stop processing new jobs after the current lock expires. This can be useful to stop a worker from taking new jobs prior to shutting down. -If `doNotWaitActive` is `true`, `pause` will *not* wait for any active jobs to finish before resolving. Otherwise, `pause` *will* wait for active jobs to finish. See [Queue#whenCurrentJobsFinished](#queuewhencurrentjobsfinished) for more information. +If `doNotWaitActive` is `true`, `pause` will _not_ wait for any active jobs to finish before resolving. Otherwise, `pause` _will_ wait for active jobs to finish. See [Queue#whenCurrentJobsFinished](#queuewhencurrentjobsfinished) for more information. Pausing a queue that is already paused does nothing. @@ -346,6 +347,25 @@ Returns a promise that returns the number of jobs in the queue, waiting or delay --- +### Queue#removeJobs + +```ts +removeJobs(pattern: string): Promise +``` + +Removes all the jobs which jobId matches the given pattern. The pattern must follow redis glob-style pattern (syntax)[https://redis.io/commands/keys] + +Example: +```js +myQueue.removeJobs('?oo*').then(function() { + console.log('done removing jobs'); +}); +``` + +Will remove jobs with ids such as: "boo", "foofighter", etc. + +--- + ### Queue#empty ```ts diff --git a/lib/commands/removeJobs-7.lua b/lib/commands/removeJobs-7.lua new file mode 100644 index 000000000..89dc0fe0a --- /dev/null +++ b/lib/commands/removeJobs-7.lua @@ -0,0 +1,52 @@ +--[[ + Remove all jobs matching a given pattern from all the queues they may be in as well as all its data. + In order to be able to remove any job, they must be unlocked. + + Input: + KEYS[1] 'active', + KEYS[2] 'wait', + KEYS[3] 'delayed', + KEYS[4] 'paused', + KEYS[5] 'completed', + KEYS[6] 'failed', + KEYS[7] 'priority', + + ARGV[1] prefix + ARGV[2] pattern + ARGV[3] cursor + + Events: + 'removed' +]] + +-- TODO PUBLISH global events 'removed' + +local rcall = redis.call +local result = rcall("SCAN", ARGV[3], "MATCH", ARGV[1] .. ARGV[2]) +local cursor = result[1]; +local jobKeys = result[2]; +local removed = {} + +local prefixLen = string.len(ARGV[1]) + 1 +for i, jobKey in ipairs(jobKeys) do + local keyTypeResp = rcall("TYPE", jobKey) + if keyTypeResp["ok"] == "hash" then + local jobId = string.sub(jobKey, prefixLen) + local lockKey = jobKey .. ':lock' + local lock = redis.call("GET", lockKey) + if not lock then + rcall("LREM", KEYS[1], 0, jobId) + rcall("LREM", KEYS[2], 0, jobId) + rcall("ZREM", KEYS[3], jobId) + rcall("LREM", KEYS[4], 0, jobId) + rcall("ZREM", KEYS[5], jobId) + rcall("ZREM", KEYS[6], jobId) + rcall("ZREM", KEYS[7], jobId) + rcall("DEL", jobKey) + rcall("DEL", jobKey .. ':logs') + table.insert(removed, jobId) + end + end +end +return {cursor, removed} + diff --git a/lib/job.js b/lib/job.js index 19347b0c7..7970d987c 100644 --- a/lib/job.js +++ b/lib/job.js @@ -119,6 +119,12 @@ Job.fromId = function(queue, jobId) { }); }; +Job.remove = async function(queue, pattern) { + await queue.isReady(); + const removed = await scripts.removeWithPattern(queue, pattern); + removed.forEach(jobId => queue.emit('removed', jobId)); +}; + Job.prototype.progress = function(progress) { if (_.isUndefined(progress)) { return this._progress; diff --git a/lib/queue.js b/lib/queue.js index 2f1d31bf9..50e0b7017 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -540,6 +540,10 @@ Queue.prototype.disconnect = function() { }); }; +Queue.prototype.removeJobs = function(pattern) { + return Job.remove(this, pattern); +}; + Queue.prototype.close = function(doNotWaitJobs) { if (this.closing) { return this.closing; diff --git a/lib/scripts.js b/lib/scripts.js index 561e3cb81..2370b4b10 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -299,6 +299,35 @@ const scripts = { return queue.client.removeJob(keys.concat([jobId, queue.token])); }, + async removeWithPattern(queue, pattern) { + const keys = _.map( + [ + 'active', + 'wait', + 'delayed', + 'paused', + 'completed', + 'failed', + 'priority' + ], + name => { + return queue.toKey(name); + } + ); + + const allRemoved = []; + let cursor = '0', + removed; + do { + [cursor, removed] = await queue.client.removeJobs( + keys.concat([queue.toKey(''), pattern, cursor]) + ); + allRemoved.push.apply(allRemoved, removed); + } while (cursor !== '0'); + + return allRemoved; + }, + extendLock(queue, jobId) { return queue.client.extendLock([ queue.toKey(jobId) + ':lock', diff --git a/test/test_job.js b/test/test_job.js index 782a47e37..69d07ea1c 100644 --- a/test/test_job.js +++ b/test/test_job.js @@ -312,6 +312,26 @@ describe('Job', () => { }); }); + describe('.removeFromPattern', () => { + it('remove jobs matching pattern', async () => { + const jobIds = ['foo', 'foo1', 'foo2', 'foo3', 'foo4', 'bar', 'baz']; + await Promise.all( + jobIds.map(jobId => Job.create(queue, { foo: 'bar' }, { jobId })) + ); + + await queue.removeJobs('foo*'); + + for (let i = 0; i < jobIds.length; i++) { + const storedJob = await Job.fromId(queue, jobIds[i]); + if (jobIds[i].startsWith('foo')) { + expect(storedJob).to.be(null); + } else { + expect(storedJob).to.not.be(null); + } + } + }); + }); + describe('.remove on priority queues', () => { it('remove a job with jobID 1 and priority 3 and check the new order in the queue', () => { return queue