diff --git a/src/GatheringClient.js b/src/GatheringClient.js index a24dcd2..c54cc49 100644 --- a/src/GatheringClient.js +++ b/src/GatheringClient.js @@ -31,7 +31,7 @@ class GatheringClient { this._gatheringServerCount = serverCount this._assertExchange = assertExchange === true - this._assertQueueOptions = Object.assign({ exclusive: true }, assertQueueOptions || {}) + this._assertQueueOptions = Object.assign({ exclusive: true, autoDelete: true }, assertQueueOptions || {}) this._assertExchangeOptions = Object.assign({ durable: true }, assertExchangeOptions || {}) } diff --git a/src/GatheringServer.js b/src/GatheringServer.js index 629c47b..3d09d0a 100644 --- a/src/GatheringServer.js +++ b/src/GatheringServer.js @@ -26,7 +26,7 @@ class GatheringServer { this._responseTimeoutMs = timeoutMs this._assertExchange = assertExchange === true - this._assertQueueOptions = Object.assign({ exclusive: true }, assertQueueOptions || {}) + this._assertQueueOptions = Object.assign({ exclusive: true, autoDelete: true }, assertQueueOptions || {}) this._assertExchangeOptions = Object.assign({ durable: true }, assertExchangeOptions || {}) this.actions = new Map() diff --git a/src/Publisher.js b/src/Publisher.js index 2a64e54..9badc4c 100644 --- a/src/Publisher.js +++ b/src/Publisher.js @@ -62,15 +62,18 @@ class Publisher { * @param {String} [correlationId] * @param {Number} [timeOut] * @param {Map} [attachments] + * @param {object} [sendOptions] * @return {Promise} */ - async send (message, correlationId = null, timeOut = null, attachments = null) { - const options = {} + async send (message, correlationId = null, timeOut = null, attachments = null, sendOptions = {}) { + const defaultOptions = {} if (correlationId) { - options.correlationId = correlationId + defaultOptions.correlationId = correlationId } + const options = Object.assign(defaultOptions, sendOptions || {}) + try { const channel = await this._connection.getChannel() let param diff --git a/src/RPCClient.js b/src/RPCClient.js index 7795db8..d305a1e 100644 --- a/src/RPCClient.js +++ b/src/RPCClient.js @@ -32,7 +32,7 @@ class RPCClient { this._correlationIdMap = new Map() this._assertReplyQueue = assertReplyQueue === true - this._assertReplyQueueOptions = Object.assign({ exclusive: true }, assertReplyQueueOptions || {}) + this._assertReplyQueueOptions = Object.assign({ exclusive: true, autoDelete: true }, assertReplyQueueOptions || {}) this._rpcQueueMaxSize = queueMaxSize this._rpcTimeoutMs = timeoutMs @@ -99,9 +99,10 @@ class RPCClient { * @param {Number} timeoutMs * @param {Map} attachments * @param {Boolean} [resolveWithFullResponse=false] + * @param {Object} sendOptions * @return {Promise} * */ - async call (message, timeoutMs = null, attachments = null, resolveWithFullResponse = false) { + async call (message, timeoutMs = null, attachments = null, resolveWithFullResponse = false, sendOptions = {}) { try { if (this._correlationIdMap.size > this._rpcQueueMaxSize) { throw new Error('RPCCLIENT QUEUE FULL ' + this.name) @@ -124,10 +125,9 @@ class RPCClient { return await new Promise((resolve, reject) => { const correlationId = this._registerMessage(resolve, reject, timeoutMs, resolveWithFullResponse) - channel.sendToQueue(this.name, param.serialize(), { - correlationId, - replyTo: this._replyQueue - }) + const options = Object.assign({ correlationId, replyTo: this._replyQueue }, sendOptions || {}) + + channel.sendToQueue(this.name, param.serialize(), options) }) } catch (err) { this._logger.error('RPCCLIENT: cannot make rpc call', err) diff --git a/src/Subscriber.js b/src/Subscriber.js index f060c37..7e6e3d1 100644 --- a/src/Subscriber.js +++ b/src/Subscriber.js @@ -27,7 +27,7 @@ class Subscriber { this.MessageModel = MessageModel || QueueMessage this.ContentSchema = ContentSchema || JSON - this._assertQueueOptions = Object.assign({ exclusive: true }, assertQueueOptions || {}) + this._assertQueueOptions = Object.assign({ exclusive: true, autoDelete: true }, assertQueueOptions || {}) this._assertExchange = assertExchange === true this._assertExchangeOptions = Object.assign({ durable: true }, assertExchangeOptions || {}) diff --git a/test/Queue.test.js b/test/Queue.test.js index 7ae3187..a850b7a 100644 --- a/test/Queue.test.js +++ b/test/Queue.test.js @@ -8,7 +8,7 @@ describe('QueueClient && QueueServer', () => { const queueName = 'techteamer-mq-js-test-queue' const logger = new ConsoleInspector(console) const maxRetry = 5 - const assertQueueOptions = { durable: false, exclusive: true } + const assertQueueOptions = { durable: false, exclusive: true, autoDelete: true } const clientManager = new QueueManager(config) clientManager.setLogger(logger) diff --git a/test/RPC.test.js b/test/RPC.test.js index e6a0be9..8cf4783 100644 --- a/test/RPC.test.js +++ b/test/RPC.test.js @@ -8,7 +8,7 @@ describe('RPCClient && RPCServer', () => { const shortRpcName = 'techteamer-mq-js-test-rpc-short' const logger = new ConsoleInspector(console) const timeoutMs = 1000 - const assertQueueOptions = { durable: false, exclusive: true } + const assertQueueOptions = { durable: false, exclusive: true, autoDelete: true } const queueManager = new QueueManager(config) queueManager.setLogger(logger) diff --git a/test/RPCAction.test.js b/test/RPCAction.test.js index 313cfee..99ffeeb 100644 --- a/test/RPCAction.test.js +++ b/test/RPCAction.test.js @@ -8,7 +8,7 @@ describe('RPCClient && RPCServer actions', function () { const rpcName = 'techteamer-mq-js-test-rpc-action' const logger = new ConsoleInspector(console) const timeoutMs = 1000 - const assertQueueOptions = { durable: false, exclusive: true } + const assertQueueOptions = { durable: false, exclusive: true, autoDelete: true } const queueManager = new QueueManager(config) queueManager.setLogger(logger)