Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/add remove with pattern #1725

Merged
merged 3 commits into from May 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 21 additions & 1 deletion REFERENCE.md
Expand Up @@ -8,6 +8,7 @@
- [Queue#resume](#queueresume)
- [Queue#whenCurrentJobsFinished](#queuewhencurrentjobsfinished)
- [Queue#count](#queuecount)
- [Queue#removeJobs](#queueremovejobs)
- [Queue#empty](#queueempty)
- [Queue#clean](#queueclean)
- [Queue#close](#queueclose)
Expand Down Expand Up @@ -308,7 +309,7 @@ pause(isLocal?: boolean, doNotWaitActive?: boolean): Promise

Returns a promise that resolves when the queue is paused. A paused queue will not process new jobs until resumed, but current jobs being processed will continue until they are finalized. The pause can be either global or local. If global, all workers in all queue instances for a given queue will be paused. If local, just this worker will stop processing new jobs after the current lock expires. This can be useful to stop a worker from taking new jobs prior to shutting down.

If `doNotWaitActive` is `true`, `pause` will *not* wait for any active jobs to finish before resolving. Otherwise, `pause` *will* wait for active jobs to finish. See [Queue#whenCurrentJobsFinished](#queuewhencurrentjobsfinished) for more information.
If `doNotWaitActive` is `true`, `pause` will _not_ wait for any active jobs to finish before resolving. Otherwise, `pause` _will_ wait for active jobs to finish. See [Queue#whenCurrentJobsFinished](#queuewhencurrentjobsfinished) for more information.

Pausing a queue that is already paused does nothing.

Expand Down Expand Up @@ -346,6 +347,25 @@ Returns a promise that returns the number of jobs in the queue, waiting or delay

---

### Queue#removeJobs

```ts
removeJobs(pattern: string): Promise<void>
```

Removes all the jobs which jobId matches the given pattern. The pattern must follow redis glob-style pattern (syntax)[https://redis.io/commands/keys]

Example:
```js
myQueue.removeJobs('?oo*').then(function() {
console.log('done removing jobs');
});
```

Will remove jobs with ids such as: "boo", "foofighter", etc.

---

### Queue#empty

```ts
Expand Down
52 changes: 52 additions & 0 deletions lib/commands/removeJobs-7.lua
@@ -0,0 +1,52 @@
--[[
Remove all jobs matching a given pattern from all the queues they may be in as well as all its data.
In order to be able to remove any job, they must be unlocked.

Input:
KEYS[1] 'active',
KEYS[2] 'wait',
KEYS[3] 'delayed',
KEYS[4] 'paused',
KEYS[5] 'completed',
KEYS[6] 'failed',
KEYS[7] 'priority',

ARGV[1] prefix
ARGV[2] pattern
ARGV[3] cursor

Events:
'removed'
]]

-- TODO PUBLISH global events 'removed'

local rcall = redis.call
local result = rcall("SCAN", ARGV[3], "MATCH", ARGV[1] .. ARGV[2])
local cursor = result[1];
local jobKeys = result[2];
local removed = {}

local prefixLen = string.len(ARGV[1]) + 1
for i, jobKey in ipairs(jobKeys) do
local keyTypeResp = rcall("TYPE", jobKey)
if keyTypeResp["ok"] == "hash" then
local jobId = string.sub(jobKey, prefixLen)
local lockKey = jobKey .. ':lock'
local lock = redis.call("GET", lockKey)
if not lock then
rcall("LREM", KEYS[1], 0, jobId)
rcall("LREM", KEYS[2], 0, jobId)
rcall("ZREM", KEYS[3], jobId)
rcall("LREM", KEYS[4], 0, jobId)
rcall("ZREM", KEYS[5], jobId)
rcall("ZREM", KEYS[6], jobId)
rcall("ZREM", KEYS[7], jobId)
rcall("DEL", jobKey)
rcall("DEL", jobKey .. ':logs')
table.insert(removed, jobId)
end
end
end
return {cursor, removed}

6 changes: 6 additions & 0 deletions lib/job.js
Expand Up @@ -119,6 +119,12 @@ Job.fromId = function(queue, jobId) {
});
};

Job.remove = async function(queue, pattern) {
await queue.isReady();
const removed = await scripts.removeWithPattern(queue, pattern);
removed.forEach(jobId => queue.emit('removed', jobId));
};

Job.prototype.progress = function(progress) {
if (_.isUndefined(progress)) {
return this._progress;
Expand Down
4 changes: 4 additions & 0 deletions lib/queue.js
Expand Up @@ -540,6 +540,10 @@ Queue.prototype.disconnect = function() {
});
};

Queue.prototype.removeJobs = function(pattern) {
return Job.remove(this, pattern);
};

Queue.prototype.close = function(doNotWaitJobs) {
if (this.closing) {
return this.closing;
Expand Down
29 changes: 29 additions & 0 deletions lib/scripts.js
Expand Up @@ -299,6 +299,35 @@ const scripts = {
return queue.client.removeJob(keys.concat([jobId, queue.token]));
},

async removeWithPattern(queue, pattern) {
const keys = _.map(
[
'active',
'wait',
'delayed',
'paused',
'completed',
'failed',
'priority'
],
name => {
return queue.toKey(name);
}
);

const allRemoved = [];
let cursor = '0',
removed;
do {
[cursor, removed] = await queue.client.removeJobs(
keys.concat([queue.toKey(''), pattern, cursor])
);
allRemoved.push.apply(allRemoved, removed);
} while (cursor !== '0');

return allRemoved;
},

extendLock(queue, jobId) {
return queue.client.extendLock([
queue.toKey(jobId) + ':lock',
Expand Down
20 changes: 20 additions & 0 deletions test/test_job.js
Expand Up @@ -312,6 +312,26 @@ describe('Job', () => {
});
});

describe('.removeFromPattern', () => {
it('remove jobs matching pattern', async () => {
const jobIds = ['foo', 'foo1', 'foo2', 'foo3', 'foo4', 'bar', 'baz'];
await Promise.all(
jobIds.map(jobId => Job.create(queue, { foo: 'bar' }, { jobId }))
);

await queue.removeJobs('foo*');

for (let i = 0; i < jobIds.length; i++) {
const storedJob = await Job.fromId(queue, jobIds[i]);
if (jobIds[i].startsWith('foo')) {
expect(storedJob).to.be(null);
} else {
expect(storedJob).to.not.be(null);
}
}
});
});

describe('.remove on priority queues', () => {
it('remove a job with jobID 1 and priority 3 and check the new order in the queue', () => {
return queue
Expand Down