diff --git a/src/QueueConnection.js b/src/QueueConnection.js index f3c81ee..f6410fe 100644 --- a/src/QueueConnection.js +++ b/src/QueueConnection.js @@ -28,10 +28,11 @@ class QueueConnection extends EventEmitter { /** * @return Promise * */ - connect () { + async connect () { if (this._connection) { - return Promise.resolve(this._connection) - } else if (this._connectionPromise) { + return this._connection + } + if (this._connectionPromise) { return this._connectionPromise } @@ -46,30 +47,11 @@ class QueueConnection extends EventEmitter { options.ca = options.ca.map((ca) => fs.readFileSync(ca)) } - this._connectionPromise = this._connect(this._config.url, options).then((conn) => { + this._connectionPromise = this._connect(this._config.url, options).then((connection) => { this._logger.info(`RabbitMQ connection established: '${QueueConfig.urlObjectToLogString(this._activeConnectionConfig)}'`) - - conn.on('error', (err) => { - if (err.message !== 'Connection closing') { - this._logger.error('RabbitMQ error', err) - this.emit('error', err) - } - }) - conn.on('close', (err) => { - this._logger.error('RabbitMQ closed', err) - this.emit('close', err) - }) - conn.on('blocked', (reason) => { - this._logger.error('RabbitMQ blocked', reason) - this.emit('blocked', reason) - }) - conn.on('unblocked', (reason) => { - this._logger.error('RabbitMQ unblocked', reason) - this.emit('unblocked', reason) - }) - - this._connection = conn - return conn + this.emitConnectionEvents(connection) + this._connection = connection + return connection }).catch((err) => { this._logger.error('RabbitMQ connection failed', err) @@ -79,6 +61,27 @@ class QueueConnection extends EventEmitter { return this._connectionPromise } + emitConnectionEvents (connection) { + connection.on('error', (err) => { + if (err.message !== 'Connection closing') { + this._logger.error('RabbitMQ error', err) + this.emit('error', err) + } + }) + connection.on('close', (err) => { + this._logger.error('RabbitMQ closed', err) + this.emit('close', err) + }) + connection.on('blocked', (reason) => { + this._logger.error('RabbitMQ blocked', reason) + this.emit('blocked', reason) + }) + connection.on('unblocked', (reason) => { + this._logger.error('RabbitMQ unblocked', reason) + this.emit('unblocked', reason) + }) + } + async _connect (configUrl, options) { // handle multiple connection hosts if (Array.isArray(configUrl.hostname)) { @@ -139,16 +142,18 @@ class QueueConnection extends EventEmitter { /** * @return Promise * */ - getChannel () { + async getChannel () { if (this._channel) { - return Promise.resolve(this._channel) - } else if (this._channelPromise) { + return this._channel + } + if (this._channelPromise) { return this._channelPromise } this._channelPromise = this.connect().then((connection) => { return connection.createConfirmChannel() }).then((channel) => { + this.emitChannelEvents(channel) this._channel = channel return channel }).catch((err) => { @@ -158,6 +163,23 @@ class QueueConnection extends EventEmitter { return this._channelPromise } + + emitChannelEvents (channel) { + channel.on('error', (err) => { + this._logger.error('RabbitMQ channel error', err) + this.emit('error', err) + }) + channel.on('close', (err) => { + this._logger.error('RabbitMQ channel closed', err) + this.emit('close', err) + }) + channel.on('return', (reason) => { + this.emit('return', reason) + }) + channel.on('drain', (reason) => { + this.emit('drain', reason) + }) + } } module.exports = QueueConnection