From 0f638f9d258f9c03fe33fb897936efa20558065d Mon Sep 17 00:00:00 2001 From: gab3sz Date: Wed, 23 Nov 2022 17:07:00 +0100 Subject: [PATCH 1/5] FKDEV-363: mq reconnect logic --- src/QueueConnection.js | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/src/QueueConnection.js b/src/QueueConnection.js index eb21e3d..cc87215 100644 --- a/src/QueueConnection.js +++ b/src/QueueConnection.js @@ -17,12 +17,39 @@ class QueueConnection { this._channel = null this._channelPromise = null this._activeConnectionConfig = null + this._onClose = () => { + this._logger.error('RabbitMQ closed') + if (this._config.exitOnConnectionClose) { + process.exit(this._config.exitOnConnectionClose) + } + } } setLogger (logger) { this._logger = logger } + close () { + if (this._connection) { + this._connection.close((err) => { + this._logger.error('RabbitMQ close connection failed', err) + throw err + }) + } + } + + reconnect () { + if (this._connection) { + this._connection.off('close', this._onClose) + this.close() + } + + this._connection = null + this._connectionPromise = null + + return this.connect() + } + /** * @return Promise * */ @@ -53,10 +80,7 @@ class QueueConnection { } }) conn.on('close', () => { - this._logger.error('RabbitMQ closed') - if (this._config.exitOnConnectionClose) { - process.exit(this._config.exitOnConnectionClose) - } + this._onClose() }) conn.on('blocked', (reason) => { this._logger.error('RabbitMQ blocked', reason) From a4396e297796e273fa5a3ad961b472051870e5f4 Mon Sep 17 00:00:00 2001 From: gab3sz Date: Thu, 24 Nov 2022 11:25:06 +0100 Subject: [PATCH 2/5] FKDEV-363: mq reconnect logic --- src/ConnectionPool.js | 11 +++++++++++ src/QueueConnection.js | 40 +++++++++++++++++++--------------------- src/QueueManager.js | 10 ++++++++++ 3 files changed, 40 insertions(+), 21 deletions(-) diff --git a/src/ConnectionPool.js b/src/ConnectionPool.js index f239e73..ed66dc8 100644 --- a/src/ConnectionPool.js +++ b/src/ConnectionPool.js @@ -101,6 +101,17 @@ class ConnectionPool { await connection.connect() } } + + /** + * @return {Promise} + */ + async reconnect () { + const connections = [...this.connections.values()] + + for (const connection of connections) { + await connection.reconnect() + } + } } module.exports = ConnectionPool diff --git a/src/QueueConnection.js b/src/QueueConnection.js index cc87215..8f2115a 100644 --- a/src/QueueConnection.js +++ b/src/QueueConnection.js @@ -29,27 +29,6 @@ class QueueConnection { this._logger = logger } - close () { - if (this._connection) { - this._connection.close((err) => { - this._logger.error('RabbitMQ close connection failed', err) - throw err - }) - } - } - - reconnect () { - if (this._connection) { - this._connection.off('close', this._onClose) - this.close() - } - - this._connection = null - this._connectionPromise = null - - return this.connect() - } - /** * @return Promise * */ @@ -139,6 +118,25 @@ class QueueConnection { } } + /** + * @return Promise + * */ + close (invokeCloseEvent = false) { + if (this._connection) { + if (!invokeCloseEvent) { + this._connection.off('close', this._onClose) + } + + this._connection.close((err) => { + this._logger.error('RabbitMQ close connection failed', err) + throw err + }) + } + + this._connection = null + this._connectionPromise = null + } + onConnection (event, callback) { if (this._connection) { this._connection.on(event, callback) diff --git a/src/QueueManager.js b/src/QueueManager.js index 3473e4a..b3761cd 100644 --- a/src/QueueManager.js +++ b/src/QueueManager.js @@ -90,6 +90,16 @@ class QueueManager { } } + async reconnect () { + try { + await this.connection.close() + await this.connect() + } catch (err) { + this._logger.error('Failed to reconnect to queue server', err) + throw err + } + } + setLogger (logger) { this._logger = logger this.connection.setLogger(logger) From 60bba1ca231c546ebf4ba90072269c8a1bbca9a8 Mon Sep 17 00:00:00 2001 From: gab3sz Date: Thu, 24 Nov 2022 11:31:06 +0100 Subject: [PATCH 3/5] FKDEV-363: mq reconnect logic --- src/QueueConnection.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/QueueConnection.js b/src/QueueConnection.js index 8f2115a..38b4df3 100644 --- a/src/QueueConnection.js +++ b/src/QueueConnection.js @@ -58,9 +58,7 @@ class QueueConnection { this._logger.error('RabbitMQ error', err) } }) - conn.on('close', () => { - this._onClose() - }) + conn.on('close', () => this._onClose()) conn.on('blocked', (reason) => { this._logger.error('RabbitMQ blocked', reason) }) From 0ad60ec9dabd96ceb32ef055a3abaa6414fa5ea0 Mon Sep 17 00:00:00 2001 From: gab3sz Date: Thu, 24 Nov 2022 15:36:26 +0100 Subject: [PATCH 4/5] FKDEV-363: mq reconnect logic --- src/QueueConnection.js | 6 ++-- test/ConnectionPool.test.js | 47 ++++++++++++++++++++++++++++++++ test/QueueConnection.test.js | 18 ++++++++++++ test/config/TestConfig.js.config | 17 ------------ 4 files changed, 68 insertions(+), 20 deletions(-) delete mode 100644 test/config/TestConfig.js.config diff --git a/src/QueueConnection.js b/src/QueueConnection.js index 38b4df3..98dfef0 100644 --- a/src/QueueConnection.js +++ b/src/QueueConnection.js @@ -58,7 +58,7 @@ class QueueConnection { this._logger.error('RabbitMQ error', err) } }) - conn.on('close', () => this._onClose()) + conn.on('close', this._onClose) conn.on('blocked', (reason) => { this._logger.error('RabbitMQ blocked', reason) }) @@ -119,9 +119,9 @@ class QueueConnection { /** * @return Promise * */ - close (invokeCloseEvent = false) { + close (handleCloseEvent = false) { if (this._connection) { - if (!invokeCloseEvent) { + if (!handleCloseEvent) { this._connection.off('close', this._onClose) } diff --git a/test/ConnectionPool.test.js b/test/ConnectionPool.test.js index 0698c69..a825702 100644 --- a/test/ConnectionPool.test.js +++ b/test/ConnectionPool.test.js @@ -81,4 +81,51 @@ describe('ConnectionPool', () => { done() }) }) + + it('Should reconnect', (done) => { + const pool = new ConnectionPool() + pool.setLogger(logger) + pool.setupQueueManagers({ + default: config + }) + Promise.resolve().then(() => { + return pool.connect() + }).then(() => { + return pool.reconnect() + }).then(() => { + done() + }).catch((err) => { + done(err) + }) + }) + + it('Should not reconnect to wrong config', (done) => { + const pool = new ConnectionPool() + pool.setLogger(logger) + pool.setupQueueManagers({ + default: config + }) + Promise.resolve().then(() => { + return pool.connect() + }).then(() => { + pool.setupQueueManagers({ + default: new QueueConfig({ + url: 'amqps://localhost:22', + rpcTimeoutMs: 10000, + rpcQueueMaxSize: 100, + logger, + options: { + timeout: 50 + } + }) + }) + + return pool.reconnect() + }).then(() => { + done('Reconnection with wrong config should not connect') + }).catch((err) => { + assert.instanceOf(err, Error, 'Reconnection with wrong config should throw an error ') + done() + }) + }) }) diff --git a/test/QueueConnection.test.js b/test/QueueConnection.test.js index 5581309..065448a 100644 --- a/test/QueueConnection.test.js +++ b/test/QueueConnection.test.js @@ -93,4 +93,22 @@ describe('QueueConnection', () => { await connection.connect() assert.strictEqual(connection._activeConnectionConfig.hostname, url.hostname) }) + + it('#close() closes connection to RabbitMQ', async () => { + const connection = new QueueConnection(config) + try { + await connection.connect() + } catch (e) { + throw new Error(`connect() failed: ${e}`) + } + + try { + await connection.close() + } catch (e) { + throw new Error(`close() failed: ${e}`) + } + + assert.strictEqual(connection._connection, null) + assert.strictEqual(connection._connectionPromise, null) + }) }) diff --git a/test/config/TestConfig.js.config b/test/config/TestConfig.js.config deleted file mode 100644 index c957eb5..0000000 --- a/test/config/TestConfig.js.config +++ /dev/null @@ -1,17 +0,0 @@ -const QueueConfig = require('../../src/QueueConfig') -const ConsoleInspector = require('../consoleInspector') - -const certPath = '/workspace/' - -module.exports = new QueueConfig({ - url: 'amqps://localhost:5671', - options: { - rejectUnauthorized: false, - cert: certPath + '/client/cert.pem', - key: certPath + '/client/key.pem', - ca: [certPath + '/ca/cacert.pem'] - }, - rpcTimeoutMs: 10000, - rpcQueueMaxSize: 100, - logger: new ConsoleInspector(console) -}) From 85993efe78aae655cfe6d3e506f4e5d387649687 Mon Sep 17 00:00:00 2001 From: gab3sz Date: Mon, 28 Nov 2022 13:38:50 +0100 Subject: [PATCH 5/5] update --- src/QueueConnection.js | 8 +++--- test/QueueConnection.test.js | 46 ++++++++++++++++++++++++++++++++ test/config/TestConfig.js.config | 17 ++++++++++++ 3 files changed, 68 insertions(+), 3 deletions(-) create mode 100644 test/config/TestConfig.js.config diff --git a/src/QueueConnection.js b/src/QueueConnection.js index 98dfef0..4ba62e4 100644 --- a/src/QueueConnection.js +++ b/src/QueueConnection.js @@ -119,16 +119,18 @@ class QueueConnection { /** * @return Promise * */ - close (handleCloseEvent = false) { + async close (handleCloseEvent = false) { if (this._connection) { if (!handleCloseEvent) { this._connection.off('close', this._onClose) } - this._connection.close((err) => { + try { + await this._connection.close() + } catch (err) { this._logger.error('RabbitMQ close connection failed', err) throw err - }) + } } this._connection = null diff --git a/test/QueueConnection.test.js b/test/QueueConnection.test.js index 065448a..a32bcd4 100644 --- a/test/QueueConnection.test.js +++ b/test/QueueConnection.test.js @@ -111,4 +111,50 @@ describe('QueueConnection', () => { assert.strictEqual(connection._connection, null) assert.strictEqual(connection._connectionPromise, null) }) + + it('#close() closes connection to RabbitMQ and the close event callback is invoked', async () => { + const connection = new QueueConnection(config) + + let callbackCalled = false + connection._onClose = () => { + callbackCalled = true + } + + try { + await connection.connect() + } catch (e) { + throw new Error(`connect() failed: ${e}`) + } + + try { + await connection.close(true) + } catch (e) { + throw new Error(`close() failed: ${e}`) + } + + assert.strictEqual(callbackCalled, true) + }) + + it('#close() closes connection to RabbitMQ and the close event callback is not invoked', async () => { + const connection = new QueueConnection(config) + + let callbackCalled = false + connection._onClose = () => { + callbackCalled = true + } + + try { + await connection.connect() + } catch (e) { + throw new Error(`connect() failed: ${e}`) + } + + try { + await connection.close() + } catch (e) { + throw new Error(`close() failed: ${e}`) + } + + assert.strictEqual(callbackCalled, false) + }) }) diff --git a/test/config/TestConfig.js.config b/test/config/TestConfig.js.config new file mode 100644 index 0000000..c957eb5 --- /dev/null +++ b/test/config/TestConfig.js.config @@ -0,0 +1,17 @@ +const QueueConfig = require('../../src/QueueConfig') +const ConsoleInspector = require('../consoleInspector') + +const certPath = '/workspace/' + +module.exports = new QueueConfig({ + url: 'amqps://localhost:5671', + options: { + rejectUnauthorized: false, + cert: certPath + '/client/cert.pem', + key: certPath + '/client/key.pem', + ca: [certPath + '/ca/cacert.pem'] + }, + rpcTimeoutMs: 10000, + rpcQueueMaxSize: 100, + logger: new ConsoleInspector(console) +})