Skip to content

Commit

Permalink
Merge pull request #1043 from OptimalBits/emit-waiting-delayed
Browse files Browse the repository at this point in the history
emit waiting event when moving job from delayed to wait list
  • Loading branch information
manast committed Sep 2, 2018
2 parents af1dd4c + e5768a0 commit 3d15fff
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 21 deletions.
4 changes: 0 additions & 4 deletions lib/commands/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ var utils = require('../utils');

fs = Promise.promisifyAll(fs);

//
// for some very strange reason, defining scripts with this code results in this error
// when executing the scripts: ERR value is not an integer or out of range
//
module.exports = (function() {
var scripts;

Expand Down
34 changes: 19 additions & 15 deletions lib/commands/updateDelaySet-4.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,44 +10,48 @@
ARGV[1] queue.toKey('')
ARGV[2] delayed timestamp
ARGV[3] queue token
Events:
'removed'
]]
local RESULT = redis.call("ZRANGE", KEYS[1], 0, 0, "WITHSCORES")
local rcall = redis.call;
local RESULT = rcall("ZRANGE", KEYS[1], 0, 0, "WITHSCORES")
local jobId = RESULT[1]
local score = RESULT[2]
if (score ~= nil) then
score = score / 0x1000
if (math.floor(score) <= tonumber(ARGV[2])) then
redis.call("ZREM", KEYS[1], jobId)
redis.call("LREM", KEYS[2], 0, jobId)
rcall("ZREM", KEYS[1], jobId)
rcall("LREM", KEYS[2], 0, jobId)

local priority = tonumber(redis.call("HGET", ARGV[1] .. jobId, "priority")) or 0
local priority = tonumber(rcall("HGET", ARGV[1] .. jobId, "priority")) or 0

if priority == 0 then
-- LIFO or FIFO
redis.call("LPUSH", KEYS[3], jobId)
rcall("LPUSH", KEYS[3], jobId)
else
-- Priority add
redis.call("ZADD", KEYS[4], priority, jobId)
local count = redis.call("ZCOUNT", KEYS[4], 0, priority)
rcall("ZADD", KEYS[4], priority, jobId)
local count = rcall("ZCOUNT", KEYS[4], 0, priority)

local len = redis.call("LLEN", KEYS[3])
local id = redis.call("LINDEX", KEYS[3], len - (count-1))
local len = rcall("LLEN", KEYS[3])
local id = rcall("LINDEX", KEYS[3], len - (count-1))
if id then
redis.call("LINSERT", KEYS[3], "BEFORE", id, jobId)
rcall("LINSERT", KEYS[3], "BEFORE", id, jobId)
else
redis.call("RPUSH", KEYS[3], jobId)
rcall("RPUSH", KEYS[3], jobId)
end

end

redis.call("HSET", ARGV[1] .. jobId, "delay", 0)
local nextTimestamp = redis.call("ZRANGE", KEYS[1], 0, 0, "WITHSCORES")[2]
-- Emit waiting event (wait..ing@token)
rcall("PUBLISH", KEYS[3] .. "ing@" .. ARGV[3], jobId)

rcall("HSET", ARGV[1] .. jobId, "delay", 0)
local nextTimestamp = rcall("ZRANGE", KEYS[1], 0, 0, "WITHSCORES")[2]
if(nextTimestamp ~= nil) then
nextTimestamp = nextTimestamp / 0x1000
redis.call("PUBLISH", KEYS[1], nextTimestamp)
rcall("PUBLISH", KEYS[1], nextTimestamp)
end
return nextTimestamp
end
Expand Down
2 changes: 1 addition & 1 deletion lib/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ var scripts = {
queue.keys.wait,
queue.keys.priority
];
var args = [queue.toKey(''), delayedTimestamp];
var args = [queue.toKey(''), delayedTimestamp, queue.token];
return queue.client.updateDelaySet(keys.concat(args));
},

Expand Down
23 changes: 22 additions & 1 deletion test/test_repeat.js
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ describe('repeat', function() {
});
});

it('should not re-add a repeatable job after it has been deleted', function() {
it('should not re-add a repeatable job after it has been removed', function() {
var _this = this;
var date = new Date('2017-02-07 9:24:00');
var nextTick = 2 * ONE_SECOND;
Expand Down Expand Up @@ -560,4 +560,25 @@ describe('repeat', function() {
}
);
});

it('should emit a waiting event when adding a repeatable job to the waiting list', function(done) {
var _this = this;
var date = new Date('2017-02-07 9:24:00');
this.clock.tick(date.getTime());
var nextTick = 2 * ONE_SECOND + 500;

queue.on('waiting', function() {
done();
});

queue
.add('repeat', { foo: 'bar' }, { repeat: { cron: '*/2 * * * * *' } })
.then(function() {
_this.clock.tick(nextTick);
});

queue.process('repeat', function() {
console.error('hiasd');
});
});
});

0 comments on commit 3d15fff

Please sign in to comment.