Skip to content

Commit

Permalink
Merge branch 'develop' into patch-5
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Jul 20, 2020
2 parents cbbacc7 + 712df1d commit 5f8b0df
Show file tree
Hide file tree
Showing 14 changed files with 479 additions and 254 deletions.
4 changes: 2 additions & 2 deletions .eslintrc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ env:
node: true

parserOptions:
ecmaVersion: 8
ecmaVersion: 2018

extends:
- eslint:recommended
Expand Down Expand Up @@ -38,4 +38,4 @@ rules:
mocha/no-sibling-hooks: 0
mocha/no-skipped-tests: 0

node/no-deprecated-api: 0
node/no-deprecated-api: 0
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ cache:

# test on node.js versions
node_js:
- '14'
- '12'
- '10'
- '8'

services:
- redis-server
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## v.3.16.0

- feat(rate-limiter): add grouping support.

[Changes](https://github.com/OptimalBits/bull/compare/v3.15.0...v3.16.0)

## v.3.15.0

- feat: add isPaused fixes #1274
Expand Down
13 changes: 8 additions & 5 deletions REFERENCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ interface QueueOptions {

```typescript
interface RateLimiter {
max: number, // Max number of jobs processed
duration: number, // per duration in milliseconds
bounceBack: boolean = false; // When jobs get rate limited, they stay in the waiting queue and are not moved to the delayed queue
max: number; // Max number of jobs processed
duration: number; // per duration in milliseconds
bounceBack?: boolean = false; // When jobs get rate limited, they stay in the waiting queue and are not moved to the delayed queue
groupKey?: string; // allows grouping of jobs with the specified key from the data object passed to the Queue#add (ex. "network.handle")
}
```

Expand Down Expand Up @@ -370,6 +371,7 @@ removeJobs(pattern: string): Promise<void>
Removes all the jobs which jobId matches the given pattern. The pattern must follow redis glob-style pattern (syntax)[https://redis.io/commands/keys]

Example:

```js
myQueue.removeJobs('?oo*').then(function() {
console.log('done removing jobs');
Expand All @@ -386,7 +388,8 @@ Will remove jobs with ids such as: "boo", "foofighter", etc.
empty(): Promise
```

Empties a queue deleting all the input lists and associated jobs.
Drains a queue deleting all the *input* lists and associated jobs. Note, this function only remove the jobs that are
*waiting" to be processed by the queue or *delayed*.

---

Expand Down Expand Up @@ -460,7 +463,7 @@ parameter. If the specified job cannot be located, the promise will be resolved
getJobs(types: JobStatus[], start?: number, end?: number, asc?: boolean): Promise<Job[]>
```

Returns a promise that will return an array of job instances of the given job statuses. Optional parameters for range and ordering are provided.
Returns a promise that will return an array of job instances of the given job statuses. Optional parameters for range and ordering are provided.

Note: The `start` and `end` options are applied **per job statuses**. For example, if there are 10 jobs in state `completed` and 10 jobs in state `active`, `getJobs(['completed', 'active'], 0, 4)` will yield an array with 10 entries, representing the first 5 completed jobs (0 - 4) and the first 5 active jobs (0 - 4).

Expand Down
8 changes: 8 additions & 0 deletions lib/commands/moveToActive-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
ARGV[6] optional jobs per time unit (rate limiter)
ARGV[7] optional time unit (rate limiter)
ARGV[8] optional do not do anything with job if rate limit hit
ARGV[9] optional rate limit by key
]]

local jobId
Expand All @@ -50,6 +51,13 @@ if jobId then

if(maxJobs) then
local rateLimiterKey = KEYS[6];
if(ARGV[9]) then
local group = string.match(jobId, "[^:]+$")
if group ~= nil then
rateLimiterKey = rateLimiterKey .. ":" .. group
end
end

-- local jobCounter = tonumber(rcall("GET", rateLimiterKey))
local jobCounter = tonumber(rcall("INCR", rateLimiterKey))
local bounceBack = ARGV[8]
Expand Down
118 changes: 57 additions & 61 deletions lib/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ Job.fromId = function(queue, jobId) {
return Promise.resolve();
}
return queue.client.hgetall(queue.toKey(jobId)).then(jobData => {
return utils.isEmpty(jobData) ? null : Job.fromJSON(queue, jobData, jobId);
return _.isEmpty(jobData) ? null : Job.fromJSON(queue, jobData, jobId);
});
};

Expand Down Expand Up @@ -244,68 +244,64 @@ Job.prototype.discard = function() {
* @param ignoreLock {boolean} True when wanting to ignore the redis lock on this job.
* @returns void
*/
Job.prototype.moveToFailed = function(err, ignoreLock) {
Job.prototype.moveToFailed = async function(err, ignoreLock) {
this.failedReason = err.message;
return this.queue.isReady().then(() => {
return new Promise(async (resolve, reject) => {
let command;
const multi = this.queue.client.multi();
this._saveAttempt(multi, err);

// Check if an automatic retry should be performed
let moveToFailed = false;
if (this.attemptsMade < this.opts.attempts && !this._discarded) {
// Check if backoff is needed
const delay = await backoffs.calculate(
this.opts.backoff,
this.attemptsMade,
this.queue.settings.backoffStrategies,
err
);

if (delay === -1) {
// If delay is -1, we should no continue retrying
moveToFailed = true;
} else if (delay) {
// If so, move to delayed (need to unlock job in this case!)
const args = scripts.moveToDelayedArgs(
this.queue,
this.id,
Date.now() + delay,
ignoreLock
);
multi.moveToDelayed(args);
command = 'delayed';
} else {
// If not, retry immediately
multi.retryJob(scripts.retryJobArgs(this, ignoreLock));
command = 'retry';
}
} else {
// If not, move to failed
moveToFailed = true;
}
await this.queue.isReady();

if (moveToFailed) {
this.finishedOn = Date.now();
const args = scripts.moveToFailedArgs(
this,
err.message,
this.opts.removeOnFail,
ignoreLock
);
multi.moveToFinished(args);
command = 'failed';
}
return multi.exec().then(results => {
const code = _.last(results)[1];
if (code < 0) {
return reject(scripts.finishedErrors(code, this.id, command));
}
resolve();
}, reject);
});
});
let command;
const multi = this.queue.client.multi();
this._saveAttempt(multi, err);

// Check if an automatic retry should be performed
let moveToFailed = false;
if (this.attemptsMade < this.opts.attempts && !this._discarded) {
// Check if backoff is needed
const delay = await backoffs.calculate(
this.opts.backoff,
this.attemptsMade,
this.queue.settings.backoffStrategies,
err
);

if (delay === -1) {
// If delay is -1, we should no continue retrying
moveToFailed = true;
} else if (delay) {
// If so, move to delayed (need to unlock job in this case!)
const args = scripts.moveToDelayedArgs(
this.queue,
this.id,
Date.now() + delay,
ignoreLock
);
multi.moveToDelayed(args);
command = 'delayed';
} else {
// If not, retry immediately
multi.retryJob(scripts.retryJobArgs(this, ignoreLock));
command = 'retry';
}
} else {
// If not, move to failed
moveToFailed = true;
}

if (moveToFailed) {
this.finishedOn = Date.now();
const args = scripts.moveToFailedArgs(
this,
err.message,
this.opts.removeOnFail,
ignoreLock
);
multi.moveToFinished(args);
command = 'failed';
}
const results = await multi.exec();
const code = _.last(results)[1];
if (code < 0) {
throw scripts.finishedErrors(code, this.id, command);
}
};

Job.prototype.moveToDelayed = function(timestamp, ignoreLock) {
Expand Down
23 changes: 19 additions & 4 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ const Queue = function Queue(name, url, opts) {
}

this.name = name;
this.token = uuid();
this.token = uuid.v4();

opts.redis = opts.redis || {};

Expand Down Expand Up @@ -702,8 +702,9 @@ Queue.prototype.add = function(name, data, opts) {
data = name;
name = Job.DEFAULT_JOB_NAME;
}
opts = _.cloneDeep(opts || {});
_.defaults(opts, this.defaultJobOptions);
opts = { ...opts, ...this.defaultJobOptions };

opts.jobId = jobIdForGroup(this.limiter, opts, data);

if (opts.repeat) {
return this.isReady().then(() => {
Expand All @@ -721,10 +722,15 @@ Queue.prototype.add = function(name, data, opts) {
*/
Queue.prototype.addBulk = function(jobs) {
const decoratedJobs = jobs.map(job => {
const jobId = jobIdForGroup(this.limiter, job.opts, job.data);
return {
...job,
name: typeof job.name !== 'string' ? Job.DEFAULT_JOB_NAME : job.name,
opts: _.defaults(job.opts, this.defaultJobOptions)
opts: {
...job.opts,
...this.defaultJobOptions,
jobId
}
};
});
return Job.createBulk(this, decoratedJobs);
Expand Down Expand Up @@ -1242,4 +1248,13 @@ function getRedisVersion(client) {
});
}

function jobIdForGroup(limiter, opts, data) {
const jobId = opts && opts.jobId;
const groupKey = _.get(limiter, 'groupKey');
if (groupKey) {
return `${jobId || uuid.v4()}:${_.get(data, groupKey)}`;
}
return jobId;
}

module.exports = Queue;
2 changes: 2 additions & 0 deletions lib/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ const scripts = {
queue.limiter.duration,
!!queue.limiter.bounceBack
);
queue.limiter.groupKey && args.push(true);
}

return queue.client.moveToActive(keys.concat(args)).then(raw2jobData);
},

Expand Down
10 changes: 0 additions & 10 deletions lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,6 @@ function tryCatch(fn, ctx, args) {
}
}

function isEmpty(obj) {
for (const key in obj) {
if (obj.hasOwnProperty(key)) {
return false;
}
}
return true;
}

/**
* Waits for a redis client to be ready.
* @param {Redis} redis client
Expand Down Expand Up @@ -45,5 +36,4 @@ function isRedisReady(client) {

module.exports.errorObject = errorObject;
module.exports.tryCatch = tryCatch;
module.exports.isEmpty = isEmpty;
module.exports.isRedisReady = isRedisReady;
13 changes: 7 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{
"name": "bull",
"version": "3.15.0",
"version": "3.16.0",
"description": "Job manager",
"engines": {
"node": ">=8"
"node": ">=10"
},
"main": "./index.js",
"repository": {
Expand All @@ -29,16 +29,16 @@
"promise.prototype.finally": "^3.1.2",
"semver": "^6.3.0",
"util.promisify": "^1.0.1",
"uuid": "^3.4.0"
"uuid": "^8.2.0"
},
"devDependencies": {
"@commitlint/cli": "^7.6.1",
"@commitlint/config-conventional": "^7.6.0",
"chai": "^4.2.0",
"coveralls": "^3.0.9",
"delay": "^4.3.0",
"eslint": "^5.16.0",
"eslint-plugin-mocha": "^6.2.1",
"eslint": "^7.4.0",
"eslint-plugin-mocha": "^7.0.1",
"eslint-plugin-node": "^8.0.1",
"expect.js": "^0.3.1",
"husky": "^1.3.1",
Expand All @@ -52,9 +52,10 @@
"sinon": "^7.5.0"
},
"scripts": {
"lint": "eslint lib test *.js",
"pretest": "npm run lint",
"lint": "eslint lib test *.js",
"test": "NODE_ENV=test mocha 'test/test_*'",
"test:nolint": "NODE_ENV=test mocha 'test/test_*'",
"coveralls": "istanbul cover ./node_modules/mocha/bin/_mocha --report lcovonly -- --exit -R spec && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js && rm -rf ./coverage",
"postpublish": "git push && git push --tags",
"prettier": "prettier --config package.json --write '**/*.js'",
Expand Down
2 changes: 1 addition & 1 deletion test/test_job.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ describe('Job', () => {
});

beforeEach(() => {
queue = new Queue('test-' + uuid(), {
queue = new Queue('test-' + uuid.v4(), {
redis: { port: 6379, host: '127.0.0.1' }
});
});
Expand Down
Loading

0 comments on commit 5f8b0df

Please sign in to comment.