diff --git a/lib/concurrent/TaskQueue.js b/lib/concurrent/TaskQueue.js new file mode 100644 index 0000000..5945f1b --- /dev/null +++ b/lib/concurrent/TaskQueue.js @@ -0,0 +1,281 @@ +var timeout = require('./timeout').timeout +var Future = require('./Future').Future +var Slf4j = require('../logger/slf4j').Slf4j + +function RejectionException (message) { + this.message = message || 'Task has been rejected' + this.stack = (new Error()).stack +} + +RejectionException.prototype = Object.create(Error.prototype) +RejectionException.prototype.constructor = RejectionException +RejectionException.prototype.name = 'RejectionException' + +/** + * @typedef {Object} TaskQueue~Options + * + * @property {LoggerOptions} logger + * @property {string} [name] Queue name + */ + +/** + * @typedef {Object} TaskQueue~TaskOptions + * + * @property {int} [timeout] + * @property {string} [name] + */ + +/** + * @typedef {Object} TaskQueue~Task + * + * @property {int} id + * @property {Function} factory + * @property {int} timeout + * @property {Promise.<*>|Thenable.<*>} completion + * @property {string} name + */ + +/** + * @typedef {Object} TaskQueue~Statistics + * + * @property {int} enqueued + * @property {int} completed + * @property {int} successful + * @property {int} discarded + * @property {int} rejected + */ + +/** + * Task queue that allows sequential task processing. + * + * @class + * @param {TaskQueue~Options} [options] + */ +function TaskQueue (options) { + options = options || {} + + var queue = [] + + var enqueued = 0 + var completed = 0 + var successful = 0 + var rejected = 0 + var discarded = 0 + + var paused = true + var closed = false + var termination = new Future() + /** + * @type {TaskQueue~Task|null} + */ + var current = null + + var logger = Slf4j.factory(options.logger, 'ama-team.voxengine-sdk.concurrent.task-queue') + + function setName (name) { + logger.attach('name', name) + } + + /** + * Sets queue name which will turn up in logs. + * + * @function TaskQueue#setName + * + * @param {string} name + */ + this.setName = setName + + if (options.name) { + setName(options.name) + } + + /** + * Executed provided task + * + * @param {TaskQueue~Task} task + */ + function execute (task) { + try { + return timeout(Promise.resolve(task.factory()), task.timeout) + } catch (e) { + return Promise.reject(e) + } + } + + /** + * Handler to be run after task has been fulfilled. + * + * @param {*} value + */ + function taskFulfillmentHandler (value) { + logger.debug('Task "{}", #{} has completed successfully', current.name, + current.id) + completed++ + successful++ + current.completion.resolve(value) + } + + /** + * Handler to be run after task has been rejected + * + * @param {Error|*} error + */ + function taskRejectionHandler (error) { + completed++ + logger.debug('Task "{}", #{} has rejected with {}', current.name, + current.id, (error && error.name ? error.name : error)) + current.completion.reject(error) + } + + /** + * Cleanup handler to be run after task has been handled + */ + function postCompletionHook () { + current = null + if (closed && queue.length === 0) { + termination.resolve() + } else { + proceed() + } + } + + /** + * Pick up next task for processing, if necessary + */ + function proceed () { + if (paused || current || queue.length === 0) { + return + } + current = queue.shift() + logger.debug('Executing task "{}", #{}', current.name, current.id) + execute(current) + .then(taskFulfillmentHandler, taskRejectionHandler) + .then(postCompletionHook) + } + + /** + * Adds new task to queue. + * + * @param {Function} factory Function representing task execution. It + * should return Promise if it relies on I/O. + * @param {TaskQueue~TaskOptions} [options] + * @return {Future.<*>} + */ + this.push = function (factory, options) { + if (closed) { + rejected++ + var error = new RejectionException('Can\'t enqueue task: queue is closed') + return Promise.reject(error) + } + options = options || {} + enqueued++ + var task = { + id: enqueued, + name: options.name || 'Task #' + enqueued, + factory: factory, + timeout: options.timeout, + completion: new Future() + } + logger.debug('Registering task "{}", #{}', task.name, task.id) + queue.push(task) + proceed() + return task.completion + } + + /** + * Start processing + * + * @return {TaskQueue} + */ + this.start = function () { + logger.debug('Starting queue processing') + paused = false + proceed() + return this + } + + /** + * Pause processing until #start() is called. + * + * @return {Promise.<*>|Thenable.<*>} + */ + this.pause = function () { + logger.debug('Pausing queue processing') + paused = true + return current ? current.completion : Promise.resolve() + } + + this.isPaused = function () { + return paused + } + + this.isClosed = function () { + return closed + } + + function close () { + logger.debug('Shutting down queue') + closed = true + var last = queue.length > 0 ? queue[queue.length - 1] : current + if (last) { + return last.completion.then(getStatistics, getStatistics) + } + return Promise.resolve(getStatistics()) + } + + /** + * Abruptly terminates queue, discarding all tasks in queue + * + * @return {Promise.} + */ + this.terminate = function () { + logger.debug('Terminating queue, discarding awaiting {} tasks', + queue.length) + while (queue.length > 0) { + discarded++ + var task = queue.shift() + logger.trace('Discarding task "{}", #{}', task.name, task.id) + } + return close() + } + + /** + * Closes queue for processing, waiting for all remaining tasks to + * complete and then resolving returned promise. + * + * @function TaskQueue#close + * + * @return {Promise.} + */ + this.close = close + + function getStatistics () { + return { + enqueued: enqueued, + completed: completed, + successful: successful, + rejected: rejected, + discarded: discarded + } + } + + /** + * @function TaskQueue#getStatistics + * + * @return {TaskQueue~Statistics} + */ + this.getStatistics = getStatistics + + this.getLength = function () { + return queue.length + } +} + +TaskQueue.started = function (options) { + return new TaskQueue(options).start() +} + +module.exports = { + TaskQueue: TaskQueue, + RejectionException: RejectionException +} diff --git a/lib/concurrent/index.js b/lib/concurrent/index.js index 897f2ea..cf0e7b9 100644 --- a/lib/concurrent/index.js +++ b/lib/concurrent/index.js @@ -1,7 +1,10 @@ var timeout = require('./timeout') +var TaskQueue = require('./TaskQueue') module.exports = { Future: require('./Future').Future, TimeoutException: timeout.TimeoutException, + TaskQueue: TaskQueue.TaskQueue, + RejectionException: TaskQueue.RejectionException, timeout: timeout.timeout } diff --git a/test/suites/integration/concurrent/TaskQueue.spec.js b/test/suites/integration/concurrent/TaskQueue.spec.js new file mode 100644 index 0000000..3509814 --- /dev/null +++ b/test/suites/integration/concurrent/TaskQueue.spec.js @@ -0,0 +1,405 @@ +/* eslint-env mocha */ +/* eslint-disable no-unused-expressions */ + +var Sinon = require('sinon') +var Chai = require('chai') +var expect = Chai.expect +var Concurrent = require('../../../../lib').Concurrent +var TaskQueue = Concurrent.TaskQueue +var RejectionException = Concurrent.RejectionException +var TimeoutException = Concurrent.TimeoutException +var Future = Concurrent.Future + +var branchStopper = function () { + throw new Error('This branch should not have been entered') +} + +describe('Integration', function () { + describe('/concurrent', function () { + describe('/TaskQueue.js', function () { + describe('.TaskQueue', function () { + describe('< new', function () { + it('may be created without any options', function () { + return new TaskQueue() + }) + + it('is paused by default', function () { + expect(new TaskQueue().isPaused()).to.be.true + }) + + it('is not terminated by default', function () { + expect(new TaskQueue().isClosed()).to.be.false + }) + + it('allows to set name', function () { + // yet another test for 100% coverage + /* eslint-disable no-new */ + var logger = {attach: Sinon.stub()} + var name = 'custom-name' + new TaskQueue({name: name, logger: {instance: logger}}) + expect(logger.attach.callCount).to.eq(1) + expect(logger.attach.getCall(0).args[0]).to.eq('name') + expect(logger.attach.getCall(0).args[1]).to.eq(name) + }) + }) + + describe('#setName', function () { + it('allows to set name externally', function () { + var logger = {attach: Sinon.stub()} + var name = 'custom-name' + var queue = new TaskQueue({logger: {instance: logger}}) + queue.setName(name) + expect(logger.attach.callCount).to.eq(1) + expect(logger.attach.getCall(0).args[0]).to.eq('name') + expect(logger.attach.getCall(0).args[1]).to.eq(name) + }) + }) + + describe('#start', function () { + it('processes single task submitted before start', function () { + var queue = new TaskQueue() + var factory = Sinon.stub() + var promise = queue.push(factory) + queue.start() + return promise + .then(function () { + expect(factory.callCount).to.eq(1) + }) + }) + + it('processes single task submitted after start', function () { + var queue = new TaskQueue() + var factory = Sinon.stub() + queue.start() + var promises = [ + queue.push(factory), + queue.push(factory), + queue.push(factory) + ] + return Promise + .all(promises) + .then(function () { + expect(factory.callCount).to.eq(3) + }) + }) + + it('processes several tasks submitted before start', function () { + var queue = new TaskQueue() + var factory = Sinon.stub() + queue.push(factory) + queue.push(factory) + var promise = queue.push(factory) + queue.start() + return promise + .then(function () { + expect(factory.callCount).to.eq(3) + }) + }) + + it('processes several tasks submitted after start', function () { + var queue = new TaskQueue() + var factory = Sinon.stub() + queue.start() + var promises = [ + queue.push(factory), + queue.push(factory), + queue.push(factory) + ] + return Promise + .all(promises) + .then(function () { + expect(factory.callCount).to.eq(3) + }) + }) + + it('processes several tasks submitted before and after start', function () { + var queue = new TaskQueue() + var factory = Sinon.stub() + var promises = [ + queue.push(factory), + queue.push(factory) + ] + queue.start() + promises.push(queue.push(factory)) + promises.push(queue.push(factory)) + return Promise + .all(promises) + .then(function () { + expect(factory.callCount).to.eq(4) + }) + }) + }) + + describe('#push', function () { + it('times out task exceeding specified timeout', function () { + var queue = TaskQueue.started() + var factory = Sinon.spy(function () { + return new Promise(function () {}) + }) + return queue + .push(factory, {timeout: 0}) + .then(branchStopper, function (error) { + expect(factory.callCount).to.eq(1) + expect(error).to.be.instanceOf(TimeoutException) + }) + }) + + it('returns promise rejected with thrown error', function () { + var queue = new TaskQueue().start() + var error = new Error() + var factory = function () { throw error } + var future = queue.push(factory) + return expect(future).to.eventually.be.rejectedWith(error) + }) + }) + + describe('#pause', function () { + it('returns promise resolving after current task completion', function () { + var queue = TaskQueue.started() + var barrier = new Future() + var invoked = Sinon.spy(function () { return barrier }) + var suppressed = Sinon.stub() + queue.push(invoked) + queue.push(suppressed) + var pause = queue.pause() + barrier.resolve() + return pause + .then(function () { + expect(invoked.callCount).to.eq(1) + expect(suppressed.callCount).to.eq(0) + }) + }) + + it('stops task processing after current task completion', function () { + var queue = TaskQueue.started() + var barrier = new Future() + var invoked = Sinon.spy(function () { return barrier }) + var suppressed = Sinon.stub() + var paused = false + queue.push(invoked) + queue.push(suppressed) + var pause = queue.pause() + pause.then(function () { paused = true }) + barrier.resolve() + var future = new Promise(function (resolve) { + setTimeout(resolve, 1) + }) + return future + .then(function () { + expect(invoked.callCount).to.eq(1) + expect(suppressed.callCount).to.eq(0) + expect(paused).to.be.true + }) + }) + + it('correctly executes if there is no current task', function () { + return TaskQueue.started().pause() + }) + }) + + describe('#close', function () { + it('works with no tasks in queue', function () { + return (new TaskQueue()).close() + }) + + it('returns when all current tasks are resolved', function () { + var queue = new TaskQueue() + var expectation = [] + var result = [] + var barriers = [] + for (var i = 0; i < 3; i++) { + var barrier = new Future() + barriers.push(barrier) + expectation.push(i) + var closure = function (i, future) { + queue.push(function () { + return future.then(function () { + result.push(i) + }) + }) + } + closure(i, barrier) + } + var finalization = queue.close() + queue.start() + barriers.forEach(function (barrier) { + barrier.resolve() + }) + return finalization + .then(function () { + expect(result).to.deep.eq(expectation) + }) + }) + + it('returns successfully even if tasks fail', function () { + var queue = new TaskQueue().start() + queue.push(function () { return Promise.reject(new Error()) }) + return queue.close() + }) + + it('forces rejection of new tasks', function () { + var queue = new TaskQueue() + queue.close() + var future = queue.push(function () {}) + return expect(future).to.eventually.be.rejected + }) + }) + + describe('#terminate', function () { + it('returns promised statistics instantly if there is no current task', function () { + var queue = new TaskQueue() + var expectation = { + enqueued: 0, + completed: 0, + successful: 0, + discarded: 0, + rejected: 0 + } + return expect(queue.terminate()).to.eventually.deep.eq(expectation) + }) + + it('returns promise resolving as soon as current task completes', function () { + var queue = TaskQueue.started() + queue.push(function () { return Promise.resolve() }) + var expectation = { + enqueued: 1, + completed: 1, + successful: 1, + discarded: 0, + rejected: 0 + } + return expect(queue.terminate()).to.eventually.deep.eq(expectation) + }) + + it('discards extra tasks', function () { + var barrier = new Future() + var queue = TaskQueue.started() + var discarded = Sinon.stub() + var factory = Sinon.spy(function () { return barrier }) + var expectation = { + enqueued: 2, + completed: 1, + successful: 1, + discarded: 1, + rejected: 0 + } + queue.push(factory) + queue.push(discarded) + var termination = queue.terminate() + barrier.resolve() + return termination + .then(function (statistics) { + expect(statistics).to.deep.eq(expectation) + expect(factory.callCount).to.eq(1) + expect(discarded.callCount).to.eq(0) + }) + }) + + it('forces rejection of new tasks', function () { + var queue = TaskQueue.started() + queue.terminate() + var task = queue.push(function () {}) + return expect(task).to.eventually.be.rejectedWith(RejectionException) + }) + }) + + describe('#statistics', function () { + it('returns correct numbers', function () { + var queue = new TaskQueue().start() + queue.push(function () {}) + var future = queue.push(function () { throw new Error() }) + queue.push(function () { return new Promise(function () {}) }) + return future + .then(branchStopper, function () { + expect(queue.getStatistics().enqueued).to.eq(3) + expect(queue.getStatistics().completed).to.eq(2) + expect(queue.getStatistics().successful).to.eq(1) + }) + }) + }) + + describe('#getLength', function () { + it('returns zero for fresh queue', function () { + expect(new TaskQueue().getLength()).to.eq(0) + }) + + it('returns actual length if queue has been populated', function () { + var queue = new TaskQueue() + var limit = 4 + for (var i = 0; i < limit; i++) { + queue.push(function () { return new Promise(function () {}) }) + } + expect(queue.getLength()).to.eq(limit) + }) + + it('returns zero when last task is completed', function () { + var queue = TaskQueue.started() + var limit = 4 + for (var i = 0; i < limit; i++) { + queue.push(function () {}) + } + return queue + .push(function () {}) + .then(function () { + expect(queue.getLength()).to.eq(0) + }) + }) + }) + + describe('> invariants', function () { + it('executes tasks sequentially', function () { + var queue = TaskQueue.started() + var barriers = [] + var limit = 4 + var result = [] + var expectation = [] + var i + for (i = 0; i < limit; i++) { + expectation.push(i) + var closure = function (i) { + var future = new Future() + barriers[i] = future + queue.push(function () { + return future + .then(function () { + result.push(i) + }) + }) + } + closure(i) + } + for (i = limit - 1; i >= 0; i--) { + barriers[i].resolve() + } + return queue + .close() + .then(function () { + expect(result).to.deep.eq(expectation) + }) + }) + + it('tolerates non-error rejection', function () { + var value = {} + var rejected = Promise.reject(value) + var queue = TaskQueue.started() + var task = queue.push(function () { + return rejected + }) + return expect(task).to.eventually.be.rejectedWith(value) + }) + }) + }) + + describe('.RejectionException', function () { + it('is supplied with corresponding name', function () { + expect(new RejectionException()).to.have.property('name').eq('RejectionException') + }) + + it('provides default message', function () { + // yes i'm still serious about that 100% coverage that nobody needs + expect(new RejectionException()).to.have.property('message') + }) + }) + }) + }) +})