diff --git a/example/server.js b/example/server.js new file mode 100644 index 0000000..ce74066 --- /dev/null +++ b/example/server.js @@ -0,0 +1,34 @@ +var Hapi = require('hapi'); +var mongoUrl = 'mongodb://localhost:27017/hapi-job-queue-test'; + +var server = new Hapi.Server(); + +server.connection({ port: process.argv[2] || 3000 }); + +server.register([ + { register: require('../'), options: { + connectionUrl: mongoUrl, + endpoint: '', + auth: false, + jobs: [ + { + name: 'test-job', + enabled: true, + schedule: 'every 5 seconds', + method: function(data, cb) { + console.log(data); + + setTimeout(cb, 100); + }, + tasks: ['Tick', 'Tock'] + } + ] + } } +], function(err) { + if (err) { + console.error(err); + } + server.start(function() { + console.log('Clock started'); + }); +}); diff --git a/lib/index.js b/lib/index.js index 991f2dc..977bf9b 100644 --- a/lib/index.js +++ b/lib/index.js @@ -169,15 +169,19 @@ exports.register = function(plugin, options, next) { } if (!jobData.locked && jobData.enabled) { - self.collection.update({name: job}, { + self.collection.update({name: job, locked: false}, { $set: { locked: true } - }, function(err) { + }, function(err, updated) { if (err) { return plugin.log(['hapi-job-queue', 'error'], {error: err}); } + if (!updated.result.n) { + return plugin.log(['hapi-job-queue', 'job-locked'], { message: 'Job locked', job: job }); + } + var startTime = +new Date(); async.eachLimit(data, self.settings.concurrentTasks, function(task, cb) { @@ -252,9 +256,10 @@ exports.register = function(plugin, options, next) { } var nextRun = later.schedule(self.jobs[job].parsedTime).next(1); - self.jobs[job].nextRun = nextRun; - if (jobData.locked || !jobData.enabled) { + if ((jobData.locked || !jobData.enabled) && +new Date() > +new Date(jobData.nextRun)) { + self.jobs[job].nextRun = nextRun; + self.collection.update({ name: job}, { $set: { nextRun: nextRun @@ -265,16 +270,22 @@ exports.register = function(plugin, options, next) { } }); } else { - self.collection.update({name: job}, { + self.jobs[job].nextRun = nextRun; + + self.collection.update({name: job, locked: false}, { $set: { locked: true, nextRun: nextRun } - }, function(err) { + }, function(err, updated) { if (err) { return plugin.log(['hapi-job-queue', 'error'], {error: err}); } + if (!updated.result.n) { + return plugin.log(['hapi-job-queue', 'job-locked'], { message: 'Job locked', job: job }); + } + var startTime = +new Date(); var tasks = jobData.tasks; diff --git a/test/jobs.test.js b/test/jobs.test.js index e57bfba..e076bbe 100644 --- a/test/jobs.test.js +++ b/test/jobs.test.js @@ -5,6 +5,7 @@ var Lab = require('lab'); var lab = exports.lab = Lab.script(); var Hapi = require('hapi'); var Joi = require('joi'); +var async = require('async'); var describe = lab.describe; var it = lab.it; @@ -15,14 +16,19 @@ var expect = Code.expect; var mongoUrl = 'mongodb://localhost:27017/hapi-job-queue-test'; var server; +var server2; var plugin; +var plugin2; var output = null; before(function(done) { server = new Hapi.Server(); server.connection({port: 3000}); + server2 = new Hapi.Server(); + server2.connection({port: 3001}); output = null; single = false; + counter = 0; server.register(require('hapi-auth-bearer-token'), function (err) { server.auth.strategy('simple', 'bearer-access-token', { @@ -38,6 +44,20 @@ before(function(done) { }); }); + server2.register(require('hapi-auth-bearer-token'), function (err) { + server2.auth.strategy('simple', 'bearer-access-token', { + allowQueryToken: true, + accessTokenName: 'token', + validateFunc: function( token, callback ) { + if(token === "1234"){ + callback(null, true, { token: token }); + } else { + callback(null, false, { token: token }); + } + } + }); + }); + MongoClient.connect(mongoUrl, function(err, db) { if (err) { return next(err); @@ -57,6 +77,7 @@ before(function(done) { schedule: 'every 1 seconds', method: function(data, cb) { output = data.time; + counter++; setTimeout(cb, 100); }, @@ -78,7 +99,47 @@ before(function(done) { }); plugin = server.plugins.jobs; - done(); + + + //second test server + server2.register([ + { register: require('../'), options: { + connectionUrl: mongoUrl, + endpoint: '/jobs', + auth: 'simple', + jobs: [ + { + name: 'test-job', + enabled: true, + schedule: 'every 1 seconds', + method: function(data, cb) { + output = data.time; + counter++; + + setTimeout(cb, 100); + }, + tasks: [ + { + time: 1 + }, + { + time: 2 + } + ] + } + ] + } } + ], function() { + server2.method('testMethod', function(data, cb) { + output = 'methodized'; + setTimeout(cb, 50); + }); + + plugin2 = server2.plugins.jobs; + + done(); + }); + }); }); @@ -295,18 +356,35 @@ describe('job queue', { timeout: 5000 }, function() { describe('runner', function() { it('should run a job at the specified time', function(done) { output = null; - plugin.enable('test-job', function(err) { - plugin.reschedule('test-job', { schedule: 'every 1 seconds' }, function(err) { - expect(err).to.not.exist(); + counter = 0; + + // tests multi server setups. + async.parallel([ + function(next) { + plugin.enable('test-job', function(err) { + plugin.reschedule('test-job', { schedule: 'every 1 seconds' }, function(err) { + expect(err).to.not.exist(); + next(); + }); + }); + }, + function(next) { + plugin2.enable('test-job', function(err) { + plugin2.reschedule('test-job', { schedule: 'every 1 seconds' }, function(err) { + expect(err).to.not.exist(); + next(); + }); + }); + } + ], function() { + setTimeout(function() { + expect(output).to.equal(2); + expect(counter).to.equal(2); + done(); + }, 1400); + }); - setTimeout(function() { - expect(err).to.not.exist(); - expect(output).to.equal(2); - done(); - }, 1400); - }); - }); }); it('should lock a running job', function(done) {