From e5768a0703a16fa17888b83c2083a684c08765dd Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Sun, 2 Sep 2018 14:40:06 +0200 Subject: [PATCH] emit waiting event when moving job from delayed to wait list --- lib/commands/index.js | 4 ---- lib/commands/updateDelaySet-4.lua | 34 +++++++++++++++++-------------- lib/scripts.js | 2 +- test/test_repeat.js | 23 ++++++++++++++++++++- 4 files changed, 42 insertions(+), 21 deletions(-) diff --git a/lib/commands/index.js b/lib/commands/index.js index 9bc5cc097..133205bc4 100644 --- a/lib/commands/index.js +++ b/lib/commands/index.js @@ -20,10 +20,6 @@ var utils = require('../utils'); fs = Promise.promisifyAll(fs); -// -// for some very strange reason, defining scripts with this code results in this error -// when executing the scripts: ERR value is not an integer or out of range -// module.exports = (function() { var scripts; diff --git a/lib/commands/updateDelaySet-4.lua b/lib/commands/updateDelaySet-4.lua index 4db63e55c..9e666b716 100644 --- a/lib/commands/updateDelaySet-4.lua +++ b/lib/commands/updateDelaySet-4.lua @@ -10,44 +10,48 @@ ARGV[1] queue.toKey('') ARGV[2] delayed timestamp + ARGV[3] queue token Events: 'removed' ]] -local RESULT = redis.call("ZRANGE", KEYS[1], 0, 0, "WITHSCORES") +local rcall = redis.call; +local RESULT = rcall("ZRANGE", KEYS[1], 0, 0, "WITHSCORES") local jobId = RESULT[1] local score = RESULT[2] if (score ~= nil) then score = score / 0x1000 if (math.floor(score) <= tonumber(ARGV[2])) then - redis.call("ZREM", KEYS[1], jobId) - redis.call("LREM", KEYS[2], 0, jobId) + rcall("ZREM", KEYS[1], jobId) + rcall("LREM", KEYS[2], 0, jobId) - local priority = tonumber(redis.call("HGET", ARGV[1] .. jobId, "priority")) or 0 + local priority = tonumber(rcall("HGET", ARGV[1] .. jobId, "priority")) or 0 if priority == 0 then -- LIFO or FIFO - redis.call("LPUSH", KEYS[3], jobId) + rcall("LPUSH", KEYS[3], jobId) else -- Priority add - redis.call("ZADD", KEYS[4], priority, jobId) - local count = redis.call("ZCOUNT", KEYS[4], 0, priority) + rcall("ZADD", KEYS[4], priority, jobId) + local count = rcall("ZCOUNT", KEYS[4], 0, priority) - local len = redis.call("LLEN", KEYS[3]) - local id = redis.call("LINDEX", KEYS[3], len - (count-1)) + local len = rcall("LLEN", KEYS[3]) + local id = rcall("LINDEX", KEYS[3], len - (count-1)) if id then - redis.call("LINSERT", KEYS[3], "BEFORE", id, jobId) + rcall("LINSERT", KEYS[3], "BEFORE", id, jobId) else - redis.call("RPUSH", KEYS[3], jobId) + rcall("RPUSH", KEYS[3], jobId) end - end - redis.call("HSET", ARGV[1] .. jobId, "delay", 0) - local nextTimestamp = redis.call("ZRANGE", KEYS[1], 0, 0, "WITHSCORES")[2] + -- Emit waiting event (wait..ing@token) + rcall("PUBLISH", KEYS[3] .. "ing@" .. ARGV[3], jobId) + + rcall("HSET", ARGV[1] .. jobId, "delay", 0) + local nextTimestamp = rcall("ZRANGE", KEYS[1], 0, 0, "WITHSCORES")[2] if(nextTimestamp ~= nil) then nextTimestamp = nextTimestamp / 0x1000 - redis.call("PUBLISH", KEYS[1], nextTimestamp) + rcall("PUBLISH", KEYS[1], nextTimestamp) end return nextTimestamp end diff --git a/lib/scripts.js b/lib/scripts.js index ba3c5c35d..fecf0ad09 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -330,7 +330,7 @@ var scripts = { queue.keys.wait, queue.keys.priority ]; - var args = [queue.toKey(''), delayedTimestamp]; + var args = [queue.toKey(''), delayedTimestamp, queue.token]; return queue.client.updateDelaySet(keys.concat(args)); }, diff --git a/test/test_repeat.js b/test/test_repeat.js index 6bb99dd03..08c976bac 100644 --- a/test/test_repeat.js +++ b/test/test_repeat.js @@ -334,7 +334,7 @@ describe('repeat', function() { }); }); - it('should not re-add a repeatable job after it has been deleted', function() { + it('should not re-add a repeatable job after it has been removed', function() { var _this = this; var date = new Date('2017-02-07 9:24:00'); var nextTick = 2 * ONE_SECOND; @@ -560,4 +560,25 @@ describe('repeat', function() { } ); }); + + it('should emit a waiting event when adding a repeatable job to the waiting list', function(done) { + var _this = this; + var date = new Date('2017-02-07 9:24:00'); + this.clock.tick(date.getTime()); + var nextTick = 2 * ONE_SECOND + 500; + + queue.on('waiting', function() { + done(); + }); + + queue + .add('repeat', { foo: 'bar' }, { repeat: { cron: '*/2 * * * * *' } }) + .then(function() { + _this.clock.tick(nextTick); + }); + + queue.process('repeat', function() { + console.error('hiasd'); + }); + }); });