Skip to content

Commit

Permalink
fix queue and exchange assertions
Browse files Browse the repository at this point in the history
  • Loading branch information
tunderdomb committed Jun 22, 2022
1 parent bcb8828 commit ce1b7d5
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 47 deletions.
12 changes: 10 additions & 2 deletions src/Publisher.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ class Publisher {
this.exchange = exchange
this.routingKey = ''
this.options = options
const { MessageModel, ContentSchema } = options || {}
const { MessageModel, ContentSchema, assertExchange = true } = options || {}
this._assertExchange = null
if (assertExchange) {
this._assertExchange = assertExchange === true
? { durable: true } // defaults
: assertExchange
}
this.MessageModel = MessageModel || QueueMessage
this.ContentSchema = ContentSchema || JSON
}
Expand All @@ -25,7 +31,9 @@ class Publisher {
* @returns {Promise}
*/
assertExchangeOrQueue (channel) {
return channel.assertExchange(this.exchange, 'fanout', { durable: true })
if (this._assertExchange) {
return channel.assertExchange(this.exchange, 'fanout', this._assertExchange)
}
}

async initialize () {
Expand Down
14 changes: 13 additions & 1 deletion src/QueueClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,26 @@ class QueueClient extends Publisher {
constructor (queueConnection, logger, name, options) {
super(queueConnection, logger, '', options)
this.routingKey = name
this._assertQueue = null
const {
assertQueue
} = options || {}

if (assertQueue) {
this._assertQueue = assertQueue === true
? { durable: true } // defaults
: assertQueue
}
}

/**
* @param channel
* @returns {Promise}
*/
assertExchangeOrQueue (channel) {
return channel.assertQueue(this.routingKey, { durable: true })
if (this._assertQueue) {
return channel.assertQueue(this.routingKey, this._assertQueue)
}
}
}

Expand Down
17 changes: 8 additions & 9 deletions src/QueueServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,17 @@ class QueueServer extends Subscriber {
constructor (queueConnection, logger, name, options) {
super(queueConnection, logger, name, options)
const {
assertQueueOptions = true,
assertQueue = true,
prefetchCount
} = options || {}

this._assertQueueOptions = null
this._assertQueue = null
this._prefetchCount = prefetchCount

if (assertQueueOptions) {
const defaultOptions = { durable: true }
this._assertQueueOptions = assertQueueOptions === true
? defaultOptions
: assertQueueOptions
if (assertQueue) {
this._assertQueue = assertQueue === true
? { durable: true } // defaults
: assertQueue
}
}

Expand All @@ -31,8 +30,8 @@ class QueueServer extends Subscriber {
async initialize () {
try {
const channel = await this._connection.getChannel()
if (this._assertQueueOptions) {
await channel.assertQueue(this.name, this._assertQueueOptions)
if (this._assertQueue) {
await channel.assertQueue(this.name, this._assertQueue)
}
await channel.prefetch(this._prefetchCount)
await channel.consume(this.name, (msg) => {
Expand Down
17 changes: 8 additions & 9 deletions src/RPCClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,20 @@ class RPCClient {
RequestContentSchema,
ResponseContentSchema,
replyQueueName = '',
replyQueueOptions = true
assertReplyQueue = true
} = options || {}

this._connection = queueConnection
this._logger = logger
this.name = rpcName
this._replyQueue = replyQueueName || ''
this._replyQueueOptions = null
this._assertReplyQueue = null
this._correlationIdMap = new Map()

if (replyQueueOptions) {
const defaultOptions = { exclusive: true }
this._replyQueueOptions = replyQueueOptions === true
? defaultOptions
: replyQueueOptions
if (assertReplyQueue) {
this._assertReplyQueue = assertReplyQueue === true
? { exclusive: true } // defaults
: assertReplyQueue
}

this._rpcQueueMaxSize = queueMaxSize
Expand Down Expand Up @@ -157,8 +156,8 @@ class RPCClient {
* */
async _getReplyQueue (ch) {
try {
if (this._replyQueueOptions) {
const assertResult = await ch.assertQueue(this._replyQueue, this._replyQueueOptions)
if (this._assertReplyQueue) {
const assertResult = await ch.assertQueue(this._replyQueue, this._assertReplyQueue)
this._replyQueue = assertResult.queue
}

Expand Down
17 changes: 8 additions & 9 deletions src/RPCServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@ class RPCServer {
ResponseMessageModel,
RequestContentSchema,
ResponseContentSchema,
queueOptions = true
assertQueue = true
} = options || {}

this._connection = queueConnection
this._logger = logger
this.name = rpcName
this._queueOptions = null
this._assertQueue = null

if (queueOptions) {
const defaultOptions = { durable: true }
this._queueOptions = queueOptions === true
? defaultOptions
: queueOptions
if (assertQueue) {
this._assertQueue = assertQueue === true
? { durable: true } // defaults
: assertQueue
}

this._prefetchCount = prefetchCount
Expand Down Expand Up @@ -82,8 +81,8 @@ class RPCServer {
async initialize () {
try {
const channel = await this._connection.getChannel()
if (this._queueOptions) {
await channel.assertQueue(this.name, this._queueOptions)
if (this._assertQueue) {
await channel.assertQueue(this.name, this._assertQueue)
}
await channel.prefetch(this._prefetchCount)
await channel.consume(this.name, (msg) => {
Expand Down
68 changes: 51 additions & 17 deletions src/Subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,27 @@ class Subscriber {
this._logger = logger
this.name = name

const { maxRetry, timeoutMs, MessageModel, ContentSchema } = options || {}
const {
maxRetry,
timeoutMs,
MessageModel,
ContentSchema,
assertQueueOptions,
assertExchange = true
} = options || {}
this._maxRetry = maxRetry
this._timeoutMs = timeoutMs
this.MessageModel = MessageModel || QueueMessage
this.ContentSchema = ContentSchema || JSON
this._assertQueueOptions = assertQueueOptions
? Object.assign(assertQueueOptions || {}, { exclusive: true })
: { exclusive: true } // defaults
this._assertExchange = null
if (assertExchange) {
this._assertExchange = assertExchange === true
? { durable: true } // defaults
: assertExchange
}

this._retryMap = new Map()

Expand Down Expand Up @@ -53,8 +69,10 @@ class Subscriber {
async initialize () {
try {
const channel = await this._connection.getChannel()
await channel.assertExchange(this.name, 'fanout', { durable: true })
const queue = await channel.assertQueue('', { exclusive: true })
if (this._assertExchange) {
await channel.assertExchange(this.name, 'fanout', this._assertExchange)
}
const queue = await channel.assertQueue('', this._assertQueueOptions)

await channel.bindQueue(queue.queue, this.name, '')

Expand Down Expand Up @@ -113,16 +131,11 @@ class Subscriber {
/**
* @param channel
* @param msg
* @return {Promise}
* @param request
* @returns {boolean} true if too many retries reached
* @private
*/
async _processMessage (channel, msg) {
const request = this._parseMessage(msg)
if (!request) {
this._ack(channel, msg)
return
}

_handleMessageRetry (channel, msg, request) {
if (msg.fields && msg.fields.redelivered && msg.fields.consumerTag) {
let counter = 1
if (this._retryMap.has(msg.fields.consumerTag)) {
Expand All @@ -138,10 +151,31 @@ class Subscriber {
if (msg.fields.consumerTag) {
this._retryMap.delete(msg.fields.consumerTag)
}
return
return true
}
}

return false
}

/**
* @param channel
* @param msg
* @return {Promise}
* @private
*/
async _processMessage (channel, msg) {
const request = this._parseMessage(msg)
if (!request) {
this._ack(channel, msg)
return
}

const tooManyRetries = this._handleMessageRetry(channel, msg, request)
if (tooManyRetries) {
return
}

let timedOut = false
const timeoutMs = typeof request.timeOut === 'number' ? request.timeOut : this._timeoutMs
const timer = setTimeout(() => {
Expand All @@ -150,23 +184,23 @@ class Subscriber {
this._nack(channel, msg)
}, timeoutMs)

return Promise.resolve().then(() => {
return this._callback(request.data, msg.properties, request, msg)
}).then(() => {
try {
await this._callback(request.data, msg.properties, request, msg)

if (!timedOut) {
clearTimeout(timer)
this._ack(channel, msg)
if (msg.fields && msg.fields.redelivered && msg.fields.consumerTag) {
this._retryMap.delete(msg.fields.consumerTag)
}
}
}).catch((err) => {
} catch (err) {
if (!timedOut) {
clearTimeout(timer)
this._logger.error('Cannot process Subscriber consume', err)
this._nack(channel, msg)
}
})
}
}

/**
Expand Down

0 comments on commit ce1b7d5

Please sign in to comment.