Skip to content
Permalink
Browse files

Fixes for failing test cases (#852)

These modifications try to fix an issue with failing test cases which are preventing contributions to make it to the repo (when these contributions have nothing to do with the tests themselves).

The changes are:
* new queuing mechanism since the previous didn't guarantee priority when executing jobs scheduled the same datetime (agenda uses setTiemout and sometimes the job with lowest priority was executed, though not processed, first). Test case involved: job concurrency -> 'should run jobs as first in first out (FIFO) with respect to priority'

* On repeatEvery -> 'sets the nextRunAt property with skipImmediate' test case we expect precise timing in milliseconds and that's not always the with the processing we do to set repeatEvery, sometimes we schedule the job one millisecond after the expected time, I find this acceptable and changed the test case to expect something in the range of 3 minutes sharp and 3 minutes + 2 milliseconds for nextRunAt.

* start/stop -> 'clears locks on stop' fails on many PRs not being able to find 'longRunningJob' on db. I provided the param job on define callback since not having params has caused trouble before and increased jobTimeout when we execute the tests on TravisCI (actually I only set the TRAVIS env var on .travis.yml, the tests were already prepared to increase this timeout when that env var is present). Hopefully it'll help, never suffered the issue when testing locally, I've only seen it on TravisCI.
  • Loading branch information
dmbarreiro authored and simison committed Nov 24, 2019
1 parent 4be084e commit e18b900680231e20bd24696c9446239f48e13265
Showing with 101 additions and 70 deletions.
  1. +1 −0 .travis.yml
  2. +2 −1 lib/agenda/index.js
  3. +79 −0 lib/agenda/job-processing-queue.js
  4. +6 −47 lib/utils/process-jobs.js
  5. +13 −22 test/job.js
@@ -22,6 +22,7 @@ env:
global:
# NodeJS v4+ requires gcc 4.8+
- CXX=g++-4.9 CC=gcc-4.9
- TRAVIS=true
script: npm test
before_script:
- mongo --version
@@ -1,5 +1,6 @@
const {EventEmitter} = require('events');
const humanInterval = require('human-interval');
const JobProcessingQueue = require('./job-processing-queue');

/**
* @class Agenda
@@ -38,7 +39,7 @@ class Agenda extends EventEmitter {
this._definitions = {};
this._runningJobs = [];
this._lockedJobs = [];
this._jobQueue = [];
this._jobQueue = new JobProcessingQueue();
this._defaultLockLifetime = config.defaultLockLifetime || 10 * 60 * 1000; // 10 minute default lockLifetime
this._sort = config.sort || {nextRunAt: 1, priority: -1};
this._indices = Object.assign({name: 1}, this._sort, {priority: -1, lockedAt: 1, nextRunAt: 1, disabled: 1});
@@ -0,0 +1,79 @@
/**
* @class
* @param {Object} args - Job Options
* @property {Object} agenda - The Agenda instance
* @property {Object} attrs
*/
class JobProcessingQueue {
constructor() {
this._queue = [];
}

get length() {
return this._queue.length;
}
}

/**
* Pops and returns last queue element (next job to be processed) without checking concurrency.
* @returns {Job} Next Job to be processed
*/
JobProcessingQueue.prototype.pop = function() {
return this._queue.pop();
};

/**
* Inserts job in first queue position
* @param {Job} job job to add to queue
* @returns {undefined}
*/
JobProcessingQueue.prototype.push = function(job) {
this._queue.push(job);
};

/**
* Inserts job in queue where it will be order from left to right in decreasing
* order of nextRunAt and priority (in case of same nextRunAt), if all values
* are even the first jobs to be introduced will have priority
* @param {Job} job job to add to queue
* @returns {undefined}
*/
JobProcessingQueue.prototype.insert = function(job) {
const matchIndex = this._queue.findIndex(element => {
if (element.attrs.nextRunAt.getTime() <= job.attrs.nextRunAt.getTime()) {
if (element.attrs.nextRunAt.getTime() === job.attrs.nextRunAt.getTime()) {
if (element.attrs.priority >= job.attrs.priority) {
return true;
}
} else {
return true;
}
}
return false;
});

if (matchIndex === -1) {
this._queue.push(job);
} else {
this._queue.splice(matchIndex, 0, job);
}
};

/**
* Returns (does not pop, element remains in queue) first element (always from the right)
* that can be processed (not blocked by concurrency execution)
* @param {Object} agendaDefinitions job to add to queue
* @returns {Job} Next Job to be processed
*/
JobProcessingQueue.prototype.returnNextConcurrencyFreeJob = function(agendaDefinitions) {
let next;
for (next = this._queue.length - 1; next > 0; next -= 1) {
const def = agendaDefinitions[this._queue[next].attrs.name];
if (def.concurrency > def.running) {
break;
}
}
return this._queue[next];
};

module.exports = JobProcessingQueue;
@@ -65,46 +65,13 @@ module.exports = function(extraJob) {
* @param {boolean} inFront puts the job in front of queue if true
* @returns {undefined}
*/
function enqueueJobs(jobs, inFront) {
function enqueueJobs(jobs) {
if (!Array.isArray(jobs)) {
jobs = [jobs];
}

jobs.forEach(job => {
let jobIndex;
let start;
let loopCondition;
let endCondition;
let inc;

if (inFront) {
start = jobQueue.length ? jobQueue.length - 1 : 0;
inc = -1;
loopCondition = function() {
return jobIndex >= 0;
};
endCondition = function(queuedJob) {
return !queuedJob || queuedJob.attrs.priority < job.attrs.priority;
};
} else {
start = 0;
inc = 1;
loopCondition = function() {
return jobIndex < jobQueue.length;
};
endCondition = function(queuedJob) {
return queuedJob.attrs.priority >= job.attrs.priority;
};
}

for (jobIndex = start; loopCondition(); jobIndex += inc) {
if (endCondition(jobQueue[jobIndex])) {
break;
}
}

// Insert the job to the queue at its prioritized position for processing
jobQueue.splice(jobIndex, 0, job);
jobQueue.insert(job);
});
}

@@ -233,17 +200,7 @@ module.exports = function(extraJob) {
const now = new Date();

// Get the next job that is not blocked by concurrency
let next;
for (next = jobQueue.length - 1; next > 0; next -= 1) {
const def = definitions[jobQueue[next].attrs.name];
if (def.concurrency > def.running) {
break;
}
}

// We now have the job we are going to process and its definition
const job = jobQueue.splice(next, 1)[0];
const jobDefinition = definitions[job.attrs.name];
const job = jobQueue.returnNextConcurrencyFreeJob(definitions);

debug('[%s:%s] about to process job', job.attrs.name, job.attrs._id);

@@ -264,6 +221,8 @@ module.exports = function(extraJob) {
*/
async function runOrRetry() {
if (self._processInterval) {
const job = jobQueue.pop();
const jobDefinition = definitions[job.attrs.name];
if (jobDefinition.concurrency > jobDefinition.running && self._runningJobs.length < self._maxConcurrency) {
// Get the deadline of when the job is not supposed to go past for locking
const lockDeadline = new Date(Date.now() - jobDefinition.lockLifetime);
@@ -295,7 +254,7 @@ module.exports = function(extraJob) {
} else {
// Run the job immediately by putting it on the top of the queue
debug('[%s:%s] concurrency preventing immediate run, pushing job to top of queue', job.attrs.name, job.attrs._id);
enqueueJobs(job, true);
enqueueJobs(job);
}
}
}
@@ -100,7 +100,9 @@ describe('Job', () => {
it('sets the nextRunAt property with skipImmediate', () => {
const now = new Date();
job.repeatEvery('3 minutes', {skipImmediate: true});
expect(job.attrs.nextRunAt).to.be(now.valueOf() + 180000);
const lowerBound = now.valueOf() + 180000;
const upperBound = now.valueOf() + 180000 + 2;
expect(job.attrs.nextRunAt).to.be.within(lowerBound, upperBound); // Inclusive
});
});

@@ -314,12 +316,11 @@ describe('Job', () => {
agenda.define('failBoat', () => {
throw new Error('Zomg fail');
});
job.run().catch(err => {
expect(err.message).to.be('Zomg fail');
});
await job.run();
expect(job.attrs.failReason).to.be('Zomg fail');
});

it('handles errors with q promises', () => {
it('handles errors with q promises', async() => {
job.attrs.name = 'failBoat2';
agenda.define('failBoat2', (job, cb) => {
Q.delay(100)
@@ -329,9 +330,8 @@ describe('Job', () => {
.fail(cb)
.done();
});
job.run().catch(err => {
expect(err).to.be.ok();
});
await job.run();
expect(job.attrs.failReason).to.be.ok();
});

it('allows async functions', async() => {
@@ -552,12 +552,8 @@ describe('Job', () => {
await j.remove();
await j.save();

agenda.jobs({name: 'another job'}, (err, res) => {
if (err) {
throw err;
}
expect(res).to.have.length(0);
});
const jobs = await agenda.jobs({name: 'another job'});
expect(jobs).to.have.length(0);
});

it('returns the job', async() => {
@@ -613,7 +609,7 @@ describe('Job', () => {
});

it('clears locks on stop', async() => {
agenda.define('longRunningJob', () => {
agenda.define('longRunningJob', job => { // eslint-disable-line no-unused-vars
// Job never finishes
});
agenda.every('10 seconds', 'longRunningJob');
@@ -622,13 +618,8 @@ describe('Job', () => {
await agenda.start();
await delay(jobTimeout);
await agenda.stop();

agenda._collection.findOne({name: 'longRunningJob'}, (err, job) => {
if (err) {
throw err;
}
expect(job.lockedAt).to.be(null);
});
const job = await agenda._collection.findOne({name: 'longRunningJob'});
expect(job.lockedAt).to.be(null);
});

describe('events', () => {

0 comments on commit e18b900

Please sign in to comment.
You can’t perform that action at this time.