Skip to content

Commit

Permalink
feat: add support for removeOn based on time
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Jan 26, 2022
1 parent 99a51c7 commit 90f040c
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 47 deletions.
29 changes: 27 additions & 2 deletions REFERENCE.md
Expand Up @@ -290,15 +290,40 @@ interface JobOpts {
// jobId is unique. If you attempt to add a job with an id that
// already exists, it will not be added (see caveat below about repeatable jobs).

removeOnComplete: boolean | number; // If true, removes the job when it successfully
removeOnComplete: boolean | number | KeepJobs; // If true, removes the job when it successfully
// completes. A number specified the amount of jobs to keep. Default behavior is to keep the job in the completed set.
// See KeepJobs if using that interface instead.

removeOnFail: boolean | number; // If true, removes the job when it fails after all attempts. A number specified the amount of jobs to keep
removeOnFail: boolean | number | KeepJobs; // If true, removes the job when it fails after all attempts. A number specified the amount of jobs to keep, see KeepJobs if using that interface instead.
// Default behavior is to keep the job in the failed set.
stackTraceLimit: number; // Limits the amount of stack trace lines that will be recorded in the stacktrace.
}
```

#### KeepJobs Options
```typescript
/**
* KeepJobs
*
* Specify which jobs to keep after finishing. If both age and count are
* specified, then the jobs kept will be the ones that satisfies both
* properties.
*/
export interface KeepJobs {
/**
* Maximum age in *seconds* for job to be kept.
*/
age?: number;

/**
* Maximum count of jobs to be kept.
*/
count?: number;
}
```

---

#### Timeout Implementation

It is important to note that jobs are _not_ proactively stopped after the given `timeout`. The job is marked as failed
Expand Down
56 changes: 27 additions & 29 deletions lib/commands/moveToFinished-8.lua
Expand Up @@ -51,48 +51,46 @@ if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists
rcall("LREM", KEYS[1], -1, ARGV[1])

-- Remove job?
local removeJobs = tonumber(ARGV[6])
if removeJobs ~= 1 then
local keepJobs = cmsgpack.unpack(ARGV[6])
local maxCount = keepJobs['count']
local maxAge = keepJobs['age']
local targetSet = KEYS[2]
local timestamp = ARGV[2]

if maxCount ~= 0 then

-- Add to complete/failed set
rcall("ZADD", KEYS[2], ARGV[2], ARGV[1])
rcall("HMSET", KEYS[3], ARGV[3], ARGV[4], "finishedOn", ARGV[2]) -- "returnvalue" / "failedReason" and "finishedOn"
rcall("ZADD", targetSet, timestamp, ARGV[1])
rcall("HMSET", KEYS[3], ARGV[3], ARGV[4], "finishedOn", timestamp) -- "returnvalue" / "failedReason" and "finishedOn"

-- Remove old jobs?
if removeJobs and removeJobs > 1 then
local start = removeJobs - 1
local jobIds = rcall("ZREVRANGE", KEYS[2], start, -1)
local function removeJobs(jobIds)
for i, jobId in ipairs(jobIds) do
local jobKey = ARGV[9] .. jobId
local jobLogKey = jobKey .. ':logs'
rcall("DEL", jobKey, jobLogKey)
end
rcall("ZREMRANGEBYRANK", KEYS[2], 0, -removeJobs);
end

-- Remove old jobs?
if maxAge ~= nil then
local start = timestamp - maxAge * 1000
local jobIds = rcall("ZREVRANGEBYSCORE", targetSet, start, "-inf")
removeJobs(jobIds)
rcall("ZREMRANGEBYSCORE", targetSet, "-inf", start)
end

if maxCount ~= nil and maxCount > 0 then
local start = maxCount
local jobIds = rcall("ZREVRANGE", targetSet, start, -1)
removeJobs(jobIds)
rcall("ZREMRANGEBYRANK", targetSet, 0, -(maxCount + 1));
end
else
local jobLogKey = KEYS[3] .. ':logs'
rcall("DEL", KEYS[3], jobLogKey)
end

rcall("PUBLISH", KEYS[2], ARGV[7])

-- -- Check if we should get from the delayed set instead of the waiting list
-- local delayedJobId = rcall("ZRANGEBYSCORE", KEYS[7], 0, tonumber(ARGV[2]) * 0x1000, "LIMIT", 0, 1)[1]
-- if delayedJobId ~= nil then
-- local jobId = delayedJobId
-- if jobId then
-- local jobKey = ARGV[9] .. jobId
-- local lockKey = jobKey .. ':lock'

-- -- get a lock
-- rcall("SET", lockKey, ARGV[11], "PX", ARGV[10])

-- rcall("ZREM", KEYS[5], jobId) -- remove from priority
-- rcall("PUBLISH", KEYS[6], jobId)
-- rcall("HSET", jobKey, "processedOn", ARGV[2])

-- return {rcall("HGETALL", jobKey), jobId} -- get job data
-- end
-- end
rcall("PUBLISH", targetSet, ARGV[7])

-- Try to get next job to avoid an extra roundtrip if the queue is not closing,
-- and not rate limited.
Expand Down
30 changes: 18 additions & 12 deletions lib/scripts.js
Expand Up @@ -5,6 +5,14 @@
'use strict';

const _ = require('lodash');
const msgpackr = require('msgpackr');

const packer = new msgpackr.Packr({
useRecords: false,
encodeUndefinedAsNil: true
});

const pack = packer.pack;

const scripts = {
isJobInList(client, listKey, jobId) {
Expand Down Expand Up @@ -129,19 +137,21 @@ const scripts = {
queueKeys.stalled
];

if (typeof shouldRemove === 'boolean') {
shouldRemove = shouldRemove ? '1' : '0';
} else if (typeof shouldRemove === 'number') {
shouldRemove = `${shouldRemove + 1}`;
}
const keepJobs = pack(
typeof shouldRemove === 'object'
? shouldRemove
: typeof shouldRemove === 'number'
? { count: shouldRemove }
: { count: shouldRemove ? 0 : -1 }
);

const args = [
job.id,
job.finishedOn,
propVal,
_.isUndefined(val) ? 'null' : val,
ignoreLock ? '0' : queue.token,
shouldRemove,
keepJobs,
JSON.stringify({ jobId: job.id, val: val }),
notFetch || queue.paused || queue.closing || queue.limiter ? 0 : 1,
queueKeys[''],
Expand Down Expand Up @@ -326,17 +336,13 @@ const scripts = {
return allRemoved;
},

extendLock(
queue,
jobId,
duration,
) {
extendLock(queue, jobId, duration) {
return queue.client.extendLock([
queue.toKey(jobId) + ':lock',
queue.keys.stalled,
queue.token,
duration,
jobId,
jobId
]);
},

Expand Down
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -25,6 +25,7 @@
"get-port": "^5.1.1",
"ioredis": "^4.27.0",
"lodash": "^4.17.21",
"msgpackr": "^1.5.2",
"p-timeout": "^3.2.0",
"semver": "^7.3.2",
"uuid": "^8.3.0"
Expand Down
101 changes: 97 additions & 4 deletions test/test_queue.js
Expand Up @@ -475,6 +475,73 @@ describe('Queue', () => {
});

describe('auto job removal', () => {
async function testRemoveOnFinish(opts, expectedCount, fail) {
const clock = sinon.useFakeTimers();
clock.reset();

queue.process(async job => {
await job.log('test log');
if (fail) {
throw new Error('job failed');
}
});

const datas = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14];

const processing = new Promise(resolve => {
queue.on(fail ? 'failed' : 'completed', async job => {
clock.tick(1000);

if (job.data == 14) {
const counts = await queue.getJobCounts(
fail ? 'failed' : 'completed'
);

if (fail) {
expect(counts.failed).to.be.equal(expectedCount);
} else {
expect(counts.completed).to.be.equal(expectedCount);
}

await Promise.all(
jobIds.map(async (jobId, index) => {
const job = await queue.getJob(jobId);
const logs = await queue.getJobLogs(jobId);

try {
if (index >= datas.length - expectedCount) {
expect(job).to.not.be.equal(null);
expect(logs.logs).to.not.be.empty;
} else {
expect(job).to.be.equal(null);
expect(logs.logs).to.be.empty;
}
} catch (err) {
console.error(err);
}
})
);

resolve();
}
});
});

const jobOpts = {};
if (fail) {
jobOpts.removeOnFail = opts;
} else {
jobOpts.removeOnComplete = opts;
}

const jobIds = (
await Promise.all(datas.map(async data => queue.add(data, jobOpts)))
).map(job => job.id);

await processing;
clock.restore();
}

it('should remove job after completed if removeOnComplete', done => {
queue
.process((job, jobDone) => {
Expand Down Expand Up @@ -545,6 +612,29 @@ describe('Queue', () => {
.catch(done);
});

it('should keep specified number of jobs after completed with removeOnComplete', async () => {
const keepJobs = 3;
await testRemoveOnFinish(keepJobs, keepJobs);
});

it('should keep of jobs newer than specified after completed with removeOnComplete', async () => {
const age = 7;
await testRemoveOnFinish({ age }, age);
});

it('should keep of jobs newer than specified and up to a count completed with removeOnComplete', async () => {
const age = 7;
const count = 5;
await testRemoveOnFinish({ age, count }, count);
});

it('should keep of jobs newer than specified and up to a count fail with removeOnFail', async () => {
const age = 7;
const count = 5;
await testRemoveOnFinish({ age, count }, count, true);
});

/*
it('should keep specified number of jobs after completed with removeOnComplete', async () => {
const keepJobs = 3;
queue.process(async job => {
Expand Down Expand Up @@ -584,6 +674,7 @@ describe('Queue', () => {
});
});
});
*/

it('should keep specified number of jobs after completed with global removeOnComplete', async () => {
const keepJobs = 3;
Expand Down Expand Up @@ -2121,7 +2212,6 @@ describe('Queue', () => {
queue.on('failed', cb);
queue.on('error', done);
});

});

describe('Retries and backoffs', () => {
Expand Down Expand Up @@ -2838,11 +2928,14 @@ describe('Queue', () => {
});
});

it('should clean the number of jobs requested even if first jobs timestamp doesn\'t match', async () => {
it("should clean the number of jobs requested even if first jobs timestamp doesn't match", async () => {
// This job shouldn't get deleted due to the 5000 grace
await queue.add({ some: 'data' });
// This job should get cleaned since 10000 > 5000 grace
const jobToClean = await queue.add({ some: 'data' }, { timestamp: Date.now() - 10000 });
const jobToClean = await queue.add(
{ some: 'data' },
{ timestamp: Date.now() - 10000 }
);
// This job shouldn't get deleted due to the 5000 grace
await queue.add({ some: 'data' });

Expand All @@ -2854,7 +2947,7 @@ describe('Queue', () => {
expect(len).to.be.eql(2);
});

it('shouldn\'t clean anything if all jobs are in grace period', async () => {
it("shouldn't clean anything if all jobs are in grace period", async () => {
await queue.add({ some: 'data' });
await queue.add({ some: 'data' });

Expand Down
25 changes: 25 additions & 0 deletions yarn.lock
Expand Up @@ -4519,11 +4519,31 @@ ms@2.1.3, ms@^2.0.0, ms@^2.1.1, ms@^2.1.2:
resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.3.tgz#574c8138ce1d2b5861f0b44579dbadd60c6615b2"
integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==

msgpackr-extract@^1.0.14:
version "1.0.16"
resolved "https://registry.yarnpkg.com/msgpackr-extract/-/msgpackr-extract-1.0.16.tgz#701c4f6e6f25c100ae84557092274e8fffeefe45"
integrity sha512-fxdRfQUxPrL/TizyfYfMn09dK58e+d65bRD/fcaVH4052vj30QOzzqxcQIS7B0NsqlypEQ/6Du3QmP2DhWFfCA==
dependencies:
nan "^2.14.2"
node-gyp-build "^4.2.3"

msgpackr@^1.5.2:
version "1.5.2"
resolved "https://registry.yarnpkg.com/msgpackr/-/msgpackr-1.5.2.tgz#b400c9885642bdec27b284f8bdadbd6570b448b7"
integrity sha512-OCguCkbG34x1ddO4vAzEm/4J1GTo512k9SoxV8K+EGfI/onFdpemRf0HpsVRFpxadXr4JBFgHsQUitgTlw7ZYQ==
optionalDependencies:
msgpackr-extract "^1.0.14"

mute-stream@~0.0.4:
version "0.0.8"
resolved "https://registry.yarnpkg.com/mute-stream/-/mute-stream-0.0.8.tgz#1630c42b2251ff81e2a283de96a5497ea92e5e0d"
integrity sha512-nnbWWOkoWyUsTjKrhgD0dcz22mdkSnpYqbEjIm2nhwhuxlSkpywJmBo8h0ZqJdkp73mb90SssHkN4rsRaBAfAA==

nan@^2.14.2:
version "2.15.0"
resolved "https://registry.yarnpkg.com/nan/-/nan-2.15.0.tgz#3f34a473ff18e15c1b5626b62903b5ad6e665fee"
integrity sha512-8ZtvEnA2c5aYCZYd1cvgdnU6cqwixRoYg70xPLWUws5ORTa/lnw+u4amixRS/Ac5U5mQVgp9pnlSUnbNWFaWZQ==

nanoid@3.1.20:
version "3.1.20"
resolved "https://registry.yarnpkg.com/nanoid/-/nanoid-3.1.20.tgz#badc263c6b1dcf14b71efaa85f6ab4c1d6cfc788"
Expand Down Expand Up @@ -4589,6 +4609,11 @@ node-fetch@^2.6.1:
resolved "https://registry.yarnpkg.com/node-fetch/-/node-fetch-2.6.1.tgz#045bd323631f76ed2e2b55573394416b639a0052"
integrity sha512-V4aYg89jEoVRxRb2fJdAg8FHvI7cEyYdVAh94HH0UIK8oJxUfkjlDQN9RbMx+bEjP7+ggMiFRprSti032Oipxw==

node-gyp-build@^4.2.3:
version "4.3.0"
resolved "https://registry.yarnpkg.com/node-gyp-build/-/node-gyp-build-4.3.0.tgz#9f256b03e5826150be39c764bf51e993946d71a3"
integrity sha512-iWjXZvmboq0ja1pUGULQBexmxq8CV4xBhX7VDOTbL7ZR4FOowwY/VOtRxBN/yKxmdGoIp4j5ysNT4u3S2pDQ3Q==

node-gyp@^7.1.0, node-gyp@^7.1.2:
version "7.1.2"
resolved "https://registry.yarnpkg.com/node-gyp/-/node-gyp-7.1.2.tgz#21a810aebb187120251c3bcec979af1587b188ae"
Expand Down

0 comments on commit 90f040c

Please sign in to comment.