Skip to content

Commit

Permalink
Merge 59838a6 into 635210f
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed May 8, 2020
2 parents 635210f + 59838a6 commit 43bb4a5
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 1 deletion.
22 changes: 21 additions & 1 deletion REFERENCE.md
Expand Up @@ -8,6 +8,7 @@
- [Queue#resume](#queueresume)
- [Queue#whenCurrentJobsFinished](#queuewhencurrentjobsfinished)
- [Queue#count](#queuecount)
- [Queue#removeJobs](#queueremovejobs)
- [Queue#empty](#queueempty)
- [Queue#clean](#queueclean)
- [Queue#close](#queueclose)
Expand Down Expand Up @@ -308,7 +309,7 @@ pause(isLocal?: boolean, doNotWaitActive?: boolean): Promise

Returns a promise that resolves when the queue is paused. A paused queue will not process new jobs until resumed, but current jobs being processed will continue until they are finalized. The pause can be either global or local. If global, all workers in all queue instances for a given queue will be paused. If local, just this worker will stop processing new jobs after the current lock expires. This can be useful to stop a worker from taking new jobs prior to shutting down.

If `doNotWaitActive` is `true`, `pause` will *not* wait for any active jobs to finish before resolving. Otherwise, `pause` *will* wait for active jobs to finish. See [Queue#whenCurrentJobsFinished](#queuewhencurrentjobsfinished) for more information.
If `doNotWaitActive` is `true`, `pause` will _not_ wait for any active jobs to finish before resolving. Otherwise, `pause` _will_ wait for active jobs to finish. See [Queue#whenCurrentJobsFinished](#queuewhencurrentjobsfinished) for more information.

Pausing a queue that is already paused does nothing.

Expand Down Expand Up @@ -346,6 +347,25 @@ Returns a promise that returns the number of jobs in the queue, waiting or delay

---

### Queue#removeJobs

```ts
removeJobs(pattern: string): Promise<void>
```

Removes all the jobs which jobId matches the given pattern. The pattern must follow redis glob-style pattern (syntax)[https://redis.io/commands/keys]

Example:
```js
myQueue.removeJobs('?oo*').then(function() {
console.log('done removing jobs');
});
```

Will remove jobs with ids such as: "boo", "foofighter", etc.

---

### Queue#empty

```ts
Expand Down
52 changes: 52 additions & 0 deletions lib/commands/removeJobs-7.lua
@@ -0,0 +1,52 @@
--[[
Remove all jobs matching a given pattern from all the queues they may be in as well as all its data.
In order to be able to remove any job, they must be unlocked.
Input:
KEYS[1] 'active',
KEYS[2] 'wait',
KEYS[3] 'delayed',
KEYS[4] 'paused',
KEYS[5] 'completed',
KEYS[6] 'failed',
KEYS[7] 'priority',
ARGV[1] prefix
ARGV[2] pattern
ARGV[3] cursor
Events:
'removed'
]]

-- TODO PUBLISH global events 'removed'

local rcall = redis.call
local result = rcall("SCAN", ARGV[3], "MATCH", ARGV[1] .. ARGV[2])
local cursor = result[1];
local jobKeys = result[2];
local removed = {}

local prefixLen = string.len(ARGV[1]) + 1
for i, jobKey in ipairs(jobKeys) do
local keyTypeResp = rcall("TYPE", jobKey)
if keyTypeResp["ok"] == "hash" then
local jobId = string.sub(jobKey, prefixLen)
local lockKey = jobKey .. ':lock'
local lock = redis.call("GET", lockKey)
if not lock then
rcall("LREM", KEYS[1], 0, jobId)
rcall("LREM", KEYS[2], 0, jobId)
rcall("ZREM", KEYS[3], jobId)
rcall("LREM", KEYS[4], 0, jobId)
rcall("ZREM", KEYS[5], jobId)
rcall("ZREM", KEYS[6], jobId)
rcall("ZREM", KEYS[7], jobId)
rcall("DEL", jobKey)
rcall("DEL", jobKey .. ':logs')
table.insert(removed, jobId)
end
end
end
return {cursor, removed}

6 changes: 6 additions & 0 deletions lib/job.js
Expand Up @@ -119,6 +119,12 @@ Job.fromId = function(queue, jobId) {
});
};

Job.remove = async function(queue, pattern) {
await queue.isReady();
const removed = await scripts.removeWithPattern(queue, pattern);
removed.forEach(jobId => queue.emit('removed', jobId));
};

Job.prototype.progress = function(progress) {
if (_.isUndefined(progress)) {
return this._progress;
Expand Down
4 changes: 4 additions & 0 deletions lib/queue.js
Expand Up @@ -540,6 +540,10 @@ Queue.prototype.disconnect = function() {
});
};

Queue.prototype.removeJobs = function(pattern) {
return Job.remove(this, pattern);
};

Queue.prototype.close = function(doNotWaitJobs) {
if (this.closing) {
return this.closing;
Expand Down
29 changes: 29 additions & 0 deletions lib/scripts.js
Expand Up @@ -299,6 +299,35 @@ const scripts = {
return queue.client.removeJob(keys.concat([jobId, queue.token]));
},

async removeWithPattern(queue, pattern) {
const keys = _.map(
[
'active',
'wait',
'delayed',
'paused',
'completed',
'failed',
'priority'
],
name => {
return queue.toKey(name);
}
);

const allRemoved = [];
let cursor = '0',
removed;
do {
[cursor, removed] = await queue.client.removeJobs(
keys.concat([queue.toKey(''), pattern, cursor])
);
allRemoved.push.apply(allRemoved, removed);
} while (cursor !== '0');

return allRemoved;
},

extendLock(queue, jobId) {
return queue.client.extendLock([
queue.toKey(jobId) + ':lock',
Expand Down
20 changes: 20 additions & 0 deletions test/test_job.js
Expand Up @@ -312,6 +312,26 @@ describe('Job', () => {
});
});

describe('.removeFromPattern', () => {
it('remove jobs matching pattern', async () => {
const jobIds = ['foo', 'foo1', 'foo2', 'foo3', 'foo4', 'bar', 'baz'];
await Promise.all(
jobIds.map(jobId => Job.create(queue, { foo: 'bar' }, { jobId }))
);

await queue.removeJobs('foo*');

for (let i = 0; i < jobIds.length; i++) {
const storedJob = await Job.fromId(queue, jobIds[i]);
if (jobIds[i].startsWith('foo')) {
expect(storedJob).to.be(null);
} else {
expect(storedJob).to.not.be(null);
}
}
});
});

describe('.remove on priority queues', () => {
it('remove a job with jobID 1 and priority 3 and check the new order in the queue', () => {
return queue
Expand Down

0 comments on commit 43bb4a5

Please sign in to comment.