Skip to content

Commit

Permalink
handle QueueManager level exchange and queue assertion options
Browse files Browse the repository at this point in the history
  • Loading branch information
tbence94 committed Jan 19, 2023
1 parent c66edde commit 1fe8e9d
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 74 deletions.
8 changes: 4 additions & 4 deletions src/GatheringClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ class GatheringClient {
queueMaxSize,
timeoutMs,
serverCount = 0,
assertQueueOptions,
assertQueueOptions = {},
assertExchange = true,
assertExchangeOptions = null
assertExchangeOptions = {}
} = options

this._rpcQueueMaxSize = queueMaxSize
this._rpcTimeoutMs = timeoutMs
this._gatheringServerCount = serverCount

this._assertExchange = assertExchange === true
this._assertQueueOptions = Object.assign({ exclusive: true, autoDelete: true }, assertQueueOptions || {})
this._assertExchangeOptions = Object.assign({ durable: true }, assertExchangeOptions || {})
this._assertQueueOptions = Object.assign({ exclusive: true, autoDelete: true }, assertQueueOptions)
this._assertExchangeOptions = Object.assign({ durable: true }, assertExchangeOptions)
}

async initialize () {
Expand Down
12 changes: 6 additions & 6 deletions src/GatheringServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class GatheringServer {
* @param {String} name
* @param {Object} options
*/
constructor (queueConnection, logger, name, options) {
constructor (queueConnection, logger, name, options = {}) {
this._connection = queueConnection
this._logger = logger
this.name = name
Expand All @@ -17,17 +17,17 @@ class GatheringServer {
const {
prefetchCount,
timeoutMs,
assertQueueOptions,
assertQueueOptions = {},
assertExchange = true,
assertExchangeOptions = null
} = options || {}
assertExchangeOptions = {}
} = options

this._prefetchCount = prefetchCount
this._responseTimeoutMs = timeoutMs

this._assertExchange = assertExchange === true
this._assertQueueOptions = Object.assign({ exclusive: true, autoDelete: true }, assertQueueOptions || {})
this._assertExchangeOptions = Object.assign({ durable: true }, assertExchangeOptions || {})
this._assertQueueOptions = Object.assign({ exclusive: true, autoDelete: true }, assertQueueOptions)
this._assertExchangeOptions = Object.assign({ durable: true }, assertExchangeOptions)

this.actions = new Map()
}
Expand Down
16 changes: 8 additions & 8 deletions src/Publisher.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,25 @@ class Publisher {
* @param {String} exchange
* @param {Object} options
*/
constructor (queueConnection, logger, exchange, options) {
constructor (queueConnection, logger, exchange, options = {}) {
this._connection = queueConnection
this._logger = logger
this.exchange = exchange
this.routingKey = ''
this.options = options

const {
MessageModel,
ContentSchema,
MessageModel = QueueMessage,
ContentSchema = JSON,
assertExchange = true,
assertExchangeOptions = null
} = options || {}
assertExchangeOptions = {}
} = options

this._assertExchange = assertExchange === true
this._assertExchangeOptions = Object.assign({ durable: true }, assertExchangeOptions || {})
this._assertExchangeOptions = Object.assign({ durable: true }, assertExchangeOptions)

this.MessageModel = MessageModel || QueueMessage
this.ContentSchema = ContentSchema || JSON
this.MessageModel = MessageModel
this.ContentSchema = ContentSchema
}

/**
Expand Down
8 changes: 4 additions & 4 deletions src/QueueClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ class QueueClient extends Publisher {
* @param {String} name
* @param {Object} options
*/
constructor (queueConnection, logger, name, options) {
constructor (queueConnection, logger, name, options = {}) {
super(queueConnection, logger, '', options)
this.routingKey = name
this._assertQueue = null

const {
assertQueue = true,
assertQueueOptions
} = options || {}
assertQueueOptions = {}
} = options

this._assertQueue = assertQueue === true
this._assertQueueOptions = Object.assign({ durable: true }, assertQueueOptions || {})
this._assertQueueOptions = Object.assign({ durable: true }, assertQueueOptions)
}

/**
Expand Down
48 changes: 47 additions & 1 deletion src/QueueConfig.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,29 @@ class QueueConfig {
options = {},
rpcTimeoutMs = 10000,
rpcQueueMaxSize = 100,
logger = console
logger = console,

// Queue & exchange options
rpcClientAssertReplyQueueOptions = {},
rpcClientExchangeOptions = {},

rpcServerAssertQueueOptions = {},
rpcServerExchangeOptions = {},

publisherAssertExchangeOptions = {},

subscriberAssertQueueOptions = {},
subscriberAssertExchangeOptions = {},

gatheringClientAssertQueueOptions = {},
gatheringClientAssertExchangeOptions = {},

gatheringServerAssertQueueOptions = {},
gatheringServerAssertExchangeOptions = {},

queueClientAssertQueueOptions = {},

queueServerAssertQueueOptions = {}
} = config

const rabbitMqOptions = new RabbitMqOptions(options)
Expand All @@ -35,6 +57,30 @@ class QueueConfig {
this.rpcTimeoutMs = rpcTimeoutMs
this.rpcQueueMaxSize = rpcQueueMaxSize
this.logger = logger

// Queue & exchange options
this.rpcClientAssertReplyQueueOptions = rpcClientAssertReplyQueueOptions
this.rpcClientExchangeOptions = rpcClientExchangeOptions

this.rpcServerAssertQueueOptions = rpcServerAssertQueueOptions
this.rpcServerExchangeOptions = rpcServerExchangeOptions

this.publisherAssertExchangeOptions = publisherAssertExchangeOptions

this.subscriberAssertQueueOptions = subscriberAssertQueueOptions
this.subscriberAssertExchangeOptions = subscriberAssertExchangeOptions

this.subscriberAssertExchangeOptions = subscriberAssertExchangeOptions

this.gatheringClientAssertQueueOptions = gatheringClientAssertQueueOptions
this.gatheringClientAssertExchangeOptions = gatheringClientAssertExchangeOptions

this.gatheringServerAssertQueueOptions = gatheringServerAssertQueueOptions
this.gatheringServerAssertExchangeOptions = gatheringServerAssertExchangeOptions

this.queueClientAssertQueueOptions = queueClientAssertQueueOptions

this.queueServerAssertQueueOptions = queueServerAssertQueueOptions
}

static isValidConfig (obj) {
Expand Down
38 changes: 30 additions & 8 deletions src/QueueManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ class QueueManager {

const settings = Object.assign({
queueMaxSize: this._config.rpcQueueMaxSize,
timeoutMs: this._config.rpcTimeoutMs
timeoutMs: this._config.rpcTimeoutMs,
assertReplyQueueOptions: this._config.rpcClientAssertReplyQueueOptions,
exchangeOptions: this._config.rpcClientExchangeOptions
}, options)

const rpcClient = new OverrideClass(this.connection, this._logger, rpcName, settings)
Expand Down Expand Up @@ -159,7 +161,9 @@ class QueueManager {

const settings = Object.assign({
prefetchCount: 1,
timeoutMs: this._config.rpcTimeoutMs
timeoutMs: this._config.rpcTimeoutMs,
assertQueueOptions: this._config.rpcServerAssertQueueOptions,
exchangeOptions: this._config.rpcServerExchangeOptions
}, options)

const rpcServer = new OverrideClass(this.connection, this._logger, rpcName, settings)
Expand Down Expand Up @@ -189,7 +193,11 @@ class QueueManager {
throw new Error('Override must be a subclass of Publisher')
}

const publisher = new OverrideClass(this.connection, this._logger, exchangeName, options)
const settings = Object.assign({
assertExchangeOptions: this._config.publisherAssertExchangeOptions
}, options)

const publisher = new OverrideClass(this.connection, this._logger, exchangeName, settings)

this.publishers.set(exchangeName, publisher)

Expand Down Expand Up @@ -219,7 +227,9 @@ class QueueManager {
const settings = Object.assign({
prefetchCount: 1,
maxRetry: 5,
timeoutMs: this._config.rpcTimeoutMs
timeoutMs: this._config.rpcTimeoutMs,
assertQueueOptions: this._config.subscriberAssertQueueOptions,
assertExchangeOptions: this._config.subscriberAssertExchangeOptions
}, options)

const subscriber = new OverrideClass(this.connection, this._logger, exchangeName, settings)
Expand Down Expand Up @@ -249,7 +259,12 @@ class QueueManager {
throw new Error('Override must be a subclass of GatheringClient')
}

const gatheringClient = new OverrideClass(this.connection, this._logger, exchangeName, options)
const settings = Object.assign({
assertQueueOptions: this._config.gatheringClientAssertQueueOptions,
assertExchangeOptions: this._config.gatheringClientAssertExchangeOptions
}, options)

const gatheringClient = new OverrideClass(this.connection, this._logger, exchangeName, settings)

this.gatheringClients.set(exchangeName, gatheringClient)

Expand Down Expand Up @@ -277,7 +292,9 @@ class QueueManager {
}

const settings = Object.assign({
timeoutMs: this._config.rpcTimeoutMs
timeoutMs: this._config.rpcTimeoutMs,
assertQueueOptions: this._config.gatheringServerAssertQueueOptions,
assertExchangeOptions: this._config.gatheringServerAssertExchangeOptions
}, options)

const gatheringServer = new OverrideClass(this.connection, this._logger, exchangeName, settings)
Expand Down Expand Up @@ -307,7 +324,11 @@ class QueueManager {
throw new Error('Override must be a subclass of QueueClient')
}

const queueClient = new OverrideClass(this.connection, this._logger, queueName, options)
const settings = Object.assign({
assertQueueOptions: this._config.queueClientAssertQueueOptions
}, options)

const queueClient = new OverrideClass(this.connection, this._logger, queueName, settings)

this.queueClients.set(queueName, queueClient)

Expand Down Expand Up @@ -337,7 +358,8 @@ class QueueManager {
const settings = Object.assign({
prefetchCount: 1,
maxRetry: 5,
timeoutMs: this._config.rpcTimeoutMs
timeoutMs: this._config.rpcTimeoutMs,
assertQueueOptions: this._config.queueServerAssertQueueOptions
}, options)

const queueServer = new OverrideClass(this.connection, this._logger, queueName, settings)
Expand Down
8 changes: 4 additions & 4 deletions src/QueueServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ class QueueServer extends Subscriber {
* @param {String} name
* @param {Object} options
*/
constructor (queueConnection, logger, name, options) {
constructor (queueConnection, logger, name, options = {}) {
super(queueConnection, logger, name, options)
const {
assertQueue = true,
assertQueueOptions = null,
assertQueueOptions = {},
prefetchCount
} = options || {}
} = options

this._assertQueue = null
this._prefetchCount = prefetchCount

this._assertQueue = assertQueue === true
this._assertQueueOptions = Object.assign({ durable: true }, assertQueueOptions || {})
this._assertQueueOptions = Object.assign({ durable: true }, assertQueueOptions)
}

/**
Expand Down
30 changes: 15 additions & 15 deletions src/RPCClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,39 @@ class RPCClient {
* @param {String} rpcName
* @param {Object} options
*/
constructor (queueConnection, logger, rpcName, options) {
constructor (queueConnection, logger, rpcName, options = {}) {
const {
queueMaxSize,
timeoutMs,
RequestMessageModel,
ResponseMessageModel,
RequestContentSchema,
ResponseContentSchema,
RequestMessageModel = QueueMessage,
ResponseMessageModel = QueueMessage,
RequestContentSchema = JSON,
ResponseContentSchema = JSON,
replyQueueName = '',
assertReplyQueue = true,
assertReplyQueueOptions = null,
assertReplyQueueOptions = {},
bindDirectExchangeName = null,
exchangeOptions = null
} = options || {}
exchangeOptions = {}
} = options

this._connection = queueConnection
this._logger = logger
this.name = rpcName
this._replyQueue = replyQueueName || ''
this._replyQueue = replyQueueName
this._correlationIdMap = new Map()

this._assertReplyQueue = assertReplyQueue === true
this._assertReplyQueueOptions = Object.assign({ exclusive: true, autoDelete: true }, assertReplyQueueOptions || {})
this._assertReplyQueueOptions = Object.assign({ exclusive: true, autoDelete: true }, assertReplyQueueOptions)

this._bindDirectExchangeName = bindDirectExchangeName
this._exchangeOptions = exchangeOptions || {}
this._exchangeOptions = exchangeOptions

this._rpcQueueMaxSize = queueMaxSize
this._rpcTimeoutMs = timeoutMs
this.RequestMessageModel = RequestMessageModel || QueueMessage
this.ResponseMessageModel = ResponseMessageModel || QueueMessage
this.RequestContentSchema = RequestContentSchema || JSON
this.ResponseContentSchema = ResponseContentSchema || JSON
this.RequestMessageModel = RequestMessageModel
this.ResponseMessageModel = ResponseMessageModel
this.RequestContentSchema = RequestContentSchema
this.ResponseContentSchema = ResponseContentSchema
}

async initialize () {
Expand Down
28 changes: 14 additions & 14 deletions src/RPCServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,36 @@ class RPCServer {
* @param {String} rpcName
* @param {Object} options
*/
constructor (queueConnection, logger, rpcName, options) {
constructor (queueConnection, logger, rpcName, options = {}) {
const {
prefetchCount,
timeoutMs,
RequestMessageModel,
ResponseMessageModel,
RequestContentSchema,
ResponseContentSchema,
RequestMessageModel = QueueMessage,
ResponseMessageModel = QueueMessage,
RequestContentSchema = JSON,
ResponseContentSchema = JSON,
assertQueue = true,
assertQueueOptions = null,
assertQueueOptions = {},
bindDirectExchangeName = null,
exchangeOptions = null
} = options || {}
exchangeOptions = {}
} = options

this._connection = queueConnection
this._logger = logger
this.name = rpcName

this._assertQueue = assertQueue === true
this._assertQueueOptions = Object.assign({ durable: true }, assertQueueOptions || {})
this._assertQueueOptions = Object.assign({ durable: true }, assertQueueOptions)

this._bindDirectExchangeName = bindDirectExchangeName
this._exchangeOptions = exchangeOptions || {}
this._exchangeOptions = exchangeOptions

this._prefetchCount = prefetchCount
this._timeoutMs = timeoutMs
this.RequestModel = RequestMessageModel || QueueMessage
this.ResponseModel = ResponseMessageModel || QueueMessage
this.RequestContentSchema = RequestContentSchema || JSON
this.ResponseContentSchema = ResponseContentSchema || JSON
this.RequestModel = RequestMessageModel
this.ResponseModel = ResponseMessageModel
this.RequestContentSchema = RequestContentSchema
this.ResponseContentSchema = ResponseContentSchema

this.actions = new Map()
}
Expand Down

0 comments on commit 1fe8e9d

Please sign in to comment.