Skip to content

Commit

Permalink
Merge 0ad60ec into 1e37290
Browse files Browse the repository at this point in the history
  • Loading branch information
gab3sz committed Nov 24, 2022
2 parents 1e37290 + 0ad60ec commit 90166d0
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 23 deletions.
11 changes: 11 additions & 0 deletions src/ConnectionPool.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ class ConnectionPool {
await connection.connect()
}
}

/**
* @return {Promise}
*/
async reconnect () {
const connections = [...this.connections.values()]

for (const connection of connections) {
await connection.reconnect()
}
}
}

module.exports = ConnectionPool
32 changes: 26 additions & 6 deletions src/QueueConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ class QueueConnection {
this._channel = null
this._channelPromise = null
this._activeConnectionConfig = null
this._onClose = () => {
this._logger.error('RabbitMQ closed')
if (this._config.exitOnConnectionClose) {
process.exit(this._config.exitOnConnectionClose)
}
}
}

setLogger (logger) {
Expand Down Expand Up @@ -52,12 +58,7 @@ class QueueConnection {
this._logger.error('RabbitMQ error', err)
}
})
conn.on('close', () => {
this._logger.error('RabbitMQ closed')
if (this._config.exitOnConnectionClose) {
process.exit(this._config.exitOnConnectionClose)
}
})
conn.on('close', this._onClose)
conn.on('blocked', (reason) => {
this._logger.error('RabbitMQ blocked', reason)
})
Expand Down Expand Up @@ -115,6 +116,25 @@ class QueueConnection {
}
}

/**
* @return Promise
* */
close (handleCloseEvent = false) {
if (this._connection) {
if (!handleCloseEvent) {
this._connection.off('close', this._onClose)
}

this._connection.close((err) => {
this._logger.error('RabbitMQ close connection failed', err)
throw err
})
}

this._connection = null
this._connectionPromise = null
}

onConnection (event, callback) {
if (this._connection) {
this._connection.on(event, callback)
Expand Down
10 changes: 10 additions & 0 deletions src/QueueManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ class QueueManager {
}
}

async reconnect () {
try {
await this.connection.close()
await this.connect()
} catch (err) {
this._logger.error('Failed to reconnect to queue server', err)
throw err
}
}

setLogger (logger) {
this._logger = logger
this.connection.setLogger(logger)
Expand Down
47 changes: 47 additions & 0 deletions test/ConnectionPool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,51 @@ describe('ConnectionPool', () => {
done()
})
})

it('Should reconnect', (done) => {
const pool = new ConnectionPool()
pool.setLogger(logger)
pool.setupQueueManagers({
default: config
})
Promise.resolve().then(() => {
return pool.connect()
}).then(() => {
return pool.reconnect()
}).then(() => {
done()
}).catch((err) => {
done(err)
})
})

it('Should not reconnect to wrong config', (done) => {
const pool = new ConnectionPool()
pool.setLogger(logger)
pool.setupQueueManagers({
default: config
})
Promise.resolve().then(() => {
return pool.connect()
}).then(() => {
pool.setupQueueManagers({
default: new QueueConfig({
url: 'amqps://localhost:22',
rpcTimeoutMs: 10000,
rpcQueueMaxSize: 100,
logger,
options: {
timeout: 50
}
})
})

return pool.reconnect()
}).then(() => {
done('Reconnection with wrong config should not connect')
}).catch((err) => {
assert.instanceOf(err, Error, 'Reconnection with wrong config should throw an error ')
done()
})
})
})
18 changes: 18 additions & 0 deletions test/QueueConnection.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,22 @@ describe('QueueConnection', () => {
await connection.connect()
assert.strictEqual(connection._activeConnectionConfig.hostname, url.hostname)
})

it('#close() closes connection to RabbitMQ', async () => {
const connection = new QueueConnection(config)
try {
await connection.connect()
} catch (e) {
throw new Error(`connect() failed: ${e}`)
}

try {
await connection.close()
} catch (e) {
throw new Error(`close() failed: ${e}`)
}

assert.strictEqual(connection._connection, null)
assert.strictEqual(connection._connectionPromise, null)
})
})
17 changes: 0 additions & 17 deletions test/config/TestConfig.js.config

This file was deleted.

0 comments on commit 90166d0

Please sign in to comment.