Skip to content

Commit

Permalink
Merge branch 'develop' into refactor-read-commands
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed May 12, 2024
2 parents 15a1b32 + afcf11c commit a5e1684
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 27 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
## [4.12.3](https://github.com/OptimalBits/bull/compare/v4.12.2...v4.12.3) (2024-05-10)


### Bug Fixes

* **job:** validate jobKey in updateProgress and update ([#2730](https://github.com/OptimalBits/bull/issues/2730)) ([6d84156](https://github.com/OptimalBits/bull/commit/6d8415696606d3b7ec891f7fca9ab0508923c321))


### Performance Improvements

* **scripts:** remove token after moving to wait or delayed ([#2731](https://github.com/OptimalBits/bull/issues/2731)) ([7ee8f74](https://github.com/OptimalBits/bull/commit/7ee8f7430a68492c9ce768e7108443592f49d74c))

## [4.12.2](https://github.com/OptimalBits/bull/compare/v4.12.1...v4.12.2) (2024-01-17)


Expand Down
7 changes: 7 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
version: '3.2'
services:
redis:
image: redis:6.2-alpine
container_name: cache
ports:
- 6379:6379
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
KEYS[1] active key
KEYS[2] delayed key
KEYS[3] job key
KEYS[4] stalled key
ARGV[1] delayedTimestamp
ARGV[2] the id of the job
Expand All @@ -21,14 +22,13 @@
local rcall = redis.call

if rcall("EXISTS", KEYS[3]) == 1 then
local lockKey
local lock

-- Check for job lock
if ARGV[3] ~= "0" then
lockKey = KEYS[3] .. ':lock'
lock = rcall("GET", lockKey)
if lock ~= ARGV[3] then
local lockKey = KEYS[3] .. ':lock'
if rcall("GET", lockKey) == ARGV[3] then
rcall("DEL", lockKey)
rcall("SREM", KEYS[4], ARGV[2])
else
return -2
end
end
Expand All @@ -38,10 +38,6 @@ if rcall("EXISTS", KEYS[3]) == 1 then
rcall("PUBLISH", KEYS[2], (score / 0x1000))
rcall("LREM", KEYS[1], 0, ARGV[2])

if lock then
rcall("DEL", lockKey)
end

return 0
else
return -1
Expand Down
7 changes: 5 additions & 2 deletions lib/commands/retryJob-5.lua → lib/commands/retryJob-6.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
KEYS[3] jobId
KEYS[4] 'meta-paused'
KEYS[5] 'paused'
KEYS[6] stalled key
ARGV[1] pushCmd
ARGV[2] jobId
Expand All @@ -26,8 +27,10 @@ if rcall("EXISTS", KEYS[3]) == 1 then
-- Check for job lock
if ARGV[3] ~= "0" then
local lockKey = KEYS[3] .. ':lock'
local lock = rcall("GET", lockKey)
if lock ~= ARGV[3] then
if rcall("GET", lockKey) == ARGV[3] then
rcall("DEL", lockKey)
rcall("SREM", KEYS[6], ARGV[2])
else
return -2
end
end
Expand Down
20 changes: 20 additions & 0 deletions lib/commands/updateData-1.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
--[[
Update job data
Input:
KEYS[1] Job id key
ARGV[1] data
Output:
0 - OK
-1 - Missing job.
]]
local rcall = redis.call

if rcall("EXISTS",KEYS[1]) == 1 then -- // Make sure job exists
rcall("HSET", KEYS[1], "data", ARGV[1])
return 0
else
return -1
end
10 changes: 8 additions & 2 deletions lib/commands/updateProgress-2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,11 @@
Event:
progress(jobId, progress)
]]
redis.call("HSET", KEYS[1], "progress", ARGV[1])
redis.call("PUBLISH", KEYS[2], ARGV[2])
local rcall = redis.call
if rcall("EXISTS", KEYS[1]) == 1 then -- // Make sure job exists
rcall("HSET", KEYS[1], "progress", ARGV[1])
rcall("PUBLISH", KEYS[2], ARGV[2])
return 0
else
return -1
end
12 changes: 6 additions & 6 deletions lib/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,13 @@ Job.prototype.progress = function(progress) {
return scripts.updateProgress(this, progress);
};

Job.prototype.update = function(data) {
Job.prototype.update = async function(data) {
this.data = data;
return this.queue.client.hset(
this.queue.toKey(this.id),
'data',
JSON.stringify(data)
);
const code = await scripts.updateData(this, data);

if (code < 0) {
throw scripts.finishedErrors(code, this.id, 'updateData');
}
};

Job.prototype.toJSON = function() {
Expand Down
22 changes: 19 additions & 3 deletions lib/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,27 @@ const scripts = {
progressJson,
JSON.stringify({ jobId: job.id, progress })
])
.then(() => {
.then((code) => {
if (code < 0) {
throw scripts.finishedErrors(code, job.id, 'updateProgress');
}
queue.emit('progress', job, progress);
});
},

updateData(job, data) {
const queue = job.queue;
const keys = [job.id].map(name => {
return queue.toKey(name);
});
const dataJson = JSON.stringify(data);

return queue.client
.updateData(keys, [
dataJson
]);
},

retryJobsArgs(queue, count) {
const keys = [
queue.toKey(''),
Expand Down Expand Up @@ -291,7 +307,7 @@ const scripts = {
timestamp = timestamp * 0x1000 + (jobId & 0xfff);
}

const keys = _.map(['active', 'delayed', jobId], name => {
const keys = _.map(['active', 'delayed', jobId, 'stalled'], name => {
return queue.toKey(name);
});
return keys.concat([
Expand Down Expand Up @@ -462,7 +478,7 @@ const scripts = {
const jobId = job.id;

const keys = _.map(
['active', 'wait', jobId, 'meta-paused', 'paused'],
['active', 'wait', jobId, 'meta-paused', 'paused', 'stalled'],
name => {
return queue.toKey(name);
}
Expand Down
8 changes: 5 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bull",
"version": "4.12.2",
"version": "4.12.3",
"description": "Job manager",
"engines": {
"node": ">=12"
Expand Down Expand Up @@ -63,8 +63,9 @@
},
"scripts": {
"clean:scripts": "rimraf rawScripts lib/scripts",
"dc:up": "docker-compose -f docker-compose.yml up -d",
"dc:down": "docker-compose -f docker-compose.yml down",
"generate:raw:scripts": "node generateRawScripts.js",
"transform:commands": "node ./commandTransform.js ./rawScripts ./lib/scripts",
"pretest": "npm-run-all clean:scripts generate:raw:scripts transform:commands lint",
"lint": "eslint lib test *.js",
"test": "NODE_ENV=test nyc mocha -- 'test/test_*' --recursive --exit",
Expand All @@ -73,7 +74,8 @@
"postpublish": "git push && git push --tags",
"prettier": "prettier --config package.json --write '**/*.js'",
"precommit": "lint-staged",
"build": "tsc"
"build": "tsc",
"transform:commands": "node ./commandTransform.js ./rawScripts ./lib/scripts"
},
"lint-staged": {
"*.{js,json}": [
Expand Down
22 changes: 21 additions & 1 deletion test/test_job.js
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,16 @@ describe('Job', () => {
});
});
});

describe('when job was removed', () => {
it('throws an error', async () => {
const job = await Job.create(queue, { foo: 'bar' });
await job.remove();
await job.update({baz: 'qux'}).catch(err => {
expect(err.message).to.be.equal('Missing key for job 1 updateData');
});
});
});
});

describe('.remove', () => {
Expand Down Expand Up @@ -520,7 +530,7 @@ describe('Job', () => {
it('can set and get progress as number', () => {
return Job.create(queue, { foo: 'bar' }).then(job => {
return job.progress(42).then(() => {
return Job.fromId(queue, job.id).then(storedJob => {
return Job.fromId(queue, job.id).then(async storedJob => {
expect(storedJob.progress()).to.be(42);
});
});
Expand All @@ -532,6 +542,16 @@ describe('Job', () => {
const storedJob = await Job.fromId(queue, job.id);
expect(storedJob.progress()).to.eql({ total: 120, completed: 40 });
});

describe('when job was removed', () => {
it('throws an error', async () => {
const job = await Job.create(queue, { foo: 'bar' });
await job.remove();
await job.progress({ total: 120, completed: 40 }).catch(err => {
expect(err.message).to.be.equal('Missing key for job 1 updateProgress');
});
});
});
});

describe('.log', () => {
Expand Down

0 comments on commit a5e1684

Please sign in to comment.