Skip to content

Commit

Permalink
emit channel events to avoid uncaught exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
tbence94 committed Jan 18, 2023
1 parent 0a3605c commit 1365987
Showing 1 changed file with 51 additions and 29 deletions.
80 changes: 51 additions & 29 deletions src/QueueConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ class QueueConnection extends EventEmitter {
/**
* @return Promise<amqplib.Connection>
* */
connect () {
async connect () {
if (this._connection) {
return Promise.resolve(this._connection)
} else if (this._connectionPromise) {
return this._connection
}
if (this._connectionPromise) {
return this._connectionPromise
}

Expand All @@ -46,30 +47,11 @@ class QueueConnection extends EventEmitter {
options.ca = options.ca.map((ca) => fs.readFileSync(ca))
}

this._connectionPromise = this._connect(this._config.url, options).then((conn) => {
this._connectionPromise = this._connect(this._config.url, options).then((connection) => {
this._logger.info(`RabbitMQ connection established: '${QueueConfig.urlObjectToLogString(this._activeConnectionConfig)}'`)

conn.on('error', (err) => {
if (err.message !== 'Connection closing') {
this._logger.error('RabbitMQ error', err)
this.emit('error', err)
}
})
conn.on('close', (err) => {
this._logger.error('RabbitMQ closed', err)
this.emit('close', err)
})
conn.on('blocked', (reason) => {
this._logger.error('RabbitMQ blocked', reason)
this.emit('blocked', reason)
})
conn.on('unblocked', (reason) => {
this._logger.error('RabbitMQ unblocked', reason)
this.emit('unblocked', reason)
})

this._connection = conn
return conn
this.emitConnectionEvents(connection)
this._connection = connection
return connection
}).catch((err) => {
this._logger.error('RabbitMQ connection failed', err)

Expand All @@ -79,6 +61,27 @@ class QueueConnection extends EventEmitter {
return this._connectionPromise
}

emitConnectionEvents (connection) {
connection.on('error', (err) => {
if (err.message !== 'Connection closing') {
this._logger.error('RabbitMQ error', err)
this.emit('error', err)
}
})
connection.on('close', (err) => {
this._logger.error('RabbitMQ closed', err)
this.emit('close', err)
})
connection.on('blocked', (reason) => {
this._logger.error('RabbitMQ blocked', reason)
this.emit('blocked', reason)
})
connection.on('unblocked', (reason) => {
this._logger.error('RabbitMQ unblocked', reason)
this.emit('unblocked', reason)
})
}

async _connect (configUrl, options) {
// handle multiple connection hosts
if (Array.isArray(configUrl.hostname)) {
Expand Down Expand Up @@ -139,16 +142,18 @@ class QueueConnection extends EventEmitter {
/**
* @return Promise<amqplib.ConfirmChannel>
* */
getChannel () {
async getChannel () {
if (this._channel) {
return Promise.resolve(this._channel)
} else if (this._channelPromise) {
return this._channel
}
if (this._channelPromise) {
return this._channelPromise
}

this._channelPromise = this.connect().then((connection) => {
return connection.createConfirmChannel()
}).then((channel) => {
this.emitChannelEvents(channel)
this._channel = channel
return channel
}).catch((err) => {
Expand All @@ -158,6 +163,23 @@ class QueueConnection extends EventEmitter {

return this._channelPromise
}

emitChannelEvents (channel) {
channel.on('error', (err) => {
this._logger.error('RabbitMQ channel error', err)
this.emit('error', err)
})
channel.on('close', (err) => {
this._logger.error('RabbitMQ channel closed', err)
this.emit('close', err)
})
channel.on('return', (reason) => {
this.emit('return', reason)
})
channel.on('drain', (reason) => {
this.emit('drain', reason)
})
}
}

module.exports = QueueConnection

0 comments on commit 1365987

Please sign in to comment.