Skip to content

Commit

Permalink
Worker tests && Worker shutdown && many methods > 10
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-essam committed Feb 10, 2018
1 parent fabdede commit e7ce261
Show file tree
Hide file tree
Showing 10 changed files with 470 additions and 15 deletions.
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ WTSQSWorker takes care of asynchronously fetching jobs from sqs while processing
* [new WTSQSWorker(options)](#new_WTSQSWorker_new)
* _instance_
* [.run(handler)](#WTSQSWorker+run)
* [.shutdown()](#WTSQSWorker+shutdown) ⇒ <code>Promise</code>
* _inner_
* [~runHandler](#WTSQSWorker..runHandler) ⇒ <code>Promise</code>

Expand All @@ -394,7 +395,7 @@ Constructs WTSQSWorker object.
| [options.maxConcurrency] | <code>Integer</code> | <code>20</code> | Maximum number of concurrent jobs. |
| [options.pollWaitTime] | <code>Integer</code> | <code>5</code> | Duration (in seconds) for which read calls wait for a job to arrive in the queue before returning. |
| [options.visibilityTimeout] | <code>Integer</code> | <code>30</code> | Duration (in seconds) that the received jobs are hidden from subsequent retrieve requests. |
| [options.logger] | <code>Object</code> \| <code>String</code> | <code></code> | Object with trace, debug, info, warn, error methods to use for logging. Or a string with log level to use default internal logger. |
| [options.logger] | <code>Object</code> \| <code>String</code> | <code></code> | Object with debug, info, warn, error methods to use for logging. Or a string with log level to use default internal logger. |

**Example**
```js
Expand Down Expand Up @@ -428,6 +429,16 @@ Start fetching and processing jobs.
| handler | [<code>runHandler</code>](#WTSQSWorker..runHandler) | Async function to process a single job. |


* * *

<a name="WTSQSWorker+shutdown"></a>

### worker.shutdown() ⇒ <code>Promise</code>
Shutsdown the worker and drain active jobs.

**Kind**: instance method of [<code>WTSQSWorker</code>](#WTSQSWorker)
**Returns**: <code>Promise</code> - Resolves when all active jobs have been drained.

* * *

<a name="WTSQSWorker..runHandler"></a>
Expand Down
28 changes: 26 additions & 2 deletions lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,33 @@ class SimpleLogger {
warn (...args) { this.log('warn', ...args) }
info (...args) { this.log('info', ...args) }
debug (...args) { this.log('debug', ...args) }
trace (...args) { this.log('trace', ...args) }
}

const flatten = (arr) => Array.prototype.concat(...arr)

const chunk = (arr, size) => {
// Adapted from lodash

if (!arr.length || size < 1) return []
let index = 0
let resIndex = 0
const result = new Array(Math.ceil(arr.length / size))

while (index < arr.length) {
result[resIndex++] = arr.slice(index, (index += size))
}
return result
}

const chunkNumber = (n, x) => {
const result = (new Array(parseInt(n / x)).fill(x))
if (n % x) result.push(n % x)
return result
}

module.exports = {
SimpleLogger
SimpleLogger,
flatten,
chunk,
chunkNumber
}
34 changes: 28 additions & 6 deletions lib/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class WTSQSWorker {
* @param {Integer} [options.maxConcurrency=20] Maximum number of concurrent jobs.
* @param {Integer} [options.pollWaitTime=5] Duration (in seconds) for which read calls wait for a job to arrive in the queue before returning.
* @param {Integer} [options.visibilityTimeout=30] Duration (in seconds) that the received jobs are hidden from subsequent retrieve requests.
* @param {(Object|String)} [options.logger=null] Object with trace, debug, info, warn, error methods to use for logging. Or a string with log level to use default internal logger.
* @param {(Object|String)} [options.logger=null] Object with debug, info, warn, error methods to use for logging. Or a string with log level to use default internal logger.
*/
constructor ({ wtsqs, maxConcurrency, pollWaitTime, visibilityTimeout, logger }) {
if (!(wtsqs instanceof WTSQS)) throw new InvalidArgument('wtsqs is required')
Expand All @@ -64,6 +64,7 @@ class WTSQSWorker {
else if (typeof logger === 'string') this._logger = new SimpleLogger(logger)
else if (typeof logger === 'object') this._logger = logger

this._drain = false
this._handler = null
this._currentConcurrency = 0
this._minAvailableLocks = Math.min(this.maxConcurrency, 10)
Expand All @@ -74,12 +75,29 @@ class WTSQSWorker {
* @param {WTSQSWorker~runHandler} handler Async function to process a single job.
*/
run (handler) {
if (this._drain) throw new WTSQSWorkerError('WTSQSWorker has already been shutdown')
if (this._handler) throw new WTSQSWorkerError('WTSQSWorker is already running')
if (!handler) throw new InvalidArgument('handler method is required')
this._handler = handler
this._workIt()
}

/**
* Shutsdown the worker and drain active jobs.
* @return {Promise} Resolves when all active jobs have been drained.
*/
async shutdown () {
if (this._drain) throw new WTSQSWorkerError('WTSQSWorker has already been shutdown')
if (!this._handler) if (this._drain) throw new WTSQSWorkerError('WTSQSWorker is not running')
this._drain = true

return new Promise((resolve, reject) => {
this._resolveDrain = resolve
})
}

_workIt () {
if (this._drain) return
setImmediate(this._launchRequiredPolls.bind(this))
}

Expand All @@ -88,21 +106,25 @@ class WTSQSWorker {
}

_acquireLocks (count) {
this._logger.trace(`WTSQSWorker::_acquireLocks(${count})`)
this._logger.debug(`WTSQSWorker::_acquireLocks(${count})`)
this._currentConcurrency += count
}

_releaseLocks (count) {
this._logger.trace(`WTSQSWorker::_releaseLocks(${count})`)
this._logger.debug(`WTSQSWorker::_releaseLocks(${count})`)
this._currentConcurrency -= count

if (this._drain && this._currentConcurrency === 0) {
return this._resolveDrain()
}
if (this._availableLocks >= this._minAvailableLocks) {
this._workIt()
}
}

async _launchRequiredPolls () {
this._logger.trace('WTSQSWorker::_launchRequiredPolls')
this._logger.debug('WTSQSWorker::_launchRequiredPolls')

const pollPromises = []
while (this._availableLocks > 0) {
const maxJobs = Math.min(this._availableLocks, 10)
Expand All @@ -113,7 +135,7 @@ class WTSQSWorker {
}

async _launchPoll (maxJobs) {
this._logger.trace(`WTSQSWorker::_launchPoll(${maxJobs})`)
this._logger.debug(`WTSQSWorker::_launchPoll(${maxJobs})`)

const jobs = []

Expand All @@ -124,7 +146,7 @@ class WTSQSWorker {
this._logger.error('WTSQSWorker::_launchPoll', e)
}

this._logger.debug(`WTSQSWorker::_launchPoll.jobs.length = ${jobs.length}`)
this._logger.info(`WTSQSWorker::_launchPoll.jobs.length = ${jobs.length}`)

if (jobs.length < maxJobs) this._releaseLocks(maxJobs - jobs.length)
if (jobs.length === 0) return
Expand Down
24 changes: 24 additions & 0 deletions lib/wtsqs.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const safeJsonStringify = require('safe-json-stringify')
const UUIDv4 = require('uuid/v4')

const { InvalidArgument } = require('./errors')
const { flatten, chunk, chunkNumber } = require('./utils')

/**
* Received SQS Message
Expand Down Expand Up @@ -142,6 +143,13 @@ class WTSQS {
async enqueueMany (payloads, { messageGroupId } = {}, sqsOptions = {}) {
if (!(payloads instanceof Array)) throw new InvalidArgument('payloads must be of type array')

if (payloads.length > 10) {
const chunks = chunk(payloads, 10)
return Promise.all(chunks.map(
chunk => this.enqueueMany(chunk, { messageGroupId }, sqsOptions)
))
}

const entries = payloads.map((payload) => {
const jsonPayload = safeJsonStringify(payload)
const id = UUIDv4()
Expand Down Expand Up @@ -221,6 +229,15 @@ class WTSQS {
* ]
*/
async peekMany (maxNumberOfMessages = 10, { pollWaitTime, visibilityTimeout } = {}, sqsOptions = {}) {
if (maxNumberOfMessages > 10) {
const lengths = chunkNumber(maxNumberOfMessages, 10)
const results = await Promise.all(lengths.map(
l => this.peekMany(l, { pollWaitTime, visibilityTimeout }, sqsOptions)
))

return flatten(results)
}

const params = {
QueueUrl: this.url,
MaxNumberOfMessages: maxNumberOfMessages,
Expand Down Expand Up @@ -274,6 +291,13 @@ class WTSQS {
* await wtsqs.deleteMany(myMessageList)
*/
async deleteMany (messages) {
if (messages.length > 10) {
const chunks = chunk(messages, 10)
return Promise.all(chunks.map(
chunk => this.deleteMany(chunk)
))
}

const entries = messages.map((msg) => ({
Id: msg.id,
ReceiptHandle: msg.receiptHandle
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
{
"name": "wtsqs",
"version": "1.1.0",
"version": "1.2.0",
"description": "AWS SQS Worker Wrapper",
"main": "lib/index.js",
"scripts": {
"lint": "standard -v",
"test": "nyc --reporter=text-summary mocha",
"coveralls": "nyc report --reporter=text-lcov | coveralls",
"cover:report": "nyc report --reporter=html && opn ./coverage/index.html",
"docs": "jsdoc2md -t jsdoc-template.md --separators lib/wtsqs.js lib/worker.js > README.md"
},
"engines": {
Expand Down
2 changes: 1 addition & 1 deletion test/mocha.opts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
--timeout 10000
--timeout 20000
--retries 3
40 changes: 40 additions & 0 deletions test/utils.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// setup chai
const chai = require('chai')
chai.use(require('dirty-chai'))
const expect = chai.expect

const utils = require('../lib/utils')

describe('_utils', () => {
describe('#flatten()', () => {
it('should flatten arrays', async function () {
const arrs = [[1, 2], [3], [4, 5]]
const flatArr = utils.flatten(arrs)
expect(flatArr).to.eql([1, 2, 3, 4, 5])
})
})

describe('#chunk()', () => {
it('should allow chunking empty array', async function () {
expect(utils.chunk([])).to.eql([])
})

it('should chunk array to size', async function () {
const arr = [1, 2, 3, 4, 5]
const chunks = utils.chunk(arr, 2)
expect(chunks).to.deep.eql([[1, 2], [3, 4], [5]])
})
})

describe('#chunkNumber()', () => {
it('should chunk number to exact slices', async function () {
const chunks = utils.chunkNumber(6, 2)
expect(chunks).to.eql([2, 2, 2])
})

it('should chunk number with extra slices', async function () {
const chunks = utils.chunkNumber(7, 2)
expect(chunks).to.eql([2, 2, 2, 1])
})
})
})
Loading

0 comments on commit e7ce261

Please sign in to comment.