Skip to content

Commit

Permalink
Merge 11e3daf into 97614a5
Browse files Browse the repository at this point in the history
  • Loading branch information
tbence94 authored Jan 16, 2023
2 parents 97614a5 + 11e3daf commit e5c02cc
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 77 deletions.
11 changes: 11 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# http://editorconfig.org
root = true

[*]
indent_style = space
indent_size = 2
end_of_line = lf
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true
max_line_length = 240
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
- Added backward compatibility for object based connection URL configuration

6.0.0
- BREAKING: removed process.exit on connection close
- BREAKING: connection configs only support string AMQP URI syntax (https://www.rabbitmq.com/uri-spec.html)
Expand Down Expand Up @@ -36,7 +38,7 @@
- dependency updates, drop support for node v10

4.1.1
- dependency updates
- dependency updates

4.1.0
- promises rewrite to async-await in most cases
Expand Down
3 changes: 3 additions & 0 deletions src/ConnectionPool.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ class ConnectionPool {
const { defaultConnectionName } = poolConfig || {}

this._logger = null
/**
* @type {Map<string, QueueManager>}
*/
this.connections = new Map()
this.defaultConnection = null
this.defaultConnectionName = defaultConnectionName || 'default'
Expand Down
39 changes: 31 additions & 8 deletions src/QueueConfig.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const { URL } = require('node:url')

class RabbitMqOptions {
constructor (options = {}) {
Expand All @@ -19,14 +20,6 @@ class RabbitMqOptions {
}

class QueueConfig {
static isValidConfig (obj) {
if (!obj || !obj.url) {
return false
}

return true
}

constructor (config = {}) {
const {
url = 'amqps://localhost:5672',
Expand All @@ -43,6 +36,36 @@ class QueueConfig {
this.rpcQueueMaxSize = rpcQueueMaxSize
this.logger = logger
}

static isValidConfig (obj) {
return !!(obj && obj.url)
}

static urlStringToObject (url) {
if (typeof url !== 'string') {
return url
}

const parsedUrl = new URL(url)
return {
protocol: parsedUrl.protocol ? parsedUrl.protocol.slice(0, -1) : undefined,
hostname: parsedUrl.hostname ? parsedUrl.hostname : undefined,
port: parsedUrl.port ? parseInt(parsedUrl.port, 10) : undefined,
username: parsedUrl.username ? parsedUrl.username : undefined,
password: parsedUrl.password ? parsedUrl.password : undefined,
vhost: parsedUrl.pathname ? parsedUrl.pathname.slice(1) : undefined
}
}

static urlObjectToLogString (urlObject) {
return [
urlObject.protocol || 'amqps',
'://',
urlObject.hostname,
urlObject.port ? `:${urlObject.port}` : '',
urlObject.vhost ? `/${urlObject.vhost}` : ''
].join('')
}
}

module.exports = QueueConfig
60 changes: 37 additions & 23 deletions src/QueueConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ const fs = require('fs')
const amqp = require('amqplib/channel_api')
const QueueConfig = require('./QueueConfig')
const EventEmitter = require('events')
const URL = require('node:url').URL

/**
* @class QueueConnection
* */
class QueueConnection extends EventEmitter {
/**
* @param {QueueConfig} config
* @param {Object|QueueConfig} config
*/
constructor (config) {
super()
Expand All @@ -19,7 +18,7 @@ class QueueConnection extends EventEmitter {
this._connectionPromise = null
this._channel = null
this._channelPromise = null
this._activeConnectionUrl = null
this._activeConnectionConfig = null
}

setLogger (logger) {
Expand Down Expand Up @@ -48,8 +47,7 @@ class QueueConnection extends EventEmitter {
}

this._connectionPromise = this._connect(this._config.url, options).then((conn) => {
const urlObject = new URL(this._activeConnectionUrl)
this._logger.info(`RabbitMQ connection established on '${urlObject.host}' host`)
this._logger.info(`RabbitMQ connection established: '${QueueConfig.urlObjectToLogString(this._activeConnectionConfig)}'`)

conn.on('error', (err) => {
if (err.message !== 'Connection closing') {
Expand Down Expand Up @@ -82,27 +80,43 @@ class QueueConnection extends EventEmitter {
}

async _connect (configUrl, options) {
if (Array.isArray(configUrl)) {
// handle multiple connection urls
for (const url of configUrl) {
try {
const connection = await amqp.connect(url, options)
this._activeConnectionUrl = url
return connection
} catch (err) {
// let the next connection url in the list by tried
const urlObject = new URL(url)
this._logger.warn('RabbitMQ connection failed to host:', urlObject.host)
}
// handle multiple connection hosts
if (Array.isArray(configUrl.hostname)) {
const urls = []
for (const host of configUrl.hostname) {
urls.push({
...configUrl, // copy given config
hostname: host // use hostname from current iteration
})
}
configUrl = urls
}

// handle multiple connection urls
if (Array.isArray(configUrl)) {
return this._connectWithMultipleUrls(configUrl, options)
}

throw new Error('RabbitMQ connection filed with multiple urls')
} else {
// assume simple url string
const connection = await amqp.connect(configUrl, options)
this._activeConnectionUrl = configUrl
return connection
// assume simple url string or standard url object
const connectionUrl = QueueConfig.urlStringToObject(configUrl)
const connection = await amqp.connect(configUrl, options)
this._activeConnectionConfig = connectionUrl
return connection
}

async _connectWithMultipleUrls (urls, options) {
for (const url of urls) {
const connectionUrl = QueueConfig.urlStringToObject(url)
try {
const connection = await amqp.connect(connectionUrl, options)
this._activeConnectionConfig = connectionUrl
return connection
} catch (err) {
this._logger.warn('RabbitMQ connection failed to host:', { ...connectionUrl, password: connectionUrl.password ? '***' : connectionUrl.password })
}
}

throw new Error('RabbitMQ connection filed with multiple urls')
}

/**
Expand Down
20 changes: 12 additions & 8 deletions src/QueueManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class QueueManager {
OverrideClass = RPCClient
}

if (OverrideClass !== RPCClient && !(OverrideClass.prototype instanceof RPCClient)) {
if (!this._isSubClass(OverrideClass, RPCClient)) {
throw new Error('Override must be a subclass of RPCClient')
}

Expand Down Expand Up @@ -153,7 +153,7 @@ class QueueManager {
OverrideClass = RPCServer
}

if (OverrideClass !== RPCServer && !(OverrideClass.prototype instanceof RPCServer)) {
if (!this._isSubClass(OverrideClass, RPCServer)) {
throw new Error('Override must be a subclass of RPCServer')
}

Expand Down Expand Up @@ -185,7 +185,7 @@ class QueueManager {
OverrideClass = Publisher
}

if (OverrideClass !== Publisher && !(OverrideClass.prototype instanceof Publisher)) {
if (!this._isSubClass(OverrideClass, Publisher)) {
throw new Error('Override must be a subclass of Publisher')
}

Expand All @@ -212,7 +212,7 @@ class QueueManager {
OverrideClass = Subscriber
}

if (OverrideClass !== Subscriber && !(OverrideClass.prototype instanceof Subscriber)) {
if (!this._isSubClass(OverrideClass, Subscriber)) {
throw new Error('Override must be a subclass of Subscriber')
}

Expand Down Expand Up @@ -245,7 +245,7 @@ class QueueManager {
OverrideClass = GatheringClient
}

if (OverrideClass !== GatheringClient && !(OverrideClass.prototype instanceof GatheringClient)) {
if (!this._isSubClass(OverrideClass, GatheringClient)) {
throw new Error('Override must be a subclass of GatheringClient')
}

Expand All @@ -272,7 +272,7 @@ class QueueManager {
OverrideClass = GatheringServer
}

if (OverrideClass !== GatheringServer && !(OverrideClass.prototype instanceof GatheringServer)) {
if (!this._isSubClass(OverrideClass, GatheringServer)) {
throw new Error('Override must be a subclass of GatheringServer')
}

Expand Down Expand Up @@ -303,7 +303,7 @@ class QueueManager {
OverrideClass = QueueClient
}

if (OverrideClass !== QueueClient && !(OverrideClass.prototype instanceof QueueClient)) {
if (!this._isSubClass(OverrideClass, QueueClient)) {
throw new Error('Override must be a subclass of QueueClient')
}

Expand All @@ -330,7 +330,7 @@ class QueueManager {
OverrideClass = QueueServer
}

if (OverrideClass !== QueueServer && !(OverrideClass.prototype instanceof QueueServer)) {
if (!this._isSubClass(OverrideClass, QueueServer)) {
throw new Error('Override must be a subclass of QueueServer')
}

Expand All @@ -346,6 +346,10 @@ class QueueManager {

return queueServer
}

_isSubClass (TestClass, ParentClass) {
return TestClass === ParentClass || TestClass.prototype instanceof ParentClass
}
}

module.exports = QueueManager
6 changes: 3 additions & 3 deletions src/RPCClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,11 @@ class RPCClient {
/**
* @param {String} action
* @param {*} data
* @param {Number} timeoutMs
* @param {Map} attachments
* @param {Number|null} timeoutMs
* @param {Map|null} attachments
* @return {Promise}
* */
callAction (action, data, timeoutMs, attachments) {
callAction (action, data, timeoutMs = null, attachments = null) {
return this.call({ action, data }, timeoutMs, attachments)
}

Expand Down
8 changes: 8 additions & 0 deletions test/ConnectionPool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ describe('ConnectionPool', () => {
Promise.resolve().then(() => {
return pool.connect()
}).then(() => {
// Assert every QueueConnection connected
pool.connections.forEach(manager => {
assert.isNotNull(manager.connection._connection)
})
done()
}).catch((err) => {
done(err)
Expand Down Expand Up @@ -93,6 +97,10 @@ describe('ConnectionPool', () => {
}).then(() => {
return pool.reconnect()
}).then(() => {
// Assert every QueueConnection connected
pool.connections.forEach(manager => {
assert.isNotNull(manager.connection._connection)
})
done()
}).catch((err) => {
done(err)
Expand Down
44 changes: 41 additions & 3 deletions test/QueueConnection.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ describe('QueueConnection', () => {
const connection = new QueueConnection(config)
connection.connect()
.then(() => {
assert.isNotNull(connection._connection)
done()
})
.catch((e) => {
Expand All @@ -33,6 +34,7 @@ describe('QueueConnection', () => {
connection.connect().then(() => {
done(new Error('Should not connect'))
}).catch(() => {
assert.isNull(connection._connection)
done()
})
})
Expand All @@ -44,17 +46,53 @@ describe('QueueConnection', () => {

const connection = new QueueConnection(multiUrlConfig)
await connection.connect()
assert.strictEqual(connection._activeConnectionUrl, config.url)
assert.deepEqual(connection._activeConnectionConfig, QueueConfig.urlStringToObject(config.url))
})

it('#connect() handles multiple string urls and tries the next url in the list if one is not working', async () => {
const multiUrlConfig = copyConfig({
url: [config.url.replace('localhost', 'localhost2'), config.url]
url: ['amqps://random-host:5671', config.url]
})

const connection = new QueueConnection(multiUrlConfig)
await connection.connect()
assert.strictEqual(connection._activeConnectionUrl, config.url)
assert.deepEqual(connection._activeConnectionConfig, QueueConfig.urlStringToObject(config.url))
})

it('#connect() handles multiple hosts in an url object and connects to the first working one', async () => {
let urlObject
if (typeof config.url === 'string') {
urlObject = QueueConfig.urlStringToObject(config.url)
} else {
urlObject = { ...config.url }
}
urlObject.hostname = [urlObject.hostname, 'invalid_host']

const multiUrlConfig = copyConfig({
url: urlObject
})

const connection = new QueueConnection(multiUrlConfig)
await connection.connect()
assert.deepEqual(connection._activeConnectionConfig, QueueConfig.urlStringToObject(config.url))
})

it('#connect() handles multiple hosts in an url object and tries the next url in the list if one is not working', async () => {
let urlObject
if (typeof config.url === 'string') {
urlObject = QueueConfig.urlStringToObject(config.url)
} else {
urlObject = { ...config.url }
}
urlObject.hostname = ['invalid_host', urlObject.hostname]

const multiUrlConfig = copyConfig({
url: urlObject
})

const connection = new QueueConnection(multiUrlConfig)
await connection.connect()
assert.deepEqual(connection._activeConnectionConfig, QueueConfig.urlStringToObject(config.url))
})

it('#close() closes connection to RabbitMQ', async () => {
Expand Down
Loading

0 comments on commit e5c02cc

Please sign in to comment.