Skip to content

Commit

Permalink
FKDEV-363: mq reconnect logic
Browse files Browse the repository at this point in the history
  • Loading branch information
gab3sz committed Nov 24, 2022
1 parent 0f638f9 commit a4396e2
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 21 deletions.
11 changes: 11 additions & 0 deletions src/ConnectionPool.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ class ConnectionPool {
await connection.connect()
}
}

/**
* @return {Promise}
*/
async reconnect () {
const connections = [...this.connections.values()]

for (const connection of connections) {
await connection.reconnect()
}
}
}

module.exports = ConnectionPool
40 changes: 19 additions & 21 deletions src/QueueConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,6 @@ class QueueConnection {
this._logger = logger
}

close () {
if (this._connection) {
this._connection.close((err) => {
this._logger.error('RabbitMQ close connection failed', err)
throw err
})
}
}

reconnect () {
if (this._connection) {
this._connection.off('close', this._onClose)
this.close()
}

this._connection = null
this._connectionPromise = null

return this.connect()
}

/**
* @return Promise<amqplib.Connection>
* */
Expand Down Expand Up @@ -139,6 +118,25 @@ class QueueConnection {
}
}

/**
* @return Promise
* */
close (invokeCloseEvent = false) {
if (this._connection) {
if (!invokeCloseEvent) {
this._connection.off('close', this._onClose)
}

this._connection.close((err) => {
this._logger.error('RabbitMQ close connection failed', err)
throw err
})
}

this._connection = null
this._connectionPromise = null
}

onConnection (event, callback) {
if (this._connection) {
this._connection.on(event, callback)
Expand Down
10 changes: 10 additions & 0 deletions src/QueueManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ class QueueManager {
}
}

async reconnect () {
try {
await this.connection.close()
await this.connect()
} catch (err) {
this._logger.error('Failed to reconnect to queue server', err)
throw err
}
}

setLogger (logger) {
this._logger = logger
this.connection.setLogger(logger)
Expand Down

0 comments on commit a4396e2

Please sign in to comment.