Skip to content

Commit

Permalink
handle multiple connection urls and hosts
Browse files Browse the repository at this point in the history
  • Loading branch information
tunderdomb committed Aug 10, 2022
1 parent ae327e9 commit 6d26abe
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 4 deletions.
50 changes: 48 additions & 2 deletions src/QueueConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class QueueConnection {
this._connectionPromise = null
this._channel = null
this._channelPromise = null
this._activeConnectionConfig = null
}

setLogger (logger) {
Expand Down Expand Up @@ -43,7 +44,7 @@ class QueueConnection {
options.ca = options.ca.map((ca) => fs.readFileSync(ca))
}

this._connectionPromise = amqp.connect(this._config.url, options).then((conn) => {
this._connectionPromise = this._connect(this._config.url, options).then((conn) => {
this._logger.info('RabbitMQ connection established')

conn.on('error', (err) => {
Expand All @@ -67,14 +68,59 @@ class QueueConnection {
this._connection = conn
return conn
}).catch((err) => {
this._logger.error(err)
this._logger.error('RabbitMQ connection failed', err)

throw err
})

return this._connectionPromise
}

async _connect (configUrl, options) {
if (Array.isArray(configUrl)) {
// handle multiple connection urls
for (const url of configUrl) {
try {
const connection = await amqp.connect(url, options)
this._activeConnectionConfig = url
return connection
} catch (err) {
// let the next connection url in the list by tried
this._logger.warn('RabbitMQ connection failed to url:', url)
}
}

throw new Error('RabbitMQ connection filed with multiple urls')
} else if (Array.isArray(configUrl.hostname)) {
// handle multiple connection hosts
for (const host of configUrl.hostname) {
const connectionUrl = {
...this._config.url, // copy given config
hostname: host // use hostname from current iteration
}

try {
const connection = await amqp.connect(connectionUrl, options)
this._activeConnectionConfig = connectionUrl
return connection
} catch (err) {
this._logger.warn('RabbitMQ connection failed to host:', { ...connectionUrl, password: connectionUrl.password ? '***' : connectionUrl.password })
}
}

throw new Error('RabbitMQ connection filed with multiple hosts')
} else {
// assume simple url string or standard url object
return amqp.connect(configUrl, options)
}
}

onConnection (event, callback) {
if (this._connection) {
this._connection.on(event, callback)
}
}

/**
* @return Promise<amqplib.ConfirmChannel>
* */
Expand Down
85 changes: 83 additions & 2 deletions test/QueueConnection.test.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,96 @@
const QueueConnection = require('../src/QueueConnection')
const QueueConfig = require('../src/QueueConfig')
const config = require('./config/LoadConfig')
const chai = require('chai')
const URL = require('node:url').URL
const assert = chai.assert

function copyConfig (obj) {
return new QueueConfig({
...JSON.parse(JSON.stringify(config)),
...obj,
logger: config.logger
})
}

describe('QueueConnection', () => {
const connection = new QueueConnection(config)
it('#connect() creates a connection to RabbitMQ', (done) => {
const connection = new QueueConnection(config)
connection.connect()
.then((c) => {
.then(() => {
done()
})
.catch((e) => {
done(new Error(`connect() failed: ${e}`))
})
})

it('#connect() fails to connect for invalid connection url', (done) => {
const multiUrlConfig = copyConfig({
url: 'invalid_url'
})

const connection = new QueueConnection(multiUrlConfig)
connection.connect().then(() => {
done(new Error('Should not connect'))
}).catch(() => {
done()
})
})

it('#connect() handles multiple string urls and connects to the first working one', async () => {
const multiUrlConfig = copyConfig({
url: [config.url, 'invalid_url']
})

const connection = new QueueConnection(multiUrlConfig)
await connection.connect()
assert.strictEqual(connection._activeConnectionConfig, config.url)
})

it('#connect() handles multiple string urls and tries the next url in the list if one is not working', async () => {
const multiUrlConfig = copyConfig({
url: ['invalid_url', config.url]
})

const connection = new QueueConnection(multiUrlConfig)
await connection.connect()
assert.strictEqual(connection._activeConnectionConfig, config.url)
})

it('#connect() handles multiple hosts in an url object and connects to the first working one', async () => {
const url = new URL(config.url)
const urlObject = {
protocol: url.protocol.replace(':', ''),
hostname: [url.hostname, 'invalid_host'],
port: parseInt(url.port, 10),
username: url.username ? url.username : undefined,
password: url.password ? url.password : undefined
}
const multiUrlConfig = copyConfig({
url: urlObject
})

const connection = new QueueConnection(multiUrlConfig)
await connection.connect()
assert.strictEqual(connection._activeConnectionConfig.hostname, url.hostname)
})

it('#connect() handles multiple hosts in an url object and tries the next url in the list if one is not working', async () => {
const url = new URL(config.url)
const urlObject = {
protocol: url.protocol.replace(':', ''),
hostname: [url.hostname, 'invalid_host'],
port: parseInt(url.port, 10),
username: url.username ? url.username : undefined,
password: url.password ? url.password : undefined
}
const multiUrlConfig = copyConfig({
url: urlObject
})

const connection = new QueueConnection(multiUrlConfig)
await connection.connect()
assert.strictEqual(connection._activeConnectionConfig.hostname, url.hostname)
})
})

0 comments on commit 6d26abe

Please sign in to comment.