Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Atomic move #496

Merged
merged 17 commits into from Apr 16, 2017
@@ -1,3 +1,10 @@
v.3.0.0-alpha
=============

- job.jobId changed to job.id.
- refactored error messages into separate error module.
- completed and failed job states are now represented in ZSETs.

This comment has been minimized.

Copy link
@xdc0

xdc0 Apr 17, 2017

Contributor

Could you please add the reasoning behind this move?

This comment has been minimized.

Copy link
@manast

manast Apr 18, 2017

Author Member
  • job.jobId -> job.id to reduce redundancy.
  • error messages refactored so that we can unit test without needing to copy/paste the messages.
  • ZSETs allows us to efficiently get ranges of completed and failed jobs, very useful for building UIs and scripts.

v.2.2.6
=======

@@ -0,0 +1,99 @@
--[[
Adds a job to the queue by doing the following:
- Increases the job counter if needed.
- Creates a new job key with the job data.
- if delayed:
- computes timestamp.
- adds to delayed zset.
- Emits a global event 'delayed' if the job is delayed.
- if not delayed
- Adds the jobId to the wait/paused list in one of three ways:
- LIFO
- FIFO
- prioritized.
- Emits a global event 'waiting' if not paused.
Input:
KEYS[1] 'wait',
KEYS[2] 'paused'
KEYS[3] 'meta-paused'
KEYS[4] 'added'
KEYS[5] 'id'
KEYS[6] 'delayed'
KEYS[7] 'priority'
ARGV[1] key prefix,
ARGV[2] custom id (will not generate one automatically)
ARGV[3] name
ARGV[4] data (json stringified job data)
ARGV[5] opts (json stringified job opts)
ARGV[6] timestamp
ARGV[7] delay
ARGV[8] delayedTimestamp
ARGV[9] priority
ARGV[10] LIFO
Events:
'waiting'
]]
local jobCounter = redis.call("INCR", KEYS[5])
local jobId
if ARGV[2] == "" then
jobId = jobCounter
else
jobId = ARGV[2]
end

local jobIdKey = ARGV[1] .. jobId
if redis.call("EXISTS", jobIdKey) == 1 then
return jobId .. "" -- convert to string
end

-- Store the job.
redis.call("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", ARGV[5], "timestamp", ARGV[6], "delay", ARGV[7])

-- Check if job is delayed
if(ARGV[8] ~= "0") then
local timestamp = tonumber(ARGV[8]) * 0x1000 + bit.band(jobCounter, 0xfff)
redis.call("ZADD", KEYS[6], timestamp, jobId)
redis.call("PUBLISH", KEYS[6], (timestamp / 0x1000))
return jobId .. "" -- convert to string
else
local direction
local target

if ARGV[10] == "LIFO" then
direction = "RPUSH"
else
direction = "LPUSH"
end

-- Whe check for the meta-paused key to decide if we are paused or not
-- (since an empty list and !EXISTS are not really the same)
if redis.call("EXISTS", KEYS[3]) ~= 1 then
target = KEYS[1]
else
target = KEYS[2]
end

if ARGV[9] == "0" then
-- Standard add
redis.call(direction, target, jobId)
else
-- Priority add
redis.call("ZADD", KEYS[7], ARGV[9], jobId)
local count = redis.call("ZCOUNT", KEYS[7], 0, ARGV[9])

local len = redis.call("LLEN", target)
local id = redis.call("LINDEX", target, len - (count-1))
if id then
redis.call("LINSERT", target, "BEFORE", id, jobId)
else
redis.call("RPUSH", target, jobId)
end
end

redis.call("PUBLISH", KEYS[4], jobId)

This comment has been minimized.

Copy link
@xdc0

xdc0 Apr 17, 2017

Contributor

Is the top documentation correct? It says that this will emit a waiting event if not added but it looks like it never emits a waiting event, it emits an added event regardless if the queue is paused or not.

This comment has been minimized.

Copy link
@manast

manast Apr 18, 2017

Author Member

good catch. I will fix it.

return jobId .. "" -- convert to string
end
@@ -0,0 +1,19 @@
--[[
Extend lock
Input:
KEYS[1] 'lock',
ARGV[1] token
ARGV[2] lock duration in milliseconds
Output:
"OK" if lock extented succesfully.
]]

if redis.call("GET", KEYS[1]) == ARGV[1] then
if redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2]) then
return 1
end
end
return 0
@@ -0,0 +1,45 @@
/**
* Load redis lua scripts.
* The name of the script must have the following format:
*
* cmdName-numKeys.lua
*
* cmdName must be in camel case format.
*
* For example:
* moveToFinish-3.lua
*
*/
'use strict';

var fs = require('fs');
var path = require('path');
var Promise = require('bluebird');

fs = Promise.promisifyAll(fs);

//
// for some very strange reason, defining scripts with this code results in this error
// when executing the scripts: ERR value is not an integer or out of range
//
module.exports = function(client){
return loadScripts(client, __dirname);
}

function loadScripts(client, dir) {
return fs.readdirAsync(dir).then(function(files){
return Promise.all(files.filter(function (file) {
return path.extname(file) === '.lua';
}).map(function (file) {
var longName = path.basename(file, '.lua');
var name = longName.split('-')[0];
var numberOfKeys = parseInt(longName.split('-')[1]);

return fs.readFileAsync(path.join(dir, file)).then(function(lua){
client.defineCommand(name, { numberOfKeys: numberOfKeys, lua: lua.toString() });
}, function(err){
console.log('Error reading script file', err)
});
}));
});
};
@@ -0,0 +1,22 @@
--[[
Checks if a job is finished (.i.e. is in the completed or failed set)
Input:
KEYS[1] completed key
KEYS[2] failed key
ARGV[1] job id
Output:
0 - not finished.
1 - completed.
2 - failed.
]]
if redis.call("ZSCORE", KEYS[1], ARGV[1]) ~= false then
return 1
end

if redis.call("ZSCORE", KEYS[2], ARGV[1]) ~= false then
return 2
end

return redis.call("ZSCORE", KEYS[2], ARGV[1])
@@ -0,0 +1,20 @@
--[[
Checks if job is in a given list.
Input:
KEYS[1]
ARGV[1]
Output:
1 if element found in the list.
]]
local function item_in_list (list, item)
for _, v in pairs(list) do
if v == item then
return 1
end
end
return nil
end
local items = redis.call("LRANGE", KEYS[1] , 0, -1)
return item_in_list(items, ARGV[1])
@@ -0,0 +1,35 @@
--[[
Move next job to be processed to active, lock it and fetch its data. The job
may be delayed, in that case we need to move it to the delayed set instead.
This operation guarantees that the worker owns the job during the locks
expiration time. The worker is responsible of keeping the lock fresh
so that no other worker picks this job again.
Note: This command only works in non-distributed redis deployments.
Input:
KEYS[1] wait key
KEYS[2] active key
KEYS[3] priority key
ARGV[1] key prefix
ARGV[2] lock token
ARGV[3] lock duration in milliseconds
]]

local jobId = redis.call("LINDEX", KEYS[1], -1)

if jobId then
local jobKey = ARGV[1] .. jobId
local lockKey = jobKey .. ':lock'

-- get a the lock
redis.call("SET", lockKey, ARGV[2], "PX", ARGV[3])
redis.call("LREM", KEYS[1], 1, jobId) -- remove from wait
redis.call("ZREM", KEYS[3], jobId) -- remove from priority
redis.call("LPUSH", KEYS[2], jobId) -- push in active

return {redis.call("HGETALL", jobKey), jobId} -- get job data
end

@@ -0,0 +1,32 @@
--[[
Moves job from active to delayed set.
Input:
KEYS[1] active key
KEYS[2] delayed key
KEYS[3] job key
ARGV[1] delayedTimestamp
ARGV[2] the id of the job
ARGV[3] timestamp
Output:
0 - OK
-1 - Missing job.
Events:
- delayed key.
]]
if redis.call("EXISTS", KEYS[3]) == 1 then
local score = tonumber(ARGV[1])
if score ~= 0 then
redis.call("ZADD", KEYS[2], score, ARGV[2])
redis.call("PUBLISH", KEYS[2], (score / 0x1000))
else
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
end
redis.call("LREM", KEYS[1], 0, ARGV[2])
return 0;
else
return -1
end
@@ -0,0 +1,56 @@
--[[
Move job from active to a finished status (completed o failed)
A job can only be moved to completed if it was active.
The job must be locked before it can be moved to a finished status,
and the lock must be released in this script.
Input:
KEYS[1] active key
KEYS[2] completed/failed key
KEYS[3] jobId key
ARGV[1] jobId
ARGV[2] timestamp
ARGV[3] msg property
ARGV[4] return value / failed reason
ARGV[5] token
ARGV[6] shouldRemove
ARGV[7] event channel
ARGV[8] event data (? maybe just send jobid).
Output:
0 OK
-1 Missing key.

This comment has been minimized.

Copy link
@xdc0

xdc0 Apr 17, 2017

Contributor

Looking at the lua script here, -1 means both, that the job is locked and that the key is missing.

This comment has been minimized.

Copy link
@manast

manast Apr 18, 2017

Author Member

you are right. I added a -2 error code when a lock is missing. I noticed however that these error codes are not being used, I need to fix it as well, it may be hidding errors right now.

Events:
'completed'
]]

if redis.call("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists
if ARGV[5] ~= "0" then
local lockKey = KEYS[3] .. ':lock'

This comment has been minimized.

Copy link
@xdc0

xdc0 Apr 17, 2017

Contributor

I've noticed that other scripts, such as moveToDelayed, pause, takeLock doesn't check for lock existence, but other scripts such as this does. It should be consistent how we do lock checking.

I know that if these operations are executed it is implied that a lock exists, but even then, we've run into issues in the past due workers stepping in each other toes (hopefully this fixing some of these problems!) so I'd vote to always check for locks and have Bull complain loudly when it happens, that way these race condition issues can be better understood and controlled.

This comment has been minimized.

Copy link
@manast

manast Apr 18, 2017

Author Member

I will add issue for such an improvement.

if redis.call("GET", lockKey) == ARGV[5] then
redis.call("DEL", lockKey)
else
return -1
end
end

-- Remove from active list
redis.call("LREM", KEYS[1], -1, ARGV[1])

-- Remove job?
if ARGV[6] == "1" then
redis.call("DEL", KEYS[3])
else
-- Add to complete/failed set
redis.call("ZADD", KEYS[2], ARGV[2], ARGV[1])
redis.call("HSET", KEYS[3], ARGV[3], ARGV[4]) -- "returnvalue" / "failedReason"
end

-- TODO PUBLISH EVENT, this is the most optimal way.
--redis.call("PUBLISH", ...)
return 0
else
return -1
end
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.