Skip to content

Commit

Permalink
Merge 85993ef into 1e37290
Browse files Browse the repository at this point in the history
  • Loading branch information
gab3sz committed Nov 28, 2022
2 parents 1e37290 + 85993ef commit edd17e7
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 6 deletions.
11 changes: 11 additions & 0 deletions src/ConnectionPool.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ class ConnectionPool {
await connection.connect()
}
}

/**
* @return {Promise}
*/
async reconnect () {
const connections = [...this.connections.values()]

for (const connection of connections) {
await connection.reconnect()
}
}
}

module.exports = ConnectionPool
34 changes: 28 additions & 6 deletions src/QueueConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ class QueueConnection {
this._channel = null
this._channelPromise = null
this._activeConnectionConfig = null
this._onClose = () => {
this._logger.error('RabbitMQ closed')
if (this._config.exitOnConnectionClose) {
process.exit(this._config.exitOnConnectionClose)
}
}
}

setLogger (logger) {
Expand Down Expand Up @@ -52,12 +58,7 @@ class QueueConnection {
this._logger.error('RabbitMQ error', err)
}
})
conn.on('close', () => {
this._logger.error('RabbitMQ closed')
if (this._config.exitOnConnectionClose) {
process.exit(this._config.exitOnConnectionClose)
}
})
conn.on('close', this._onClose)
conn.on('blocked', (reason) => {
this._logger.error('RabbitMQ blocked', reason)
})
Expand Down Expand Up @@ -115,6 +116,27 @@ class QueueConnection {
}
}

/**
* @return Promise
* */
async close (handleCloseEvent = false) {
if (this._connection) {
if (!handleCloseEvent) {
this._connection.off('close', this._onClose)
}

try {
await this._connection.close()
} catch (err) {
this._logger.error('RabbitMQ close connection failed', err)
throw err
}
}

this._connection = null
this._connectionPromise = null
}

onConnection (event, callback) {
if (this._connection) {
this._connection.on(event, callback)
Expand Down
10 changes: 10 additions & 0 deletions src/QueueManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ class QueueManager {
}
}

async reconnect () {
try {
await this.connection.close()
await this.connect()
} catch (err) {
this._logger.error('Failed to reconnect to queue server', err)
throw err
}
}

setLogger (logger) {
this._logger = logger
this.connection.setLogger(logger)
Expand Down
47 changes: 47 additions & 0 deletions test/ConnectionPool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,51 @@ describe('ConnectionPool', () => {
done()
})
})

it('Should reconnect', (done) => {
const pool = new ConnectionPool()
pool.setLogger(logger)
pool.setupQueueManagers({
default: config
})
Promise.resolve().then(() => {
return pool.connect()
}).then(() => {
return pool.reconnect()
}).then(() => {
done()
}).catch((err) => {
done(err)
})
})

it('Should not reconnect to wrong config', (done) => {
const pool = new ConnectionPool()
pool.setLogger(logger)
pool.setupQueueManagers({
default: config
})
Promise.resolve().then(() => {
return pool.connect()
}).then(() => {
pool.setupQueueManagers({
default: new QueueConfig({
url: 'amqps://localhost:22',
rpcTimeoutMs: 10000,
rpcQueueMaxSize: 100,
logger,
options: {
timeout: 50
}
})
})

return pool.reconnect()
}).then(() => {
done('Reconnection with wrong config should not connect')
}).catch((err) => {
assert.instanceOf(err, Error, 'Reconnection with wrong config should throw an error ')
done()
})
})
})
64 changes: 64 additions & 0 deletions test/QueueConnection.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,68 @@ describe('QueueConnection', () => {
await connection.connect()
assert.strictEqual(connection._activeConnectionConfig.hostname, url.hostname)
})

it('#close() closes connection to RabbitMQ', async () => {
const connection = new QueueConnection(config)
try {
await connection.connect()
} catch (e) {
throw new Error(`connect() failed: ${e}`)
}

try {
await connection.close()
} catch (e) {
throw new Error(`close() failed: ${e}`)
}

assert.strictEqual(connection._connection, null)
assert.strictEqual(connection._connectionPromise, null)
})

it('#close() closes connection to RabbitMQ and the close event callback is invoked', async () => {
const connection = new QueueConnection(config)

let callbackCalled = false
connection._onClose = () => {
callbackCalled = true
}

try {
await connection.connect()
} catch (e) {
throw new Error(`connect() failed: ${e}`)
}

try {
await connection.close(true)
} catch (e) {
throw new Error(`close() failed: ${e}`)
}

assert.strictEqual(callbackCalled, true)
})

it('#close() closes connection to RabbitMQ and the close event callback is not invoked', async () => {
const connection = new QueueConnection(config)

let callbackCalled = false
connection._onClose = () => {
callbackCalled = true
}

try {
await connection.connect()
} catch (e) {
throw new Error(`connect() failed: ${e}`)
}

try {
await connection.close()
} catch (e) {
throw new Error(`close() failed: ${e}`)
}

assert.strictEqual(callbackCalled, false)
})
})

0 comments on commit edd17e7

Please sign in to comment.