Skip to content

Commit

Permalink
[backend] Handle rabbitmq close event to catch dropped connections
Browse files Browse the repository at this point in the history
  • Loading branch information
JeremyCloarec committed Apr 2, 2024
1 parent 7b0bcf3 commit d6227cf
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { parseCsvMapper } from '../../modules/internal/csvMapper/csvMapper-utils
import type { ConnectorConfig } from '../connector';
import { IMPORT_CSV_CONNECTOR } from './importCsv';
import { internalLoadById } from '../../database/middleware-loader';
import { UnknownError } from '../../config/errors';

Check warning on line 16 in opencti-platform/opencti-graphql/src/connector/importCsv/importCsv-connector.ts

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/connector/importCsv/importCsv-connector.ts#L16

Added line #L16 was not covered by tests

const RETRY_CONNECTION_PERIOD = 10000;

Expand Down Expand Up @@ -102,7 +103,13 @@ const initImportCsvConnector = () => {

const handleCsvImport = async (context: AuthContext) => {
consumeQueue(context, connector.id, connectionSetterCallback, consumeQueueCallback).catch(() => {
if (rabbitMqConnection) rabbitMqConnection.close();
if (rabbitMqConnection) {
try {
rabbitMqConnection.close();
} catch (e) {
logApp.error(UnknownError('RabbitMQ connection close fail', { cause: e }));
}
}

Check warning on line 112 in opencti-platform/opencti-graphql/src/connector/importCsv/importCsv-connector.ts

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/connector/importCsv/importCsv-connector.ts#L106-L112

Added lines #L106 - L112 were not covered by tests
setTimeout(handleCsvImport, RETRY_CONNECTION_PERIOD);
});
};
Expand Down
7 changes: 6 additions & 1 deletion opencti-platform/opencti-graphql/src/database/rabbitmq.js
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,16 @@ export const consumeQueue = async (context, connectorId, connectionSetterCallbac
} : amqpCred();
return new Promise((_, reject) => {
try {
amqp.connect(amqpUri(), connOptions, (err, conn) => {
amqp.connect(amqpUri(), { ...connOptions }, (err, conn) => {

Check warning on line 248 in opencti-platform/opencti-graphql/src/database/rabbitmq.js

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/database/rabbitmq.js#L248

Added line #L248 was not covered by tests
if (err) {
reject(err);
} else { // Connection success
logApp.info('Starting connector queue consuming');
conn.on('close', (onConnectError) => {
if (onConnectError) {
reject(onConnectError);
}
});

Check warning on line 257 in opencti-platform/opencti-graphql/src/database/rabbitmq.js

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/database/rabbitmq.js#L253-L257

Added lines #L253 - L257 were not covered by tests
conn.on('error', (onConnectError) => {
reject(onConnectError);
});
Expand Down

0 comments on commit d6227cf

Please sign in to comment.