-
Notifications
You must be signed in to change notification settings - Fork 4
/
nest-rabbit-tasks.module.ts
69 lines (61 loc) · 2.41 KB
/
nest-rabbit-tasks.module.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import { OnModuleInit, Module as ModuleDecorator, DynamicModule, Logger } from '@nestjs/common';
import { HaredoChain, MessageCallback, setLoggers } from 'haredo';
import _ from 'lodash';
import {
NestRabbitTasksModuleSyncOptions,
NestRabbitTasksModuleAsyncOptions,
RabbitWorkerInterface,
} from './nest-rabbit-tasks.interfaces';
import { NestRabbitWorkerDynamic } from './nest-rabbit-worker.dynamic';
import { NestRabbitTasksExplorer } from './nest-rabbit-tasks.explorer';
interface QueueConnectionAndWorkerBindParams {
worker: RabbitWorkerInterface<any>;
queueConnection: HaredoChain | null;
}
@ModuleDecorator({})
export class NestRabbitTasksModule implements OnModuleInit {
public static registerSync(options: NestRabbitTasksModuleSyncOptions | NestRabbitTasksModuleSyncOptions[]): DynamicModule {
return {
module: NestRabbitTasksModule,
...NestRabbitWorkerDynamic.getSyncDynamics(options),
};
}
public static registerAsync(options: NestRabbitTasksModuleAsyncOptions | NestRabbitTasksModuleAsyncOptions[]): DynamicModule {
return {
module: NestRabbitTasksModule,
...NestRabbitWorkerDynamic.getAsyncDynamics(options),
};
}
public constructor(private readonly explorer: NestRabbitTasksExplorer, private readonly logger: Logger) {}
public onModuleInit() {
setLoggers({
error: this.logger.error.bind(this.logger),
info: this.logger.log.bind(this.logger),
debug: this.logger.debug.bind(this.logger),
});
this.bindMessageFromQueueToMessageHandlerInWorker();
}
private bindMessageFromQueueToMessageHandlerInWorker() {
this.explorer.explore().forEach(({ worker, queueConnection }: QueueConnectionAndWorkerBindParams) => {
if (!queueConnection) {
// The error was already reported earlier
return;
}
if (!worker.handleMessage) {
this.logger.error(`The worker (${worker.constructor.name}) have no "handleMessage" function.`);
return;
}
queueConnection
.subscribe(worker.handleMessage as MessageCallback<any>)
.then(() => {
const queueName = queueConnection.state.queue!.name;
const workerName = worker.constructor.name;
const msg = `Successfully listening on ${workerName}.handleMessage for ${queueName}`;
this.logger.log(msg);
})
.catch((err: Error) => {
this.logger.error(err.message, err.stack);
});
});
}
}