Skip to content

Commit

Permalink
FKITDEV-2146: Fix the reconnecting (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
bencelaszlo committed Mar 4, 2024
1 parent 11043a9 commit d38aeb4
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 2 deletions.
6 changes: 4 additions & 2 deletions src/QueueConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class QueueConnection extends EventEmitter {
return connection
}).catch((err) => {
this._logger.error('RabbitMQ connection failed', err)

this._connectionPromise = null
throw err
})

Expand Down Expand Up @@ -146,7 +146,9 @@ class QueueConnection extends EventEmitter {
await this._connection.close()
} catch (err) {
this._logger.error('RabbitMQ close connection failed', err)
throw err
if (!err.message.startsWith('Connection closed')) {
throw err
}
}
}

Expand Down
19 changes: 19 additions & 0 deletions test/ConnectionPool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,25 @@ describe('ConnectionPool', () => {
})
})

it('should reconnect if the connection is already closed', async () => {
const pool = new ConnectionPool()
pool.setLogger(logger)
pool.setupQueueManagers({
default: config
})
try {
await pool.connect()
const queueManager = pool.getConnection('default')
await queueManager.connection.close()
await pool.reconnect()

assert.isNotNull(queueManager.connection._connection)
} catch (error) {
// eslint-disable-next-line no-console
console.error(error)
}
})

it('Should not reconnect to wrong config', (done) => {
const pool = new ConnectionPool()
pool.setLogger(logger)
Expand Down
25 changes: 25 additions & 0 deletions test/QueueConnection.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -238,4 +238,29 @@ describe('QueueConnection', () => {

assert.strictEqual(callbackCalled, false)
})

it('#close() already closed connection should not throw error', async () => {
const connection = new QueueConnection(config)
try {
await connection.connect()
} catch (connectionConnectError) {
throw new Error(`connect() failed: ${connectionConnectError}`)
}

try {
await connection.close()
} catch (connectionCloseError) {
throw new Error(`close() failed: ${connectionCloseError}`)
}

try {
await connection.close()
} catch (connectionCloseError) {
throw new Error(`close() on already closed connection failed: ${connectionCloseError}`)
}

assert.strictEqual(connection._connection, null)
assert.strictEqual(connection._connectionPromise, null)
assert.doesNotThrow(connection.close, Error)
})
})

0 comments on commit d38aeb4

Please sign in to comment.