Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.DS_Store
coverage/*
scripts/*.svg
node_modules/
19 changes: 19 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
language: node_js
cache:
directories:
- node_modules
notifications:
slack: dadi:pnhiL60xOrm7GOglHUmb7xHK
email: false
node_js:
- '7'
- '6'
before_script:
- npm prune
after_success:
- npm run semantic-release
branches:
except:
- /^v\d+\.\d+\.\d+$/
services:
- redis-server
193 changes: 111 additions & 82 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
'use strict'

const RSMQ = require('rsmq')

/*
Expand All @@ -6,98 +8,125 @@ const RSMQ = require('rsmq')
* respects message signatures that should be deferred until
* a later time (specified in config)
*/
module.exports = function Queue (opts) {
// instantiate message queue
const rsmq = new RSMQ({
host: opts.host,
port: opts.port
})

let QueueWrapper = function (options) {
this.options = options

// initialise queue
this.rsmq = this.initialiseQueue()

// initialise connection state
var connected = null
rsmq.on('connect', () => connected = true)
rsmq.on('disconnect', () => connected = false)

// do request, fail or wait
function request (req, err) {
if (connected === true) return req()
if (connected === false) return err()

rsmq.on('connect', connect)
rsmq.on('disconnect', disconnect)

function connect () {
req()
removeListeners()
}

function disconnect () {
err()
removeListeners()
}

function removeListeners () {
rsmq.removeListener('connect', connect)
rsmq.removeListener('disconnect', disconnect)
this.connected = null

this.rsmq.on('connect', () => {
this.connected = true
})

this.rsmq.on('disconnect', () => {
this.connected = false
})
}

// instantiate message queue
QueueWrapper.prototype.initialiseQueue = function () {
return new RSMQ({
host: this.options.host,
port: this.options.port
})
}

// public send function
QueueWrapper.prototype.send = function (message, done) {
const send = () => {
let options = {
qname: this.options.name,
message: message,
delay: this.getDelay(message)
}

console.log(options)

this.rsmq.sendMessage(options, done)
}

// public send function
this.send = function (message, done) {
function send () {
var options = {
qname: opts.name,
message: message,
delay: getDelay(message)
}
rsmq.sendMessage(options, done)
}

function error () {
done(new Error('Queue server connection refused'))
}
function error () {
done(new Error('Queue server connection refused'))
}

this.request(send, error)
}

request(send, error)
// do request, fail or wait
QueueWrapper.prototype.request = function (req, err) {
let rsmq = this.rsmq

if (this.connected === true) {
return req()
}

// determine message delay (or 0)
function getDelay (message) {
return isDeferred(message)
? untilStart() / 1000
: 0
if (this.connected === false) {
return err()
}

// is message signature in deferred list?
function isDeferred (message) {
if (!opts.deferred) return false
return opts.deferred.messages.some((value) => {
return message.startsWith(value)
})
function connect () {
req()
removeListeners()
}

// how long until deferred message window?
function untilStart () {
var now = new Date()
var start = parseTime(opts.deferred.start)
var stop = parseTime(opts.deferred.stop)

if (now >= start) { // in or later than window
if (now < stop || stop < start) return 0 // in window or rollover
return 24 * 60 * 60 * 1000 - (now - start) // later than window
}
if (now < start) { // in or before window
if (now < stop && stop < start) return 0 // in window or rollover
return start - now // earlier than window
}

function disconnect () {
err()
removeListeners()
}

function removeListeners () {
rsmq.removeListener('connect', connect)
rsmq.removeListener('disconnect', disconnect)
}

this.rsmq.on('connect', connect)
this.rsmq.on('disconnect', disconnect)
}

// determine message delay (or 0)
QueueWrapper.prototype.getDelay = function (message) {
return this.isDeferred(message)
? this.untilStart() / 1000
: 0
}

// is message signature in deferred list?
QueueWrapper.prototype.isDeferred = function (message) {
if (!this.options.deferred) return false

if (!Array.isArray(this.options.deferred.messages)) return false

return this.options.deferred.messages.some((value) => {
return message.startsWith(value)
})
}

// how long until deferred message window?
QueueWrapper.prototype.untilStart = function () {
const now = new Date()
const start = this.parseTime(this.options.deferred.start)
const stop = this.parseTime(this.options.deferred.stop)

if (now >= start) { // in or later than window
if (now < stop || stop < start) return 0 // in window or rollover
return 24 * 60 * 60 * 1000 - (now - start) // later than window
}

// parse time string from config file
function parseTime (string) {
var time = new Date()
var [hrs, mins] = string.split(':')
time.setUTCHours(hrs, mins)
return time

if (now < start) { // in or before window
if (now < stop && stop < start) return 0 // in window or rollover
return start - now // earlier than window
}

return this
}

// parse time string from configured options
QueueWrapper.prototype.parseTime = function (string) {
const time = new Date()
const timeParts = string.split(':')
time.setUTCHours(timeParts[0], timeParts[1])
return time
}

module.exports = QueueWrapper
Loading