Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tunderdomb committed Jun 22, 2022
1 parent 90af6a7 commit 72b4ca7
Show file tree
Hide file tree
Showing 13 changed files with 114 additions and 35 deletions.
25 changes: 22 additions & 3 deletions src/GatheringClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,37 @@ class GatheringClient {
this._correlationIdMap = new Map()
this._replyQueue = ''

const { queueMaxSize, timeoutMs, serverCount = 0 } = options
const {
queueMaxSize,
timeoutMs,
serverCount = 0,
assertQueueOptions,
assertExchange = true
} = options
this._rpcQueueMaxSize = queueMaxSize
this._rpcTimeoutMs = timeoutMs
this._gatheringServerCount = serverCount
this._assertExchange = null

this._assertQueueOptions = assertQueueOptions
? Object.assign(assertQueueOptions || {}, { exclusive: true })
: { exclusive: true } // defaults

if (assertExchange) {
this._assertExchange = assertExchange === true
? { durable: true }
: assertExchange
}
}

async initialize () {
try {
const channel = await this._connection.getChannel()
await channel.assertExchange(this.name, 'fanout', { durable: true })
if (this._assertExchange) {
await channel.assertExchange(this.name, 'fanout', this._assertExchange)
}

const replyQueue = await channel.assertQueue('', { exclusive: true })
const replyQueue = await channel.assertQueue('', this._assertQueueOptions)
this._replyQueue = replyQueue.queue

await channel.consume(this._replyQueue, (reply) => {
Expand Down
24 changes: 21 additions & 3 deletions src/GatheringServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,36 @@ class GatheringServer {
this.name = name
this.statusQueue = name

const { prefetchCount, timeoutMs } = options || {}
const {
prefetchCount,
timeoutMs,
assertQueueOptions,
assertExchange = true
} = options || {}
this._prefetchCount = prefetchCount
this._responseTimeoutMs = timeoutMs
this._assertExchange = null

this._assertQueueOptions = assertQueueOptions
? Object.assign(assertQueueOptions || {}, { exclusive: true })
: { exclusive: true } // defaults

if (assertExchange) {
this._assertExchange = assertExchange === true
? { durable: true }
: assertExchange
}

this.actions = new Map()
}

async initialize () {
try {
const channel = await this._connection.getChannel()
await channel.assertExchange(this.name, 'fanout', { durable: true })
const serverQueue = await channel.assertQueue('', { exclusive: true })
if (this._assertExchange) {
await channel.assertExchange(this.name, 'fanout', this._assertExchange)
}
const serverQueue = await channel.assertQueue('', this._assertQueueOptions)
const serverQueueName = serverQueue.queue

await channel.prefetch(this._prefetchCount)
Expand Down
2 changes: 1 addition & 1 deletion src/QueueConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class QueueConnection {
})
conn.on('close', () => {
this._logger.error('RabbitMQ closed')
process.exit(2)
// process.exit(2)
})
conn.on('blocked', (reason) => {
this._logger.error('RabbitMQ blocked', reason)
Expand Down
15 changes: 13 additions & 2 deletions src/QueueManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,24 @@ class QueueManager {
/**
* @param {String} exchangeName
* @param {Publisher|function() : Publisher} OverrideClass
* @param {Object} [options]
* @return Publisher
*/
getPublisher (exchangeName, OverrideClass = Publisher) {
getPublisher (exchangeName, OverrideClass = Publisher, options = {}) {
if (this.publishers.has(exchangeName)) {
return this.publishers.get(exchangeName)
}

if (arguments.length === 2 && typeof OverrideClass !== 'function') {
options = OverrideClass
OverrideClass = Publisher
}

if (OverrideClass !== Publisher && !(OverrideClass.prototype instanceof Publisher)) {
throw new Error('Override must be a subclass of Publisher')
}

const publisher = new OverrideClass(this.connection, this._logger, exchangeName)
const publisher = new OverrideClass(this.connection, this._logger, exchangeName, options)

this.publishers.set(exchangeName, publisher)

Expand Down Expand Up @@ -282,6 +288,11 @@ class QueueManager {
return this.queueClients.get(queueName)
}

if (arguments.length === 2 && typeof OverrideClass !== 'function') {
options = OverrideClass
OverrideClass = QueueClient
}

if (OverrideClass !== QueueClient && !(OverrideClass.prototype instanceof QueueClient)) {
throw new Error('Override must be a subclass of QueueClient')
}
Expand Down
2 changes: 1 addition & 1 deletion src/QueueServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class QueueServer extends Subscriber {
this._processMessage(channel, msg)
})
} catch (err) {
this._logger.error('CANNOT INITIALIZE QUEUE SERVER', err)
this._logger.error('CANNOT INITIALIZE QUEUE SERVER', this.name, this._assertQueue, err)
throw err
}
}
Expand Down
9 changes: 5 additions & 4 deletions test/Gathering-multi-server.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ const ConsoleInspector = require('./consoleInspector')
const config = require('./config/LoadConfig')

describe('GatheringClient && multiple GatheringServer', () => {
const gatheringName = 'test-gathering-multi'
const gatheringName = 'techteamer-mq-js-test-gathering-multi'
const logger = new ConsoleInspector(console)
const timeoutMs = 1000
const assertExchangeOptions = { durable: false, autoDelete: true }

const queueManager = new QueueManager(config)
queueManager.setLogger(logger)

const gatheringClient = queueManager.getGatheringClient(gatheringName, { queueMaxSize: 100, timeoutMs })
const gatheringServer1 = new GatheringServer(queueManager.connection, queueManager._logger, gatheringName, { prefetchCount: 1, timeoutMs })
const gatheringServer2 = new GatheringServer(queueManager.connection, queueManager._logger, gatheringName, { prefetchCount: 1, timeoutMs })
const gatheringClient = queueManager.getGatheringClient(gatheringName, { queueMaxSize: 100, timeoutMs, assertExchange: assertExchangeOptions })
const gatheringServer1 = new GatheringServer(queueManager.connection, queueManager._logger, gatheringName, { prefetchCount: 1, timeoutMs, assertExchange: assertExchangeOptions })
const gatheringServer2 = new GatheringServer(queueManager.connection, queueManager._logger, gatheringName, { prefetchCount: 1, timeoutMs, assertExchange: assertExchangeOptions })

before(() => {
return queueManager.connect().then(() => {
Expand Down
5 changes: 3 additions & 2 deletions test/Gathering.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ describe('GatheringClient && GatheringServer', () => {
const gatheringName = 'test-gathering'
const logger = new ConsoleInspector(console)
const timeoutMs = 1000
const assertExchangeOptions = { durable: false, autoDelete: true }

const queueManager = new QueueManager(config)
queueManager.setLogger(logger)

const gatheringClient = queueManager.getGatheringClient(gatheringName, { queueMaxSize: 100, timeoutMs })
const gatheringServer1 = queueManager.getGatheringServer(gatheringName, { prefetchCount: 1, timeoutMs })
const gatheringClient = queueManager.getGatheringClient(gatheringName, { queueMaxSize: 100, timeoutMs, assertExchange: assertExchangeOptions })
const gatheringServer1 = queueManager.getGatheringServer(gatheringName, { prefetchCount: 1, timeoutMs, assertExchange: assertExchangeOptions })

before(() => {
return queueManager.connect()
Expand Down
7 changes: 4 additions & 3 deletions test/GatheringAction.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ const chai = require('chai')
const expect = chai.expect

describe('GatheringClient && GatheringServer actions', () => {
const gatheringName = 'test-gathering-action'
const gatheringName = 'techteamer-mq-js-test-test-gathering-action'
const logger = new ConsoleInspector(console)
const timeoutMs = 1000
const assertExchangeOptions = { durable: false, autoDelete: true }

const queueManager = new QueueManager(config)
queueManager.setLogger(logger)

const gatheringClient = queueManager.getGatheringClient(gatheringName, { queueMaxSize: 100, timeoutMs })
const gatheringServer1 = queueManager.getGatheringServer(gatheringName, { prefetchCount: 1, timeoutMs })
const gatheringClient = queueManager.getGatheringClient(gatheringName, { queueMaxSize: 100, timeoutMs, assertExchange: assertExchangeOptions })
const gatheringServer1 = queueManager.getGatheringServer(gatheringName, { prefetchCount: 1, timeoutMs, assertExchange: assertExchangeOptions })

before(() => {
return queueManager.connect()
Expand Down
13 changes: 10 additions & 3 deletions test/PubSubAction.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,24 @@ const ConsoleInspector = require('./consoleInspector')
const config = require('./config/LoadConfig')

describe('Publisher && Subscriber actions', () => {
const publisherName = 'test-publisher-action'
const publisherName = 'techteamer-mq-js-test-publisher-action'
const logger = new ConsoleInspector(console)
const maxRetry = 5
const assertExchangeOptions = { durable: false, autoDelete: true }

const publisherManager = new QueueManager(config)
publisherManager.setLogger(logger)
const publisher = publisherManager.getPublisher(publisherName)
const publisher = publisherManager.getPublisher(publisherName, {
assertExchange: assertExchangeOptions
})

const subscriberManager = new QueueManager(config)
subscriberManager.setLogger(logger)
const subscriber = subscriberManager.getSubscriber(publisherName, { maxRetry, timeoutMs: 10000 })
const subscriber = subscriberManager.getSubscriber(publisherName, {
maxRetry,
timeoutMs: 10000,
assertExchange: assertExchangeOptions
})

before(() => {
return publisherManager.connect().then(() => {
Expand Down
19 changes: 15 additions & 4 deletions test/Pubsub.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,24 @@ const SeedRandom = require('seed-random')
const config = require('./config/LoadConfig')

describe('Publisher && Subscriber', () => {
const publisherName = 'test-publisher'
const publisherName = 'techteamer-mq-js-test-publisher'
const logger = new ConsoleInspector(console)
const maxRetry = 5
const assertExchangeOptions = { durable: false, autoDelete: true }

const publisherManager = new QueueManager(config)
publisherManager.setLogger(logger)
const publisher = publisherManager.getPublisher(publisherName)
const publisher = publisherManager.getPublisher(publisherName, {
assertExchange: assertExchangeOptions
})

const subscriberManager = new QueueManager(config)
subscriberManager.setLogger(logger)
const subscriber = subscriberManager.getSubscriber(publisherName, { maxRetry, timeoutMs: 10000 })
const subscriber = subscriberManager.getSubscriber(publisherName, {
maxRetry,
timeoutMs: 10000,
assertExchange: assertExchangeOptions
})

before(() => {
return publisherManager.connect().then(() => {
Expand Down Expand Up @@ -125,7 +132,11 @@ describe('Publisher && Subscriber', () => {
it('Publisher.send() sends a message and each subscriber receives it', (done) => {
const otherManager = new QueueManager(config)
otherManager.setLogger(logger)
const otherSubscriber = otherManager.getSubscriber(publisherName, { maxRetry, timeoutMs: 10000 })
const otherSubscriber = otherManager.getSubscriber(publisherName, {
maxRetry,
timeoutMs: 10000,
assertExchange: assertExchangeOptions
})

otherManager.connect().then(() => {
const stringMessage = 'foobar'
Expand Down
14 changes: 11 additions & 3 deletions test/Queue.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,25 @@ const SeedRandom = require('seed-random')
const config = require('./config/LoadConfig')

describe('QueueClient && QueueServer', () => {
const queueName = 'test-queue'
const queueName = 'techteamer-mq-js-test-queue'
const logger = new ConsoleInspector(console)
const maxRetry = 5
const assertQueueOptions = { durable: false, exclusive: true }

const clientManager = new QueueManager(config)
clientManager.setLogger(logger)
const queueClient = clientManager.getQueueClient(queueName)
const queueClient = clientManager.getQueueClient(queueName, {
assertQueue: false
})

const serverManager = new QueueManager(config)
serverManager.setLogger(logger)
const options = { prefetchCount: 1, maxRetry, timeoutMs: 10000 }
const options = {
prefetchCount: 1,
maxRetry,
timeoutMs: 10000,
assertQueue: assertQueueOptions
}
const queueServer = serverManager.getQueueServer(queueName, options)

before(() => {
Expand Down
9 changes: 5 additions & 4 deletions test/RPC.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@ const SeedRandom = require('seed-random')
const config = require('./config/LoadConfig')

describe('RPCClient && RPCServer', () => {
const rpcName = 'test-rpc'
const shortRpcName = 'short-test-rpc'
const rpcName = 'techteamer-mq-js-test-rpc'
const shortRpcName = 'techteamer-mq-js-test-rpc-short'
const logger = new ConsoleInspector(console)
const timeoutMs = 1000
const assertQueueOptions = { durable: false, exclusive: true }

const queueManager = new QueueManager(config)
queueManager.setLogger(logger)

const rpcClient = queueManager.getRPCClient(rpcName, { queueMaxSize: 100, timeoutMs })
const rpcServer = queueManager.getRPCServer(rpcName, { prefetchCount: 1, timeoutMs })
const rpcServer = queueManager.getRPCServer(rpcName, { prefetchCount: 1, timeoutMs, assertQueue: assertQueueOptions })
const shortRpcClient = queueManager.getRPCClient(shortRpcName, { queueMaxSize: 1, timeoutMs })
const shortRpcServer = queueManager.getRPCServer(shortRpcName, { prefetchCount: 1, timeoutMs })
const shortRpcServer = queueManager.getRPCServer(shortRpcName, { prefetchCount: 1, timeoutMs, assertQueue: assertQueueOptions })

before(() => {
return queueManager.connect()
Expand Down
5 changes: 3 additions & 2 deletions test/RPCAction.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ const chai = require('chai')
const expect = chai.expect

describe('RPCClient && RPCServer actions', function () {
const rpcName = 'test-rpc-action'
const rpcName = 'techteamer-mq-js-test-rpc-action'
const logger = new ConsoleInspector(console)
const timeoutMs = 1000
const assertQueueOptions = { durable: false, exclusive: true }

const queueManager = new QueueManager(config)
queueManager.setLogger(logger)

const rpcClient = queueManager.getRPCClient(rpcName, { queueMaxSize: 100, timeoutMs })
const rpcServer = queueManager.getRPCServer(rpcName, { prefetchCount: 1, timeoutMs })
const rpcServer = queueManager.getRPCServer(rpcName, { prefetchCount: 1, timeoutMs, assertQueue: assertQueueOptions })

before(() => {
return queueManager.connect()
Expand Down

0 comments on commit 72b4ca7

Please sign in to comment.