Skip to content

Commit

Permalink
fix(scripts): throw error when moving non-active job to delayed (#2740)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed May 21, 2024
1 parent 728f524 commit 63636b1
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 15 deletions.
19 changes: 19 additions & 0 deletions lib/commands/includes/removeLock.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
local function removeLock(jobKey, stalledKey, token, jobId)
if token ~= "0" then
local lockKey = jobKey .. ':lock'
local lockToken = rcall("GET", lockKey)
if lockToken == token then
rcall("DEL", lockKey)
rcall("SREM", stalledKey, jobId)
else
if lockToken then
-- Lock exists but token does not match
return -6
else
-- Lock is missing completely
return -2
end
end
end
return 0
end
19 changes: 9 additions & 10 deletions lib/commands/moveToDelayed-4.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,21 @@
]]
local rcall = redis.call

-- Includes
--- @include "includes/removeLock"

if rcall("EXISTS", KEYS[3]) == 1 then
-- Check for job lock
if ARGV[3] ~= "0" then
local lockKey = KEYS[3] .. ':lock'
if rcall("GET", lockKey) == ARGV[3] then
rcall("DEL", lockKey)
rcall("SREM", KEYS[4], ARGV[2])
else
return -2
end
local errorCode = removeLock(KEYS[3], KEYS[4], ARGV[3], ARGV[2])
if errorCode < 0 then
return errorCode
end

local numRemovedElements = rcall("LREM", KEYS[1], -1, ARGV[2])
if numRemovedElements < 1 then return -3 end

local score = tonumber(ARGV[1])
rcall("ZADD", KEYS[2], score, ARGV[2])
rcall("PUBLISH", KEYS[2], (score / 0x1000))
rcall("LREM", KEYS[1], 0, ARGV[2])

return 0
else
Expand Down
2 changes: 1 addition & 1 deletion lib/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ Job.prototype.moveToFailed = async function(err, ignoreLock) {
const results = await multi.exec();
const code = _.last(results)[1];
if (code < 0) {
throw scripts.finishedErrors(code, this.id, command);
throw scripts.finishedErrors(code, this.id, command, 'active');
}
};

Expand Down
8 changes: 6 additions & 2 deletions lib/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -238,20 +238,24 @@ const scripts = {
);
return job.queue.client.moveToFinished(args).then(result => {
if (result < 0) {
throw scripts.finishedErrors(result, job.id, 'finished');
throw scripts.finishedErrors(result, job.id, 'finished', 'active');
} else if (result) {
return raw2jobData(result);
}
return 0;
});
},

finishedErrors(code, jobId, command) {
finishedErrors(code, jobId, command, state) {
switch (code) {
case -1:
return new Error('Missing key for job ' + jobId + ' ' + command);
case -2:
return new Error('Missing lock for job ' + jobId + ' ' + command);
case -3:
return new Error(`Job ${jobId} is not in the ${state} state. ${command}`);
case -6:
return new Error(`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`);
}
},

Expand Down
11 changes: 9 additions & 2 deletions test/test_job.js
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,9 @@ describe('Job', () => {
.then(isFailed => {
expect(isFailed).to.be(false);
})
.then(() => {
return scripts.moveToActive(queue);
})
.then(() => {
return job.moveToFailed(new Error('test error'), true);
})
Expand Down Expand Up @@ -893,7 +896,9 @@ describe('Job', () => {
})
.then(state => {
expect(state).to.be('completed');
return client.zrem(queue.toKey('completed'), job.id);
return client.zrem(queue.toKey('completed'), job.id).then(()=>{
return client.lpush(queue.toKey('active'), job.id)
});
})
.then(() => {
return job.moveToDelayed(Date.now() + 10000, true);
Expand All @@ -907,7 +912,9 @@ describe('Job', () => {
})
.then(state => {
expect(state).to.be('delayed');
return client.zrem(queue.toKey('delayed'), job.id);
return client.zrem(queue.toKey('delayed'), job.id).then(()=>{
return client.lpush(queue.toKey('active'), job.id)
});
})
.then(() => {
return job.moveToFailed(new Error('test'), true);
Expand Down

0 comments on commit 63636b1

Please sign in to comment.