Skip to content

Commit

Permalink
fix: allow non-pubsub events publishing
Browse files Browse the repository at this point in the history
  • Loading branch information
Maxim Ciuchitu committed Oct 25, 2021
1 parent a360ed4 commit b24b829
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 38 deletions.
5 changes: 1 addition & 4 deletions src/CqrsModule.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import type { DynamicModule, OnApplicationBootstrap } from '@nestjs/common';
import { Logger, Module } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { CqrsModule as NestCqrsModule } from '@nestjs/cqrs';
import type { ICqrsModuleAsyncOptions, ICqrsModuleOptions } from './interface';
import { ConfigProvider, ConnectionProvider, LoggerProvider } from './provider';
import { CommandBus, Consumer, EventBus, ExplorerService, Producer, Publisher, PubSubReflector, QueryBus } from './service';
import { CommandBus, Consumer, EventBus, ExplorerService, Producer, PubSubReflector, QueryBus } from './service';
import { CQRS_MODULE_CONSUMER_OPTIONS, CQRS_MODULE_OPTIONS, DEFAULT_CONSUMER_OPTIONS } from './utils/configuration';

@Module({
Expand Down Expand Up @@ -46,7 +45,6 @@ export class CqrsModule implements OnApplicationBootstrap {
private readonly eventsBus: EventBus,
private readonly commandsBus: CommandBus,
private readonly queryBus: QueryBus,
private readonly moduleRef: ModuleRef,
) {}

static forRoot(options: ICqrsModuleOptions): DynamicModule {
Expand Down Expand Up @@ -84,7 +82,6 @@ export class CqrsModule implements OnApplicationBootstrap {
this.eventsBus.registerSagas(sagas);
this.eventsBus.register(events);

this.eventsBus.publisher = new Publisher(this.eventsBus.subject$, this.moduleRef.get(Producer));
await this.eventsBus.registerPubsubEvents(this.explorerService.pubsubEvents());
}
}
83 changes: 50 additions & 33 deletions src/service/EventBus.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { LoggerService, Type } from '@nestjs/common';
import { Injectable } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import type { IEventHandler } from '@nestjs/cqrs';
import type { IEvent, IEventHandler } from '@nestjs/cqrs';
import { EventBus as NestEventBus } from '@nestjs/cqrs';
import type { ConsumeMessage } from 'amqplib';
import { escapeRegExp, omit } from 'lodash';
Expand All @@ -13,32 +13,45 @@ import { LoggerProvider } from '../provider';
import { toEventName } from '../utils';
import { CommandBus } from './CommandBus';
import { Consumer } from './Consumer';
import { Producer } from './Producer';
import { Publisher } from './Publisher';
import { PubSubReflector } from './PubSubReflector';

@Injectable()
export class EventBus<EventBase extends AbstractPubsubEvent<any> = AbstractPubsubEvent<any>> extends NestEventBus<EventBase> {
export class EventBus extends NestEventBus<IEvent> {
private _pubSubPublisher: Publisher;

get publisher(): Publisher {
return this._pubSubPublisher;
}

set publisher(_pubSubPublisher: Publisher) {
this._pubSubPublisher = _pubSubPublisher;
}

constructor(
commandBus: CommandBus,
private readonly moduleRefs: ModuleRef,
private readonly consumer: Consumer,
private readonly reflector: PubSubReflector<EventBase>,
private readonly moduleRefs: ModuleRef,
private readonly reflector: PubSubReflector,
) {
super(commandBus, moduleRefs);
this.usePubSubPublisher();
}

async publish(event: EventBase): Promise<void> {
async publish<T extends IEvent>(event: T): Promise<void> {
return super.publish(event);
}

async publishAll(events: EventBase[]): Promise<void> {
async publishAll<T extends IEvent>(events: T[]): Promise<void> {
return super.publishAll(events);
}

async registerPubsubEvents(handlers: Type<AbstractPubsubHandler<EventBase>>[]): Promise<void> {
const handlersWithEvents: IHandlerWrapper<EventBase>[] = this.filterValidHandlersWithEvents(handlers);
async registerPubsubEvents(handlers: Type<AbstractPubsubHandler<AbstractPubsubEvent<any>>>[]): Promise<void> {
const handlersWithEvents: IHandlerWrapper[] = this.filterValidHandlersWithEvents(handlers);

for (const mappedHandler of handlersWithEvents) {
const { handler, autoAck = AutoAckEnum.ALWAYS_ACK }: IHandlerWrapper<EventBase> = mappedHandler;
const { handler, autoAck = AutoAckEnum.ALWAYS_ACK }: IHandlerWrapper = mappedHandler;

this.consumer.configureAutoAck(handler, autoAck);
this.consumer.addHandleCatch(handler);
Expand All @@ -52,22 +65,22 @@ export class EventBus<EventBase extends AbstractPubsubEvent<any> = AbstractPubsu
return LoggerProvider.logger;
}

protected registerPubsubHandler(handlerWrapper: IHandlerWrapper<EventBase>): void {
const { handler, eventWrappers }: IHandlerWrapper<EventBase> = handlerWrapper;
protected registerPubsubHandler(handlerWrapper: IHandlerWrapper): void {
const { handler, eventWrappers }: IHandlerWrapper = handlerWrapper;

const instance: IEventHandler<EventBase> | undefined = this.moduleRefs.get(handler, { strict: false });
const instance: IEventHandler<IEvent> | undefined = this.moduleRefs.get(handler, { strict: false });
if (!instance) {
this.logger().warn(`Could not get event handler "${handler.name}" instance`);
return;
}

eventWrappers.forEach((eventWrapper: IEventWrapper<EventBase>) => this.bind(instance, eventWrapper.event.name));
eventWrappers.forEach((eventWrapper: IEventWrapper) => this.bind(instance, eventWrapper.event.name));
}

protected async bindPubsubConsumer(handlerWrapper: IHandlerWrapper<EventBase>): Promise<void> {
const { handler, eventWrappers }: IHandlerWrapper<EventBase> = handlerWrapper;
protected async bindPubsubConsumer(handlerWrapper: IHandlerWrapper): Promise<void> {
const { handler, eventWrappers }: IHandlerWrapper = handlerWrapper;

const handlerInstance: AbstractPubsubHandler<EventBase> | undefined = this.moduleRefs.get(handler, { strict: false });
const handlerInstance: AbstractPubsubHandler<AbstractPubsubEvent<any>> | undefined = this.moduleRefs.get(handler, { strict: false });
if (!handlerInstance) {
this.logger().warn(`Could not get event handler "${handler.name}" instance`);
return;
Expand All @@ -80,8 +93,8 @@ export class EventBus<EventBase extends AbstractPubsubEvent<any> = AbstractPubsu
});
}

protected emitPubsubEvent(handlerWrapper: IHandlerWrapper<EventBase>, message: ConsumeMessage): void {
const { handler, eventWrappers }: IHandlerWrapper<EventBase> = handlerWrapper;
protected emitPubsubEvent(handlerWrapper: IHandlerWrapper, message: ConsumeMessage): void {
const { handler, eventWrappers }: IHandlerWrapper = handlerWrapper;
const typeProperty: string | unknown = message.properties.type;

const baseContext: Record<string, unknown> = { handler: handler.name, type: typeProperty };
Expand All @@ -93,13 +106,13 @@ export class EventBus<EventBase extends AbstractPubsubEvent<any> = AbstractPubsu
}

// try exact match at first
let matchedEventWrappers: IEventWrapper<EventBase>[] = eventWrappers.filter(
({ event, exchange }: IEventWrapper<EventBase>) => exchange === message.fields.exchange && toEventName(event.name) === typeProperty,
let matchedEventWrappers: IEventWrapper[] = eventWrappers.filter(
({ event, exchange }: IEventWrapper) => exchange === message.fields.exchange && toEventName(event.name) === typeProperty,
);

// fallback to binding pattern at second
if (!matchedEventWrappers.length) {
matchedEventWrappers = eventWrappers.filter((eventWrapper: IEventWrapper<EventBase>) => {
matchedEventWrappers = eventWrappers.filter((eventWrapper: IEventWrapper) => {
const bindingPattern: string = this.consumer.extractBindingPattern(eventWrapper);

return eventWrapper.exchange === message.fields.exchange && EventBus.checkTypeAgainstBinding(typeProperty, bindingPattern);
Expand All @@ -110,7 +123,7 @@ export class EventBus<EventBase extends AbstractPubsubEvent<any> = AbstractPubsu
this.logger().warn(
JSON.stringify({
...baseContext,
events: matchedEventWrappers.map((eventWrapper: IEventWrapper<EventBase>) => {
events: matchedEventWrappers.map((eventWrapper: IEventWrapper) => {
return {
name: eventWrapper.event.name,
bindingPattern: this.consumer.extractBindingPattern(eventWrapper),
Expand All @@ -123,38 +136,38 @@ export class EventBus<EventBase extends AbstractPubsubEvent<any> = AbstractPubsu
return;
}

const eventClasses = matchedEventWrappers.map((eventWrapper: IEventWrapper<EventBase>) => eventWrapper.event);
const eventClasses = matchedEventWrappers.map((eventWrapper: IEventWrapper) => eventWrapper.event);

const [firstEventClass, ...unused]: Type<EventBase>[] = eventClasses;
const [firstEventClass, ...unused]: Type<AbstractPubsubEvent<any>>[] = eventClasses;
if (unused.length) {
this.logger().warn(
JSON.stringify({
...baseContext,
used: firstEventClass.name,
unused: unused.map((event: Type<EventBase>) => event.name),
unused: unused.map((event: Type<AbstractPubsubEvent<any>>) => event.name),
message: "Handler's event" + ' intersection' + ' detected',
}),
);
}

const pubsubEvent: EventBase = new firstEventClass(JSON.parse(message.content.toString())).withMessage(message);
const pubsubEvent: AbstractPubsubEvent<any> = new firstEventClass(JSON.parse(message.content.toString())).withMessage(message);

this.subject$.next(pubsubEvent);
this.publisher.publishLocally(pubsubEvent);
}

private filterValidHandlersWithEvents(handlers: Type<AbstractPubsubHandler<EventBase>>[]): IHandlerWrapper<EventBase>[] {
const validHandlersWithEvents: IHandlerWrapper<EventBase>[] = [];
private filterValidHandlersWithEvents(handlers: Type<AbstractPubsubHandler<AbstractPubsubEvent<any>>>[]): IHandlerWrapper[] {
const validHandlersWithEvents: IHandlerWrapper[] = [];

handlers.forEach((handler: Type<AbstractPubsubHandler<EventBase>>) => {
const metadata: IPubsubEventHandlerMetadata<EventBase> | undefined = this.reflector.reflectHandlerMetadata(handler);
handlers.forEach((handler: Type<AbstractPubsubHandler<AbstractPubsubEvent<any>>>) => {
const metadata: IPubsubEventHandlerMetadata | undefined = this.reflector.reflectHandlerMetadata(handler);
if (!metadata) {
this.logger().error(`Event handler "${handler.name}" should be decorated with "${PubsubEventHandler.name}"`);
return;
}

const eventWrappers: IEventWrapper<EventBase>[] = [];
const eventWrappers: IEventWrapper[] = [];

metadata.events.forEach((event: Type<EventBase>) => {
metadata.events.forEach((event: Type<AbstractPubsubEvent<any>>) => {
const metadata: IPubsubEventOptions | undefined = this.reflector.reflectEventMetadata(event);
if (!metadata) {
this.logger().error(`Event "${event.name}" should be decorated with "${PubsubEvent.name}"`);
Expand All @@ -174,4 +187,8 @@ export class EventBus<EventBase extends AbstractPubsubEvent<any> = AbstractPubsu
// transform pattern to regexp to support "*" binding
return new RegExp(`^${escapeRegExp(bindingPattern).replace(/\\\*/g, '\\w*')}$`).test(typeProperty);
}

private usePubSubPublisher(): void {
this._pubSubPublisher = new Publisher(this.subject$, this.moduleRefs.get(Producer));
}
}
6 changes: 5 additions & 1 deletion src/service/Publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { AbstractPubsubEvent } from '../interface';
import { Producer } from './Producer';

@Injectable()
export class Publisher<EventBase extends IEvent> extends DefaultPubSub<EventBase> {
export class Publisher<EventBase extends IEvent = IEvent> extends DefaultPubSub<EventBase> {
constructor(subject$: Subject<EventBase>, private readonly producer: Producer) {
super(subject$);
}
Expand All @@ -17,6 +17,10 @@ export class Publisher<EventBase extends IEvent> extends DefaultPubSub<EventBase
return this.producer.produce(event);
}

return this.publishLocally(event);
}

publishLocally<T extends EventBase>(event: T): void {
return super.publish(event);
}
}

0 comments on commit b24b829

Please sign in to comment.