Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: cleanup whenCurrentJobsFinished #1542

Merged
merged 1 commit into from Nov 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 15 additions & 2 deletions REFERENCE.md
Expand Up @@ -6,6 +6,7 @@
- [Queue#add](#queueadd)
- [Queue#pause](#queuepause)
- [Queue#resume](#queueresume)
- [Queue#whenCurrentJobsFinished](#queuewhencurrentjobsfinished)
- [Queue#count](#queuecount)
- [Queue#empty](#queueempty)
- [Queue#clean](#queueclean)
Expand Down Expand Up @@ -149,7 +150,7 @@ process(name: string, concurrency: number, processor: ((job, done?) => Promise<a

Defines a processing function for the jobs in a given Queue.

The callback is called everytime a job is placed in the queue. It is passed an instance of the job as first argument.
The callback is called every time a job is placed in the queue. It is passed an instance of the job as first argument.

If the callback signature contains the second optional `done` argument, the callback will be passed a `done` callback to be called after the job has been completed. The `done` callback can be called with an Error instance, to signal that the job did not complete successfully, or with a result as second argument (e.g.: `done(null, result);`) when the job is successful. Errors will be passed as a second argument to the "failed" event;
results, as a second argument to the "completed" event.
Expand Down Expand Up @@ -301,11 +302,13 @@ interface BackoffOpts {
### Queue#pause

```ts
pause(isLocal?: boolean): Promise
pause(isLocal?: boolean, doNotWaitActive?: boolean): Promise
```

Returns a promise that resolves when the queue is paused. A paused queue will not process new jobs until resumed, but current jobs being processed will continue until they are finalized. The pause can be either global or local. If global, all workers in all queue instances for a given queue will be paused. If local, just this worker will stop processing new jobs after the current lock expires. This can be useful to stop a worker from taking new jobs prior to shutting down.

If `doNotWaitActive` is `true`, `pause` will *not* wait for any active jobs to finish before resolving. Otherwise, `pause` *will* wait for active jobs to finish. See [Queue#whenCurrentJobsFinished](#queuewhencurrentjobsfinished) for more information.

Pausing a queue that is already paused does nothing.

---
Expand All @@ -322,6 +325,16 @@ Resuming a queue that is not paused does nothing.

---

### Queue#whenCurrentJobsFinished

```ts
whenCurrentJobsFinished(): Promise<Void>
```

Returns a promise that resolves when all jobs currently being processed by this worker have finished.

---

### Queue#count

```ts
Expand Down
53 changes: 12 additions & 41 deletions lib/queue.js
Expand Up @@ -1174,49 +1174,20 @@ Queue.prototype.clean = function(grace, type, limit) {
* @returns {Promise}
*/
Queue.prototype.whenCurrentJobsFinished = function() {
return new Promise((resolve, reject) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the unnecessary new Promise.

if (!this.bclientInitialized) {
// bclient not yet initialized, so no jobs to wait for
return resolve();
}

//
// Force reconnection of blocking connection to abort blocking redis call immediately.
//
const forcedReconnection = redisClientDisconnect(this.bclient).then(() => {
return this.bclient.connect();
});
if (!this.bclientInitialized) {
// bclient not yet initialized, so no jobs to wait for
return Promise.resolve();
}

Promise.all(this.processing)
.then(() => {
return forcedReconnection;
})
.then(resolve, reject);

/*
this.bclient.disconnect();
this.bclient.once('end', function(){
console.error('ENDED!');
setTimeout(function(){
this.bclient.connect();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed all this commented code.

}, 0);
});
//
// Force reconnection of blocking connection to abort blocking redis call immediately.
//
const forcedReconnection = redisClientDisconnect(this.bclient).then(() => {
return this.bclient.connect();
});

/*
var stream = this.bclient.connector.stream;
if(stream){
stream.on('finish', function(){
console.error('FINISHED!');
this.bclient.connect();
});
stream.on('error', function(err){
console.error('errir', err);
this.bclient.connect();
});
this.bclient.connect();
}
*/
//this.bclient.connect();
return Promise.all([this.processing[0]]).then(() => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After upgrade to latest version of bull we are seeing that on shutdown, it doesn't properly wait for all jobs to complete.

It seems to me that this is likely the change that caused the problem. Was it intentional to change this to only wait for the first job?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that seems like a dumb mistake on my part. Thanks for catching it @holm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix: #1586

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick fix!

return forcedReconnection;
});
};

Expand Down
97 changes: 97 additions & 0 deletions test/test_when_current_jobs_finished.js
@@ -0,0 +1,97 @@
'use strict';

const expect = require('chai').expect;
const redis = require('ioredis');
const utils = require('./utils');
const delay = require('delay');
const sinon = require('sinon');

describe('.whenCurrentJobsFinished', () => {
let client;
beforeEach(() => {
client = new redis();
return client.flushdb();
});

afterEach(async () => {
sinon.restore();
await utils.cleanupQueues();
await client.flushdb();
return client.quit();
});

it('should handle queue with no processor', async () => {
const queue = await utils.newQueue();
expect(await queue.whenCurrentJobsFinished()).to.equal(undefined);
});

it('should handle queue with no jobs', async () => {
const queue = await utils.newQueue();
queue.process(() => Promise.resolve());
expect(await queue.whenCurrentJobsFinished()).to.equal(undefined);
});

it('should wait for job to complete', async () => {
const queue = await utils.newQueue();
await queue.add({});

let finishJob;

// wait for job to be active
await new Promise(resolve => {
queue.process(() => {
resolve();

return new Promise(resolve => {
finishJob = resolve;
});
});
});

let isFulfilled = false;
const finished = queue.whenCurrentJobsFinished().then(() => {
isFulfilled = true;
});

await delay(100);
expect(isFulfilled).to.equal(false);

finishJob();
expect(await finished).to.equal(
undefined,
'whenCurrentJobsFinished should resolve once jobs are finished'
);
});

it('should wait for job to fail', async () => {
const queue = await utils.newQueue();
await queue.add({});

let rejectJob;

// wait for job to be active
await new Promise(resolve => {
queue.process(() => {
resolve();

return new Promise((resolve, reject) => {
rejectJob = reject;
});
});
});

let isFulfilled = false;
const finished = queue.whenCurrentJobsFinished().then(() => {
isFulfilled = true;
});

await delay(100);
expect(isFulfilled).to.equal(false);

rejectJob();
expect(await finished).to.equal(
undefined,
'whenCurrentJobsFinished should resolve once jobs are finished'
);
});
});