diff --git a/REFERENCE.md b/REFERENCE.md index 4c418d3d2..e47af6f54 100644 --- a/REFERENCE.md +++ b/REFERENCE.md @@ -6,6 +6,7 @@ - [Queue#add](#queueadd) - [Queue#pause](#queuepause) - [Queue#resume](#queueresume) + - [Queue#whenCurrentJobsFinished](#queuewhencurrentjobsfinished) - [Queue#count](#queuecount) - [Queue#empty](#queueempty) - [Queue#clean](#queueclean) @@ -149,7 +150,7 @@ process(name: string, concurrency: number, processor: ((job, done?) => Promise +``` + +Returns a promise that resolves when all jobs currently being processed by this worker have finished. + +--- + ### Queue#count ```ts diff --git a/lib/queue.js b/lib/queue.js index fbc1d0e11..7f067a7fe 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -1174,49 +1174,20 @@ Queue.prototype.clean = function(grace, type, limit) { * @returns {Promise} */ Queue.prototype.whenCurrentJobsFinished = function() { - return new Promise((resolve, reject) => { - if (!this.bclientInitialized) { - // bclient not yet initialized, so no jobs to wait for - return resolve(); - } - - // - // Force reconnection of blocking connection to abort blocking redis call immediately. - // - const forcedReconnection = redisClientDisconnect(this.bclient).then(() => { - return this.bclient.connect(); - }); + if (!this.bclientInitialized) { + // bclient not yet initialized, so no jobs to wait for + return Promise.resolve(); + } - Promise.all(this.processing) - .then(() => { - return forcedReconnection; - }) - .then(resolve, reject); - - /* - this.bclient.disconnect(); - this.bclient.once('end', function(){ - console.error('ENDED!'); - setTimeout(function(){ - this.bclient.connect(); - }, 0); - }); + // + // Force reconnection of blocking connection to abort blocking redis call immediately. + // + const forcedReconnection = redisClientDisconnect(this.bclient).then(() => { + return this.bclient.connect(); + }); - /* - var stream = this.bclient.connector.stream; - if(stream){ - stream.on('finish', function(){ - console.error('FINISHED!'); - this.bclient.connect(); - }); - stream.on('error', function(err){ - console.error('errir', err); - this.bclient.connect(); - }); - this.bclient.connect(); - } - */ - //this.bclient.connect(); + return Promise.all([this.processing[0]]).then(() => { + return forcedReconnection; }); }; diff --git a/test/test_when_current_jobs_finished.js b/test/test_when_current_jobs_finished.js new file mode 100644 index 000000000..67e3f161c --- /dev/null +++ b/test/test_when_current_jobs_finished.js @@ -0,0 +1,97 @@ +'use strict'; + +const expect = require('chai').expect; +const redis = require('ioredis'); +const utils = require('./utils'); +const delay = require('delay'); +const sinon = require('sinon'); + +describe('.whenCurrentJobsFinished', () => { + let client; + beforeEach(() => { + client = new redis(); + return client.flushdb(); + }); + + afterEach(async () => { + sinon.restore(); + await utils.cleanupQueues(); + await client.flushdb(); + return client.quit(); + }); + + it('should handle queue with no processor', async () => { + const queue = await utils.newQueue(); + expect(await queue.whenCurrentJobsFinished()).to.equal(undefined); + }); + + it('should handle queue with no jobs', async () => { + const queue = await utils.newQueue(); + queue.process(() => Promise.resolve()); + expect(await queue.whenCurrentJobsFinished()).to.equal(undefined); + }); + + it('should wait for job to complete', async () => { + const queue = await utils.newQueue(); + await queue.add({}); + + let finishJob; + + // wait for job to be active + await new Promise(resolve => { + queue.process(() => { + resolve(); + + return new Promise(resolve => { + finishJob = resolve; + }); + }); + }); + + let isFulfilled = false; + const finished = queue.whenCurrentJobsFinished().then(() => { + isFulfilled = true; + }); + + await delay(100); + expect(isFulfilled).to.equal(false); + + finishJob(); + expect(await finished).to.equal( + undefined, + 'whenCurrentJobsFinished should resolve once jobs are finished' + ); + }); + + it('should wait for job to fail', async () => { + const queue = await utils.newQueue(); + await queue.add({}); + + let rejectJob; + + // wait for job to be active + await new Promise(resolve => { + queue.process(() => { + resolve(); + + return new Promise((resolve, reject) => { + rejectJob = reject; + }); + }); + }); + + let isFulfilled = false; + const finished = queue.whenCurrentJobsFinished().then(() => { + isFulfilled = true; + }); + + await delay(100); + expect(isFulfilled).to.equal(false); + + rejectJob(); + expect(await finished).to.equal( + undefined, + 'whenCurrentJobsFinished should resolve once jobs are finished' + ); + }); +});