Skip to content

Commit

Permalink
fix: faster delay handling. fixes #1893
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Oct 30, 2020
1 parent 4fa8708 commit 4f5e388
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 177 deletions.
25 changes: 11 additions & 14 deletions lib/commands/moveToActive-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
KEYS[1] wait key
KEYS[2] active key
KEYS[3] priority key
KEYS[4] active event key
KEYS[4] active key
KEYS[5] stalled key
-- Rate limiting
Expand All @@ -34,12 +34,11 @@

local rcall = redis.call

local rateLimit = function(jobId, maxJobs)
local rateLimiterKey = KEYS[6];
local rateLimit = function(activeKey, rateLimiterKey, delayedKey, jobId, timestamp, maxJobs, timeUnit, bounceBack, groupLimit)
local limiterIndexTable = rateLimiterKey .. ":index"

-- Rate limit by group?
if(ARGV[9]) then
if(groupLimit) then
local group = string.match(jobId, "[^:]+$")
if group ~= nil then
rateLimiterKey = rateLimiterKey .. ":" .. group
Expand Down Expand Up @@ -68,7 +67,7 @@ local rateLimit = function(jobId, maxJobs)

if numLimitedJobs > 0 then
-- Note, add some slack to compensate for drift.
delay = ((numLimitedJobs * ARGV[7] * 1.1) / maxJobs) + tonumber(rcall("PTTL", rateLimiterKey))
delay = ((numLimitedJobs * timeUnit * 1.1) / maxJobs) + tonumber(rcall("PTTL", rateLimiterKey))
end
end

Expand All @@ -80,31 +79,29 @@ local rateLimit = function(jobId, maxJobs)
if (delay == 0) and (jobCounter >= maxJobs) then
-- Seems like there are no current rated limited jobs, but the jobCounter has exceeded the number of jobs for this unit of time so we need to rate limit this job.
local exceedingJobs = jobCounter - maxJobs
delay = tonumber(rcall("PTTL", rateLimiterKey)) + ((exceedingJobs) * ARGV[7]) / maxJobs
delay = tonumber(rcall("PTTL", rateLimiterKey)) + ((exceedingJobs) * timeUnit) / maxJobs
end

if delay > 0 then
local bounceBack = ARGV[8]
if bounceBack == 'false' then
local timestamp = delay + tonumber(ARGV[4])
local timestamp = delay + tonumber(timestamp)
-- put job into delayed queue
rcall("ZADD", KEYS[7], timestamp * 0x1000 + bit.band(jobCounter, 0xfff), jobId)
rcall("PUBLISH", KEYS[7], timestamp)
rcall("ZADD", delayedKey, timestamp * 0x1000 + bit.band(jobCounter, 0xfff), jobId)
rcall("PUBLISH", delayedKey, timestamp)
rcall("SADD", limitedSetKey, jobId)

-- store index so that we can delete rate limited data
rcall("HSET", limiterIndexTable, jobId, limitedSetKey)

end

-- remove from active queue
rcall("LREM", KEYS[2], 1, jobId)
rcall("LREM", activeKey, 1, jobId)
return true
else
-- false indicates not rate limited
-- increment jobCounter only when a job is not rate limited
if (jobCounter == 0) then
rcall("PSETEX", rateLimiterKey, ARGV[7], 1)
rcall("PSETEX", rateLimiterKey, timeUnit, 1)
else
rcall("INCR", rateLimiterKey)
end
Expand All @@ -127,7 +124,7 @@ if jobId then
local maxJobs = tonumber(ARGV[6])

if maxJobs then
if rateLimit(jobId, maxJobs) then
if rateLimit(KEYS[2], KEYS[6], KEYS[7], jobId, ARGV[4], maxJobs, ARGV[7], ARGV[8], ARGV[9]) then
return
end
end
Expand Down
118 changes: 0 additions & 118 deletions lib/commands/moveToFinished-7.lua

This file was deleted.

Loading

0 comments on commit 4f5e388

Please sign in to comment.