From 9b30402905a3d6cd0fd3920317576e8d1c8d69a5 Mon Sep 17 00:00:00 2001 From: darkterra Date: Thu, 4 Jan 2018 02:54:59 +0000 Subject: [PATCH] Add fix + feature --- README.md | 99 +++++++++++++++++++++++++------- lib/helper.js | 99 ++++++++++++++++++-------------- lib/scheduler.js | 145 +++++++++++++++++++++++++---------------------- 3 files changed, 214 insertions(+), 129 deletions(-) diff --git a/README.md b/README.md index f24cf70..187c29b 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -mongo-scheduler [![Build Status](https://travis-ci.org/jamplify/mongo-scheduler.png)](https://travis-ci.org/jamplify/mongo-scheduler) +mongo-scheduler ================== Persistent event scheduler using mongodb as storage @@ -16,17 +16,17 @@ Usage ### Initialization ```javascript -var Scheduler = require('mongo-scheduler') +var Scheduler = require('mongo-scheduer') var scheduler = new Scheduler(connection, options) ``` __Arguments__ -* connectionString - mongodb connections string (i.e.: "mongodb://localhost:27017/scheduler-db") or a mongoose connection object -* options - Options object +* connectionString \ - mongodb connections string (i.e.: "mongodb://localhost:27017/scheduler-db") or a mongoose connection object +* options \ - Options object __Valid Options__ -* pollInterval - Frequency in ms that the scheduler should poll the db. Default: 60000 (1 minute) -* doNotFire - If set to true, this instance will only schedule events, not fire them. Default: false +* pollInterval \ - Frequency in ms that the scheduler should poll the db. Default: 60000 (1 minute) +* doNotFire \ - If set to true, this instance will only schedule events, not fire them. Default: false --------------------------------------- @@ -40,23 +40,22 @@ scheduler.schedule(event) ``` __Arguments__ -* event - Event details -* [callback] +* event\ - Event details +* [callback] \Function> - callabck __Event Fields__ -* name - Name of event that should be fired -* [cron] - A cron string representing a frequency this should fire on -* [collection] - Info about the documents this event corresponds to -* [id] - Value of the _id field of the document this event corresponds to -* [after] - Time that the event should be triggered at, if left blank it will trigger the next time the scheduler polls -* [query] - a MongoDB query expression to select records that this event should be triggered for -* [data] - Extra data to attach to the event - +* name \ - Name of event that should be fired +* [cron] \ - A cron string representing a frequency this should fire on +* [collection] \ - Info about the documents this event corresponds to +* [id] \ - Value of the _id field of the document this event corresponds to +* [after] \ - Time that the event should be triggered at, if left blank it will trigger the next time the scheduler polls +* [query] \ - a MongoDB query expression to select records that this event should be triggered for +* [data] \ - Extra data to attach to the event --------------------------------------- -### on +### scheduler.on Event handler. @@ -68,9 +67,69 @@ scheduler.on('breakfast', function(meal, event) { }) ``` __Arguments__ -* eventName - Name of event -* handler - handler +* eventName \ - Name of event +* handler \ - handler + +--------------------------------------- + +### scheduler.list + +List all events. + +```javascript +scheduler.list('breakfast', function(err, events) { + // Do something with events +}) +``` + +__Arguments__ +* handler \ - handler + +--------------------------------------- + +### scheduler.find + +Find an event. + +```javascript +scheduler.find('breakfast', function(err, event) { + // Do something with event +}) +``` + +__Arguments__ +* eventName \ - Name of event +* handler \ - handler + +--------------------------------------- + +### scheduler.remove + +Remove an event. + +```javascript +scheduler.remove('breakfast', function(err, event) { + // Event has been removed +}) +``` + +__Arguments__ +* eventName \ - Name of event +* handler \ - handler + +--------------------------------------- + +### scheduler.enable + +Enable scheduler. +--------------------------------------- + +### scheduler.disable + +Disable scheduler. + +--------------------------------------- #### Error handling If the scheduler encounters an error it will emit an 'error' event. In this case the handler, will receive two arguments: the Error object, and the event doc (if applicable). @@ -78,4 +137,4 @@ If the scheduler encounters an error it will emit an 'error' event. In this case License ------- -MIT License +MIT License \ No newline at end of file diff --git a/lib/helper.js b/lib/helper.js index 80b7b32..5a425d8 100644 --- a/lib/helper.js +++ b/lib/helper.js @@ -1,71 +1,86 @@ -var _ = require('lodash'), - parser = require('cron-parser') +'use strict'; + +const _ = require('lodash'); +const parser = require('cron-parser'); function SchedulerError(message) { - this.name = "SchedulerError" - this.message = message || "Unexpected Scheduler Error" + this.name = "SchedulerError"; + this.message = message || "Unexpected Scheduler Error"; } -SchedulerError.prototype = new Error() + +SchedulerError.prototype = new Error(); SchedulerError.prototype.constructor = SchedulerError; function translateQueryfields(queryfields) { - return _.map(queryfields.split(' '), function(field) { - if(field === 'collection' || field === 'id') - return 'storage.' + field - else if(field === 'query' || field === 'after') - return 'conditions.' + field - else if(field === 'name') return 'event' - else return field - }) + return _.map(queryfields.split(' '), (field) => { + if(field === 'collection' || field === 'id') { + return 'storage.' + field; + } + else if(field === 'query' || field === 'after') { + return 'conditions.' + field; + } + else if(field === 'name') { + return 'event'; + } + else { + return field; + } + }); } module.exports = { - buildSchedule: function(details) { - var storage = _.extend({}, _.pick(details, 'collection', 'id')) - var conditions = _.extend({}, _.pick(details, 'query', 'after')) - var options = _.defaults(details.options || {}, { + buildSchedule: (details) => { + const storage = _.extend({}, _.pick(details, 'collection', 'id')); + const conditions = _.extend({}, _.pick(details, 'query', 'after')); + const options = _.defaults(details.options || {}, { emitPerDoc: false, - queryFields: 'name collection id' - }) + queryFields: 'name collection id after' + }); - var doc = { + const doc = { status: details.status || 'ready', event: details.name, - storage: storage, - conditions: conditions, + storage, + conditions, data: details.data, - options: options - } + options + }; - var queryFields = translateQueryfields(options.queryFields) - var query = _.transform(queryFields, function(memo, f) { - memo[f] = _.get(doc, f) - }, {}) + const queryFields = translateQueryfields(options.queryFields); + const query = _.transform(queryFields, function(memo, f) { + memo[f] = _.get(doc, f); + }, {}); if (!!details.cron) { - doc.cron = details.cron - doc.conditions.after = parser.parseExpression(details.cron).next() + doc.cron = details.cron; + doc.conditions.after = parser.parseExpression(details.cron).next(); } - return { doc: doc, query: query } + return { doc, query }; }, - buildEvent: function(doc) { - if (!doc) return; + buildEvent: (doc) => { + if (!doc) { + return; + } - doc.conditions.query = doc.conditions.query || {} - if (typeof doc.conditions.query === 'string') - doc.conditions.query = JSON.parse(doc.conditions.query) - if(doc.storage && doc.storage.id) - _.extend(doc.conditions.query, {_id: doc.storage.id}) - return doc + doc.conditions.query = doc.conditions.query || {}; + + if (typeof doc.conditions.query === 'string') { + doc.conditions.query = JSON.parse(doc.conditions.query); + } + if(doc.storage && doc.storage.id) { + _.extend(doc.conditions.query, {_id: doc.storage.id}); + } + + return doc; }, shouldExit: function(err, result) { - return !!err || !!(result.lastErrorObject && result.lastErrorObject.err) + return !!err || !!(result.lastErrorObject && result.lastErrorObject.err); }, buildError: function(err, result) { - return err || new SchedulerError(result.lastErrorObject.err) + return err || new SchedulerError(result.lastErrorObject.err); } -} +}; diff --git a/lib/scheduler.js b/lib/scheduler.js index 51e6338..bf95d10 100644 --- a/lib/scheduler.js +++ b/lib/scheduler.js @@ -1,38 +1,38 @@ -var helper = require('./helper') -var mongo = require('mongodb') -var ObjectId = mongo.ObjectId -var events = require("events") -var parser = require("cron-parser") +const helper = require('./helper'); +const mongo = require('mongodb'); +const ObjectId = mongo.ObjectId; +const events = require("events"); +const parser = require("cron-parser"); function Scheduler(connection, opts) { var self = this, MongoClient = mongo.MongoClient, ready = false, options = opts || {}, - db + db; - events.EventEmitter.call(this) + events.EventEmitter.call(this); if (connection instanceof Object) { - db = connection.db - ready = true + db = connection.db; + ready = true; } else { MongoClient.connect(connection, function(err, database) { - if (err) throw err - db = database - ready = true - }) + if (err) throw err; + db = database; + ready = true; + }); } function emit(event, doc, cb) { var command = { findAndModify: 'scheduled_events', query: {_id: event._id}, - } + }; - self.emit(event.event, doc, event) - if(!!cb) cb() + self.emit(event.event, doc, event); + if(!!cb) cb(); setTimeout(function() { if (!!event.cron) { @@ -41,14 +41,14 @@ function Scheduler(connection, opts) { status: 'ready', 'conditions.after': parser.parseExpression(event.cron).next() } - } - } else command.remove = true + }; + } else command.remove = true; db.command(command, function(err, result) { if (err) - return self.emit('error', helper.buildError(err, result)) - }) - }, 1000) + return self.emit('error', helper.buildError(err, result)); + }); + }, 1000); } function poll() { @@ -58,7 +58,7 @@ function Scheduler(connection, opts) { {'conditions.after': {'$exists': 0}}, {'conditions.after': {'$lte': new Date()}} ] - } + }; db.command({ findAndModify: 'scheduled_events', @@ -66,73 +66,83 @@ function Scheduler(connection, opts) { update: {$set: {status: 'running'}} }, function(err, result) { if(helper.shouldExit(err, result)) - return self.emit('error', helper.buildError(err, result)) + return self.emit('error', helper.buildError(err, result)); - var event = helper.buildEvent(result.value) + var event = helper.buildEvent(result.value); if (!event) return; if (!event.storage.collection) return emit(event, null, poll); db.collection(event.storage.collection, function(err, coll) { - if(err) return self.emit('error', err, event) + if(err) return self.emit('error', err, event); coll.find(event.conditions.query, function(err, cursor) { - if (err) return self.emit('error', err, event) + if (err) return self.emit('error', err, event); if(event.options.emitPerDoc || !!event.storage.id) cursor.each(function(err, doc) { - if (err) return self.emit('error', err, event) + if (err) return self.emit('error', err, event); if (!doc) return poll(); - emit(event, doc) - }) + emit(event, doc); + }); else cursor.toArray(function(err, results) { - if (err) return self.emit('error', err, event) - if (results.length !== 0) emit(event, results) - poll() - }) - }) - }) - }) + if (err) return self.emit('error', err, event); + if (results.length !== 0) emit(event, results); + poll(); + }); + }); + }); + }); } function whenReady(op) { return function() { - if(ready) return op.apply(self, arguments) + if(ready) return op.apply(self, arguments); - var args = arguments + var args = arguments; var id = setInterval(function() { if (!ready) return; - clearInterval(id) - op.apply(self, args) - }, 10) - } + clearInterval(id); + op.apply(self, args); + }, 10); + }; } function initialize() { - poll() - setInterval(poll, options.pollInterval || 60000) + poll(); + setInterval(poll, options.pollInterval || 60000); } function schedule(details, cb) { var info = helper.buildSchedule(details), - callback = cb || function() {} + callback = cb || function() {}; db.createCollection('scheduled_events', function(err, coll) { - coll.findAndModify(info.query, - ['event', 'asc'], - info.doc, - {new: true, upsert: true}, - callback) - }) + if (err) { + console.error(err); + } + else { + coll.findAndModify(info.query, + ['event', 'asc'], + info.doc, + {new: true, upsert: true}, + callback); + } + }); } function list(cb) { - var collection = db.collection('scheduled_events') - collection.find({}).toArray(cb) + var collection = db.collection('scheduled_events'); + collection.find({}).toArray(cb); } function find(name, cb) { - var collection = db.collection('scheduled_events') - collection.findOne({event: name}, cb) + var collection = db.collection('scheduled_events'); + collection.findOne({event: name}, cb); + } + + function remove(name, cb) { + var collection = db.collection('scheduled_events'); + collection.remove({event: name}, cb); } function updateStatus(status) { @@ -142,22 +152,23 @@ function Scheduler(connection, opts) { { status : status, 'conditions.after': parser.parseExpression(event.cron).next() } - } + }; collection.findAndModify({_id : ObjectId(event._id)}, ['event', 'asc'], update, {new: true}, function(err, result) { - cb(err, result.value) - }) - } + cb(err, result.value); + }); + }; } - this.schedule = whenReady(schedule) - this.list = whenReady(list) - this.find = whenReady(find) - this.enable = whenReady(updateStatus('ready')) - this.disable = whenReady(updateStatus('disabled')) + this.schedule = whenReady(schedule); + this.list = whenReady(list); + this.find = whenReady(find); + this.remove = whenReady(remove); + this.enable = whenReady(updateStatus('ready')); + this.disable = whenReady(updateStatus('disabled')); - if(!opts.doNotFire) whenReady(initialize)() + if(!opts.doNotFire) whenReady(initialize)(); } -Scheduler.prototype = new events.EventEmitter() -module.exports = Scheduler +Scheduler.prototype = new events.EventEmitter(); +module.exports = Scheduler; \ No newline at end of file