diff --git a/lib/agenda.js b/lib/agenda.js index 0efbb42a3..f5f1bbbf5 100644 --- a/lib/agenda.js +++ b/lib/agenda.js @@ -58,7 +58,7 @@ utils.inherits(Agenda, Emitter); * @param {Function} cb called when MongoDB connection fails or passes * @returns {exports} instance of Agenda */ -Agenda.prototype.mongo = function( mdb, collection, cb ){ +Agenda.prototype.mongo = function(mdb, collection, cb) { this._mdb = mdb; this.db_init(collection, cb); return this; @@ -87,7 +87,7 @@ Agenda.prototype.database = function(url, collection, options, cb) { options = options || {}; var self = this; - MongoClient.connect(url, options, function( error, db ){ + MongoClient.connect(url, options, function(error, db) { if (error) { debug('error connecting to MongoDB using collection: [%s]', collection); if (cb) { @@ -99,7 +99,7 @@ Agenda.prototype.database = function(url, collection, options, cb) { } debug('successful connection to MongoDB using collection: [%s]', collection); self._mdb = db; - self.db_init( collection, cb ); + self.db_init(collection, cb); }); return this; }; @@ -110,7 +110,7 @@ Agenda.prototype.database = function(url, collection, options, cb) { * @param {Function} cb called when the db is initialized * @returns {undefined} */ -Agenda.prototype.db_init = function( collection, cb ){ +Agenda.prototype.db_init = function(collection, cb){ debug('init database collection using name [%s]', collection); this._collection = this._mdb.collection(collection || 'agendaJobs'); var self = this; @@ -121,7 +121,7 @@ Agenda.prototype.db_init = function( collection, cb ){ }, { "key": {"name" : 1, "lockedAt" : 1, "priority" : -1, "nextRunAt" : 1, "disabled" : 1}, "name": "findAndLockNextJobIndex2" - }], function( err, result ){ + }], function(err, result) { if (err) { debug('index creation failed, attempting legacy index creation next'); } else { @@ -139,8 +139,8 @@ Agenda.prototype.db_init = function( collection, cb ){ * @param {Function} cb called when indices fail or pass * @returns {undefined} */ -function handleLegacyCreateIndex(err, result, self, cb){ - if (err && err.message !== 'no such cmd: createIndexes'){ +function handleLegacyCreateIndex(err, result, self, cb) { + if (err && err.message !== 'no such cmd: createIndexes') { debug('not attempting legacy index, emitting error'); self.emit('error', err); } else { @@ -234,7 +234,7 @@ Agenda.prototype.defaultLockLimit = function(num) { * @param {Number} ms time in ms to set default lock * @returns {exports} agenda instance */ -Agenda.prototype.defaultLockLifetime = function(ms){ +Agenda.prototype.defaultLockLifetime = function(ms) { debug('Agenda.defaultLockLifetime(%d)', ms); this._defaultLockLifetime = ms; return this; @@ -254,21 +254,20 @@ Agenda.prototype.create = function(name, data) { return job; }; - /** * Finds all jobs matching 'query' * @param {Object} query object for MongoDB * @param {Function} cb called when fails or passes * @returns {undefined} */ -Agenda.prototype.jobs = function( query, cb ){ +Agenda.prototype.jobs = function(query, cb) { var self = this; - this._collection.find( query ).toArray( function( error, result ){ + this._collection.find(query).toArray(function(error, result) { var jobs; - if ( !error ){ - jobs = result.map( createJob.bind( null, self ) ); + if (!error) { + jobs = result.map(createJob.bind(null, self)); } - cb( error, jobs ); + cb(error, jobs); }); }; @@ -281,7 +280,7 @@ Agenda.prototype.jobs = function( query, cb ){ Agenda.prototype.purge = function(cb) { var definedNames = Object.keys(this._definitions); debug('Agenda.purge(%o)'); - this.cancel( {name: {$not: {$in: definedNames}}}, cb ); + this.cancel({name: {$not: {$in: definedNames}}}, cb); }; /** @@ -384,7 +383,6 @@ Agenda.prototype.every = function(interval, names, data, options, cb) { } }); }); - } }; @@ -477,7 +475,6 @@ Agenda.prototype.now = function(name, data, cb) { return job; }; - /** * Cancels any jobs matching the passed MongoDB query, and removes them from the database. * @param {Object} query MongoDB query to use when cancelling @@ -487,14 +484,14 @@ Agenda.prototype.now = function(name, data, cb) { */ Agenda.prototype.cancel = function(query, cb) { debug('attempting to cancel all Agenda jobs', query); - this._collection.deleteMany( query, function( error, result ){ + this._collection.deleteMany(query, function(error, result) { if (cb) { if (error) { debug('error trying to delete jobs from MongoDB'); } else { debug('jobs cancelled'); } - cb( error, result && result.result ? result.result.n : undefined ); + cb(error, result && result.result ? result.result.n : undefined); } }); }; @@ -506,7 +503,6 @@ Agenda.prototype.cancel = function(query, cb) { * @returns {undefined} */ Agenda.prototype.saveJob = function(job, cb) { - debug('attempting to save a job into Agenda instance'); // Grab information needed to save job but that we don't want to persist in MongoDB @@ -526,19 +522,22 @@ Agenda.prototype.saveJob = function(job, cb) { debug('set job props: \n%O', props); // Grab current time and set default query options for MongoDB - var now = new Date(), protect = {}, update = { $set: props }; + var now = new Date(), protect = {}, update = {$set: props}; debug('current time stored as %s', now.toISOString()); // If the job already had an ID, then update the properties of the job // i.e, who last modified it, etc if (id) { - // Update the job and process the resulting data' debug('job already has _id, calling findOneAndUpdate() using _id as query'); - this._collection.findOneAndUpdate({ _id: id }, update, { returnOriginal: false }, processDbResult); - + this._collection.findOneAndUpdate({ + _id: id + }, + update, { + returnOriginal: false + }, + processDbResult); } else if (props.type === 'single') { - // Job type set to 'single' so... // NOTE: Again, not sure about difference between 'single' here and 'once' in job.js debug('job with type of "single" found'); @@ -559,32 +558,30 @@ Agenda.prototype.saveJob = function(job, cb) { // Try an upsert // NOTE: 'single' again, not exactly sure what it means debug('calling findOneAndUpdate() with job name and type of "single" as query'); - this._collection.findOneAndUpdate( - { name: props.name, type: 'single' }, - update, - { upsert: true, returnOriginal: false }, - processDbResult - ); - + this._collection.findOneAndUpdate({ + name: props.name, + type: 'single' + }, + update, { + upsert: true, + returnOriginal: false + }, + processDbResult); } else if (unique) { - // If we want the job to be unique, then we can upsert based on the 'unique' query object that was passed in var query = job.attrs.unique; query.name = props.name; if (uniqueOpts && uniqueOpts.insertOnly) { - update = { $setOnInsert: props }; + update = {$setOnInsert: props}; } // Use the 'unique' query object to find an existing job or create a new one debug('calling findOneAndUpdate() with unique object as query: \n%O', query); - this._collection.findOneAndUpdate(query, update, { upsert: true, returnOriginal: false }, processDbResult); - + this._collection.findOneAndUpdate(query, update, {upsert: true, returnOriginal: false}, processDbResult); } else { - // If all else fails, the job does not exist yet so we just insert it into MongoDB debug('using default behavior, inserting new job via insertOne() with props that were set: \n%O', props); this._collection.insertOne(props, processDbResult); - } /** @@ -596,7 +593,6 @@ Agenda.prototype.saveJob = function(job, cb) { * @returns {undefined} */ function processDbResult(err, result) { - // Check if there is an error and either cb(error) or throw if there is no callback if (err) { debug('processDbResult() received an error, job was not updated/created'); @@ -606,14 +602,12 @@ Agenda.prototype.saveJob = function(job, cb) { throw err; } } else if (result) { - debug('processDbResult() called with success, checking whether to process job immediately or not'); // We have a result from the above calls // findAndModify() returns different results than insertOne() so check for that var res = result.ops ? result.ops : result.value; if (res) { - // If it is an array, grab the first job if (Array.isArray(res)) { res = res[0]; @@ -629,14 +623,12 @@ Agenda.prototype.saveJob = function(job, cb) { processJobs.call(self, job); } } - } // If we have a callback, return the Job instance if (fn) { fn(null, job); } - } }; @@ -677,7 +669,6 @@ Agenda.prototype.stop = function(cb) { * @returns {undefined} */ Agenda.prototype._findAndLockNextJob = function(jobName, definition, cb) { - var self = this, now = new Date(), lockDeadline = new Date(Date.now().valueOf() - definition.lockLifetime); debug('_findAndLockNextJob(%s, [Function], cb)', jobName); @@ -688,54 +679,55 @@ Agenda.prototype._findAndLockNextJob = function(jobName, definition, cb) { debug('missing MongoDB connection, not attempting to find and lock a job'); cb(new Error('No MongoDB Connection')); } else { - /** * Query used to find job to run * @type {{$or: [*]}} */ var JOB_PROCESS_WHERE_QUERY = { - $or: [ - { name: jobName, lockedAt: null, nextRunAt: { $lte: this._nextScanAt }, disabled: { $ne: true } }, - { name: jobName, - lockedAt: { $exists: false }, - nextRunAt: { $lte: this._nextScanAt }, - disabled: { $ne: true } - }, - { name: jobName, lockedAt: { $lte: lockDeadline }, disabled: { $ne: true } } - ] + $or: [{ + name: jobName, + lockedAt: null, + nextRunAt: {$lte: this._nextScanAt}, + disabled: {$ne: true} + }, { + name: jobName, + lockedAt: {$exists: false}, + nextRunAt: {$lte: this._nextScanAt}, + disabled: {$ne: true} + }, { + name: jobName, + lockedAt: {$lte: lockDeadline}, + disabled: {$ne: true} + }] }; /** * Query used to set a job as locked * @type {{$set: {lockedAt: Date}}} */ - var JOB_PROCESS_SET_QUERY = { $set: { lockedAt: now } }; + var JOB_PROCESS_SET_QUERY = {$set: {lockedAt: now}}; /** * Query used to affect what gets returned * @type {{returnOriginal: boolean, priority: number}} */ - var JOB_RETURN_QUERY = { returnOriginal: false, 'priority': -1 }; + var JOB_RETURN_QUERY = {returnOriginal: false, priority: -1}; // Find ONE and ONLY ONE job and set the 'lockedAt' time so that job begins to be processed - this._collection.findOneAndUpdate(JOB_PROCESS_WHERE_QUERY, JOB_PROCESS_SET_QUERY, JOB_RETURN_QUERY, - function(err, result) { - var job; - if (!err && result.value) { - debug('found a job available to lock, creating a new job on Agenda with id [%s]', result.value._id); - job = createJob(self, result.value); - } - if (err) { - debug('error occurred when running query to find and lock job'); - } - cb(err, job); + this._collection.findOneAndUpdate(JOB_PROCESS_WHERE_QUERY, JOB_PROCESS_SET_QUERY, JOB_RETURN_QUERY, function(err, result) { + var job; + if (!err && result.value) { + debug('found a job available to lock, creating a new job on Agenda with id [%s]', result.value._id); + job = createJob(self, result.value); } - ); - + if (err) { + debug('error occurred when running query to find and lock job'); + } + cb(err, job); + }); } }; - /** * Create Job object from data * @param {Object} agenda instance of Agenda @@ -760,7 +752,7 @@ Agenda.prototype._unlockJobs = function(done) { return job.attrs._id; }); debug('about to unlock jobs with ids: %O', jobIds); - this._collection.updateMany({_id: { $in: jobIds } }, { $set: { lockedAt: null } }, done); + this._collection.updateMany({_id: {$in: jobIds}}, {$set: {lockedAt: null}}, done); }; /** @@ -769,7 +761,6 @@ Agenda.prototype._unlockJobs = function(done) { * @returns {undefined} */ function processJobs(extraJob) { - // Make sure an interval has actually been set // Prevents race condition with 'Agenda.stop' and already scheduled run if (!this._processInterval) { @@ -781,7 +772,6 @@ function processJobs(extraJob) { // Determine whether or not we have a direct process call! if (!extraJob) { - // Go through each jobName set in 'Agenda.process' and fill the queue with the next jobs for (jobName in definitions) { if ({}.hasOwnProperty.call(definitions, jobName)) { @@ -789,14 +779,11 @@ function processJobs(extraJob) { jobQueueFilling(jobName); } } - } else if (definitions[extraJob.attrs.name]) { - // Add the job to list of jobs to lock and then lock it immediately! debug('job was passed directly to processJobs(), locking and running immediately'); self._jobsToLock.push(extraJob); lockOnTheFly(); - } /** @@ -854,12 +841,13 @@ function processJobs(extraJob) { } for (jobIndex = start; loopCondition(); jobIndex += inc) { - if (endCondition(jobQueue[jobIndex])) break; + if (endCondition(jobQueue[jobIndex])) { + break; + } } // Insert the job to the queue at its prioritized position for processing jobQueue.splice(jobIndex, 0, job); - }); } @@ -870,7 +858,6 @@ function processJobs(extraJob) { * @returns {undefined} */ function lockOnTheFly() { - // Already running this? Return if (self._isLockingOnTheFly) { debug('lockOnTheFly() already running, returning'); @@ -906,16 +893,15 @@ function processJobs(extraJob) { _id: job.attrs._id, lockedAt: null, nextRunAt: job.attrs.nextRunAt, - disabled: { $ne: true } + disabled: {$ne: true} }; // Update / options for the MongoDB query - var update = { $set: { lockedAt: now } }; - var options = { returnOriginal: false }; + var update = {$set: {lockedAt: now}}; + var options = {returnOriginal: false}; // Lock the job in MongoDB! self._collection.findOneAndUpdate(criteria, update, options, function(err, resp) { - // Did the "job" get locked? Create a job object and run if (resp.value) { debug('found a job that can be locked on the fly in MongoDB'); @@ -931,7 +917,6 @@ function processJobs(extraJob) { // Re-run in case anything is in the queue lockOnTheFly(); - }); } @@ -941,7 +926,6 @@ function processJobs(extraJob) { * @returns {undefined} */ function jobQueueFilling(name) { - // Don't lock because of a limit we have set (lockLimit, etc) if (!shouldLock(name)) { debug('lock limit reached in queue filling for [%s]', name); @@ -954,7 +938,6 @@ function processJobs(extraJob) { // For this job name, find the next job to run and lock it! self._findAndLockNextJob(name, definitions[name], function(err, job) { - if (err) { debug('[%s] job lock failed while filling queue', name); throw err; @@ -973,7 +956,6 @@ function processJobs(extraJob) { jobQueueFilling(name); jobProcessing(); } - }); } @@ -982,7 +964,6 @@ function processJobs(extraJob) { * @returns {undefined} */ function jobProcessing() { - // Ensure we have jobs if (!jobQueue.length) { return; @@ -995,11 +976,14 @@ function processJobs(extraJob) { var next; for (next = jobQueue.length - 1; next > 0; next -= 1) { var def = definitions[jobQueue[next].attrs.name]; - if (def.concurrency > def.running) break; + if (def.concurrency > def.running) { + break; + } } // We now have the job we are going to process and its definition var job = jobQueue.splice(next, 1)[0], jobDefinition = definitions[job.attrs.name]; + debug('[%s:%s] about to process job', job.attrs.name, job.attrs._id); // If the 'nextRunAt' time is older than the current time, run the job @@ -1048,11 +1032,9 @@ function processJobs(extraJob) { jobProcessing(); } else { - // Run the job immediately by putting it on the top of the queue debug('[%s:%s] concurrency preventing immediate run, pushing job to top of queue', job.attrs.name, job.attrs._id); enqueueJobs(job, true); - } } } @@ -1065,7 +1047,6 @@ function processJobs(extraJob) { * @returns {undefined} */ function processJobResult(err, job) { - if (err && !job) throw (err); var name = job.attrs.name; @@ -1085,6 +1066,5 @@ function processJobs(extraJob) { // Re-process jobs now that one has finished jobProcessing(); - } -} +} \ No newline at end of file