diff --git a/backend/src/db/entity-migrations/v3.3.7-add-oia-message.ts b/backend/src/db/entity-migrations/v3.3.7-add-oia-message.ts new file mode 100644 index 0000000000..6c461bb365 --- /dev/null +++ b/backend/src/db/entity-migrations/v3.3.7-add-oia-message.ts @@ -0,0 +1,35 @@ +import { Knex } from 'knex'; +import { OIANALYTICS_MESSAGE_TABLE } from '../../repository/oianalytics-message.repository'; +import { ENGINES_TABLE } from '../../repository/engine.repository'; +import { version } from '../../../package.json'; +import CreateTableBuilder = Knex.CreateTableBuilder; +import { OIANALYTICS_MESSAGE_STATUS } from '../../../../shared/model/oianalytics-message.model'; + +function createDefaultEntityFields(table: CreateTableBuilder): void { + table.uuid('id').primary(); + table.timestamps(false, true); +} + +export async function up(knex: Knex): Promise { + await createOIAMessageTable(knex); + await addVersionInEngineSettings(knex); +} + +async function createOIAMessageTable(knex: Knex): Promise { + await knex.schema.createTable(OIANALYTICS_MESSAGE_TABLE, table => { + createDefaultEntityFields(table); + table.string('type').notNullable(); + table.json('content').notNullable(); + table.datetime('completed_date'); + table.string('error'); + table.enum('status', OIANALYTICS_MESSAGE_STATUS).notNullable().defaultTo('PENDING'); + }); +} + +async function addVersionInEngineSettings(knex: Knex) { + await knex.schema.raw(`ALTER TABLE ${ENGINES_TABLE} ADD oibus_version NOT NULL DEFAULT "${version}"`); +} + +export async function down(knex: Knex): Promise { + await knex.schema.dropTable(OIANALYTICS_MESSAGE_TABLE); +} diff --git a/backend/src/index.ts b/backend/src/index.ts index 94bf5a4d64..76e90fe24d 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -18,6 +18,7 @@ import HomeMetricsService from './service/home-metrics.service'; import CommandService from './service/oia/command.service'; import RegistrationService from './service/oia/registration.service'; import ProxyServer from './web-server/proxy-server'; +import OIAnalyticsMessageService from './service/oia/message.service'; const CONFIG_DATABASE = 'oibus.db'; const CRYPTO_DATABASE = 'crypto.db'; @@ -107,7 +108,15 @@ const LOG_DB_NAME = 'logs.db'; loggerService.logger! ); - const commandService = new CommandService(repositoryService, encryptionService, loggerService.logger!, binaryFolder); + const oianalyticsMessageService = new OIAnalyticsMessageService(repositoryService, encryptionService, loggerService.logger!); + oianalyticsMessageService.start(); + const commandService = new CommandService( + repositoryService, + encryptionService, + oianalyticsMessageService, + loggerService.logger!, + binaryFolder + ); commandService.start(); const oibusService = new OIBusService(engine, historyQueryEngine); @@ -138,6 +147,7 @@ const LOG_DB_NAME = 'logs.db'; engine, historyQueryEngine, oibusService, + oianalyticsMessageService, proxyServer ); @@ -145,6 +155,7 @@ const LOG_DB_NAME = 'logs.db'; repositoryService, encryptionService, commandService, + oianalyticsMessageService, reloadService, loggerService.logger! ); @@ -172,6 +183,7 @@ const LOG_DB_NAME = 'logs.db'; stopping = true; await oibusService.stopOIBus(); await commandService.stop(); + await oianalyticsMessageService.stop(); await proxyServer.stop(); await server.stop(); registrationService.stop(); diff --git a/backend/src/repository/engine.repository.spec.ts b/backend/src/repository/engine.repository.spec.ts index 4f7b7da230..5a4dba3bda 100644 --- a/backend/src/repository/engine.repository.spec.ts +++ b/backend/src/repository/engine.repository.spec.ts @@ -155,6 +155,7 @@ describe('Non-empty Engine repository', () => { id: 'id1', name: 'OIBus', port: 2223, + version: '3.3.3', proxyEnabled: false, proxyPort: 9000, consoleLogLevel: 'silent', @@ -188,6 +189,7 @@ describe('Non-empty Engine repository', () => { id: 'id1', name: 'OIBus', port: 2223, + version: '3.3.3', proxyEnabled: false, proxyPort: 9000, logParameters: { @@ -218,7 +220,7 @@ describe('Non-empty Engine repository', () => { }; const engineSettings = repository.getEngineSettings(); expect(database.prepare).toHaveBeenCalledWith( - 'SELECT id, name, port, proxy_enabled AS proxyEnabled, proxy_port AS proxyPort, log_console_level AS consoleLogLevel, log_file_level AS fileLogLevel, ' + + 'SELECT id, name, port, oibus_version AS version, proxy_enabled AS proxyEnabled, proxy_port AS proxyPort, log_console_level AS consoleLogLevel, log_file_level AS fileLogLevel, ' + 'log_file_max_file_size AS fileLogMaxFileSize, log_file_number_of_files AS fileLogNumberOfFiles, ' + 'log_database_level AS databaseLogLevel, log_database_max_number_of_logs AS databaseLogMaxNumberOfLogs, ' + 'log_loki_level AS lokiLogLevel, log_loki_interval AS lokiLogInterval, log_loki_address AS lokiLogAddress, ' + @@ -264,4 +266,11 @@ describe('Non-empty Engine repository', () => { repository.createEngineSettings(command); expect(generateRandomId).not.toHaveBeenCalled(); }); + + it('should update version', () => { + repository.updateVersion('3.4.0'); + + expect(database.prepare).toHaveBeenCalledWith('UPDATE engines SET oibus_version = ? WHERE rowid=(SELECT MIN(rowid) FROM engines);'); + expect(run).toHaveBeenCalledWith('3.4.0'); + }); }); diff --git a/backend/src/repository/engine.repository.ts b/backend/src/repository/engine.repository.ts index a114fdd35c..5a129fd598 100644 --- a/backend/src/repository/engine.repository.ts +++ b/backend/src/repository/engine.repository.ts @@ -53,7 +53,7 @@ export default class EngineRepository { */ getEngineSettings(): EngineSettingsDTO | null { const query = - 'SELECT id, name, port, proxy_enabled AS proxyEnabled, proxy_port AS proxyPort, ' + + 'SELECT id, name, port, oibus_version AS version, proxy_enabled AS proxyEnabled, proxy_port AS proxyPort, ' + 'log_console_level AS consoleLogLevel, ' + 'log_file_level AS fileLogLevel, ' + 'log_file_max_file_size AS fileLogMaxFileSize, ' + @@ -75,6 +75,7 @@ export default class EngineRepository { id: results[0].id, name: results[0].name, port: results[0].port, + version: results[0].version, proxyEnabled: Boolean(results[0].proxyEnabled), proxyPort: results[0].proxyPort, logParameters: { @@ -152,6 +153,15 @@ export default class EngineRepository { ); } + /** + * Update OIBus version + */ + updateVersion(version: string): void { + const query = `UPDATE ${ENGINES_TABLE} SET oibus_version = ? WHERE rowid=(SELECT MIN(rowid) FROM ${ENGINES_TABLE});`; + + this.database.prepare(query).run(version); + } + /** * Create engine settings in the database. */ diff --git a/backend/src/repository/oianalytics-message.repository.spec.ts b/backend/src/repository/oianalytics-message.repository.spec.ts new file mode 100644 index 0000000000..159a3fe68a --- /dev/null +++ b/backend/src/repository/oianalytics-message.repository.spec.ts @@ -0,0 +1,176 @@ +import SqliteDatabaseMock, { all, get, run } from '../tests/__mocks__/database.mock'; +import { Database } from 'better-sqlite3'; +import OianalyticsMessageRepository from './oianalytics-message.repository'; +import { Page } from '../../../shared/model/types'; +import { InfoMessageContent, OIAnalyticsMessageCommand, OIAnalyticsMessageDTO } from '../../../shared/model/oianalytics-message.model'; + +jest.mock('../tests/__mocks__/database.mock'); +jest.mock('../service/utils', () => ({ + generateRandomId: jest.fn(() => '123456') +})); +const nowDateString = '2020-02-02T02:02:02.222Z'; + +const existingMessage: OIAnalyticsMessageDTO = { + id: '1234', + status: 'ERRORED', + type: 'INFO', + content: {} as InfoMessageContent +}; + +let database: Database; +let repository: OianalyticsMessageRepository; +describe('OIAnalytics Message repository', () => { + beforeEach(() => { + jest.clearAllMocks(); + database = new SqliteDatabaseMock(); + all.mockReturnValue([existingMessage]); + database.prepare = jest.fn().mockReturnValue({ + run, + get, + all + }); + repository = new OianalyticsMessageRepository(database); + }); + + it('should create message', () => { + run.mockReturnValueOnce({ lastInsertRowid: 1 }); + get.mockReturnValueOnce({ ...existingMessage, content: '{}' }); + const command: OIAnalyticsMessageCommand = { + type: 'INFO', + content: {} as InfoMessageContent + }; + repository.createOIAnalyticsMessages(command); + expect(database.prepare).toHaveBeenCalledWith('INSERT INTO oianalytics_messages (id, type, status, content) VALUES (?, ?, ?, ?);'); + expect(run).toHaveBeenCalledWith('123456', command.type, 'PENDING', JSON.stringify(command.content)); + }); + + it('should update message', () => { + const command: OIAnalyticsMessageCommand = { + type: 'INFO', + content: {} as InfoMessageContent + }; + repository.updateOIAnalyticsMessages('id', command); + expect(database.prepare).toHaveBeenCalledWith('UPDATE oianalytics_messages SET content = ? WHERE id = ?;'); + expect(run).toHaveBeenCalledWith(JSON.stringify(command.content), 'id'); + }); + + it('should properly get messages page by search criteria', () => { + const expectedValue: Page = { + content: [ + { + id: '1234', + creationDate: '2023-01-01T00:00:00.000Z', + type: 'INFO', + status: 'PENDING', + content: {} as InfoMessageContent + }, + { + id: '1234', + creationDate: '2024-01-01T00:00:00.000Z', + type: 'INFO', + status: 'ERRORED', + content: {} as InfoMessageContent + } + ], + size: 50, + number: 0, + totalElements: 2, + totalPages: 1 + }; + all.mockReturnValueOnce([ + { + id: '1234', + creationDate: '2023-01-01T00:00:00.000Z', + type: 'INFO', + status: 'PENDING', + content: '{}' + }, + { + id: '1234', + creationDate: '2024-01-01T00:00:00.000Z', + type: 'INFO', + status: 'ERRORED', + content: '{}' + } + ]); + get.mockReturnValueOnce({ count: 2 }); + const logs = repository.searchMessagesPage( + { + types: ['INFO'], + status: ['PENDING', 'ERRORED'], + start: '2023-01-01T00:00:00.000Z', + end: '2023-01-02T00:00:00.000Z' + }, + 0 + ); + expect(database.prepare).toHaveBeenCalledWith( + 'SELECT id, created_at as creationDate, completed_date as compeltedDate, type, status, error, content FROM oianalytics_messages WHERE id IS NOT NULL ' + + 'AND type IN (?) AND status IN (?,?) AND created_at >= ? AND created_at <= ? ORDER BY created_at DESC LIMIT 50 OFFSET ?;' + ); + expect(logs).toEqual(expectedValue); + + expect(database.prepare).toHaveBeenCalledWith( + 'SELECT COUNT(*) as count FROM oianalytics_messages WHERE id IS NOT NULL AND type IN (?) AND status IN (?,?) AND created_at >= ? AND created_at <= ?;' + ); + }); + + it('should properly get messages list by search criteria', () => { + const expectedValue: Array = [ + { + id: '1234', + creationDate: '2023-01-01T00:00:00.000Z', + type: 'INFO', + status: 'PENDING', + content: {} as InfoMessageContent + }, + { + id: '1234', + creationDate: '2024-01-01T00:00:00.000Z', + type: 'INFO', + status: 'ERRORED', + content: {} as InfoMessageContent + } + ]; + all.mockReturnValueOnce([ + { + id: '1234', + creationDate: '2023-01-01T00:00:00.000Z', + type: 'INFO', + status: 'PENDING', + content: '{}' + }, + { + id: '1234', + creationDate: '2024-01-01T00:00:00.000Z', + type: 'INFO', + status: 'ERRORED', + content: '{}' + } + ]); + const messages = repository.searchMessagesList({ + types: ['INFO'], + status: ['PENDING', 'ERRORED'], + start: '2023-01-01T00:00:00.000Z', + end: '2023-01-02T00:00:00.000Z' + }); + expect(database.prepare).toHaveBeenCalledWith( + 'SELECT id, created_at as creationDate, completed_date as compeltedDate, type, status, error, content FROM oianalytics_messages WHERE id IS NOT NULL ' + + 'AND type IN (?) AND status IN (?,?) AND created_at >= ? AND created_at <= ? ORDER BY created_at DESC;' + ); + expect(messages).toEqual(expectedValue); + }); + + it('should mark a command as COMPLETED', () => { + repository.markAsCompleted('id1', nowDateString); + const query = `UPDATE oianalytics_messages SET status = 'COMPLETED', completed_date = ? WHERE id = ?;`; + expect(database.prepare).toHaveBeenCalledWith(query); + expect(run).toHaveBeenCalledWith(nowDateString, 'id1'); + }); + + it('should mark a command as ERRORED', () => { + repository.markAsErrored('id1', nowDateString, 'not ok'); + const query = `UPDATE oianalytics_messages SET status = 'ERRORED', completed_date = ?, error = ? WHERE id = ?;`; + expect(database.prepare).toHaveBeenCalledWith(query); + expect(run).toHaveBeenCalledWith(nowDateString, 'not ok', 'id1'); + }); +}); diff --git a/backend/src/repository/oianalytics-message.repository.ts b/backend/src/repository/oianalytics-message.repository.ts new file mode 100644 index 0000000000..190adf659c --- /dev/null +++ b/backend/src/repository/oianalytics-message.repository.ts @@ -0,0 +1,139 @@ +import { generateRandomId } from '../service/utils'; +import { Database } from 'better-sqlite3'; +import { Instant, Page } from '../../../shared/model/types'; +import { + OIAnalyticsMessageCommand, + OIAnalyticsMessageDTO, + OIAnalyticsMessageSearchParam +} from '../../../shared/model/oianalytics-message.model'; + +export const OIANALYTICS_MESSAGE_TABLE = 'oianalytics_messages'; +const PAGE_SIZE = 50; +/** + * Repository used for engine settings + */ +export default class OianalyticsMessageRepository { + constructor(private readonly database: Database) {} + + /** + * Search messages by page + */ + searchMessagesPage(searchParams: OIAnalyticsMessageSearchParam, page: number): Page { + const queryParams = []; + let whereClause = 'WHERE id IS NOT NULL'; + if (searchParams.types.length > 0) { + whereClause += ` AND type IN (${searchParams.types.map(() => '?')})`; + queryParams.push(...searchParams.types); + } + if (searchParams.status.length > 0) { + whereClause += ` AND status IN (${searchParams.status.map(() => '?')})`; + queryParams.push(...searchParams.status); + } + if (searchParams.start) { + whereClause += ` AND created_at >= ?`; + queryParams.push(searchParams.start); + } + if (searchParams.end) { + whereClause += ` AND created_at <= ?`; + queryParams.push(searchParams.end); + } + + const query = `SELECT id, created_at as creationDate, completed_date as compeltedDate, type, status, error, content FROM ${OIANALYTICS_MESSAGE_TABLE} ${whereClause} ORDER BY created_at DESC LIMIT ${PAGE_SIZE} OFFSET ?;`; + const results: Array = this.database.prepare(query).all(...queryParams, PAGE_SIZE * page); + const totalElements = ( + this.database.prepare(`SELECT COUNT(*) as count FROM ${OIANALYTICS_MESSAGE_TABLE} ${whereClause};`).get(...queryParams) as { + count: number; + } + ).count; + const totalPages = Math.ceil(totalElements / PAGE_SIZE); + + return { + content: results.map(result => ({ + id: result.id, + creationDate: result.creationDate, + type: result.type, + status: result.status, + error: result.error, + completedDate: result.completedDate, + content: JSON.parse(result.content) + })) as Array, + size: PAGE_SIZE, + number: page, + totalElements, + totalPages + }; + } + + /** + * Search messages by list + */ + searchMessagesList(searchParams: OIAnalyticsMessageSearchParam): Array { + const queryParams = []; + let whereClause = 'WHERE id IS NOT NULL'; + if (searchParams.types.length > 0) { + whereClause += ` AND type IN (${searchParams.types.map(() => '?')})`; + queryParams.push(...searchParams.types); + } + if (searchParams.status.length > 0) { + whereClause += ` AND status IN (${searchParams.status.map(() => '?')})`; + queryParams.push(...searchParams.status); + } + if (searchParams.start) { + whereClause += ` AND created_at >= ?`; + queryParams.push(searchParams.start); + } + if (searchParams.end) { + whereClause += ` AND created_at <= ?`; + queryParams.push(searchParams.end); + } + + const query = `SELECT id, created_at as creationDate, completed_date as compeltedDate, type, status, error, content FROM ${OIANALYTICS_MESSAGE_TABLE} ${whereClause} ORDER BY created_at DESC;`; + const results: Array = this.database.prepare(query).all(...queryParams); + return results.map(result => ({ + id: result.id, + creationDate: result.creationDate, + type: result.type, + status: result.status, + error: result.error, + completedDate: result.completedDate, + content: JSON.parse(result.content) + })) as Array; + } + + /** + * Update a message + */ + updateOIAnalyticsMessages(id: string, command: OIAnalyticsMessageCommand): void { + const query = `UPDATE ${OIANALYTICS_MESSAGE_TABLE} SET content = ? WHERE id = ?;`; + this.database.prepare(query).run(JSON.stringify(command.content), id); + } + + /** + * Create a message + */ + createOIAnalyticsMessages(command: OIAnalyticsMessageCommand): OIAnalyticsMessageDTO { + const insertQuery = `INSERT INTO ${OIANALYTICS_MESSAGE_TABLE} (id, type, status, content) VALUES (?, ?, ?, ?);`; + const insertResult = this.database + .prepare(insertQuery) + .run(generateRandomId(), command.type, 'PENDING', JSON.stringify(command.content)); + + const query = + `SELECT id, created_at as creationDate, completed_date as compeltedDate, type, status, error, content FROM ${OIANALYTICS_MESSAGE_TABLE} ` + + `WHERE ROWID = ?;`; + const result: any = this.database.prepare(query).get(insertResult.lastInsertRowid); + return { + ...result, + content: JSON.parse(result.content) + } as OIAnalyticsMessageDTO; + } + + markAsCompleted(id: string, completedDate: Instant): void { + const query = `UPDATE ${OIANALYTICS_MESSAGE_TABLE} SET status = 'COMPLETED', completed_date = ? WHERE id = ?;`; + this.database.prepare(query).run(completedDate, id); + } + + markAsErrored(id: string, completedDate: Instant, result: string): void { + const query = `UPDATE ${OIANALYTICS_MESSAGE_TABLE} SET status = 'ERRORED', completed_date = ?, error = ? WHERE id = ?;`; + this.database.prepare(query).run(completedDate, result, id); + } +} diff --git a/backend/src/service/oia/command.service.spec.ts b/backend/src/service/oia/command.service.spec.ts index bad51c960b..1afba4f3a1 100644 --- a/backend/src/service/oia/command.service.spec.ts +++ b/backend/src/service/oia/command.service.spec.ts @@ -10,6 +10,9 @@ import { downloadFile, getNetworkSettingsFromRegistration, getOIBusInfo, unzip } import { RegistrationSettingsDTO } from '../../../../shared/model/engine.model'; import fs from 'node:fs/promises'; import path from 'node:path'; +import { version } from '../../../package.json'; +import MessageServiceMock from '../../tests/__mocks__/message-service.mock'; +import OIAnalyticsMessageService from './message.service'; jest.mock('node:fs/promises'); jest.mock('node-fetch'); @@ -19,6 +22,7 @@ jest.mock('../utils'); jest.spyOn(process, 'exit').mockImplementation(() => {}); const repositoryService: RepositoryService = new RepositoryServiceMock('', ''); +const oianalyticsMessageService: OIAnalyticsMessageService = new MessageServiceMock('', ''); const encryptionService: EncryptionService = new EncryptionServiceMock('', ''); const nowDateString = '2020-02-02T02:02:02.222Z'; @@ -55,21 +59,20 @@ describe('Command service with running command', () => { beforeEach(() => { jest.clearAllMocks(); jest.useFakeTimers().setSystemTime(new Date(nowDateString)); - (getOIBusInfo as jest.Mock).mockReturnValue({ version: 'v3.2.0' }); - + (repositoryService.engineRepository.getEngineSettings as jest.Mock).mockReturnValue({ version: '3.2.0' }); (repositoryService.registrationRepository.getRegistrationSettings as jest.Mock).mockReturnValue(registration); (repositoryService.commandRepository.searchCommandsList as jest.Mock).mockReturnValue([command]); - service = new CommandService(repositoryService, encryptionService, logger, 'binaryFolder'); + service = new CommandService(repositoryService, encryptionService, oianalyticsMessageService, logger, 'binaryFolder'); }); - it('should properly start and stop', async () => { - expect(getOIBusInfo).toHaveBeenCalledTimes(1); + it('should properly start and stop after an update', async () => { + expect(repositoryService.engineRepository.getEngineSettings).toHaveBeenCalledTimes(1); expect(repositoryService.commandRepository.markAsCompleted).toHaveBeenCalledWith( command.id, nowDateString, - `OIBus updated to version v3.2.0` + `OIBus updated to version ${version}` ); - + expect(repositoryService.engineRepository.updateVersion).toHaveBeenCalledWith(version); service.run = jest.fn(); service.start(); expect(service.run).toHaveBeenCalledTimes(1); @@ -147,7 +150,7 @@ describe('Command service without command', () => { jest.clearAllMocks(); (repositoryService.commandRepository.searchCommandsList as jest.Mock).mockReturnValue([]); - service = new CommandService(repositoryService, encryptionService, logger, 'binaryFolder'); + service = new CommandService(repositoryService, encryptionService, oianalyticsMessageService, logger, 'binaryFolder'); }); it('should properly start when not registered', () => { @@ -226,3 +229,35 @@ describe('Command service without command', () => { service.setLogger(anotherLogger); }); }); + +describe('Command service with running command', () => { + const registration: RegistrationSettingsDTO = { + id: 'id', + host: 'http://localhost:4200', + acceptUnauthorized: false, + useProxy: false, + token: 'token', + activationCode: '1234', + status: 'REGISTERED', + activationDate: '2020-20-20T00:00:00.000Z', + activationExpirationDate: '' + }; + + beforeEach(() => { + jest.clearAllMocks(); + jest.useFakeTimers().setSystemTime(new Date(nowDateString)); + (repositoryService.engineRepository.getEngineSettings as jest.Mock).mockReturnValue({ version }); + (repositoryService.registrationRepository.getRegistrationSettings as jest.Mock).mockReturnValue(registration); + (repositoryService.commandRepository.searchCommandsList as jest.Mock).mockReturnValue([command]); + service = new CommandService(repositoryService, encryptionService, oianalyticsMessageService, logger, 'binaryFolder'); + }); + + it('should properly start and stop after a failed update', async () => { + expect(repositoryService.engineRepository.getEngineSettings).toHaveBeenCalledTimes(1); + expect(repositoryService.engineRepository.updateVersion).not.toHaveBeenCalled(); + expect(repositoryService.commandRepository.markAsErrored).toHaveBeenCalledWith( + command.id, + `OIBus has not been updated. Rollback to version ${version}` + ); + }); +}); diff --git a/backend/src/service/oia/command.service.ts b/backend/src/service/oia/command.service.ts index 4ccc747523..cc53535cec 100644 --- a/backend/src/service/oia/command.service.ts +++ b/backend/src/service/oia/command.service.ts @@ -9,6 +9,9 @@ import DeferredPromise from '../deferred-promise'; import { DateTime } from 'luxon'; import { RegistrationSettingsDTO } from '../../../../shared/model/engine.model'; import path from 'node:path'; +import { version } from '../../../package.json'; +import { OIAnalyticsMessageInfoCommandDTO } from '../../../../shared/model/oianalytics-message.model'; +import OIAnalyticsMessageService from './message.service'; const DOWNLOAD_TIMEOUT = 600_000; const STOP_TIMEOUT = 30_000; @@ -23,18 +26,34 @@ export default class CommandService { constructor( private repositoryService: RepositoryService, private encryptionService: EncryptionService, + private oianalyticsMessageService: OIAnalyticsMessageService, private logger: pino.Logger, private binaryFolder: string ) { const currentUpgradeCommand = this.repositoryService.commandRepository.searchCommandsList({ status: ['RUNNING'], types: ['UPGRADE'] }); if (currentUpgradeCommand.length > 0) { const engineSettings = this.repositoryService.engineRepository.getEngineSettings()!; - const info = getOIBusInfo(engineSettings); - this.repositoryService.commandRepository.markAsCompleted( - currentUpgradeCommand[0].id, - DateTime.now().toUTC().toISO(), - `OIBus updated to version ${info.version}` - ); + if (engineSettings.version !== version) { + this.repositoryService.engineRepository.updateVersion(version); + this.repositoryService.commandRepository.markAsCompleted( + currentUpgradeCommand[0].id, + DateTime.now().toUTC().toISO(), + `OIBus updated to version ${version}` + ); + engineSettings.version = version; + const info = getOIBusInfo(engineSettings); + const infoMessageCommand: OIAnalyticsMessageInfoCommandDTO = { + type: 'INFO', + content: info + }; + const createdMessage = this.repositoryService.oianalyticsMessageRepository.createOIAnalyticsMessages(infoMessageCommand); + this.oianalyticsMessageService.addMessageToQueue(createdMessage); + } else { + this.repositoryService.commandRepository.markAsErrored( + currentUpgradeCommand[0].id, + `OIBus has not been updated. Rollback to version ${version}` + ); + } } } diff --git a/backend/src/service/oia/message.service.spec.ts b/backend/src/service/oia/message.service.spec.ts new file mode 100644 index 0000000000..afd494afb7 --- /dev/null +++ b/backend/src/service/oia/message.service.spec.ts @@ -0,0 +1,260 @@ +import RepositoryService from '../repository.service'; +import RepositoryServiceMock from '../../tests/__mocks__/repository-service.mock'; +import EncryptionServiceMock from '../../tests/__mocks__/encryption-service.mock'; +import EncryptionService from '../encryption.service'; +import pino from 'pino'; +import fetch from 'node-fetch'; +import PinoLogger from '../../tests/__mocks__/logger.mock'; +import { getNetworkSettingsFromRegistration, getOIBusInfo } from '../utils'; +import { RegistrationSettingsDTO } from '../../../../shared/model/engine.model'; +import OIAnalyticsMessageService from './message.service'; +import { InfoMessageContent, OIAnalyticsMessageDTO } from '../../../../shared/model/oianalytics-message.model'; + +jest.mock('node:fs/promises'); +jest.mock('node-fetch'); +jest.mock('../utils'); + +// @ts-ignore +jest.spyOn(process, 'exit').mockImplementation(() => {}); + +const repositoryService: RepositoryService = new RepositoryServiceMock('', ''); +const encryptionService: EncryptionService = new EncryptionServiceMock('', ''); + +const nowDateString = '2020-02-02T02:02:02.222Z'; +const logger: pino.Logger = new PinoLogger(); +const anotherLogger: pino.Logger = new PinoLogger(); + +const existingMessage: OIAnalyticsMessageDTO = { + id: '1234', + status: 'ERRORED', + type: 'INFO', + content: {} as InfoMessageContent +}; + +let service: OIAnalyticsMessageService; +describe('OIAnalytics message service with messages', () => { + const registration: RegistrationSettingsDTO = { + id: 'id', + host: 'http://localhost:4200', + acceptUnauthorized: false, + useProxy: false, + token: 'token', + activationCode: '1234', + status: 'REGISTERED', + activationDate: '2020-20-20T00:00:00.000Z', + activationExpirationDate: '' + }; + + beforeEach(() => { + jest.clearAllMocks(); + jest.useFakeTimers().setSystemTime(new Date(nowDateString)); + (getOIBusInfo as jest.Mock).mockReturnValue({ version: 'v3.2.0' }); + + (repositoryService.registrationRepository.getRegistrationSettings as jest.Mock).mockReturnValue(registration); + (repositoryService.oianalyticsMessageRepository.searchMessagesList as jest.Mock).mockReturnValue([existingMessage]); + service = new OIAnalyticsMessageService(repositoryService, encryptionService, logger); + }); + + it('should properly start and stop', async () => { + service.run = jest.fn(); + service.start(); + expect(service.run).toHaveBeenCalledTimes(1); + expect(repositoryService.oianalyticsMessageRepository.searchMessagesList).toHaveBeenCalledWith({ + status: ['PENDING'], + types: [] + }); + + await service.stop(); + expect(logger.debug).toHaveBeenCalledWith(`Stopping OIAnalytics message service...`); + expect(logger.debug).toHaveBeenCalledWith(`OIAnalytics message service stopped`); + }); + + it('should properly send message and wait for it to finish before stopping', async () => { + service.sendMessage = jest.fn().mockImplementation(() => { + return new Promise(resolve => { + setTimeout(resolve, 1000); + }); + }); + + service.start(); + service.start(); + + service.stop(); + + jest.advanceTimersByTime(1000); + await service.stop(); + service.removeMessageFromQueue(existingMessage.id); + + expect(service.sendMessage).toHaveBeenCalledTimes(1); + expect(logger.debug).toHaveBeenCalledWith('Waiting for OIAnalytics message to finish'); + service.addMessageToQueue(existingMessage); + }); + + it('should properly send message and trigger timeout', async () => { + service.sendMessage = jest.fn().mockImplementation(() => { + return new Promise(resolve => { + setTimeout(resolve, 100_000); + }); + }); + + service.start(); + service.stop(); + jest.advanceTimersByTime(10_000); + + service.stop(); + + expect(logger.debug).toHaveBeenCalledWith('Waiting for OIAnalytics message to finish'); + jest.advanceTimersByTime(20_000); + + await service.stop(); + expect(logger.debug).toHaveBeenCalledWith(`OIAnalytics message service stopped`); + }); + + it('should properly catch command exception', async () => { + service.sendMessage = jest.fn().mockImplementation(() => { + return new Promise((resolve, reject) => { + setTimeout(() => reject(new Error('exception')), 1000); + }); + }); + + service.start(); + service.stop(); + + jest.advanceTimersByTime(1000); + + await service.stop(); + expect(logger.error).toHaveBeenCalledWith( + `Error while sending message ${existingMessage.id} (created ${existingMessage.creationDate}) of type ${existingMessage.type}. Error: exception` + ); + expect(service.sendMessage).toHaveBeenCalledTimes(1); + expect(logger.debug).toHaveBeenCalledWith('Waiting for OIAnalytics message to finish'); + }); +}); + +describe('OIAnalytics message service without message', () => { + beforeEach(() => { + jest.clearAllMocks(); + jest.useFakeTimers().setSystemTime(new Date(nowDateString)); + + (repositoryService.oianalyticsMessageRepository.searchMessagesList as jest.Mock).mockReturnValue([]); + (getNetworkSettingsFromRegistration as jest.Mock).mockReturnValue({ host: 'http://localhost:4200', headers: {}, agent: undefined }); + + service = new OIAnalyticsMessageService(repositoryService, encryptionService, logger); + }); + + it('should properly start when not registered', () => { + const registration: RegistrationSettingsDTO = { + id: 'id', + host: 'http://localhost:4200', + acceptUnauthorized: false, + useProxy: false, + token: 'token', + activationCode: '1234', + status: 'NOT_REGISTERED', + activationDate: '2020-20-20T00:00:00.000Z', + activationExpirationDate: '' + }; + (repositoryService.registrationRepository.getRegistrationSettings as jest.Mock).mockReturnValue(registration); + + expect(getOIBusInfo).not.toHaveBeenCalled(); + expect(repositoryService.oianalyticsMessageRepository.markAsCompleted).not.toHaveBeenCalled(); + + service.run = jest.fn(); + service.start(); + expect(logger.debug).toHaveBeenCalledWith(`Message service not started: OIAnalytics not registered`); + expect(service.run).not.toHaveBeenCalled(); + }); + + it('should properly start when registered', () => { + const registration: RegistrationSettingsDTO = { + id: 'id', + host: 'http://localhost:4200', + acceptUnauthorized: false, + useProxy: false, + token: 'token', + activationCode: '1234', + status: 'REGISTERED', + activationDate: '2020-20-20T00:00:00.000Z', + activationExpirationDate: '' + }; + (repositoryService.registrationRepository.getRegistrationSettings as jest.Mock).mockReturnValue(registration); + + service.run = jest.fn(); + service.start(); + expect(service.run).not.toHaveBeenCalled(); + }); + + it('should send message and trigger next run', async () => { + (fetch as unknown as jest.Mock).mockReturnValueOnce(Promise.resolve(new Response())); + + await service.sendMessage(existingMessage); + expect(logger.trace).toHaveBeenCalledWith( + `${existingMessage.id} (created ${existingMessage.creationDate}) of type ${existingMessage.type} sent` + ); + expect(fetch).toHaveBeenCalledWith('http://localhost:4200/api/oianalytics/oibus/message', { + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify(existingMessage), + timeout: 15_000, + agent: undefined + }); + expect(repositoryService.oianalyticsMessageRepository.markAsCompleted).toHaveBeenCalledWith(existingMessage.id, nowDateString); + + jest.advanceTimersByTime(15000); + expect(fetch).toHaveBeenCalledTimes(1); + }); + + it('should send message and manage 404 error', async () => { + (fetch as unknown as jest.Mock).mockReturnValueOnce(Promise.resolve(new Response('invalid', { status: 404, statusText: 'Not Found' }))); + + await service.sendMessage(existingMessage); + expect(logger.error).toHaveBeenCalledWith( + `Error 404 while sending message ${existingMessage.id} (created ${existingMessage.creationDate}) of type ` + + `${existingMessage.type} on http://localhost:4200/api/oianalytics/oibus/message: Not Found` + ); + }); + + it('should send message and log error on fetch error', async () => { + const clearTimeoutSpy = jest.spyOn(global, 'clearTimeout'); + + (fetch as unknown as jest.Mock).mockImplementation(() => { + throw new Error('error'); + }); + + await service.sendMessage(existingMessage); + expect(logger.debug).toHaveBeenCalledWith( + `Error while sending message ${existingMessage.id} (created ${existingMessage.creationDate}) of type ` + + `${existingMessage.type} on http://localhost:4200/api/oianalytics/oibus/message. ${new Error('error')}` + ); + + await service.stop(); + expect(clearTimeoutSpy).toHaveBeenCalledTimes(1); + + await service.sendMessage(existingMessage); + await service.sendMessage(existingMessage); + expect(clearTimeoutSpy).toHaveBeenCalledTimes(2); + jest.advanceTimersByTime(15000); + await service.stop(); + }); + + it('should send message, log error and retry on fetch auth error', async () => { + (fetch as unknown as jest.Mock).mockReturnValueOnce( + Promise.resolve(new Response('invalid', { status: 401, statusText: 'Unauthorized' })) + ); + + await service.sendMessage(existingMessage); + expect(logger.debug).toHaveBeenCalledWith( + `Error 401 while sending message ${existingMessage.id} (created ${existingMessage.creationDate}) of type ` + + `${existingMessage.type} on http://localhost:4200/api/oianalytics/oibus/message: Unauthorized` + ); + + jest.advanceTimersByTime(15000); + await service.stop(); + }); + + it('should change logger', () => { + service.setLogger(anotherLogger); + }); +}); diff --git a/backend/src/service/oia/message.service.ts b/backend/src/service/oia/message.service.ts new file mode 100644 index 0000000000..889fd22281 --- /dev/null +++ b/backend/src/service/oia/message.service.ts @@ -0,0 +1,171 @@ +import { getNetworkSettingsFromRegistration } from '../utils'; +import RepositoryService from '../repository.service'; +import EncryptionService from '../encryption.service'; +import pino from 'pino'; +import { EventEmitter } from 'node:events'; +import DeferredPromise from '../deferred-promise'; +import { DateTime } from 'luxon'; +import { RegistrationSettingsDTO } from '../../../../shared/model/engine.model'; +import fetch from 'node-fetch'; +import { OIAnalyticsMessageDTO } from '../../../../shared/model/oianalytics-message.model'; + +const STOP_TIMEOUT = 30_000; +const MESSAGE_TIMEOUT = 15_000; +const RETRY_TIMEOUT = 10_000; +const NEXT_TIMEOUT = 1_000; + +export default class OIAnalyticsMessageService { + private messagesQueue: Array = []; + private registration: RegistrationSettingsDTO | null = null; + private triggerRun: EventEmitter = new EventEmitter(); + private runProgress$: DeferredPromise | null = null; + private stopTimeout: NodeJS.Timeout | null = null; + private retryTimeout: NodeJS.Timeout | null = null; + + constructor( + private repositoryService: RepositoryService, + private encryptionService: EncryptionService, + private logger: pino.Logger + ) {} + + start(): void { + this.registration = this.repositoryService.registrationRepository.getRegistrationSettings()!; + if (this.registration.status !== 'REGISTERED') { + this.logger.debug(`Message service not started: OIAnalytics not registered`); + return; + } + this.messagesQueue = this.repositoryService.oianalyticsMessageRepository.searchMessagesList({ status: ['PENDING'], types: [] }); + + this.triggerRun.on('next', async () => { + if (!this.runProgress$) { + if (this.messagesQueue.length > 0) { + await this.run(); + } + } + }); + this.triggerRun.emit('next'); + } + + async run(): Promise { + this.runProgress$ = new DeferredPromise(); + const [message] = this.messagesQueue; + + try { + await this.sendMessage(message); + } catch (error: any) { + this.logger.error( + `Error while sending message ${message.id} (created ${message.creationDate}) of type ${message.type}. ${error.toString()}` + ); + this.repositoryService.oianalyticsMessageRepository.markAsErrored(message.id, DateTime.now().toUTC().toISO(), error.toString()); + this.removeMessageFromQueue(message.id); + } + + this.runProgress$.resolve(); + this.runProgress$ = null; + } + + /** + * Removes messages from the queue. + * @param messageId + * @returns + */ + removeMessageFromQueue(messageId: string): void { + const idx = this.messagesQueue.findIndex(message => message.id === messageId); + if (idx === -1) return; + + this.messagesQueue.splice(idx, 1); + } + + /** + * Add a message to the message queue and trigger the next run if no message is running + * @param message - The message to add + */ + addMessageToQueue(message: OIAnalyticsMessageDTO): void { + this.messagesQueue.push(message); + this.triggerRun.emit('next'); + } + + async sendMessage(message: OIAnalyticsMessageDTO): Promise { + if (this.retryTimeout) { + clearTimeout(this.retryTimeout); + this.retryTimeout = null; + } + const endpoint = `/api/oianalytics/oibus/message`; + const registrationSettings = this.repositoryService.registrationRepository.getRegistrationSettings(); + const connectionSettings = await getNetworkSettingsFromRegistration(registrationSettings, endpoint, this.encryptionService); + const url = `${connectionSettings.host}${endpoint}`; + + try { + const response = await fetch(url, { + method: 'POST', + body: JSON.stringify(message), + headers: { ...connectionSettings.headers, 'Content-Type': 'application/json' }, + timeout: MESSAGE_TIMEOUT, + agent: connectionSettings.agent + }); + if (!response.ok) { + if (response.status !== 401) { + this.logger.error( + `Error ${response.status} while sending message ${message.id} (created ${message.creationDate}) of type ${message.type} on ${url}: ${response.statusText}` + ); + this.repositoryService.oianalyticsMessageRepository.markAsErrored( + message.id, + DateTime.now().toUTC().toISO(), + response.statusText + ); + this.removeMessageFromQueue(message.id); + } else { + this.logger.debug( + `Error ${response.status} while sending message ${message.id} (created ${message.creationDate}) of type ${message.type} on ${url}: ${response.statusText}` + ); + this.retryTimeout = setTimeout(() => { + this.triggerRun.emit('next'); + }, RETRY_TIMEOUT); + } + return; + } + + this.repositoryService.oianalyticsMessageRepository.markAsCompleted(message.id, DateTime.now().toUTC().toISO()); + this.logger.trace(`${message.id} (created ${message.creationDate}) of type ${message.type} sent`); + this.removeMessageFromQueue(message.id); + this.retryTimeout = setTimeout(() => { + this.triggerRun.emit('next'); + }, NEXT_TIMEOUT); + } catch (fetchError) { + this.logger.debug( + `Error while sending message ${message.id} (created ${message.creationDate}) of type ${message.type} on ${url}. ${fetchError}` + ); + this.retryTimeout = setTimeout(() => { + this.triggerRun.emit('next'); + }, RETRY_TIMEOUT); + } + } + + /** + * Stop services and timer + */ + async stop(): Promise { + this.logger.debug(`Stopping OIAnalytics message service...`); + + this.triggerRun.removeAllListeners(); + if (this.runProgress$) { + if (!this.stopTimeout) { + this.stopTimeout = setTimeout(() => { + this.runProgress$!.resolve(); + }, STOP_TIMEOUT); + } + this.logger.debug('Waiting for OIAnalytics message to finish'); + await this.runProgress$.promise; + clearTimeout(this.stopTimeout); + } + if (this.retryTimeout) { + clearTimeout(this.retryTimeout); + this.retryTimeout = null; + } + this.logger.debug(`OIAnalytics message service stopped`); + } + + setLogger(logger: pino.Logger) { + this.logger = logger; + } +} diff --git a/backend/src/service/oia/registration.service.spec.ts b/backend/src/service/oia/registration.service.spec.ts index 7dc0518b6d..d8b7217ad4 100644 --- a/backend/src/service/oia/registration.service.spec.ts +++ b/backend/src/service/oia/registration.service.spec.ts @@ -11,6 +11,8 @@ import { OIBusCommandDTO } from '../../../../shared/model/command.model'; import { generateRandomId, getNetworkSettingsFromRegistration, getOIBusInfo } from '../utils'; import CommandService from './command.service'; import CommandServiceMock from '../../tests/__mocks__/command-service.mock'; +import OIAnalyticsMessageService from './message.service'; +import OIAnalyticsMessageServiceMock from '../../tests/__mocks__/message-service.mock'; import RegistrationService from './registration.service'; import ReloadServiceMock from '../../tests/__mocks__/reload-service.mock'; import ReloadService from '../reload.service'; @@ -24,6 +26,7 @@ jest.mock('../proxy-agent'); const repositoryService: RepositoryService = new RepositoryServiceMock('', ''); const encryptionService: EncryptionService = new EncryptionServiceMock('', ''); const commandService: CommandService = new CommandServiceMock(); +const oianalyticsMessageService: OIAnalyticsMessageService = new OIAnalyticsMessageServiceMock(); const reloadService: ReloadService = new ReloadServiceMock(); const nowDateString = '2020-02-02T02:02:02.222Z'; @@ -59,7 +62,14 @@ describe('Registration service', () => { jest.useFakeTimers().setSystemTime(new Date(nowDateString)); (createProxyAgent as jest.Mock).mockReturnValue(undefined); - service = new RegistrationService(repositoryService, encryptionService, commandService, reloadService, logger); + service = new RegistrationService( + repositoryService, + encryptionService, + commandService, + oianalyticsMessageService, + reloadService, + logger + ); }); it('should get NOT_REGISTERED registration settings', () => { @@ -486,7 +496,14 @@ describe('Registration service with PENDING registration', () => { jest.useFakeTimers().setSystemTime(new Date(nowDateString)); (createProxyAgent as jest.Mock).mockReturnValue(undefined); (repositoryService.registrationRepository.getRegistrationSettings as jest.Mock).mockReturnValue(mockResult); - service = new RegistrationService(repositoryService, encryptionService, commandService, reloadService, logger); + service = new RegistrationService( + repositoryService, + encryptionService, + commandService, + oianalyticsMessageService, + reloadService, + logger + ); }); it('should get PENDING registration settings', () => { @@ -550,7 +567,14 @@ describe('Registration service with REGISTERED registration', () => { jest.useFakeTimers().setSystemTime(new Date(nowDateString)); (createProxyAgent as jest.Mock).mockReturnValue(undefined); (repositoryService.registrationRepository.getRegistrationSettings as jest.Mock).mockReturnValue(mockResult); - service = new RegistrationService(repositoryService, encryptionService, commandService, reloadService, logger); + service = new RegistrationService( + repositoryService, + encryptionService, + commandService, + oianalyticsMessageService, + reloadService, + logger + ); }); it('should get REGISTERED registration settings', () => { @@ -629,7 +653,14 @@ describe('OIBus service should interact with OIA and', () => { (repositoryService.registrationRepository.getRegistrationSettings as jest.Mock).mockReturnValue(mockResult); (repositoryService.engineRepository.getEngineSettings as jest.Mock).mockReturnValue(mockEngineSettings); (getNetworkSettingsFromRegistration as jest.Mock).mockReturnValue({ host: 'http://localhost:4200', headers: {}, agent: undefined }); - service = new RegistrationService(repositoryService, encryptionService, commandService, reloadService, logger); + service = new RegistrationService( + repositoryService, + encryptionService, + commandService, + oianalyticsMessageService, + reloadService, + logger + ); }); it('should ack commands and return if no commands in OIBus', async () => { @@ -796,7 +827,7 @@ describe('OIBus service should interact with OIA and', () => { }); await service.retrieveCommands(); - expect(logger.error).toHaveBeenCalledWith( + expect(logger.debug).toHaveBeenCalledWith( `Error while retrieving commands on http://localhost:4200/api/oianalytics/oibus/commands/pending. ${new Error('error')}` ); }); diff --git a/backend/src/service/oia/registration.service.ts b/backend/src/service/oia/registration.service.ts index 31318fd502..1bfbd54d72 100644 --- a/backend/src/service/oia/registration.service.ts +++ b/backend/src/service/oia/registration.service.ts @@ -10,6 +10,7 @@ import fetch from 'node-fetch'; import { Instant } from '../../../../shared/model/types'; import CommandService from './command.service'; import ReloadService from '../reload.service'; +import OIAnalyticsMessageService from './message.service'; const CHECK_TIMEOUT = 10_000; export default class RegistrationService { @@ -22,6 +23,7 @@ export default class RegistrationService { private repositoryService: RepositoryService, private encryptionService: EncryptionService, private commandService: CommandService, + private oianalyticsMessageService: OIAnalyticsMessageService, private reloadService: ReloadService, private logger: pino.Logger ) {} @@ -184,6 +186,10 @@ export default class RegistrationService { } else { await this.activateRegistration(DateTime.now().toUTC().toISO()!, responseData.accessToken); this.logger.info(`OIBus registered on ${registrationSettings.host}`); + await this.commandService.stop(); + this.commandService.start(); + await this.oianalyticsMessageService.stop(); + this.oianalyticsMessageService.start(); const engineSettings = this.repositoryService.engineRepository.getEngineSettings()!; if (engineSettings.logParameters.oia.level !== 'silent') { await this.reloadService.restartLogger(engineSettings); @@ -323,7 +329,7 @@ export default class RegistrationService { } await this.sendAckCommands(); } catch (fetchError) { - this.logger.error(`Error while retrieving commands on ${url}. ${fetchError}`); + this.logger.debug(`Error while retrieving commands on ${url}. ${fetchError}`); } } diff --git a/backend/src/service/reload.service.spec.ts b/backend/src/service/reload.service.spec.ts index 2fa31eae58..f570cb92e2 100644 --- a/backend/src/service/reload.service.spec.ts +++ b/backend/src/service/reload.service.spec.ts @@ -31,15 +31,20 @@ import ProxyServer from '../web-server/proxy-server'; import ProxyServerMock from '../tests/__mocks__/proxy-server.mock'; import OIBusService from './oibus.service'; import OibusServiceMock from '../tests/__mocks__/oibus-service.mock'; +import { getOIBusInfo } from './utils'; +import OIAnalyticsMessageService from './oia/message.service'; +import MessageServiceMock from '../tests/__mocks__/message-service.mock'; jest.mock('./encryption.service'); jest.mock('./logger/logger.service'); jest.mock('./engine-metrics.service'); +jest.mock('./utils'); const oibusEngine: OIBusEngine = new OibusEngineMock(); const historyQueryEngine: HistoryQueryEngine = new HistoryQueryEngineMock(); const encryptionService: EncryptionService = new EncryptionServiceMock('', ''); const proxyServer: ProxyServer = new ProxyServerMock(); +const oianalyticsMessageService: OIAnalyticsMessageService = new MessageServiceMock(); const repositoryService: RepositoryService = new RepositoryServiceMock('', ''); const engineMetricsService: EngineMetricsService = new EngineMetricsServiceMock(); const homeMetrics: HomeMetricsService = new HomeMetricsServiceMock(); @@ -65,6 +70,7 @@ describe('reload service', () => { oibusEngine, historyQueryEngine, oibusService, + oianalyticsMessageService, proxyServer ); }); @@ -83,22 +89,27 @@ describe('reload service', () => { it('should update port', async () => { const changePortFn = jest.fn(); - const oldSettings = { port: 2223, proxyEnabled: false, proxyPort: 8888 }; - const newSettings = { port: 2224, proxyEnabled: true, proxyPort: 9000 }; + const oldSettings = { name: 'oibus name', port: 2223, proxyEnabled: false, proxyPort: 8888 }; + const newSettings = { name: 'oibus name', port: 2224, proxyEnabled: true, proxyPort: 9000 }; service.setWebServerChangePort(changePortFn); await service.onUpdateOibusSettings(oldSettings as EngineSettingsDTO, newSettings as EngineSettingsDTO); expect(changePortFn).toHaveBeenCalledTimes(1); + expect(getOIBusInfo).not.toHaveBeenCalled(); + expect(repositoryService.oianalyticsMessageRepository.createOIAnalyticsMessages).not.toHaveBeenCalled(); expect(proxyServer.stop).toHaveBeenCalledTimes(1); expect(proxyServer.start).toHaveBeenCalledTimes(1); await service.onUpdateOibusSettings(null, newSettings as EngineSettingsDTO); expect(changePortFn).toHaveBeenCalledTimes(2); + expect(getOIBusInfo).toHaveBeenCalledTimes(1); + expect(proxyServer.stop).toHaveBeenCalledTimes(2); expect(proxyServer.start).toHaveBeenCalledTimes(2); - await service.onUpdateOibusSettings(newSettings as EngineSettingsDTO, newSettings as EngineSettingsDTO); + await service.onUpdateOibusSettings({ ...newSettings, name: 'another name' } as EngineSettingsDTO, newSettings as EngineSettingsDTO); expect(changePortFn).toHaveBeenCalledTimes(2); + expect(getOIBusInfo).toHaveBeenCalledTimes(2); expect(proxyServer.stop).toHaveBeenCalledTimes(2); expect(proxyServer.start).toHaveBeenCalledTimes(2); }); @@ -108,6 +119,9 @@ describe('reload service', () => { const newSettings = { id: 'oibusId', name: 'oibusName', logParameters: {} as LogSettings }; service.setWebServerChangeLogger(changeLoggerFn); await service.onUpdateOibusSettings(null, newSettings as EngineSettingsDTO); + expect(getOIBusInfo).toHaveBeenCalledTimes(1); + expect(getOIBusInfo).toHaveBeenCalledWith(newSettings); + expect(repositoryService.oianalyticsMessageRepository.createOIAnalyticsMessages).toHaveBeenCalledTimes(1); expect(loggerService.stop).toHaveBeenCalledTimes(1); expect(loggerService.start).toHaveBeenCalledWith(newSettings.id, newSettings.name, newSettings.logParameters, { id: 'id1' }); expect(changeLoggerFn).toHaveBeenCalledTimes(1); diff --git a/backend/src/service/reload.service.ts b/backend/src/service/reload.service.ts index d3334186df..73c0a5d34a 100644 --- a/backend/src/service/reload.service.ts +++ b/backend/src/service/reload.service.ts @@ -21,6 +21,9 @@ import { ScanModeCommandDTO } from '../../../shared/model/scan-mode.model'; import HomeMetricsService from './home-metrics.service'; import ProxyServer from '../web-server/proxy-server'; import OIBusService from './oibus.service'; +import { getOIBusInfo } from './utils'; +import { OIAnalyticsMessageInfoCommandDTO } from '../../../shared/model/oianalytics-message.model'; +import OIAnalyticsMessageService from './oia/message.service'; export default class ReloadService { private webServerChangeLoggerCallback: (logger: pino.Logger) => void = () => {}; @@ -36,6 +39,7 @@ export default class ReloadService { private readonly _oibusEngine: OIBusEngine, private readonly _historyEngine: HistoryQueryEngine, private readonly _oibusService: OIBusService, + private readonly _oianalyticsMessageService: OIAnalyticsMessageService, private readonly _proxyServer: ProxyServer ) {} @@ -94,6 +98,16 @@ export default class ReloadService { oldSettings.name !== newSettings.name ) { await this.restartLogger(newSettings); + const registration = this.repositoryService.registrationRepository.getRegistrationSettings()!; + if (oldSettings?.name !== newSettings.name && registration.status !== 'NOT_REGISTERED') { + const info = getOIBusInfo(newSettings); + const infoMessageCommand: OIAnalyticsMessageInfoCommandDTO = { + type: 'INFO', + content: info + }; + const createdMessage = this.repositoryService.oianalyticsMessageRepository.createOIAnalyticsMessages(infoMessageCommand); + this._oianalyticsMessageService.addMessageToQueue(createdMessage); + } } if (!oldSettings || oldSettings.port !== newSettings.port) { await this.webServerChangePortCallback(newSettings.port); diff --git a/backend/src/service/repository.service.spec.ts b/backend/src/service/repository.service.spec.ts index 97394f34eb..b7de49ddba 100644 --- a/backend/src/service/repository.service.spec.ts +++ b/backend/src/service/repository.service.spec.ts @@ -22,6 +22,7 @@ import NorthConnectorMetricsRepository from '../repository/north-connector-metri import CertificateRepository from '../repository/certificate.repository'; import RegistrationRepository from '../repository/registration.repository'; import CommandRepository from '../repository/command.repository'; +import OianalyticsMessageRepository from '../repository/oianalytics-message.repository'; jest.mock('better-sqlite3', () => jest.fn(() => 'sqlite database')); jest.mock('../repository/external-source.repository'); @@ -44,6 +45,7 @@ jest.mock('../repository/subscription.repository'); jest.mock('../repository/certificate.repository'); jest.mock('../repository/registration.repository'); jest.mock('../repository/command.repository'); +jest.mock('../repository/oianalytics-message.repository'); describe('Repository service', () => { it('should properly initialize service', () => { @@ -72,6 +74,7 @@ describe('Repository service', () => { expect(RegistrationRepository).toHaveBeenCalledWith('sqlite database'); expect(CertificateRepository).toHaveBeenCalledWith('sqlite database'); expect(CommandRepository).toHaveBeenCalledWith('sqlite database'); + expect(OianalyticsMessageRepository).toHaveBeenCalledWith('sqlite database'); expect(repositoryService.engineRepository).toBeDefined(); expect(repositoryService.cryptoRepository).toBeDefined(); @@ -93,5 +96,6 @@ describe('Repository service', () => { expect(repositoryService.subscriptionRepository).toBeDefined(); expect(repositoryService.certificateRepository).toBeDefined(); expect(repositoryService.commandRepository).toBeDefined(); + expect(repositoryService.oianalyticsMessageRepository).toBeDefined(); }); }); diff --git a/backend/src/service/repository.service.ts b/backend/src/service/repository.service.ts index 3c754b5aab..9fe3d9f2b5 100644 --- a/backend/src/service/repository.service.ts +++ b/backend/src/service/repository.service.ts @@ -20,6 +20,7 @@ import EngineMetricsRepository from '../repository/engine-metrics.repository'; import CertificateRepository from '../repository/certificate.repository'; import RegistrationRepository from '../repository/registration.repository'; import CommandRepository from '../repository/command.repository'; +import OianalyticsMessageRepository from '../repository/oianalytics-message.repository'; export default class RepositoryService { private readonly _engineRepository: EngineRepository; @@ -42,6 +43,7 @@ export default class RepositoryService { private readonly _subscriptionRepository: SubscriptionRepository; private readonly _registrationRepository: RegistrationRepository; private readonly _commandRepository: CommandRepository; + private readonly _oianalyticsMessageRepository: OianalyticsMessageRepository; constructor(oibusDatabasePath: string, logsDatabasePath: string, cryptoDatabasePath: string, cacheDatabasePath: string) { const oibusDatabase = Database(oibusDatabasePath); @@ -64,6 +66,7 @@ export default class RepositoryService { this._subscriptionRepository = new SubscriptionRepository(oibusDatabase); this._registrationRepository = new RegistrationRepository(oibusDatabase); this._commandRepository = new CommandRepository(oibusDatabase); + this._oianalyticsMessageRepository = new OianalyticsMessageRepository(oibusDatabase); this._cryptoRepository = new CryptoRepository(cryptoDatabase); @@ -155,4 +158,8 @@ export default class RepositoryService { get commandRepository(): CommandRepository { return this._commandRepository; } + + get oianalyticsMessageRepository(): OianalyticsMessageRepository { + return this._oianalyticsMessageRepository; + } } diff --git a/backend/src/service/utils.spec.ts b/backend/src/service/utils.spec.ts index f659c9bea4..20667220bb 100644 --- a/backend/src/service/utils.spec.ts +++ b/backend/src/service/utils.spec.ts @@ -2,7 +2,6 @@ import path from 'node:path'; import fs from 'node:fs/promises'; import fsSync, { Dirent, Stats } from 'node:fs'; import zlib from 'node:zlib'; -import { version } from '../../package.json'; import minimist from 'minimist'; @@ -1133,12 +1132,12 @@ describe('Service utils', () => { hostname: os.hostname(), operatingSystem: `${os.type()} ${os.release()}`, processId: process.pid.toString(), - version: version, + version: '3.3.3', oibusId: 'id', oibusName: 'name', platform: getPlatformFromOsType(os.type()) }; - const result = getOIBusInfo({ id: 'id', name: 'name' } as EngineSettingsDTO); + const result = getOIBusInfo({ id: 'id', name: 'name', version: '3.3.3' } as EngineSettingsDTO); expect(result).toEqual(expectedResult); }); diff --git a/backend/src/service/utils.ts b/backend/src/service/utils.ts index 484aab3593..93553ac3d6 100644 --- a/backend/src/service/utils.ts +++ b/backend/src/service/utils.ts @@ -15,7 +15,6 @@ import https from 'node:https'; import http from 'node:http'; import { EngineSettingsDTO, OIBusInfo, RegistrationSettingsDTO } from '../../../shared/model/engine.model'; import os from 'node:os'; -import { version } from '../../package.json'; import { NorthCacheFiles } from '../../../shared/model/north-connector.model'; import EncryptionService from './encryption.service'; import { createProxyAgent } from './proxy-agent'; @@ -488,7 +487,7 @@ export const getOIBusInfo = (oibusSettings: EngineSettingsDTO): OIBusInfo => { hostname: os.hostname(), operatingSystem: `${os.type()} ${os.release()}`, architecture: process.arch, - version, + version: oibusSettings.version, oibusId: oibusSettings.id, oibusName: oibusSettings.name, platform: getPlatformFromOsType(os.type()) diff --git a/backend/src/tests/__mocks__/message-service.mock.ts b/backend/src/tests/__mocks__/message-service.mock.ts new file mode 100644 index 0000000000..06a7e1a7a9 --- /dev/null +++ b/backend/src/tests/__mocks__/message-service.mock.ts @@ -0,0 +1,11 @@ +/** + * Create a mock object for Command Service + */ +export default jest.fn().mockImplementation(() => { + return { + start: jest.fn(), + stop: jest.fn(), + addMessageToQueue: jest.fn(), + removeMessageFromQueue: jest.fn(pass => pass) + }; +}); diff --git a/backend/src/tests/__mocks__/repository-service.mock.ts b/backend/src/tests/__mocks__/repository-service.mock.ts index d8f7211943..f7cc9377d2 100644 --- a/backend/src/tests/__mocks__/repository-service.mock.ts +++ b/backend/src/tests/__mocks__/repository-service.mock.ts @@ -128,9 +128,18 @@ export default jest.fn().mockImplementation(() => ({ markAsAcknowledged: jest.fn(), delete: jest.fn() }, + oianalyticsMessageRepository: { + searchMessagesPage: jest.fn(), + searchMessagesList: jest.fn(), + updateOIAnalyticsMessages: jest.fn(), + createOIAnalyticsMessages: jest.fn(), + markAsCompleted: jest.fn(), + markAsErrored: jest.fn() + }, engineRepository: { getEngineSettings: jest.fn(), - updateEngineSettings: jest.fn() + updateEngineSettings: jest.fn(), + updateVersion: jest.fn() }, subscriptionRepository: { getNorthSubscriptions: jest.fn(), diff --git a/backend/src/web-server/controllers/oibus.controller.spec.ts b/backend/src/web-server/controllers/oibus.controller.spec.ts index 8d1aa09cfe..626c3b25ec 100644 --- a/backend/src/web-server/controllers/oibus.controller.spec.ts +++ b/backend/src/web-server/controllers/oibus.controller.spec.ts @@ -35,6 +35,7 @@ describe('Oibus controller', () => { } as EngineSettingsCommandDTO; engine = { id: '1', + version: '3.3.4', ...engineCommand }; }); diff --git a/frontend/src/app/engine/edit-engine/edit-engine.component.spec.ts b/frontend/src/app/engine/edit-engine/edit-engine.component.spec.ts index a9b709541b..acff34b195 100644 --- a/frontend/src/app/engine/edit-engine/edit-engine.component.spec.ts +++ b/frontend/src/app/engine/edit-engine/edit-engine.component.spec.ts @@ -101,6 +101,7 @@ describe('EditEngineComponent', () => { id: 'id', name: 'OIBus Test', port: 2223, + version: '3.3.3', proxyEnabled: true, proxyPort: 9000, logParameters: { diff --git a/shared/model/engine.model.ts b/shared/model/engine.model.ts index d3d8acb01b..7943f51bd8 100644 --- a/shared/model/engine.model.ts +++ b/shared/model/engine.model.ts @@ -70,6 +70,7 @@ export interface LogSettings { export interface EngineSettingsDTO extends BaseEntity { name: string; port: number; + version: string; proxyEnabled: boolean; proxyPort: number; logParameters: LogSettings; diff --git a/shared/model/oianalytics-message.model.ts b/shared/model/oianalytics-message.model.ts new file mode 100644 index 0000000000..897931e44a --- /dev/null +++ b/shared/model/oianalytics-message.model.ts @@ -0,0 +1,48 @@ +import { BaseEntity, Instant } from './types'; + +export const OIANALYTICS_MESSAGE_STATUS = ['PENDING', 'COMPLETED', 'ERRORED'] as const; +export type OIAnalyticsMessageStatus = (typeof OIANALYTICS_MESSAGE_STATUS)[number]; + +export const OIANALYTICS_MESSAGE_TYPES = ['INFO'] as const; +export type OIAnalyticsMessageType = (typeof OIANALYTICS_MESSAGE_TYPES)[number]; + +interface BaseOIAnalyticsMessageDTO extends BaseEntity { + type: OIAnalyticsMessageType; + status: OIAnalyticsMessageStatus; + error?: string; + completedDate?: Instant; + content: T; +} + +interface OIAnalyticsMessageCommandDTO extends Omit, 'id' | 'creationDate' | 'lastEditInstant' | 'status'| 'error' | 'completedDate' > {} + +export interface OIAnalyticsMessageSearchParam { + types: Array; + status: Array; + start?: Instant; + end?: Instant; +} + +export interface InfoMessageContent { + version: string; + oibusName: string; + oibusId: string; + dataDirectory: string; + binaryDirectory: string; + processId: string; + hostname: string; + operatingSystem: string; + architecture: string; + platform: string; +} + +export interface OIAnalyticsMessageInfoDTO extends BaseOIAnalyticsMessageDTO { + type: 'INFO' +} + +export interface OIAnalyticsMessageInfoCommandDTO extends OIAnalyticsMessageCommandDTO { + type: 'INFO' +} + +export type OIAnalyticsMessageCommand = OIAnalyticsMessageInfoCommandDTO +export type OIAnalyticsMessageDTO = OIAnalyticsMessageInfoDTO