Skip to content

Commit

Permalink
fix: wait in queue to be ready in getNextJob fixes #1852
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Jul 15, 2021
1 parent 29e4115 commit 4e224e5
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 19 deletions.
3 changes: 0 additions & 3 deletions lib/process/sandbox.js
Expand Up @@ -36,9 +36,6 @@ module.exports = function(processFile, childPool) {
case 'log':
job.log(msg.value);
break;
case 'update':
job.update(msg.value);
break;
}
};

Expand Down
38 changes: 22 additions & 16 deletions lib/queue.js
Expand Up @@ -1129,7 +1129,7 @@ Queue.prototype.multi = function() {
/**
Returns a promise that resolves to the next job in queue.
*/
Queue.prototype.getNextJob = function() {
Queue.prototype.getNextJob = async function() {
if (this.closing) {
return Promise.resolve();
}
Expand All @@ -1138,27 +1138,33 @@ Queue.prototype.getNextJob = function() {
//
// Waiting for new jobs to arrive
//
return this.bclient
.brpoplpush(this.keys.wait, this.keys.active, this.settings.drainDelay)
.then(
jobId => {
if (jobId) {
return this.moveToActive(jobId);
}
},
err => {
// Swallow error if locally paused since we did force a disconnection
if (!(this.paused && err.message === 'Connection is closed.')) {
throw err;
}
}
try {
const jobId = await this.bclient.brpoplpush(
this.keys.wait,
this.keys.active,
this.settings.drainDelay
);

if (jobId) {
return this.moveToActive(jobId);
}
} catch (err) {
err => {
// Swallow error if locally paused since we did force a disconnection
if (!(this.paused && err.message === 'Connection is closed.')) {
throw err;
}
};
}
} else {
return this.moveToActive();
}
};

Queue.prototype.moveToActive = function(jobId) {
Queue.prototype.moveToActive = async function(jobId) {
// For manual retrieving jobs we need to wait for the queue to be ready.
await this.isReady();

return scripts.moveToActive(this, jobId).then(([jobData, jobId]) => {
return this.nextJobFromJobData(jobData, jobId);
});
Expand Down

0 comments on commit 4e224e5

Please sign in to comment.