Una librería de eventos para NestJS con soporte opcional para RabbitMQ que proporciona una interfaz simple y potente para el manejo de mensajería asíncrona.
- ✅ Integración nativa con NestJS
- ✅ Soporte opcional para RabbitMQ
- ✅ Configuración flexible (síncrona y asíncrona)
- ✅ Interfaz única y consistente (EventsService)
- ✅ Manejo de colas y exchanges
- ✅ Consumo de mensajes con acknowledgments
- ✅ Gestión automática de conexiones
- ✅ Logging integrado
- ✅ TypeScript con tipado completo
- ✅ Fácil migración entre proveedores de mensajería
npm install @ap-dev/nestjs-eventsimport { Module } from '@nestjs/common';
import { EventsModule } from '@ap-dev/nestjs-events';
@Module({
imports: [
EventsModule.forRoot(), // Sin configuración = solo logging local
],
})
export class AppModule {}import { Module } from '@nestjs/common';
import { EventsModule } from '@ap-dev/nestjs-events';
@Module({
imports: [
EventsModule.forRoot({
rabbitmq: {
url: 'amqp://localhost:5672',
prefetch: 10,
},
}),
],
})
export class AppModule {}import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { EventsModule } from '@ap-dev/nestjs-events';
@Module({
imports: [
ConfigModule.forRoot(),
EventsModule.forRootAsync({
useFactory: async (configService: ConfigService) => ({
rabbitmq: {
url: configService.get<string>('RABBITMQ_URL', 'amqp://localhost:5672'),
prefetch: configService.get<number>('RABBITMQ_PREFETCH', 10),
},
}),
inject: [ConfigService],
}),
],
})
export class AppModule {}import { Injectable } from '@nestjs/common';
import { EventsService } from '@ap-dev/nestjs-events';
@Injectable()
export class MyService {
constructor(private readonly eventsService: EventsService) {}
async sendUserCreatedEvent(user: any) {
// Emite un evento (se enviará a RabbitMQ si está configurado)
await this.eventsService.emitEvent('user.created', user);
}
async sendToSpecificQueue(data: any) {
// Envía directamente a una cola específica
await this.eventsService.sendToQueue('notifications', data);
}
async publishToExchange(data: any) {
// Publica a un exchange con routing key
await this.eventsService.publishToExchange('user-events', 'user.updated', data);
}
async checkConnection() {
// Verifica si RabbitMQ está habilitado y conectado
const isEnabled = this.eventsService.isRabbitMQEnabled();
const isConnected = this.eventsService.isConnected();
console.log(`RabbitMQ enabled: ${isEnabled}, connected: ${isConnected}`);
}
}import { Injectable, OnModuleInit } from '@nestjs/common';
import { EventsService } from '@ap-dev/nestjs-events';
@Injectable()
export class InfrastructureService implements OnModuleInit {
constructor(private readonly eventsService: EventsService) {}
async onModuleInit() {
if (this.eventsService.isRabbitMQEnabled()) {
await this.setupInfrastructure();
}
}
private async setupInfrastructure() {
// Crear exchange
await this.eventsService.createExchange('user-events', {
type: 'topic',
durable: true,
});
// Crear cola
await this.eventsService.createQueue('user-notifications', {
durable: true,
arguments: { 'x-message-ttl': 60000 },
});
// Vincular cola al exchange con routing key
await this.eventsService.bindQueueToExchange(
'user-notifications',
'user-events',
'user.*'
);
}
}import { Injectable } from '@nestjs/common';
import { EventsService } from '@ap-dev/nestjs-events';
@Injectable()
export class ConsumerService {
constructor(private readonly eventsService: EventsService) {}
async startConsumer() {
if (!this.eventsService.isConnected()) {
console.log('RabbitMQ not connected, skipping consumer setup');
return;
}
// Consumir mensajes
const consumerTag = await this.eventsService.consume(
'user-notifications',
(message, ack, nack) => {
try {
console.log('Received:', message.content);
ack(); // Confirmar procesamiento
} catch (error) {
console.error('Error processing message:', error);
nack(true); // Rechazar y reencolar
}
},
{ noAck: false }
);
console.log('Consumer started with tag:', consumerTag);
}
}# .env
RABBITMQ_URL=amqp://user:password@localhost:5672
RABBITMQ_QUEUE=events
RABBITMQ_EXCHANGE=app-events
RABBITMQ_ROUTING_KEY=app# docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: passwordinterface RabbitMQConfig {
url: string;
queue?: string;
exchange?: string;
routingKey?: string;
exchangeType?: 'direct' | 'topic' | 'fanout' | 'headers';
durable?: boolean;
persistent?: boolean;
prefetch?: number;
}
interface MessageOptions {
queue?: string;
exchange?: string;
routingKey?: string;
persistent?: boolean;
priority?: number;
expiration?: string;
headers?: Record<string, any>;
}npm run buildnpm testnpm publishMIT