Skip to content

Commit

Permalink
Merge branch 'develop' into feat/kill-sandboxed-workers-gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
GabrielCastro committed Jul 24, 2020
2 parents bb26001 + c23ed74 commit ce35559
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -7,9 +7,9 @@ cache:

# test on node.js versions
node_js:
- '14'
- '12'
- '10'
- '8'

services:
- redis-server
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,11 @@
# Changelog

## v.3.16.0

- feat(rate-limiter): add grouping support.

[Changes](https://github.com/OptimalBits/bull/compare/v3.15.0...v3.16.0)

## v.3.15.0

- feat: add isPaused fixes #1274
Expand Down
8 changes: 7 additions & 1 deletion PATTERNS.md
Expand Up @@ -66,7 +66,13 @@ The most robust and scalable way to accomplish this is by combining the standard
Reusing Redis Connections
-------------------------

A standard queue requires **3 connections** to the Redis server. In some situations you might want to re-use connections—for example on Heroku where the connection count is restricted. You can do this with the `createClient` option in the `Queue` constructor (note: bclient connections [cannot be re-used](https://github.com/OptimalBits/bull/issues/880)):
A standard queue requires **3 connections** to the Redis server. In some situations you might want to re-use connections—for example on Heroku where the connection count is restricted. You can do this with the `createClient` option in the `Queue` constructor.

Notes:
- bclient connections [cannot be re-used](https://github.com/OptimalBits/bull/issues/880), so you should return a new connection each time this is called.
- client and subscriber connections can be shared and will not be closed when the queue is closed. When you are shutting down the process, first close the queues, then the shared connections (if they are shared).
- if you are not sharing connections but still using `createClient` to do some custom connection logic, you may still need to keep a list of all the connections you created so you can manually close them later when the queue shuts down, if you need a graceful shutdown for your process
- do not set a `keyPrefix` on the connection you create, use bull's built-in prefix feature if you need a key prefix

```js
var {REDIS_URL} = process.env
Expand Down
27 changes: 22 additions & 5 deletions REFERENCE.md
Expand Up @@ -63,6 +63,7 @@ The optional `url` argument, allows to specify a redis connection string such as

```typescript
interface QueueOptions {
createClient?(type: 'client' | 'subscriber' | 'bclient', config?: Redis.RedisOptions): Redis.Redis | Redis.Cluster;
limiter?: RateLimiter;
redis?: RedisOpts;
prefix?: string = 'bull'; // prefix for all queue keys.
Expand All @@ -73,9 +74,10 @@ interface QueueOptions {

```typescript
interface RateLimiter {
max: number, // Max number of jobs processed
duration: number, // per duration in milliseconds
bounceBack: boolean = false; // When jobs get rate limited, they stay in the waiting queue and are not moved to the delayed queue
max: number; // Max number of jobs processed
duration: number; // per duration in milliseconds
bounceBack?: boolean = false; // When jobs get rate limited, they stay in the waiting queue and are not moved to the delayed queue
groupKey?: string; // allows grouping of jobs with the specified key from the data object passed to the Queue#add (ex. "network.handle")
}
```

Expand Down Expand Up @@ -104,6 +106,19 @@ interface AdvancedSettings {
}
```

**Custom or Shared IORedis Connections**

`createClient` is passed a `type` to specify the type of connection that Bull is trying to create, and some options that `bull` would like to set for that connection.

You can merge the provided options with some of your own and create an `ioredis` connection.

When type is `client` or `subscriber` you can return the same connection for multiple queues, which can reduce the number of connections you open to the redis server. Bull
does not close or disconnect these connections when queues are closed, so if you need to have your app do a graceful shutdown, you will need to keep references to these
Redis connections somewhere and disconnect them after you shut down all the queues.

The `bclient` connection however is a "blocking client" and is used to wait for new jobs on a single queue at a time. For this reason it cannot be shared and a
new connection should be returned each time.

**Advanced Settings**

**Warning:** Do not override these advanced settings unless you understand the internals of the queue.
Expand Down Expand Up @@ -356,6 +371,7 @@ removeJobs(pattern: string): Promise<void>
Removes all the jobs which jobId matches the given pattern. The pattern must follow redis glob-style pattern (syntax)[https://redis.io/commands/keys]

Example:

```js
myQueue.removeJobs('?oo*').then(function() {
console.log('done removing jobs');
Expand All @@ -372,7 +388,8 @@ Will remove jobs with ids such as: "boo", "foofighter", etc.
empty(): Promise
```

Empties a queue deleting all the input lists and associated jobs.
Drains a queue deleting all the *input* lists and associated jobs. Note, this function only remove the jobs that are
*waiting" to be processed by the queue or *delayed*.

---

Expand Down Expand Up @@ -446,7 +463,7 @@ parameter. If the specified job cannot be located, the promise will be resolved
getJobs(types: JobStatus[], start?: number, end?: number, asc?: boolean): Promise<Job[]>
```

Returns a promise that will return an array of job instances of the given job statuses. Optional parameters for range and ordering are provided.
Returns a promise that will return an array of job instances of the given job statuses. Optional parameters for range and ordering are provided.

Note: The `start` and `end` options are applied **per job statuses**. For example, if there are 10 jobs in state `completed` and 10 jobs in state `active`, `getJobs(['completed', 'active'], 0, 4)` will yield an array with 10 entries, representing the first 5 completed jobs (0 - 4) and the first 5 active jobs (0 - 4).

Expand Down
8 changes: 8 additions & 0 deletions lib/commands/moveToActive-8.lua
Expand Up @@ -29,6 +29,7 @@
ARGV[6] optional jobs per time unit (rate limiter)
ARGV[7] optional time unit (rate limiter)
ARGV[8] optional do not do anything with job if rate limit hit
ARGV[9] optional rate limit by key
]]

local jobId
Expand All @@ -50,6 +51,13 @@ if jobId then

if(maxJobs) then
local rateLimiterKey = KEYS[6];
if(ARGV[9]) then
local group = string.match(jobId, "[^:]+$")
if group ~= nil then
rateLimiterKey = rateLimiterKey .. ":" .. group
end
end

-- local jobCounter = tonumber(rcall("GET", rateLimiterKey))
local jobCounter = tonumber(rcall("INCR", rateLimiterKey))
local bounceBack = ARGV[8]
Expand Down
23 changes: 19 additions & 4 deletions lib/queue.js
Expand Up @@ -120,7 +120,7 @@ const Queue = function Queue(name, url, opts) {
}

this.name = name;
this.token = uuid();
this.token = uuid.v4();

opts.redis = opts.redis || {};

Expand Down Expand Up @@ -713,8 +713,9 @@ Queue.prototype.add = function(name, data, opts) {
data = name;
name = Job.DEFAULT_JOB_NAME;
}
opts = _.cloneDeep(opts || {});
_.defaults(opts, this.defaultJobOptions);
opts = { ...opts, ...this.defaultJobOptions };

opts.jobId = jobIdForGroup(this.limiter, opts, data);

if (opts.repeat) {
return this.isReady().then(() => {
Expand All @@ -732,10 +733,15 @@ Queue.prototype.add = function(name, data, opts) {
*/
Queue.prototype.addBulk = function(jobs) {
const decoratedJobs = jobs.map(job => {
const jobId = jobIdForGroup(this.limiter, job.opts, job.data);
return {
...job,
name: typeof job.name !== 'string' ? Job.DEFAULT_JOB_NAME : job.name,
opts: _.defaults(job.opts, this.defaultJobOptions)
opts: {
...job.opts,
...this.defaultJobOptions,
jobId
}
};
});
return Job.createBulk(this, decoratedJobs);
Expand Down Expand Up @@ -1253,4 +1259,13 @@ function getRedisVersion(client) {
});
}

function jobIdForGroup(limiter, opts, data) {
const jobId = opts && opts.jobId;
const groupKey = _.get(limiter, 'groupKey');
if (groupKey) {
return `${jobId || uuid.v4()}:${_.get(data, groupKey)}`;
}
return jobId;
}

module.exports = Queue;
2 changes: 2 additions & 0 deletions lib/scripts.js
Expand Up @@ -83,7 +83,9 @@ const scripts = {
queue.limiter.duration,
!!queue.limiter.bounceBack
);
queue.limiter.groupKey && args.push(true);
}

return queue.client.moveToActive(keys.concat(args)).then(raw2jobData);
},

Expand Down
4 changes: 2 additions & 2 deletions package.json
@@ -1,6 +1,6 @@
{
"name": "bull",
"version": "3.15.0",
"version": "3.16.0",
"description": "Job manager",
"engines": {
"node": ">=10"
Expand Down Expand Up @@ -29,7 +29,7 @@
"promise.prototype.finally": "^3.1.2",
"semver": "^6.3.0",
"util.promisify": "^1.0.1",
"uuid": "^3.4.0"
"uuid": "^8.2.0"
},
"devDependencies": {
"@commitlint/cli": "^7.6.1",
Expand Down
2 changes: 1 addition & 1 deletion test/test_job.js
Expand Up @@ -17,7 +17,7 @@ describe('Job', () => {
});

beforeEach(() => {
queue = new Queue('test-' + uuid(), {
queue = new Queue('test-' + uuid.v4(), {
redis: { port: 6379, host: '127.0.0.1' }
});
});
Expand Down
25 changes: 14 additions & 11 deletions test/test_queue.js
Expand Up @@ -474,7 +474,7 @@ describe('Queue', () => {

it('should remove a job after completed if the default job options specify removeOnComplete', done => {
utils
.newQueue('test-' + uuid(), {
.newQueue('test-' + uuid.v4(), {
defaultJobOptions: {
removeOnComplete: true
}
Expand Down Expand Up @@ -557,7 +557,7 @@ describe('Queue', () => {
it('should keep specified number of jobs after completed with global removeOnComplete', async () => {
const keepJobs = 3;

const localQueue = await utils.newQueue('test-' + uuid(), {
const localQueue = await utils.newQueue('test-' + uuid.v4(), {
defaultJobOptions: {
removeOnComplete: keepJobs
}
Expand Down Expand Up @@ -626,7 +626,7 @@ describe('Queue', () => {

it('should remove a job after fail if the default job options specify removeOnFail', done => {
utils
.newQueue('test-' + uuid(), {
.newQueue('test-' + uuid.v4(), {
defaultJobOptions: {
removeOnFail: true
}
Expand Down Expand Up @@ -707,7 +707,7 @@ describe('Queue', () => {
it('should keep specified number of jobs after completed with global removeOnFail', async () => {
const keepJobs = 3;

const localQueue = await utils.newQueue('test-' + uuid(), {
const localQueue = await utils.newQueue('test-' + uuid.v4(), {
defaultJobOptions: {
removeOnFail: keepJobs
}
Expand Down Expand Up @@ -1335,7 +1335,7 @@ describe('Queue', () => {
it('process stalled jobs without requiring a queue restart', function(done) {
this.timeout(12000);

const queue2 = utils.buildQueue('running-stalled-job-' + uuid(), {
const queue2 = utils.buildQueue('running-stalled-job-' + uuid.v4(), {
settings: {
lockRenewTime: 5000,
lockDuration: 500,
Expand Down Expand Up @@ -1376,7 +1376,7 @@ describe('Queue', () => {
const FAILED_MESSAGE = 'job stalled more than allowable limit';
this.timeout(10000);

const queue2 = utils.buildQueue('running-stalled-job-' + uuid(), {
const queue2 = utils.buildQueue('running-stalled-job-' + uuid.v4(), {
settings: {
lockRenewTime: 2500,
lockDuration: 250,
Expand Down Expand Up @@ -1755,7 +1755,7 @@ describe('Queue', () => {
it('should process delayed jobs in correct order even in case of restart', function(done) {
this.timeout(15000);

const QUEUE_NAME = 'delayed queue multiple' + uuid();
const QUEUE_NAME = 'delayed queue multiple' + uuid.v4();
let order = 1;

queue = new Queue(QUEUE_NAME);
Expand Down Expand Up @@ -1800,7 +1800,7 @@ describe('Queue', () => {
});

it('should process delayed jobs with exact same timestamps in correct order (FIFO)', done => {
const QUEUE_NAME = 'delayed queue multiple' + uuid();
const QUEUE_NAME = 'delayed queue multiple' + uuid.v4();
queue = new Queue(QUEUE_NAME);
let order = 1;

Expand Down Expand Up @@ -1915,7 +1915,10 @@ describe('Queue', () => {
queue.add({});
queue.add({});

queue.on('completed', _.after(2, () => done()));
queue.on(
'completed',
_.after(2, () => done())
);
});

//This job use delay to check that at any time we have 4 process in parallel.
Expand Down Expand Up @@ -2484,7 +2487,7 @@ describe('Queue', () => {
let queue;

beforeEach(() => {
queue = utils.buildQueue('cleaner' + uuid());
queue = utils.buildQueue('cleaner' + uuid.v4());
});

afterEach(function() {
Expand Down Expand Up @@ -2519,7 +2522,7 @@ describe('Queue', () => {
});

it('should clean an empty queue', done => {
const testQueue = utils.buildQueue('cleaner' + uuid());
const testQueue = utils.buildQueue('cleaner' + uuid.v4());
testQueue.isReady().then(() => {
return testQueue.clean(0);
});
Expand Down
66 changes: 65 additions & 1 deletion test/test_rate_limiter.js
Expand Up @@ -55,7 +55,7 @@ describe('Rate limiter', () => {
}
});

it('should obey the rate limit', done => {
it.skip('should obey the rate limit', done => {
const startTime = new Date().getTime();
const numJobs = 4;

Expand Down Expand Up @@ -155,4 +155,68 @@ describe('Rate limiter', () => {
});
});
});

it('should rate limit by grouping', async function() {
this.timeout(20000);
const numGroups = 4;
const numJobs = 20;
const startTime = Date.now();

const rateLimitedQueue = utils.buildQueue('test rate limiter with group', {
limiter: {
max: 1,
duration: 1000,
groupKey: 'accountId'
}
});

rateLimitedQueue.process(() => {
return Promise.resolve();
});

const completed = {};

const running = new Promise((resolve, reject) => {
const afterJobs = _.after(numJobs, () => {
try {
const timeDiff = Date.now() - startTime;
expect(timeDiff).to.be.gte(numGroups * 1000);
expect(timeDiff).to.be.below((numGroups + 1) * 1500);

for (const group in completed) {
let prevTime = completed[group][0];
for (let i = 1; i < completed[group].length; i++) {
const diff = completed[group][i] - prevTime;
expect(diff).to.be.below(2100);
expect(diff).to.be.gte(900);
prevTime = completed[group][i];
}
}
resolve();
} catch (err) {
reject(err);
}
});

rateLimitedQueue.on('completed', ({ id }) => {
const group = _.last(id.split(':'));
completed[group] = completed[group] || [];
completed[group].push(Date.now());

afterJobs();
});

rateLimitedQueue.on('failed', async err => {
await queue.close();
reject(err);
});
});

for (let i = 0; i < numJobs; i++) {
rateLimitedQueue.add({ accountId: i % numGroups });
}

await running;
await rateLimitedQueue.close();
});
});

0 comments on commit ce35559

Please sign in to comment.