Skip to content

Commit

Permalink
fix(retry-job): consider priority (#2737) fixes #1755
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed May 15, 2024
1 parent 8c3bf1f commit 09ce146
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 114 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ jobs:
node-version: 12
- name: Install dependencies
run: yarn install --frozen-lockfile --non-interactive
- name: Generate scripts
run: yarn pretest
- name: Release
env:
GITHUB_TOKEN: ${{ secrets.GH_TOKEN }}
Expand Down
6 changes: 3 additions & 3 deletions generateRawScripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ class RawScriptLoader extends ScriptLoader {
for (const command of scripts) {
const {
name,
options: { numberOfKeys, lua },
options: { numberOfKeys, lua }
} = command;
await writeFile(
path.join(writeFilenamePath, `${name}-${numberOfKeys}.lua`),
lua,
lua
);
}
}
Expand All @@ -41,5 +41,5 @@ const scriptLoader = new RawScriptLoader();

scriptLoader.transpileScripts(
path.join(__dirname, './lib/commands'),
path.join(__dirname, './rawScripts'),
path.join(__dirname, './rawScripts')
);
16 changes: 16 additions & 0 deletions lib/commands/includes/addJobWithPriority.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
--[[
Function to add job considering priority.
]]

local function addJobWithPriority(priorityKey, priority, jobId, targetKey)
rcall("ZADD", priorityKey, priority, jobId)
local count = rcall("ZCOUNT", priorityKey, 0, priority)

local len = rcall("LLEN", targetKey)
local id = rcall("LINDEX", targetKey, len - (count - 1))
if id then
rcall("LINSERT", targetKey, "BEFORE", id, jobId)
else
rcall("RPUSH", targetKey, jobId)
end
end
12 changes: 12 additions & 0 deletions lib/commands/includes/getTargetQueueList.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
--[[
Function to check for the meta.paused key to decide if we are paused or not
(since an empty list and !EXISTS are not really the same).
]]

local function getTargetQueueList(queueMetaKey, waitKey, pausedKey)
if rcall("EXISTS", queueMetaKey) ~= 1 then
return waitKey, false
else
return pausedKey, true
end
end
5 changes: 3 additions & 2 deletions lib/commands/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ const { ScriptLoader } = require('./script-loader');
const scriptLoader = new ScriptLoader();

module.exports = {
ScriptLoader, scriptLoader
}
ScriptLoader,
scriptLoader
};
27 changes: 9 additions & 18 deletions lib/commands/promote-4.lua → lib/commands/promote-5.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
KEYS[1] 'delayed'
KEYS[2] 'wait'
KEYS[3] 'paused'
KEYS[4] 'priority'
KEYS[4] 'meta-paused'
KEYS[5] 'priority'
ARGV[1] queue.toKey('')
ARGV[2] jobId
Expand All @@ -17,30 +18,20 @@
local rcall = redis.call;
local jobId = ARGV[2]

if redis.call("ZREM", KEYS[1], jobId) == 1 then
local priority = tonumber(rcall("HGET", ARGV[1] .. jobId, "priority")) or 0
-- Includes
--- @include "includes/addJobWithPriority"
--- @include "includes/getTargetQueueList"

local target = KEYS[2];
if rcall("ZREM", KEYS[1], jobId) == 1 then
local priority = tonumber(rcall("HGET", ARGV[1] .. jobId, "priority")) or 0

if rcall("EXISTS", KEYS[3]) == 1 then
target = KEYS[3]
end
local target = getTargetQueueList(KEYS[4], KEYS[2], KEYS[3])

if priority == 0 then
-- LIFO or FIFO
rcall("LPUSH", target, jobId)
else
-- Priority add
rcall("ZADD", KEYS[4], priority, jobId)
local count = rcall("ZCOUNT", KEYS[4], 0, priority)

local len = rcall("LLEN", target)
local id = rcall("LINDEX", target, len - (count - 1))
if id then
rcall("LINSERT", target, "BEFORE", id, jobId)
else
rcall("RPUSH", target, jobId)
end
addJobWithPriority(KEYS[5], priority, jobId, target)
end

-- Emit waiting event (wait..ing@token)
Expand Down
22 changes: 15 additions & 7 deletions lib/commands/retryJob-6.lua → lib/commands/retryJob-7.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
Input:
KEYS[1] 'active',
KEYS[2] 'wait'
KEYS[3] jobId
KEYS[3] jobId key
KEYS[4] 'meta-paused'
KEYS[5] 'paused'
KEYS[6] stalled key
KEYS[7] 'priority'
ARGV[1] pushCmd
ARGV[2] jobId
Expand All @@ -22,6 +23,11 @@
-2 - Job Not locked
]]
local rcall = redis.call

-- Includes
--- @include "includes/addJobWithPriority"
--- @include "includes/getTargetQueueList"

if rcall("EXISTS", KEYS[3]) == 1 then

-- Check for job lock
Expand All @@ -37,15 +43,17 @@ if rcall("EXISTS", KEYS[3]) == 1 then

rcall("LREM", KEYS[1], 0, ARGV[2])

local target
if rcall("EXISTS", KEYS[4]) ~= 1 then
target = KEYS[2]
local target = getTargetQueueList(KEYS[4], KEYS[2], KEYS[5])

local priority = tonumber(rcall("HGET", KEYS[3], "priority")) or 0

if priority == 0 then
-- LIFO or FIFO
rcall(ARGV[1], target, ARGV[2])
else
target = KEYS[5]
addJobWithPriority(KEYS[7], priority, ARGV[2], target)
end

rcall(ARGV[1], target, ARGV[2])

return 0
else
return -1
Expand Down
Loading

0 comments on commit 09ce146

Please sign in to comment.