diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..4ae945b --- /dev/null +++ b/.editorconfig @@ -0,0 +1,15 @@ +# EditorConfig helps developers define and maintain consistent +# coding styles between different editors and IDEs +# editorconfig.org + +root = true + +[*] + +indent_style = space +indent_size = 2 + +end_of_line = lf +charset = utf-8 +trim_trailing_whitespace = true +insert_final_newline = false \ No newline at end of file diff --git a/.gitignore b/.gitignore index c2658d7..40b878d 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1 @@ -node_modules/ +node_modules/ \ No newline at end of file diff --git a/.jshintrc b/.jshintrc new file mode 100644 index 0000000..97fa98b --- /dev/null +++ b/.jshintrc @@ -0,0 +1,53 @@ +{ + "bitwise": true, + "camelcase": false, + "curly": false, + "eqeqeq": true, + "es3": false, + "forin": true, + "immed": true, + "indent": 2, + "latedef": "nofunc", + "newcap": true, + "noarg": true, + "noempty": true, + "nonew": false, + "plusplus": false, + "quotmark": true, + "regexp": false, + "undef": true, + "unused": true, + "strict": true, + "trailing": true, + "maxparams": 5, + "maxdepth": 2, + "maxstatements": 30, + "maxcomplexity": 10, + "maxlen": 300, + + "asi": false, + "boss": false, + "debug": false, + "eqnull": true, + "esnext": false, + "evil": false, + "expr": false, + "funcscope": false, + "globalstrict": false, + "iterator": false, + "lastsemic": false, + "laxbreak": false, + "laxcomma": false, + "loopfunc": false, + "moz": false, + "multistr": true, + "proto": false, + "scripturl": false, + "smarttabs": false, + "shadow": false, + "sub": false, + "supernew": false, + "validthis": true, + + "node": true +} diff --git a/lib/driver/redis.js b/lib/driver/redis.js index ca1dce8..2b8f2f5 100644 --- a/lib/driver/redis.js +++ b/lib/driver/redis.js @@ -1,3 +1,5 @@ +'use strict'; + var redis = require('redis'); /** @@ -59,6 +61,16 @@ RedisDriver.prototype.hset = function (worker, key, data, callback) { this.client.hset(this.getCompleteWorkerName(worker), key, data, callback); }; +/** + * Get a data in the worker + * @param string worker Worker + * @param string key Key + * @param function callback Callback + */ +RedisDriver.prototype.hget = function (worker, key, callback) { + this.client.hget(this.getCompleteWorkerName(worker), key, callback); +}; + /** * Get all property of a worker * @param string worker Worker diff --git a/lib/queue.js b/lib/queue.js index 656be63..98664da 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -1,3 +1,5 @@ +'use strict'; + var _ = require('lodash'); /** diff --git a/lib/worker.js b/lib/worker.js index e189428..59752b2 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -1,3 +1,5 @@ +'use strict'; + var _ = require('lodash'); var util = require('util'); var EventEmitter = require('events').EventEmitter; @@ -18,7 +20,7 @@ var EventEmitter = require('events').EventEmitter; */ function Worker (queue, driver, name, action, options) { EventEmitter.call(this); - + this.options = _.extend({ type: 'FIFO', loopSleepTime: 0, @@ -33,7 +35,7 @@ function Worker (queue, driver, name, action, options) { this.action = action; this.completeName = this.queue.getName() + ':' + name; this.initialized = false; -}; +} util.inherits(Worker, EventEmitter); @@ -122,7 +124,7 @@ Worker.prototype.start = function (callback) { this.setStatus(Worker.STATUS_STARTED, next); var endDate = new Date(Date.now() + (this.options.expire * 1000)).toJSON(); - this.setEndDate(endDate); + this.setEndDate(endDate, _.noop); }, this)); }; @@ -207,14 +209,14 @@ Worker.prototype.setStatus = function (status, callback) { var date = (new Date()).toJSON(); - this.driver.hset(this.completeName, 'status_changedate', date); + this.driver.hset(this.completeName, 'status_changedate', date, _.noop); if (status === Worker.STATUS_WORKING) this.driver.hincrby(this.completeName, 'cpt_action', 1); if (status === Worker.STATUS_STARTED) { - this.driver.hset(this.completeName, 'cpt_action', 0); - this.driver.hset(this.completeName, 'start_date', date); + this.driver.hset(this.completeName, 'cpt_action', 0, _.noop); + this.driver.hset(this.completeName, 'start_date', date, _.noop); } }; diff --git a/package.json b/package.json index 3cce63a..0a37181 100644 --- a/package.json +++ b/package.json @@ -27,7 +27,17 @@ "dependencies": { "redis": "~0.8.3", "optimist": "~0.5.2", - "lodash": "~2.4.1" + "lodash": "^2.4.1", + "async": "^0.9.0", + "moment": "^2.6.0" + }, + "devDependencies": { + "chai": "~1.7.2", + "mocha": "~1.13.0", + "sinon": "~1.7.3", + "sinon-chai": "~2.4.0", + "chai-things": "~0.2.0", + "proxyquire": "~0.5.2" }, "licence": "MIT" -} \ No newline at end of file +} diff --git a/test/.jshintrc b/test/.jshintrc new file mode 100644 index 0000000..3089676 --- /dev/null +++ b/test/.jshintrc @@ -0,0 +1,76 @@ +{ + "bitwise": true, + "camelcase": false, + "curly": false, + "eqeqeq": true, + "es3": false, + "forin": true, + "immed": true, + "indent": 2, + "latedef": "nofunc", + "newcap": true, + "noarg": true, + "noempty": true, + "nonew": true, + "plusplus": false, + "quotmark": true, + "regexp": false, + "undef": true, + "unused": true, + "strict": false, + "trailing": true, + "maxparams": false, + "maxdepth": 2, + "maxstatements": 0, + "maxcomplexity": 10, + "maxlen": 110, + "globals": { + "describe": false, + "it": false, + "after": false, + "afterEach": false, + "before": false, + "beforeEach": false, + "define": false, + "xit": false, + "xdescribe": false, + "window": false, + "document": false, + "inject": false, + "mocha": false, + "$": false, + "requirejs": false, + "angular": false, + "sinon": true, + "expect": true, + "chai": true, + "e2e": true, + "app": true, + "localStorage": false + }, + "asi": false, + "boss": false, + "debug": false, + "eqnull": true, + "esnext": false, + "evil": false, + "expr": true, + "funcscope": false, + "globalstrict": false, + "iterator": false, + "lastsemic": false, + "laxbreak": false, + "laxcomma": false, + "loopfunc": false, + "moz": false, + "multistr": true, + "proto": false, + "scripturl": false, + "smarttabs": false, + "shadow": false, + "sub": false, + "supernew": false, + "validthis": false, + "node": true, + "-W024": false +} diff --git a/test/bootstrap.js b/test/bootstrap.js new file mode 100644 index 0000000..72b9511 --- /dev/null +++ b/test/bootstrap.js @@ -0,0 +1,16 @@ +// chai +chai = require('chai'); +expect = chai.expect; +chai.Assertion.includeStack = true; + +// sinon +sinon = require('sinon'); +chai.use(require('sinon-chai')); +chai.use(require('chai-things')); + +var base = __dirname + '/../lib/'; + +app = { + base: base, + config: require('./config') +}; \ No newline at end of file diff --git a/test/config.js b/test/config.js new file mode 100644 index 0000000..2cb5c73 --- /dev/null +++ b/test/config.js @@ -0,0 +1,9 @@ +module.exports = { + redis: { + run: { + host: 'localhost', + port: 6379 + }, + db: 12 + } +}; \ No newline at end of file diff --git a/test/unit/queue.js b/test/unit/queue.js new file mode 100644 index 0000000..c6f2715 --- /dev/null +++ b/test/unit/queue.js @@ -0,0 +1,131 @@ +'use strict'; + +var async = require('async'); +var Queue = require(app.base + 'queue').Queue; +var RedisDriver = require(app.base + 'driver/redis').RedisDriver; + +describe('Queue', function () { + var queue, driver; + + before(function (done) { + driver = new RedisDriver(app.config.redis.run); + driver.db = app.config.redis.db; + queue = new Queue('test', driver); + queue.initialize(done); + }); + + after(function (done) { + driver.client.flushdb(done); + }); + + it('should be correctly initialized', function () { + expect(queue.initialized).to.be.true; + }); + + it('should get name', function () { + expect(queue.getName()).to.equal('test'); + }); + + describe('#basic commands with empty db', function () { + afterEach(function (done) { + driver.client.flushdb(done); + }); + + it('should rpush', function (done) { + queue.rpush('hello', function (err, count) { + if (err) return done(err); + expect(count).to.equal(1); + done(); + }); + }); + + it('should lpush', function (done) { + queue.lpush('hello', function (err, count) { + if (err) return done(err); + expect(count).to.equal(1); + done(); + }); + }); + + it('should lpop', function (done) { + queue.lpop(1, function (err, data) { + if (err) return done(err); + expect(data).to.eql([ null ]); + done(); + }); + }); + + it('should rpop', function (done) { + queue.rpop(1, function (err, data) { + if (err) return done(err); + expect(data).to.eql([ null ]); + done(); + }); + }); + + it('should llen', function (done) { + queue.llen(function (err, count) { + if (err) return done(err); + expect(count).to.equal(0); + done(); + }); + }); + + }); + + + describe('#basic commands compiles', function () { + + beforeEach(function (done) { + async.waterfall([ + queue.rpush.bind(queue, 'first'), + function (count, callback) { + queue.rpush('second', callback); + } + ], done); + }); + + afterEach(function (done) { + driver.client.flushdb(done); + }); + + it('should rpop', function (done) { + queue.rpop(1, function (err, data) { + if (err) return done(err); + expect(data).to.eql([ 'second' ]); + done(); + }); + }); + + it('should lpop', function (done) { + queue.lpop(1, function (err, data) { + if (err) return done(err); + expect(data).to.eql([ 'first' ]); + done(); + }); + }); + + it('should lpush', function (done) { + async.waterfall([ + queue.lpush.bind(queue, 'third'), + function (count, callback) { + queue.lpop(1, callback); + } + ], function (err, data) { + if (err) return done(err); + expect(data).to.eql([ 'third' ]); + done(); + }); + }); + + it('should llen', function (done) { + queue.llen(function (err, count) { + if (err) return done(err); + expect(count).to.equal(2); + done(); + }); + }); + + }); + +}); \ No newline at end of file diff --git a/test/unit/worker.js b/test/unit/worker.js new file mode 100644 index 0000000..396ad24 --- /dev/null +++ b/test/unit/worker.js @@ -0,0 +1,408 @@ +'use strict'; + +var async = require('async'); +var moment = require('moment'); +var QueueWorker = require(app.base + 'worker').Worker; +var Queue = require(app.base + 'queue').Queue; +var RedisDriver = require(app.base + 'driver/redis').RedisDriver; + +describe('Worker with a redis queue', function () { + + var worker, queue; + var driver = new RedisDriver(app.config.redis.run); + + // mock + var action = sinon.stub().yields(); + var status = QueueWorker.prototype.setStatus = sinon.spy(QueueWorker.prototype.setStatus); + + // init queue and driver + driver.db = app.config.redis.db; + queue = new Queue('Q', driver); + + // init worker with options + var initWorker = function (options, callback) { + worker = new QueueWorker(queue, driver, 'W', action, options); + worker.start(callback); + }; + + // reinit state of bd and mocks + afterEach(function (done) { + action.reset(); + status.reset(); + driver.client.flushdb(done); + }); + + + + describe('#Controls', function () { + + beforeEach(function (done) { + initWorker(null, done); + }); + + describe('#start', function () { + + it('should be initialized', function () { + expect(worker.initialized).to.be.true; + }); + + it('should initialize queue', function () { + expect(worker.queue).to.eql(queue); + }); + + it('should initialize name', function () { + expect(worker.completeName).to.equal('Q:W'); + }); + + it('should record default parameters in storage', function (done) { + worker.driver.hgetall(worker.completeName, function (err, params) { + if (err) return done(err); + expect(params).to.have.property('waiting_timeout', '1'); + expect(params).to.have.property('loop_sleep', '0'); + expect(params).to.have.property('pause_sleep_time', '5'); + expect(params).to.have.property('data_per_tick', '1'); + expect(params).to.have.property('action', 'stub'); + expect(params).to.have.property('queue', 'Q'); + expect(params).to.have.property('type', 'FIFO'); + expect(params).to.have.property('language', 'nodejs'); + done(); + }); + }); + + it('should set end date', function (done) { + worker.driver.hget(worker.completeName, 'end_date', function (err, data) { + if (err) return done(err); + expect(moment(data).diff(moment(), 'days') + 1).to.equal(200); + done(); + }); + }); + + it('should set status', function (done) { + worker.driver.hget(worker.completeName, 'status', function (err, data) { + if (err) return done(err); + expect(data).to.equal(QueueWorker.STATUS_STARTED); + done(); + }); + }); + + it('should set cpt_action', function (done) { + worker.driver.hget(worker.completeName, 'cpt_action', function (err, data) { + if (err) return done(err); + expect(data).to.equal('0'); + done(); + }); + }); + + it('should set start_date', function (done) { + worker.driver.hget(worker.completeName, 'start_date', function (err, data) { + if (err) return done(err); + expect(moment(data)).to.have.property('_isAMomentObject', true); + done(); + }); + }); + + it('should set status change date', function (done) { + worker.driver.hget(worker.completeName, 'status_changedate', function (err, data) { + if (err) return done(err); + expect(moment(data)).to.have.property('_isAMomentObject', true); + done(); + }); + }); + + it('shouldn\'t launch an action', function () { + expect(action).not.to.be.called; + }); + + it('should fix worker status', function () { + expect(status).to.be.calledWith(QueueWorker.STATUS_STARTED); + }); + }); + + describe('#pause', function () { + + beforeEach(function (done) { + worker.pause(done); + }); + + it('should change status', function (done) { + worker.driver.hget(worker.completeName, 'status', function (err, data) { + if (err) return done(err); + expect(data).to.equal(QueueWorker.STATUS_PAUSED); + done(); + }); + }); + + it('shouldn\'t launch an action', function () { + expect(action).not.to.be.called; + }); + + it('should fix worker status', function () { + expect(status).to.be.calledWith(QueueWorker.STATUS_STARTED); + expect(status).to.be.calledWith(QueueWorker.STATUS_PAUSED); + }); + + }); + + describe('#stop', function () { + + beforeEach(function (done) { + worker.stop(done); + }); + + it('shouldn\'t launch an action', function () { + expect(action).not.to.be.called; + }); + + it('should set end date', function (done) { + worker.driver.hget(worker.completeName, 'end_date', function (err, data) { + if (err) return done(err); + expect(parseInt(moment(data).format('X'), 10)).to.be.below(Date.now()); + done(); + }); + }); + }); + }); + + + + + describe('#Process', function () { + + beforeEach(function () { + driver = new RedisDriver(app.config.redis.run); + driver.db = app.config.redis.db; + queue = new Queue('Q', driver); + }); + + describe('FIFO', function () { + + beforeEach(function (done) { + initWorker(null, done); + }); + + it('should treat an element', function (done) { + queue.rpush('first', function (err) { + if (err) return done(err); + worker.on('job complete', function () { + expect(action).to.be.calledWith(['first']); + done(); + }); + }); + }); + + it('should treat some elements in the good order', function (done) { + var elements = ['first', 'second', 'third']; + var expectElements = []; + + worker.on('job complete', function () { + expectElements.push(action.getCall(expectElements.length).args[0][0]); + + if (expectElements.length !== elements.length) return; + + expect(expectElements).to.eql(elements); + worker.removeAllListeners(); + done(); + }); + + async.eachSeries(elements, queue.rpush.bind(queue), function (err) { + if (err) return done(err); + }); + }); + + }); + + describe('LIFO', function () { + + beforeEach(function (done) { + initWorker({type: 'LIFO'}, done); + }); + + it('should treat an element', function (done) { + queue.rpush('first', function (err) { + if (err) return done(err); + worker.on('job complete', function () { + expect(action).to.be.calledWith(['first']); + done(); + }); + }); + }); + + it('should treat some elements in the good order', function (done) { + var elements = ['first', 'second', 'third']; + var expectElements = []; + + worker.on('job complete', function () { + expectElements.push(action.getCall(expectElements.length).args[0][0]); + + if (expectElements.length !== elements.length) return; + + expect(expectElements).to.eql(elements); + worker.removeAllListeners(); + done(); + }); + + async.eachSeries(elements, queue.lpush.bind(queue), function (err) { + if (err) return done(err); + }); + }); + + }); + + }); + +}); + + + + +describe('Worker processing', function () { + var mockQueue, mockDriver, worker, clock, action; + + function getTimeAddSeconds(seconds) { + var now = new Date(); + now.setSeconds(now.getSeconds() + seconds); + return now; + } + + beforeEach(function () { + mockQueue = { + driver: { close: sinon.spy() }, + getName: sinon.stub().returns('Q') + }; + mockDriver = { close: sinon.spy() }; + clock = sinon.useFakeTimers(); + sinon.stub(process, 'nextTick', setTimeout); + action = sinon.spy(); + }); + + afterEach(function () { + process.nextTick.restore(); + clock.restore(); + }); + + it('should close connexion if no infos are retrieved in redis', function () { + QueueWorker.prototype.getInfos = sinon.stub().yields(null); + worker = new QueueWorker(mockQueue, mockDriver, 'W', action); + worker.process(); + + clock.tick(1); + + expect(mockQueue.driver.close).to.be.called; + expect(mockDriver.close).to.be.called; + expect(action).not.to.be.called; + }); + + it('should close connexion if end_date is earlier than now', function () { + QueueWorker.prototype.getInfos = sinon.stub().yields(null, { + 'end_date': '1970-01-01T00:00:00.000Z' + }); + Date.now = sinon.spy(getTimeAddSeconds.bind(null, 10)); + + worker = new QueueWorker(mockQueue, mockDriver, 'W', action); + worker.process(); + + clock.tick(1); + + expect(mockQueue.driver.close).to.be.called; + expect(mockDriver.close).to.be.called; + expect(action).not.to.be.called; + }); + + it('should only relaunch a process if it is in pause', function () { + QueueWorker.prototype.getInfos = sinon.stub().yields(null, { + 'end_date': '1970-01-02T00:00:00.000Z', + 'pause_sleep_time': '5', + status: QueueWorker.STATUS_PAUSED + }); + + Date.now = sinon.spy(getTimeAddSeconds.bind(null, 10)); + + var setEndDate = QueueWorker.prototype.setEndDate = sinon.stub(); + + worker = new QueueWorker(mockQueue, mockDriver, 'W', action); + worker.process(); + + clock.tick(1); + + expect(setEndDate).to.be.calledWith('1970-01-01T00:00:10.000Z'); + expect(action).not.to.be.called; + }); + + it('should call lpop if it is in fifo, and no results', function () { + QueueWorker.prototype.getInfos = sinon.stub().yields(null, { + 'end_date': '1970-01-02T00:00:00.000Z', + 'data_per_tick': '1', + 'waiting_timeout': '1', + type: 'FIFO' + }); + + var setStatus = QueueWorker.prototype.setStatus = sinon.stub().yields(); + + mockQueue.lpop = sinon.stub().yields(null, []); + mockQueue.rpop = sinon.stub().yields(null, []); + + Date.now = sinon.spy(getTimeAddSeconds.bind(null, 10)); + + worker = new QueueWorker(mockQueue, mockDriver, 'W', action); + worker.process(); + + clock.tick(1); + + expect(mockQueue.rpop).not.to.be.called; + expect(mockQueue.lpop).to.be.calledWith('1'); + expect(setStatus).to.be.calledWith(QueueWorker.STATUS_WAITING); + expect(action).not.to.be.called; + }); + + it('should call rpop if it is in lifo, and no results', function () { + QueueWorker.prototype.getInfos = sinon.stub().yields(null, { + 'end_date': '1970-01-02T00:00:00.000Z', + 'data_per_tick': '1', + 'waiting_timeout': '1', + type: 'LIFO' + }); + + var setStatus = QueueWorker.prototype.setStatus = sinon.stub().yields(); + + mockQueue.lpop = sinon.stub().yields(null, []); + mockQueue.rpop = sinon.stub().yields(null, []); + + Date.now = sinon.spy(getTimeAddSeconds.bind(null, 10)); + + worker = new QueueWorker(mockQueue, mockDriver, 'W', action); + worker.process(); + + clock.tick(1); + + expect(mockQueue.lpop).not.to.be.called; + expect(mockQueue.rpop).to.be.calledWith('1'); + expect(setStatus).to.be.calledWith(QueueWorker.STATUS_WAITING); + expect(action).not.to.be.called; + }); + + it('should call the action if there are some results', function () { + QueueWorker.prototype.getInfos = sinon.stub().yields(null, { + 'end_date': '1970-01-02T00:00:00.000Z', + 'data_per_tick': '1', + 'waiting_timeout': '1', + 'loop_sleep': '0', + type: 'FIFO' + }); + + var setStatus = QueueWorker.prototype.setStatus = sinon.stub().yields(); + + mockQueue.lpop = sinon.stub().yields(null, ['first']); + + Date.now = sinon.spy(getTimeAddSeconds.bind(null, 10)); + + worker = new QueueWorker(mockQueue, mockDriver, 'W', action); + worker.process(); + + clock.tick(1); + + expect(setStatus).to.be.calledWith(QueueWorker.STATUS_WORKING); + expect(action).to.be.called; + }); + + +}); \ No newline at end of file