Skip to content

Commit

Permalink
Merge 5fcad86 into e239b6e
Browse files Browse the repository at this point in the history
  • Loading branch information
niftylettuce committed Aug 4, 2017
2 parents e239b6e + 5fcad86 commit 1bc0432
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 26 deletions.
20 changes: 18 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,12 @@ You can also specify it during instantiation
var agenda = new Agenda({defaultLockLifetime: 10000});
```

### sort(query)

Takes a `query` which specifies the sort query to be used for finding and locking the next job.

By default it is `{ nextRunAt: 1, priority: -1 }`, which obeys a first in first out approach, with respect to priority.

## Agenda Events

An instance of an agenda will emit the following events:
Expand Down Expand Up @@ -732,6 +738,15 @@ agenda.on('fail:send email', function(err, job) {

## Frequently Asked Questions

### What is the order in which jobs run?

Jobs are run **with priority in a first in first out order** (so they will be run in the order they were queued/created _AND_ with respect to highest priority). We utilize MongoDB's ObjectID through the job record property of `_id` as a sorting value (since it is a [timestamp](https://docs.mongodb.com/manual/reference/method/ObjectId.getTimestamp/) in itself).

For example, if we have two jobs named "send-email" queued (both with the same priority), and the first job is queued at 3:00 PM and second job is queued at 3:05 PM with the same `priority` value, then the first job will run first if we start to send "send-email" jobs at 3:10 PM. However if the first job has a priority of `5` and the second job has a priority of `10`, then the second will run first (priority takes precedence) at 3:10 PM.

The default MongoDB sort object is `{ nextRunAt: 1, priority: -1 }` and can be changed through the option `sort` when configuring Agenda.


### Sample Project Structure?

Agenda doesn't have a preferred project structure and leaves it to the user to
Expand Down Expand Up @@ -956,7 +971,7 @@ _This library treats months as 0-11 where as normally, cron months are parsed as

```
* * * * * *
| | | | | |
| | | | | |
| | | | | +-- Year (range: 1900-3000)
| | | | +---- Day of the Week (range: 1-7, 1 standing for Monday)
| | | +------ Month of the Year (range: 0-11) NOTE: Difference here
Expand All @@ -968,7 +983,7 @@ _This library treats months as 0-11 where as normally, cron months are parsed as
Starting in version `1.0.0`, cron will be parsed in the standard UNIX style:
```
* * * * * *
| | | | | |
| | | | | |
| | | | | +-- Year (range: 1900-3000)
| | | | +---- Day of the Week (range: 1-7, 1 standing for Monday)
| | | +------ Month of the Year (range: 1-12) NOTE: Difference here
Expand Down Expand Up @@ -1011,6 +1026,7 @@ Agenda has some great community members that help a great deal.
- [@liamdon](http://github.com/liamdon)
- [@loris](http://github.com/loris)
- [@jakeorr](http://github.com/jakeorr)
- [@niftylettuce](http://github.com/niftylettuce)


# License
Expand Down
51 changes: 29 additions & 22 deletions lib/agenda.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var Agenda = module.exports = function(config, cb) {
this._lockedJobs = [];
this._jobQueue = [];
this._defaultLockLifetime = config.defaultLockLifetime || 10 * 60 * 1000; // 10 minute default lockLifetime
this._sort = config.sort || { nextRunAt: 1, priority: -1 };

this._isLockingOnTheFly = false;
this._jobsToLock = [];
Expand Down Expand Up @@ -115,20 +116,18 @@ Agenda.prototype.db_init = function(collection, cb){
this._collection = this._mdb.collection(collection || 'agendaJobs');
var self = this;
debug('attempting index creation');
this._collection.createIndexes([{
"key": {"name" : 1, "priority" : -1, "lockedAt" : 1, "nextRunAt" : 1, "disabled" : 1},
"name": "findAndLockNextJobIndex1"
}, {
"key": {"name" : 1, "lockedAt" : 1, "priority" : -1, "nextRunAt" : 1, "disabled" : 1},
"name": "findAndLockNextJobIndex2"
}], function(err, result) {
if (err) {
debug('index creation failed, attempting legacy index creation next');
} else {
debug('index creation success');
this._collection.createIndex(
Object.assign({ name : 1 }, this._sort, { priority: -1, lockedAt: 1, nextRunAt: 1, disabled: 1 }),
{ name: 'findAndLockNextJobIndex' },
function(err, result) {
if (err) {
debug('index creation failed, attempting legacy index creation next');
} else {
debug('index creation success');
}
handleLegacyCreateIndex(err, result, self, cb)
}
handleLegacyCreateIndex(err, result, self, cb)
});
);
};

/**
Expand All @@ -147,12 +146,8 @@ function handleLegacyCreateIndex(err, result, self, cb) {
// Looks like a mongo.version < 2.4.x
err = null;
self._collection.ensureIndex(
{"name": 1, "priority": -1, "lockedAt": 1, "nextRunAt": 1, "disabled": 1},
{name: "findAndLockNextJobIndex1"}
);
self._collection.ensureIndex(
{"name": 1, "lockedAt": 1, "priority": -1, "nextRunAt": 1, "disabled": 1},
{name: "findAndLockNextJobIndex2"}
Object.assign({ name: 1 }, this._sort, { priority: -1, lockedAt: 1, nextRunAt: 1, disabled: 1 }),
{ name: 'findAndLockNextJobIndex' }
);
self.emit('ready');
}
Expand Down Expand Up @@ -240,6 +235,18 @@ Agenda.prototype.defaultLockLifetime = function(ms) {
return this;
};

/**
* Set the sort query for finding next job
* Default is { nextRunAt: 1, priority: -1 }
* @param {Object} query sort query object for MongoDB
* @returns {exports} agenda instance
*/
Agenda.prototype.sort = function(query) {
debug('Agenda.sort([Object])');
this._sort = query;
return this;
}

/**
* Given a name and some data, create a new job
* @param {String} name name of job
Expand Down Expand Up @@ -709,9 +716,9 @@ Agenda.prototype._findAndLockNextJob = function(jobName, definition, cb) {

/**
* Query used to affect what gets returned
* @type {{returnOriginal: boolean, priority: number}}
* @type {{returnOriginal: boolean, sort: object}}
*/
var JOB_RETURN_QUERY = {returnOriginal: false, priority: -1};
var JOB_RETURN_QUERY = {returnOriginal: false, sort: this._sort };

// Find ONE and ONLY ONE job and set the 'lockedAt' time so that job begins to be processed
this._collection.findOneAndUpdate(JOB_PROCESS_WHERE_QUERY, JOB_PROCESS_SET_QUERY, JOB_RETURN_QUERY, function(err, result) {
Expand Down Expand Up @@ -1067,4 +1074,4 @@ function processJobs(extraJob) {
// Re-process jobs now that one has finished
jobProcessing();
}
}
}
11 changes: 10 additions & 1 deletion test/agenda.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,15 @@ describe('Agenda', () => {
expect(jobs._definitions.testDefaultLockLifetime.lockLifetime).to.be(7777);
});
});
describe('sort', () => {
it('returns itself', () => {
expect(jobs.sort({ nextRunAt: 1, priority: -1 })).to.be(jobs);
});
it('sets the default sort option', () => {
jobs.sort({ nextRunAt: -1 });
expect(jobs._sort).to.eql({ nextRunAt: -1 });
});
});
});

describe('job methods', () => {
Expand Down Expand Up @@ -460,4 +469,4 @@ describe('Agenda', () => {
});
});
});
});
});
110 changes: 109 additions & 1 deletion test/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,114 @@ describe('Job', () => {
jobs.schedule(new Date(now + 100), 'non-blocking', {i: 3});
}, 100);
});

it('should run jobs as first in first out (FIFO)', function(done) {

var results = [];

jobs.define('fifo', { concurrency: 1 }, function() {
// do something
});

jobs.on('start:fifo', checkResults);

jobs.now('fifo', function(err) {
if (err) return done(err);
setTimeout(function() {
jobs.now('fifo', function(err) {
if (err) return done(err);
setTimeout(function() {
jobs.now('fifo', function(err) {
if (err) return done(err);
jobs.start();
});
}, 100);
});
}, 100);
});

function checkResults(job) {
results.push(new Date(job.attrs.nextRunAt).getTime());
if (results.length !== 3) return;
expect(results.join('')).to.eql(results.sort().join(''));
done();
}

});

it('should run jobs as first in first out (FIFO) with respect to priority', function(done) {

var times = [];
var priorities = [];
var now = Date.now();

jobs.define('fifo-priority', { concurrency: 1 }, function() {
// do something
});

jobs.on('start:fifo-priority', checkResults);

jobs.create('fifo-priority').schedule(new Date(now)).priority('high').save(err => {
if (err) return done(err);
jobs.create('fifo-priority').schedule(new Date(now + 100)).priority('low').save(err => {
if (err) return done(err);
jobs.create('fifo-priority').schedule(new Date(now + 100)).priority('high').save(err => {
if (err) return done(err);
jobs.start();
});
});
});

function checkResults(job) {
priorities.push(job.attrs.priority);
times.push(new Date(job.attrs.nextRunAt).getTime());
if (priorities.length !== 3 || times.length !== 3) return;
expect(times.join('')).to.eql(times.sort().join(''));
expect(priorities).to.eql([10, 10, -10]);
done();
}

});

it('should run higher priority jobs first', function(done) {

// inspired by tests added by @lushc here:
// <https://github.com/agenda/agenda/pull/451/commits/336ff6445803606a6dc468a6f26c637145790adc>
var now = new Date();
var results = [];

jobs.define('priority', { concurrency: 1 }, function() {
// do something
});

jobs.on('start:priority', checkResults);

jobs.create('priority').schedule(now).save(function(err) {
if (err) return done(err);
jobs.create('priority').schedule(now).priority('low').save(function(err) {
if (err) return done(err);
jobs.create('priority').schedule(now).priority('high').save(function(err) {
if (err) return done(err);
jobs.start();
});
});
});

function checkResults(job) {
results.push(job.attrs.priority);
if (results.length !== 3) return;
expect(results).to.eql([10, 0, -10]);
done();
}

});

it('should support custom sort option', function() {
var sort = { foo: 1 };
var agenda = new Agenda({ sort: sort });
expect(agenda._sort).to.eql(sort);
});

});

describe('every running', () => {
Expand Down Expand Up @@ -1088,4 +1196,4 @@ describe('Job', () => {
});
});
});
});
});

0 comments on commit 1bc0432

Please sign in to comment.