diff --git a/lib/commands/cleanJobsInSet-3.lua b/lib/commands/cleanJobsInSet-3.lua index a8298d1a1..4daabc1ea 100644 --- a/lib/commands/cleanJobsInSet-3.lua +++ b/lib/commands/cleanJobsInSet-3.lua @@ -7,7 +7,7 @@ KEYS[3] rate limiter key ARGV[1] jobId - ARGV[2] timestamp + ARGV[2] maxTimestamp ARGV[3] limit the number of jobs to be removed. 0 is unlimited ARGV[4] set name, can be any of 'wait', 'active', 'paused', 'delayed', 'completed', or 'failed' ]] @@ -21,15 +21,31 @@ local maxTimestamp = ARGV[2] local limitStr = ARGV[3] local setName = ARGV[4] -local command = "ZRANGE" local isList = false local rcall = redis.call if setName == "wait" or setName == "active" or setName == "paused" then - command = "LRANGE" isList = true end +-- We use ZRANGEBYSCORE to make the case where we're deleting a limited number +-- of items in a sorted set only run a single iteration. If we simply used +-- ZRANGE, we may take a long time traversing through jobs that are within the +-- grace period. +local function shouldUseZRangeByScore(isList, limit) + return not isList and limit > 0 +end + +local function getJobs(setKey, isList, rangeStart, rangeEnd, maxTimestamp, limit) + if isList then + return rcall("LRANGE", setKey, rangeStart, rangeEnd) + elseif shouldUseZRangeByScore(isList, limit) then + return rcall("ZRANGEBYSCORE", setKey, 0, maxTimestamp, "LIMIT", 0, limit) + else + return rcall("ZRANGE", setKey, rangeStart, rangeEnd) + end +end + local limit = tonumber(limitStr) local rangeStart = 0 local rangeEnd = -1 @@ -44,7 +60,7 @@ if limit > 0 then rangeEnd = -1 end -local jobIds = rcall(command, setKey, rangeStart, rangeEnd) +local jobIds = getJobs(setKey, isList, rangeStart, rangeEnd, maxTimestamp, limit) local deleted = {} local deletedCount = 0 local jobTS @@ -101,9 +117,9 @@ while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do end end - -- If we didn't have a limit, return immediately. We should have deleted - -- all the jobs we can - if limit <= 0 then + -- If we didn't have a limit or used the single-iteration ZRANGEBYSCORE + -- function, return immediately. We should have deleted all the jobs we can + if limit <= 0 or shouldUseZRangeByScore(isList, limit) then break end @@ -111,7 +127,7 @@ while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do -- We didn't delete enough. Look for more to delete rangeStart = rangeStart - limit rangeEnd = rangeEnd - limit - jobIds = rcall(command, setKey, rangeStart, rangeEnd) + jobIds = getJobs(setKey, isList, rangeStart, rangeEnd, maxTimestamp, limit) end end diff --git a/test/test_queue.js b/test/test_queue.js index 0706515f8..99322f648 100644 --- a/test/test_queue.js +++ b/test/test_queue.js @@ -3098,5 +3098,59 @@ describe('Queue', () => { done(); }); }); + + it('should clean completed jobs outside grace period', async () => { + queue.process((job, jobDone) => { + jobDone(); + }); + const [jobToClean] = await Promise.all([ + queue.add({ some: 'oldJob' }), + queue.add({ some: 'gracePeriodJob' }, { delay: 50 }), + ]); + await delay(100); + + const cleaned = await queue.clean(75, 'completed'); + + expect(cleaned.length).to.be.eql(1); + expect(cleaned[0]).to.eql(jobToClean.id); + }); + + it('should clean completed jobs outside grace period with limit', async () => { + queue.process((job, jobDone) => { + jobDone(); + }); + const [jobToClean] = await Promise.all([ + queue.add({ some: 'oldJob' }), + queue.add({ some: 'gracePeriodJob' }, { delay: 50 }), + ]); + await delay(100); + + const cleaned = await queue.clean(75, 'completed', 10); + + expect(cleaned.length).to.be.eql(1); + expect(cleaned[0]).to.eql(jobToClean.id); + }); + + it('should clean completed jobs respecting limit', async () => { + queue.process((job, jobDone) => { + jobDone(); + }); + const jobsToCleanPromises = []; + for (let i = 0; i < 3; i++) { + jobsToCleanPromises.push(queue.add({ some: 'jobToClean' })); + } + + const [jobsToClean] = await Promise.all([ + Promise.all(jobsToCleanPromises), + queue.add({ some: 'gracePeriodJob' }, { delay: 50 }), + ]); + await delay(100); + + const cleaned = await queue.clean(75, 'completed', 1); + + expect(cleaned.length).to.be.eql(1); + const jobsToCleanIds = jobsToClean.map(job => job.id); + expect(jobsToCleanIds).to.include(cleaned[0]); + }); }); });