Skip to content

Commit

Permalink
feature: external limiter support
Browse files Browse the repository at this point in the history
  • Loading branch information
colinskow committed Feb 18, 2020
1 parent b9ffac9 commit edc69f4
Show file tree
Hide file tree
Showing 4 changed files with 379 additions and 31 deletions.
53 changes: 52 additions & 1 deletion README.md
Expand Up @@ -37,6 +37,7 @@ Thanks to the folks at [Mixmax](https://mixmax.com), Bee-Queue is once again bei
- Concurrent processing
- Job timeouts, retries, and retry strategies
- Scheduled jobs
- External rate limiter support
- Pass events via Pub/Sub
- Progress reporting
- Send job results back to producers
Expand Down Expand Up @@ -189,6 +190,48 @@ subQueue.process(10, function (job, done) {
});
```

## Use With a Rate Limiter

You can integrate Bee-Queue with any external rate limiter of your choice to throttle jobs.

`Queue.process` takes a function as an optional second argument, which if provided will be run just prior to executing each job. This allows you to query an external limiter and optionally reschedule the job if it is not cleared to run. The limiter query function helps the queue rapidly scan for jobs which are ready to run rather than tying up your concurrency with waiting jobs.

The limiter query function can use a callback or return a promise. The promise should resolve to an object which at minimum contains a boolean `ready` parameter, `true` if the job is ready to run.

```js
limiterQueue.process(
async (job) => {
console.log(`Processing job ${job.id} after ${job.data.tries} tries`);
// Do some work
},
async (job) => {
const tries = job.data.tries + 1;
const data = { ...job.data, tries };
// You can plug in any non-blocking external rate limiter here
const result = await limiter.check();
if (result.ok) {
return {
ready: true,
// (optional) this will replace the existing job data
data
}
} else {
return {
ready: false,
// (optional) this will replace the existing job data
data,
// (optional) reschedule the job to try again at this timestamp
// if 0 or not specified, the job goes to the end of the waiting queue
delayUntil: Date.now() + result.msUntilReady
}
}
}
);
```

Additionally if the limiter check promise rejects or passes an error to the callback the job will be immediately failed with no retry logic.


## Progress Reporting

Handlers can send progress reports, which will be received as events on the original job instance:
Expand Down Expand Up @@ -508,7 +551,7 @@ Looks up jobs by their queue type. When looking up jobs of type `waiting`, `acti

Note that large values of the attributes of `page` may cause excess load on the Redis server.

#### Queue#process([concurrency], handler(job, done))
#### Queue#process([concurrency], handler(job, done), [limiterQuery(job, done)])

Begins processing jobs with the provided handler function.

Expand All @@ -527,6 +570,14 @@ The handler function should either:

_N.B. If the handler returns a `Promise`, calls to the `done` callback will be ignored._

The limiter query function acts exactly the same way as the handler function above, but it expects the promise or callback to return an object with the following properties:

- `ready`: boolean, `true` if the job is ready to run
- `delayUntil`: (optional) Date or integer, timestamp that the job will be rescheduled for. If 0 or not specified the job will be placed at the end of the waiting queue.
- `data`: (optional) any, replaces `job.data` if present

If the promise rejects or the callback returns an error, the job will be immediately failed without any retry logic.

#### Queue#checkStalledJobs([interval], [cb])

Checks for jobs that appear to be stalling and thus need to be retried, then re-enqueues them.
Expand Down
28 changes: 24 additions & 4 deletions index.d.ts
Expand Up @@ -29,10 +29,24 @@ declare class BeeQueue {
getJobs(type: string, page: BeeQueue.Page, cb: (jobs: BeeQueue.Job[]) => void): void;
getJobs(type: string, page: BeeQueue.Page): Promise<BeeQueue.Job[]>;

process<T>(handler: (job: BeeQueue.Job) => Promise<T>): void;
process<T>(concurrency: number, handler: (job: BeeQueue.Job) => Promise<T>): void;
process<T>(handler: (job: BeeQueue.Job, done: BeeQueue.DoneCallback<T>) => void): void;
process<T>(concurrency: number, handler: (job: BeeQueue.Job, done: BeeQueue.DoneCallback<T>) => void): void;
process<T>(
handler: (job: BeeQueue.Job) => Promise<T>,
limiterQuery?: (job: BeeQueue.Job) => Promise<BeeQueue.LimiterQueryResult>
): void;
process<T>(
concurrency: number,
handler: (job: BeeQueue.Job) => Promise<T>,
limiterQuery?: (job: BeeQueue.Job) => Promise<BeeQueue.LimiterQueryResult>
): void;
process<T>(
handler: (job: BeeQueue.Job, done: BeeQueue.DoneCallback<T>) => void,
limiterQuery?: (job: BeeQueue.Job, done: BeeQueue.DoneCallback<BeeQueue.LimiterQueryResult>) => void
): void;
process<T>(
concurrency: number,
handler: (job: BeeQueue.Job, done: BeeQueue.DoneCallback<T>,
limiterQuery?: (job: BeeQueue.Job, done: BeeQueue.DoneCallback<BeeQueue.LimiterQueryResult>) => void) => void
): void;

checkStalledJobs(interval?: number): Promise<number>;
checkStalledJobs(interval: number, cb: (err: Error, numStalled: number) => void): void
Expand Down Expand Up @@ -110,6 +124,12 @@ declare namespace BeeQueue {
newestJob?: string;
}

interface LimiterQueryResult {
ready: boolean;
delayUntil?: Date | number;
data?: any;
}

type DoneCallback<T> = (error: Error | null, result?: T) => void;
}

Expand Down
160 changes: 134 additions & 26 deletions lib/queue.js
Expand Up @@ -494,6 +494,77 @@ class Queue extends Emitter {
return jobPromise;
}

_checkLimiter(job, limiterQuery) {
// Returns a promise that resolves to true if the job is ready to run
// otherwise false
return limiterQuery(job)
.then(result => {
if (result.ready) {
if (result.data) {
// Update the job if data has changed
job.data = result.data;
this.jobs.set(job.id, job);
const promise = helpers.deferred();
this.client.multi()
.hset(this.toKey('jobs'), job.id, job.toData())
.exec(promise.defer());
return promise.then(() => true);
}
return true;
} else {
// The job is not ready to run
// remove it from the active jobs list
const multi = this.client.multi();
this._deactivateJob(job, multi);
// If the retry is delayed, add it to the delayed queue
if (result.delayUntil) {
job.delayUntil(result.delayUntil);
}
if (job.options.delay) {
this._delayJob(job, multi, job.options.delay);
} else {
// Otherwise move it to the end of the waiting queue
multi.lpush(this.toKey('waiting'), job.id);
}
// If the job data has been modified, save it
if (result.data) {
job.data = result.data;
multi.hset(this.toKey('jobs'), job.id, job.toData());
}
this.jobs.set(job.id, job);
const promise = helpers.deferred();
multi.exec(promise.defer());
return promise.then(() => {
this.emit('limited', job, job.options.delay || 0);
return false;
});
}
}, (err) => {
// If the limiterQuery function rejects,
// fail the job immediately with no retry
const errInfo = err.stack || err.message || err;
job.options.stacktraces.unshift(errInfo);
const multi = this.client.multi();
this._deactivateJob(job, multi);
this._removeFailedJob(job, multi);
/* istanbul ignore else */
if (this.settings.sendEvents) {
const jobEvent = {
id: job.id,
event: job.status,
data: errInfo
};
multi.publish(this.toKey('events'), JSON.stringify(jobEvent));
}
const promise = helpers.deferred();
multi.exec(promise.defer());
return promise.then(() => {
this.emit('error', err);
return false;
});
});
}

_preventStall(jobId) {
const promise = helpers.deferred(), cb = promise.defer();
this.client.srem(this.toKey('stalling'), jobId, cb);
Expand All @@ -508,9 +579,8 @@ class Queue extends Emitter {
throw new Error(`unable to update the status of ${status} job ${job.id}`);
}

const multi = this.client.multi()
.lrem(this.toKey('active'), 0, job.id)
.srem(this.toKey('stalling'), job.id);
const multi = this.client.multi();
this._deactivateJob(job, multi);

const jobEvent = {
id: job.id,
Expand All @@ -530,13 +600,7 @@ class Queue extends Emitter {
: null;
const delay = strategy ? strategy(job) : -1;
if (delay < 0) {
job.status = 'failed';
if (this.settings.removeOnFailure) {
multi.hdel(this.toKey('jobs'), job.id);
} else {
multi.hset(this.toKey('jobs'), job.id, job.toData());
multi.sadd(this.toKey('failed'), job.id);
}
this._removeFailedJob(job, multi);
} else {
job.options.retries -= 1;
job.status = 'retrying';
Expand All @@ -546,8 +610,7 @@ class Queue extends Emitter {
multi.lpush(this.toKey('waiting'), job.id);
} else {
const time = Date.now() + delay;
multi.zadd(this.toKey('delayed'), time, job.id)
.publish(this.toKey('earlierDelayed'), time);
this._delayJob(job, multi, time);
}
}
} else {
Expand All @@ -572,7 +635,7 @@ class Queue extends Emitter {
return promise.then(() => [status, result]);
}

process(concurrency, handler) {
process(concurrency, handler, limiterQuery) {
if (!this.settings.isWorker) {
throw new Error('Cannot call Queue#process on a non-worker');
}
Expand All @@ -586,6 +649,7 @@ class Queue extends Emitter {
}

if (typeof concurrency === 'function') {
limiterQuery = handler;
handler = concurrency;
concurrency = defaults['#process'].concurrency;
}
Expand All @@ -598,6 +662,10 @@ class Queue extends Emitter {
this.queued = 1;
this.concurrency = concurrency;

if (limiterQuery) {
this.limiterQuery = helpers.wrapAsync(limiterQuery, catchExceptions);
}

const jobTick = () => {
if (this.paused) {
this.queued -= 1;
Expand All @@ -607,6 +675,21 @@ class Queue extends Emitter {
// invariant: in this code path, this.running < this.concurrency, always
// after spoolup, this.running + this.queued === this.concurrency
this._getNextJob().then((job) => {
const execJob = () => {
return this._runJob(job).catch((err) => {
this.emit('error', err);
}).then((results) => {
this.running -= 1;
this.queued += 1;
setImmediate(jobTick);
/* istanbul ignore else */
if (results) {
const status = results[0], result = results[1];
this.emit(status, job, result);
}
});
};

// We're shutting down.
if (this.paused) {
// This job will get picked up later as a stalled job if we happen to
Expand All @@ -633,20 +716,23 @@ class Queue extends Emitter {
return;
}

return this._runJob(job).catch((err) => {
this.emit('error', err);
}).then((results) => {
this.running -= 1;
this.queued += 1;

setImmediate(jobTick);
// If a limiterQuery function is provided, check the limiter before
// running the job
if (this.limiterQuery) {
return this._checkLimiter(job, this.limiterQuery)
.then(ready => {
if (ready) {
return execJob();
} else {
this.running -= 1;
this.queued += 1;
setImmediate(jobTick);
}
});
} else {
return execJob();
}

/* istanbul ignore else */
if (results) {
const status = results[0], result = results[1];
this.emit(status, job, result);
}
});
}, (err) => {
setImmediate(jobTick);
throw err;
Expand All @@ -659,6 +745,28 @@ class Queue extends Emitter {
return this;
}

_deactivateJob(job, multi) {
multi
.lrem(this.toKey('active'), 0, job.id)
.srem(this.toKey('stalling'), job.id);
}

_removeFailedJob(job, multi) {
job.status = 'failed';
if (this.settings.removeOnFailure) {
multi.hdel(this.toKey('jobs'), job.id);
} else {
multi.hset(this.toKey('jobs'), job.id, job.toData());
multi.sadd(this.toKey('failed'), job.id);
}
}

_delayJob(job, multi, time) {
multi
.zadd(this.toKey('delayed'), time, job.id)
.publish(this.toKey('earlierDelayed'), time);
}

_doStalledJobCheck() {
return this._evalScript('checkStalledJobs', 4, this.toKey('stallBlock'),
this.toKey('stalling'), this.toKey('waiting'), this.toKey('active'),
Expand Down

0 comments on commit edc69f4

Please sign in to comment.