diff --git a/src/config/configuration.ts b/src/config/configuration.ts index d8f2ffad..6cffe4c5 100644 --- a/src/config/configuration.ts +++ b/src/config/configuration.ts @@ -18,6 +18,7 @@ export default (): any => { backend: { baseurl: process.env.BACKEND_BASEURL || "https://localhost:3000", deviceStatsIntervalInDays: parseInt(process.env.DEVICE_STATS_INTERVAL_IN_DAYS, 10) || 29, + datatargetLogMaxEvents: process.env.DATATARGET_LOG_MAX_EVENTS || 250, }, kombit: { entryPoint: diff --git a/src/controllers/admin-controller/data-target-log.controller.ts b/src/controllers/admin-controller/data-target-log.controller.ts new file mode 100644 index 00000000..506eaf6a --- /dev/null +++ b/src/controllers/admin-controller/data-target-log.controller.ts @@ -0,0 +1,33 @@ +import { ComposeAuthGuard } from "@auth/compose-auth.guard"; +import { Read } from "@auth/roles.decorator"; +import { RolesGuard } from "@auth/roles.guard"; +import { ApiAuth } from "@auth/swagger-auth-decorator"; +import { DatatargetLog } from "@entities/datatarget-log.entity"; +import { Controller, Get, Param, ParseIntPipe, UseGuards } from "@nestjs/common"; +import { ApiForbiddenResponse, ApiTags, ApiUnauthorizedResponse } from "@nestjs/swagger"; +import { InjectRepository } from "@nestjs/typeorm"; +import { Repository } from "typeorm"; + +@ApiTags("Data Target Logs") +@Controller("datatarget-log") +@UseGuards(ComposeAuthGuard, RolesGuard) +@ApiAuth() +@Read() +@ApiForbiddenResponse() +@ApiUnauthorizedResponse() +export class DatatargetLogController { + constructor( + @InjectRepository(DatatargetLog) + private datatargetLogRepository: Repository + ) {} + + @Get(":datatargetId") + async getDatatargetLogs(@Param("datatargetId", new ParseIntPipe()) datatargetId: number): Promise { + return await this.datatargetLogRepository.find({ + where: { + datatarget: { id: datatargetId }, + }, + relations: ["iotDevice"], + }); + } +} diff --git a/src/controllers/admin-controller/data-target.controller.ts b/src/controllers/admin-controller/data-target.controller.ts index 686368bc..6d570d01 100644 --- a/src/controllers/admin-controller/data-target.controller.ts +++ b/src/controllers/admin-controller/data-target.controller.ts @@ -75,7 +75,7 @@ export class DataTargetController { @ApiOperation({ summary: "Find DataTarget by id" }) async findOne(@Req() req: AuthenticatedRequest, @Param("id", new ParseIntPipe()) id: number): Promise { try { - const dataTarget = await this.dataTargetService.findOne(id); + const dataTarget = await this.dataTargetService.findOneWithHasRecentError(id); checkIfUserHasAccessToApplication(req, dataTarget.application.id, ApplicationAccessScope.Read); return dataTarget; } catch (err) { diff --git a/src/entities/data-target.entity.ts b/src/entities/data-target.entity.ts index c8bd464d..3e346caf 100644 --- a/src/entities/data-target.entity.ts +++ b/src/entities/data-target.entity.ts @@ -19,6 +19,9 @@ export abstract class DataTarget extends DbBaseEntity { @Column() name: string; + @Column({ nullable: true }) + lastMessageDate?: Date; + @ManyToOne( // eslint-disable-next-line @typescript-eslint/no-unused-vars type => Application, diff --git a/src/entities/datatarget-log.entity.ts b/src/entities/datatarget-log.entity.ts new file mode 100644 index 00000000..d5690319 --- /dev/null +++ b/src/entities/datatarget-log.entity.ts @@ -0,0 +1,31 @@ +import { SendStatus } from "@enum/send-status.enum"; +import { Column, Entity, Index, JoinColumn, ManyToOne } from "typeorm"; +import { DbBaseEntity } from "./base.entity"; +import { DataTarget } from "./data-target.entity"; +import { IoTDevice } from "./iot-device.entity"; +import { PayloadDecoder } from "./payload-decoder.entity"; + +@Entity("datatarget-log") +@Index(["datatarget", "createdAt"]) +export class DatatargetLog extends DbBaseEntity { + @ManyToOne(() => DataTarget, { onDelete: "CASCADE" }) + @JoinColumn() + datatarget: DataTarget; + + @Column() + type: SendStatus; + + @Column({ nullable: true }) + statusCode?: number; + + @Column({ nullable: true }) + message?: string; + + @ManyToOne(() => IoTDevice, { onDelete: "SET NULL", nullable: true }) + @JoinColumn() + iotDevice?: IoTDevice; + + @ManyToOne(() => PayloadDecoder, { onDelete: "SET NULL", nullable: true }) + @JoinColumn() + payloadDecoder?: PayloadDecoder; +} diff --git a/src/entities/dto/list-all-data-targets-response.dto.ts b/src/entities/dto/list-all-data-targets-response.dto.ts index 85859694..231275a2 100644 --- a/src/entities/dto/list-all-data-targets-response.dto.ts +++ b/src/entities/dto/list-all-data-targets-response.dto.ts @@ -1,4 +1,8 @@ import { ListAllEntitiesResponseDto } from "@dto/list-all-entities-response.dto"; import { DataTarget } from "@entities/data-target.entity"; -export class ListAllDataTargetsResponseDto extends ListAllEntitiesResponseDto {} +export type DataTargetDto = DataTarget & { + hasRecentErrors?: boolean; +}; + +export class ListAllDataTargetsResponseDto extends ListAllEntitiesResponseDto {} diff --git a/src/entities/interfaces/data-target-send-status.interface.ts b/src/entities/interfaces/data-target-send-status.interface.ts index a97940e5..ba43ce03 100644 --- a/src/entities/interfaces/data-target-send-status.interface.ts +++ b/src/entities/interfaces/data-target-send-status.interface.ts @@ -3,4 +3,6 @@ import { SendStatus } from "@enum/send-status.enum"; export interface DataTargetSendStatus { errorMessage?: string; status: SendStatus; + statusCode?: number; + statusText?: string; } diff --git a/src/migration/1726653509863-datatarget-log.ts b/src/migration/1726653509863-datatarget-log.ts new file mode 100644 index 00000000..cb64c3b2 --- /dev/null +++ b/src/migration/1726653509863-datatarget-log.ts @@ -0,0 +1,22 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class DatatargetLog1726653509863 implements MigrationInterface { + name = 'DatatargetLog1726653509863' + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "data_target" ADD "lastMessageDate" TIMESTAMP`); + await queryRunner.query(`CREATE TABLE "datatarget-log" ("id" SERIAL NOT NULL, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), "type" character varying NOT NULL, "statusCode" integer, "message" character varying, "createdById" integer, "updatedById" integer, "datatargetId" integer, "iotDeviceId" integer, "payloadDecoderId" integer, CONSTRAINT "PK_f853c6517b8a428fa350221a0f2" PRIMARY KEY ("id"))`); + await queryRunner.query(`CREATE INDEX "IDX_1d333574ef086bee54ff78b2e0" ON "datatarget-log" ("datatargetId", "createdAt") `); + await queryRunner.query(`ALTER TABLE "datatarget-log" ADD CONSTRAINT "FK_2f62220b03e524c7a056890fb3b" FOREIGN KEY ("createdById") REFERENCES "user"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`); + await queryRunner.query(`ALTER TABLE "datatarget-log" ADD CONSTRAINT "FK_7bcebf642504f26aa8c03074cca" FOREIGN KEY ("updatedById") REFERENCES "user"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`); + await queryRunner.query(`ALTER TABLE "datatarget-log" ADD CONSTRAINT "FK_0e1507ac15e7d6755273691345e" FOREIGN KEY ("datatargetId") REFERENCES "data_target"("id") ON DELETE CASCADE ON UPDATE NO ACTION`); + await queryRunner.query(`ALTER TABLE "datatarget-log" ADD CONSTRAINT "FK_4fd74ac0a22db2f38e6b420a657" FOREIGN KEY ("iotDeviceId") REFERENCES "iot_device"("id") ON DELETE SET NULL ON UPDATE NO ACTION`); + await queryRunner.query(`ALTER TABLE "datatarget-log" ADD CONSTRAINT "FK_28c126089e5dac0360926acf819" FOREIGN KEY ("payloadDecoderId") REFERENCES "payload_decoder"("id") ON DELETE SET NULL ON UPDATE NO ACTION`); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "data_target" DROP COLUMN "lastMessageDate"`); + await queryRunner.query(`DROP TABLE "datatarget-log"`); + } + +} diff --git a/src/modules/device-management/data-target.module.ts b/src/modules/device-management/data-target.module.ts index da007996..285f3fcf 100644 --- a/src/modules/device-management/data-target.module.ts +++ b/src/modules/device-management/data-target.module.ts @@ -1,3 +1,4 @@ +import { DatatargetLogController } from "@admin-controller/data-target-log.controller"; import { DataTargetController } from "@admin-controller/data-target.controller"; import configuration from "@config/configuration"; import { ApplicationModule } from "@modules/device-management/application.module"; @@ -8,6 +9,7 @@ import { ConfigModule } from "@nestjs/config"; import { DataTargetService } from "@services/data-targets/data-target.service"; import { OS2IoTMail } from "@services/os2iot-mail.service"; import { CLIENT_SECRET_PROVIDER, PlainTextClientSecretProvider } from "../../helpers/fiware-token.helper"; +import { DataTargetLogService } from "@services/data-targets/data-target-log.service"; @Module({ imports: [ @@ -16,10 +18,11 @@ import { CLIENT_SECRET_PROVIDER, PlainTextClientSecretProvider } from "../../hel OrganizationModule, ConfigModule.forRoot({ load: [configuration] }), ], - exports: [DataTargetService], - controllers: [DataTargetController], + exports: [DataTargetService, DataTargetLogService], + controllers: [DataTargetController, DatatargetLogController], providers: [ DataTargetService, + DataTargetLogService, OS2IoTMail, { provide: CLIENT_SECRET_PROVIDER, diff --git a/src/modules/shared.module.ts b/src/modules/shared.module.ts index c50574c7..4daa177c 100644 --- a/src/modules/shared.module.ts +++ b/src/modules/shared.module.ts @@ -1,39 +1,40 @@ import { Module } from "@nestjs/common"; import { TypeOrmModule } from "@nestjs/typeorm"; +import { ApiKey } from "@entities/api-key.entity"; +import { ApplicationDeviceType } from "@entities/application-device-type.entity"; import { Application } from "@entities/application.entity"; +import { ControlledProperty } from "@entities/controlled-property.entity"; import { DataTarget } from "@entities/data-target.entity"; +import { DatatargetLog } from "@entities/datatarget-log.entity"; +import { DeviceModel } from "@entities/device-model.entity"; +import { Downlink } from "@entities/downlink.entity"; +import { FiwareDataTarget } from "@entities/fiware-data-target.entity"; +import { GatewayStatusHistory } from "@entities/gateway-status-history.entity"; +import { Gateway } from "@entities/gateway.entity"; import { GenericHTTPDevice } from "@entities/generic-http-device.entity"; import { HttpPushDataTarget } from "@entities/http-push-data-target.entity"; -import { FiwareDataTarget } from "@entities/fiware-data-target.entity"; import { IoTDevicePayloadDecoderDataTargetConnection } from "@entities/iot-device-payload-decoder-data-target-connection.entity"; import { IoTDevice } from "@entities/iot-device.entity"; import { LoRaWANDevice } from "@entities/lorawan-device.entity"; +import { LorawanMulticastDefinition } from "@entities/lorawan-multicast.entity"; +import { MqttDataTarget } from "@entities/mqtt-data-target.entity"; +import { MQTTExternalBrokerDevice } from "@entities/mqtt-external-broker-device.entity"; +import { MQTTInternalBrokerDevice } from "@entities/mqtt-internal-broker-device.entity"; +import { Multicast } from "@entities/multicast.entity"; +import { OpenDataDkDataset } from "@entities/open-data-dk-dataset.entity"; +import { OpenDataDkDataTarget } from "@entities/open-data-dk-push-data-target.entity"; import { Organization } from "@entities/organization.entity"; import { PayloadDecoder } from "@entities/payload-decoder.entity"; +import { PermissionTypeEntity } from "@entities/permissions/permission-type.entity"; import { Permission } from "@entities/permissions/permission.entity"; -import { ReceivedMessage } from "@entities/received-message.entity"; import { ReceivedMessageMetadata } from "@entities/received-message-metadata.entity"; +import { ReceivedMessageSigFoxSignals } from "@entities/received-message-sigfox-signals.entity"; +import { ReceivedMessage } from "@entities/received-message.entity"; import { SigFoxDevice } from "@entities/sigfox-device.entity"; import { SigFoxGroup } from "@entities/sigfox-group.entity"; import { User } from "@entities/user.entity"; -import { DeviceModel } from "@entities/device-model.entity"; -import { OpenDataDkDataset } from "@entities/open-data-dk-dataset.entity"; import { AuditLog } from "@services/audit-log.service"; -import { ApiKey } from "@entities/api-key.entity"; -import { Multicast } from "@entities/multicast.entity"; -import { LorawanMulticastDefinition } from "@entities/lorawan-multicast.entity"; -import { ControlledProperty } from "@entities/controlled-property.entity"; -import { ApplicationDeviceType } from "@entities/application-device-type.entity"; -import { ReceivedMessageSigFoxSignals } from "@entities/received-message-sigfox-signals.entity"; -import { MqttDataTarget } from "@entities/mqtt-data-target.entity"; -import { PermissionTypeEntity } from "@entities/permissions/permission-type.entity"; -import { GatewayStatusHistory } from "@entities/gateway-status-history.entity"; -import { OpenDataDkDataTarget } from "@entities/open-data-dk-push-data-target.entity"; -import { MQTTInternalBrokerDevice } from "@entities/mqtt-internal-broker-device.entity"; -import { MQTTExternalBrokerDevice } from "@entities/mqtt-external-broker-device.entity"; -import { Gateway } from "@entities/gateway.entity"; -import { Downlink } from "@entities/downlink.entity"; @Module({ imports: [ @@ -41,6 +42,7 @@ import { Downlink } from "@entities/downlink.entity"; User, Application, DataTarget, + DatatargetLog, GenericHTTPDevice, HttpPushDataTarget, FiwareDataTarget, @@ -70,7 +72,7 @@ import { Downlink } from "@entities/downlink.entity"; MQTTInternalBrokerDevice, MQTTExternalBrokerDevice, Gateway, - Downlink + Downlink, ]), ], providers: [AuditLog], diff --git a/src/services/data-targets/base-data-target.service.ts b/src/services/data-targets/base-data-target.service.ts index 63975586..16135475 100644 --- a/src/services/data-targets/base-data-target.service.ts +++ b/src/services/data-targets/base-data-target.service.ts @@ -10,18 +10,20 @@ import { DataTarget } from "@entities/data-target.entity"; export abstract class BaseDataTargetService { protected readonly baseLogger = new Logger(BaseDataTargetService.name); - success(receiver: string): DataTargetSendStatus { + success(receiver: string, statusCode?: number, statusText?: string): DataTargetSendStatus { this.baseLogger.debug(`Send to ${receiver} sucessful!`); - return { status: SendStatus.OK }; + return { status: SendStatus.OK, statusCode, statusText }; } - failure(receiver: string, errorMessage: string, dataTarget: DataTarget): DataTargetSendStatus { + failure(receiver: string, errorMessage: string, dataTarget: DataTarget, statusCode?: number, statusText?: string): DataTargetSendStatus { this.baseLogger.error( `Datatarget {Id: ${dataTarget.id}, Name: ${dataTarget.name}} Send to ${receiver} failed with error ${errorMessage}` ); return { status: SendStatus.ERROR, errorMessage: errorMessage.toString(), + statusCode, + statusText }; } } diff --git a/src/services/data-targets/data-target-kafka-listener.service.ts b/src/services/data-targets/data-target-kafka-listener.service.ts index b2ffc43e..84efa543 100644 --- a/src/services/data-targets/data-target-kafka-listener.service.ts +++ b/src/services/data-targets/data-target-kafka-listener.service.ts @@ -4,14 +4,14 @@ import { IoTDevice } from "@entities/iot-device.entity"; import { DataTargetType } from "@enum/data-target-type.enum"; import { KafkaTopic } from "@enum/kafka-topic.enum"; import { DataTargetSendStatus } from "@interfaces/data-target-send-status.interface"; -import { Injectable, Logger, NotImplementedException } from "@nestjs/common"; +import { Injectable, Logger } from "@nestjs/common"; import { DataTargetService } from "@services/data-targets/data-target.service"; import { HttpPushDataTargetService } from "@services/data-targets/http-push-data-target.service"; -import { IoTDevicePayloadDecoderDataTargetConnectionService } from "@services/device-management/iot-device-payload-decoder-data-target-connection.service"; import { IoTDeviceService } from "@services/device-management/iot-device.service"; import { AbstractKafkaConsumer } from "@services/kafka/kafka.abstract.consumer"; import { CombinedSubscribeTo } from "@services/kafka/kafka.decorator"; import { KafkaPayload } from "@services/kafka/kafka.message"; +import { DataTargetLogService } from "./data-target-log.service"; import { FiwareDataTargetService } from "./fiware-data-target.service"; import { MqttDataTargetService } from "./mqtt-data-target.service"; @@ -25,7 +25,7 @@ export class DataTargetKafkaListenerService extends AbstractKafkaConsumer { private httpPushDataTargetService: HttpPushDataTargetService, private fiwareDataTargetService: FiwareDataTargetService, private mqttDataTargetService: MqttDataTargetService, - private ioTDevicePayloadDecoderDataTargetConnectionService: IoTDevicePayloadDecoderDataTargetConnectionService + private dataTargetLogService: DataTargetLogService ) { super(); } @@ -72,22 +72,22 @@ export class DataTargetKafkaListenerService extends AbstractKafkaConsumer { if (target.type == DataTargetType.HttpPush) { try { const status = await this.httpPushDataTargetService.send(target, dto); - this.logger.debug(`Sent to HttpPush target: ${JSON.stringify(status)}`); + await this.onSendDone(status, DataTargetType.HttpPush, target, dto); } catch (err) { - this.logger.error(`Error while sending to Http Push DataTarget: ${err}`); + await this.onSendError(err, DataTargetType.HttpPush, target, dto); } } else if (target.type == DataTargetType.Fiware) { try { const status = await this.fiwareDataTargetService.send(target, dto); - this.logger.debug(`Sent to FIWARE target: ${JSON.stringify(status)}`); + await this.onSendDone(status, DataTargetType.Fiware, target, dto); } catch (err) { - this.logger.error(`Error while sending to FIWARE DataTarget: ${err}`); + await this.onSendError(err, DataTargetType.Fiware, target, dto); } } else if (target.type === DataTargetType.MQTT) { try { - this.mqttDataTargetService.send(target, dto, this.onSendDone); + this.mqttDataTargetService.send(target, dto, this.onSendDone, this.onSendError); } catch (err) { - this.logger.error(`Error while sending to MQTT DataTarget: ${err}`); + await this.onSendError(err, DataTargetType.MQTT, target, dto); } } else if (target.type === DataTargetType.OpenDataDK) { // OpenDataDk data targets are handled uniquely and ignored here. @@ -97,7 +97,24 @@ export class DataTargetKafkaListenerService extends AbstractKafkaConsumer { }); } - private onSendDone = (status: DataTargetSendStatus, targetType: DataTargetType) => { + private onSendDone = async ( + status: DataTargetSendStatus, + targetType: DataTargetType, + datatarget: DataTarget, + payloadDto: TransformedPayloadDto + ) => { this.logger.debug(`Sent to ${targetType} target: ${JSON.stringify(status)}`); + await this.dataTargetService.updateLastMessageDate(datatarget.id); + await this.dataTargetLogService.onSendDone(status, datatarget, payloadDto); + }; + private onSendError = async ( + err: Error, + targetType: DataTargetType, + datatarget: DataTarget, + payloadDto: TransformedPayloadDto + ) => { + this.logger.error(`Error while sending to ${targetType} DataTarget: ${err}`); + await this.dataTargetService.updateLastMessageDate(datatarget.id); + await this.dataTargetLogService.onSendError(err, datatarget, payloadDto); }; } diff --git a/src/services/data-targets/data-target-log.service.ts b/src/services/data-targets/data-target-log.service.ts new file mode 100644 index 00000000..7a0cf9f9 --- /dev/null +++ b/src/services/data-targets/data-target-log.service.ts @@ -0,0 +1,113 @@ +import { TransformedPayloadDto } from "@dto/kafka/transformed-payload.dto"; +import { DataTarget } from "@entities/data-target.entity"; +import { DatatargetLog } from "@entities/datatarget-log.entity"; +import { IoTDevice } from "@entities/iot-device.entity"; +import { PayloadDecoder } from "@entities/payload-decoder.entity"; +import { SendStatus } from "@enum/send-status.enum"; +import { DataTargetSendStatus } from "@interfaces/data-target-send-status.interface"; +import { Injectable } from "@nestjs/common"; +import { ConfigService } from "@nestjs/config"; +import { InjectRepository } from "@nestjs/typeorm"; +import { In, MoreThan, Repository } from "typeorm"; + +@Injectable() +export class DataTargetLogService { + private datatargetLogMaxEvents: number; + + constructor( + configService: ConfigService, + @InjectRepository(DatatargetLog) + private datatargetLogRepository: Repository + ) { + this.datatargetLogMaxEvents = configService.get("backend.datatargetLogMaxEvents"); + } + + public onSendDone = async ( + status: DataTargetSendStatus, + datatarget: DataTarget, + payloadDto: TransformedPayloadDto + ) => { + // If this is just an OK-event, we only want to log it if the latest event before it was an error, for this datatarget. Otherwise early return + if (status.status === SendStatus.OK) { + const datatargetLastestEvent = await this.datatargetLogRepository.findOne({ + where: { datatarget: { id: datatarget.id } }, + order: { createdAt: "DESC" }, + }); + if (datatargetLastestEvent?.type !== SendStatus.ERROR) return; + } + await this.handleDataTargetLogCommon( + datatarget, + status?.status, + status?.statusCode, + status?.statusText, + payloadDto?.iotDeviceId, + payloadDto?.payloadDecoderId + ); + }; + + public onSendError = async (err: Error, datatarget: DataTarget, payloadDto: TransformedPayloadDto) => { + await this.handleDataTargetLogCommon( + datatarget, + SendStatus.ERROR, + undefined, + // TODO: Is there ANY risk of user-data in this error? E.g. if JSON-serialize fails on a specific prop?? + "" + err, + payloadDto?.iotDeviceId, + payloadDto?.payloadDecoderId + ); + }; + + private async handleDataTargetLogCommon( + datatarget: DataTarget, + status: SendStatus, + statusCode?: number, + message?: string, + iotDeviceId?: number, + payloadDecoderId?: number + ) { + // Ensures we only keep the last X number of events (specified in config) for each datatarget, before we delete the oldest ones + if (this.datatargetLogMaxEvents) { + const oldEventsToDelete = await this.datatargetLogRepository.find({ + where: { datatarget: { id: datatarget.id } }, + order: { createdAt: "DESC" }, + skip: this.datatargetLogMaxEvents - 1, + }); + if (oldEventsToDelete?.length) { + await this.datatargetLogRepository.remove(oldEventsToDelete); + } + } + // Insert new event + const logEntity: DatatargetLog = { + // Meta-columns should be auto-populated by DB/ORM + id: undefined, + createdAt: undefined, + updatedAt: undefined, + // Actual data and references + datatarget, + type: status, + message, + statusCode, + iotDevice: iotDeviceId ? ({ id: iotDeviceId } as IoTDevice) : undefined, + payloadDecoder: payloadDecoderId ? ({ id: payloadDecoderId } as PayloadDecoder) : undefined, + }; + await this.datatargetLogRepository.insert(logEntity); + } + + public async getDatatargetWithRecentError(datatargetIds: number[]): Promise> { + const dateLimit = new Date(); + dateLimit.setHours(dateLimit.getHours() - 24); + + const res = await this.datatargetLogRepository + .createQueryBuilder() + .where({ + datatarget: { id: In(datatargetIds) }, + type: SendStatus.ERROR, + createdAt: MoreThan(dateLimit), + }) + .select('"datatargetId"') + .distinct(true) + .getRawMany(); + + return new Set(res.map(row => row.datatargetId)); + } +} diff --git a/src/services/data-targets/data-target.service.ts b/src/services/data-targets/data-target.service.ts index cee648f7..71560b2e 100644 --- a/src/services/data-targets/data-target.service.ts +++ b/src/services/data-targets/data-target.service.ts @@ -1,6 +1,6 @@ import { CreateDataTargetDto } from "@dto/create-data-target.dto"; import { CreateOpenDataDkDatasetDto } from "@dto/create-open-data-dk-dataset.dto"; -import { ListAllDataTargetsResponseDto } from "@dto/list-all-data-targets-response.dto"; +import { DataTargetDto, ListAllDataTargetsResponseDto } from "@dto/list-all-data-targets-response.dto"; import { ListAllDataTargetsDto } from "@dto/list-all-data-targets.dto"; import { OddkMailInfo } from "@dto/oddk-mail-info.dto"; import { UpdateDataTargetDto } from "@dto/update-data-target.dto"; @@ -9,6 +9,7 @@ import { FiwareDataTarget } from "@entities/fiware-data-target.entity"; import { HttpPushDataTarget } from "@entities/http-push-data-target.entity"; import { MqttDataTarget } from "@entities/mqtt-data-target.entity"; import { OpenDataDkDataset } from "@entities/open-data-dk-dataset.entity"; +import { User } from "@entities/user.entity"; import { dataTargetTypeMap } from "@enum/data-target-type-mapping"; import { DataTargetType } from "@enum/data-target-type.enum"; import { ErrorCodes } from "@enum/error-codes.enum"; @@ -18,7 +19,7 @@ import { ApplicationService } from "@services/device-management/application.serv import { OS2IoTMail } from "@services/os2iot-mail.service"; import { DeleteResult, Repository, SelectQueryBuilder } from "typeorm"; import { CLIENT_SECRET_PROVIDER, ClientSecretProvider } from "../../helpers/fiware-token.helper"; -import { User } from "@entities/user.entity"; +import { DataTargetLogService } from "./data-target-log.service"; @Injectable() export class DataTargetService { @@ -31,7 +32,8 @@ export class DataTargetService { private applicationService: ApplicationService, @Inject(CLIENT_SECRET_PROVIDER) private clientSecretProvider: ClientSecretProvider, - private oS2IoTMail: OS2IoTMail + private oS2IoTMail: OS2IoTMail, + private dataTargetLogService: DataTargetLogService ) {} private readonly logger = new Logger(DataTargetService.name); @@ -51,8 +53,13 @@ export class DataTargetService { const [result, total] = await queryBuilder.getManyAndCount(); + const idsWithRecentError = await this.dataTargetLogService.getDatatargetWithRecentError(result.map(dt => dt.id)); + const resultWithErrorInfo = result.map( + dt => ({ ...dt, hasRecentErrors: idsWithRecentError.has(dt.id) } as DataTargetDto) + ); + return { - data: result, + data: resultWithErrorInfo, count: total, }; } @@ -84,6 +91,12 @@ export class DataTargetService { }); } + public async findOneWithHasRecentError(id: number): Promise { + const datatarget = await this.findOne(id); + const idsWithRecentError = await this.dataTargetLogService.getDatatargetWithRecentError([id]); + return { ...datatarget, hasRecentErrors: idsWithRecentError.has(id) }; + } + async findDataTargetsByApplicationId(applicationId: number): Promise { return await this.dataTargetRepository.findBy({ application: { id: applicationId }, @@ -261,4 +274,12 @@ export class DataTargetService { }); return true; } + + public async updateLastMessageDate(datatargetId: number) { + this.dataTargetRepository.update( + { id: datatargetId }, + // Note: The "updatedAt"-part here prevents the updatedAt/updatedBy to be overwritten with unhelpful data from the automatic update of lastMessageDate + { lastMessageDate: new Date(), updatedAt: () => '"updatedAt"' } + ); + } } diff --git a/src/services/data-targets/fiware-data-target.service.ts b/src/services/data-targets/fiware-data-target.service.ts index 6ed81d7e..f8bea708 100644 --- a/src/services/data-targets/fiware-data-target.service.ts +++ b/src/services/data-targets/fiware-data-target.service.ts @@ -1,5 +1,5 @@ import { Injectable, Logger } from "@nestjs/common"; -import { AxiosRequestConfig } from "axios"; +import { AxiosError, AxiosRequestConfig, isAxiosError } from "axios"; import { AuthorizationType } from "@enum/authorization-type.enum"; import { DataTarget } from "@entities/data-target.entity"; @@ -47,13 +47,15 @@ export class FiwareDataTargetService extends BaseDataTargetService { this.logger.debug(`FiwareDataTarget result: '${JSON.stringify(result.data)}'`); if (!result.status.toString().startsWith("2")) { + // Note: Axios actually throws on non-2xx codes, so this should never happen (will be handled in catch-block instead) this.logger.warn(`Got a non-2xx status-code: ${result.status.toString()} and message: ${result.statusText}`); } - return this.success(target); + return this.success(target, result.status, result.statusText); } catch (err) { this.logger.error(`FiwareDataTarget got error: ${err}`); await this.authenticationTokenProvider.clearConfig(config); - return this.failure(target, err, dataTarget); + const axiosErrResp = isAxiosError(err) ? (err as AxiosError)?.response : undefined; + return this.failure(target, err, dataTarget, axiosErrResp?.status, axiosErrResp?.statusText); } } diff --git a/src/services/data-targets/http-push-data-target.service.ts b/src/services/data-targets/http-push-data-target.service.ts index 45d29a8d..ba131e68 100644 --- a/src/services/data-targets/http-push-data-target.service.ts +++ b/src/services/data-targets/http-push-data-target.service.ts @@ -8,7 +8,7 @@ import { HttpPushDataTargetData } from "@interfaces/http-push-data-target-data.i import { HttpService } from "@nestjs/axios"; import { Injectable, Logger } from "@nestjs/common"; import { BaseDataTargetService } from "@services/data-targets/base-data-target.service"; -import { AxiosRequestConfig } from "axios"; +import { AxiosError, AxiosRequestConfig, isAxiosError } from "axios"; @Injectable() export class HttpPushDataTargetService extends BaseDataTargetService { @@ -35,13 +35,15 @@ export class HttpPushDataTargetService extends BaseDataTargetService { this.logger.debug(`HttpPushDataTarget result: '${JSON.stringify(result.data)}'`); if (!result.status.toString().startsWith("2")) { + // Note: Axios actually throws on non-2xx codes, so this should never happen (will be handled in catch-block instead) this.logger.warn(`Got a non-2xx status-code: ${result.status.toString()} and message: ${result.statusText}`); } - return this.success(target); + return this.success(target, result.status, result.statusText); } catch (err) { // TODO: Error handling for common errors this.logger.error(`HttpPushDataTarget got error: ${err}`); - return this.failure(target, err, datatarget); + const axiosErrResp = isAxiosError(err) ? (err as AxiosError)?.response : undefined; + return this.failure(target, err, datatarget, axiosErrResp?.status, axiosErrResp?.statusText); } } diff --git a/src/services/data-targets/mqtt-data-target.service.ts b/src/services/data-targets/mqtt-data-target.service.ts index c81ef85c..610ec1bc 100644 --- a/src/services/data-targets/mqtt-data-target.service.ts +++ b/src/services/data-targets/mqtt-data-target.service.ts @@ -5,14 +5,13 @@ import { MqttDataTarget } from "@entities/mqtt-data-target.entity"; import { DataTargetType } from "@enum/data-target-type.enum"; import { DataTargetSendStatus } from "@interfaces/data-target-send-status.interface"; import { MqttDataTargetConfiguration } from "@interfaces/mqtt-data-target-configuration.interface"; -import { HttpService } from "@nestjs/axios"; import { Injectable, Logger } from "@nestjs/common"; import * as mqtt from "mqtt"; import { BaseDataTargetService } from "./base-data-target.service"; @Injectable() export class MqttDataTargetService extends BaseDataTargetService { - constructor(private httpService: HttpService) { + constructor() { super(); } @@ -21,7 +20,18 @@ export class MqttDataTargetService extends BaseDataTargetService { send( datatarget: DataTarget, dto: TransformedPayloadDto, - onDone: (status: DataTargetSendStatus, targetType: DataTargetType) => void + onDone: ( + status: DataTargetSendStatus, + targetType: DataTargetType, + datatarget: DataTarget, + payloadDto: TransformedPayloadDto + ) => void, + onSendError: ( + err: Error, + targetType: DataTargetType, + datatarget: DataTarget, + payloadDto: TransformedPayloadDto + ) => void ): void { const config: MqttDataTargetConfiguration = (datatarget as MqttDataTarget).toConfiguration(); @@ -38,21 +48,54 @@ export class MqttDataTargetService extends BaseDataTargetService { }); const targetForLogging = `MqttDataTarget(URL '${config.url}', topic '${config.topic}')`; - client.once("connect", () => { - client.publish(config.topic, JSON.stringify(dto.payload), { qos: config.qos }, (err, packet) => { - try { - if (err) { - const status = this.failure(targetForLogging, err?.message, datatarget); - onDone(status, DataTargetType.MQTT); - } else { - this.logger.debug("Packet received: " + JSON.stringify(packet)); - const status = this.success(targetForLogging); - onDone(status, DataTargetType.MQTT); + client + .once("connect", () => { + client.publish(config.topic, JSON.stringify(dto.payload), { qos: config.qos }, (err, packet) => { + try { + const responseInfo = this.decodeMqttResponse(packet); + if (err) { + const status = this.failure( + targetForLogging, + err?.message, + datatarget, + responseInfo?.reasonCode, + responseInfo?.reasonString + ); + onDone(status, DataTargetType.MQTT, datatarget, dto); + } else { + this.logger.debug("Packet received: " + JSON.stringify(packet)); + const status = this.success(targetForLogging, responseInfo?.reasonCode, responseInfo?.reasonString); + onDone(status, DataTargetType.MQTT, datatarget, dto); + } + } finally { + client.end(); } - } finally { - client.end(); - } - }); - }); + }); + }) + .once("error", err => onSendError(err, DataTargetType.MQTT, datatarget, dto)); + } + + private decodeMqttResponse( + packet: mqtt.Packet | undefined + ): { reasonString?: string; reasonCode?: number } | undefined { + // Some of the packet-types have no usefull info at all + if ( + !packet || + packet.cmd === "pingreq" || + packet.cmd === "pingresp" || + packet.cmd === "publish" || + packet.cmd === "connect" + ) { + return undefined; + } + // A few special packets have reason-info other than the reasonString + if (packet.cmd === "connack") { + return { reasonCode: packet.reasonCode, reasonString: "" + packet.returnCode }; + } + // The remaining packet-types (which is most of them) will have the reasonString, and all except 2 will also have reasonCode + return { + reasonString: packet.properties?.reasonString, + reasonCode: packet.cmd !== "subscribe" && packet.cmd !== "unsubscribe" ? packet.reasonCode : undefined, + }; } }