Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rabbitmq): add support for multiple connections #411

Merged
2 changes: 2 additions & 0 deletions integration/rabbitmq/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq';
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { ControllerDiscoveryModule } from './controller-discovery/controller-discovery.module';
import { NamedConnectionModule } from './named-connection/named-connection.module';
import { RpcService } from './rpc/rpc.service';

const rabbitHost =
Expand All @@ -25,6 +26,7 @@ const uri = `amqp://rabbitmq:rabbitmq@${rabbitHost}:${rabbitPort}`;
}),
}),
ControllerDiscoveryModule,
NamedConnectionModule,
],
controllers: [AppController],
providers: [RpcService],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const CONNECTION_NAME = 'test-connection';
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import {
AmqpConnectionManager,
MessageHandlerErrorBehavior,
RabbitPayload,
RabbitRPC,
} from '@golevelup/nestjs-rabbitmq';
import {
Controller,
Get,
UseGuards,
UseInterceptors,
UsePipes,
} from '@nestjs/common';
import { ReplyErrorCallback } from '../rpc/reply.error.callback';
import { TransformInterceptor } from '../transform.interceptor';
import { RpcException } from '../rpc/rpc-exception';
import { DenyGuard } from '../deny.guard';
import { ValidationPipe } from '../validation.pipe';
import { CONNECTION_NAME } from './named-connection.constants';

@Controller('named-connection')
export class NamedConnectionController {
constructor(private readonly amqpConnectionManager: AmqpConnectionManager) {}

get amqpConnection() {
return this.amqpConnectionManager.getConnection(CONNECTION_NAME);
}

@Get()
getHello(): string {
return 'Hello World!';
}

@Get('rpc')
async getRpc() {
return this.amqpConnection.request({
exchange: 'exchange3',
routingKey: 'rpc-2',
});
}

@RabbitRPC({
connection: CONNECTION_NAME,
routingKey: 'rpc-2',
exchange: 'exchange2',
queue: 'rpc-2',
})
rpc(message: object) {
return {
echo: message,
};
}

@RabbitRPC({
connection: CONNECTION_NAME,
routingKey: 'intercepted-rpc-2',
exchange: 'exchange2',
queue: 'intercepted-rpc-2',
})
@UseInterceptors(TransformInterceptor)
interceptedRpc() {
return {
message: 42,
};
}

@RabbitRPC({
connection: CONNECTION_NAME,
routingKey: 'piped-rpc-2',
exchange: 'exchange3',
queue: 'piped-rpc-2',
errorBehavior: MessageHandlerErrorBehavior.ACK,
errorHandler: ReplyErrorCallback,
})
@UsePipes(ValidationPipe)
pipedRpc(@RabbitPayload() message: number) {
return {
message,
};
}

@RabbitRPC({
connection: CONNECTION_NAME,
routingKey: 'piped-param-rpc-2',
exchange: 'exchange3',
queue: 'piped-param-rpc-2',
errorBehavior: MessageHandlerErrorBehavior.ACK,
errorHandler: ReplyErrorCallback,
})
// eslint-disable-next-line sonarjs/no-identical-functions
pipedParamRpc(@RabbitPayload(ValidationPipe) message: number) {
return {
message,
};
}

@RabbitRPC({
connection: CONNECTION_NAME,
routingKey: 'guarded-rpc-2',
exchange: 'exchange3',
queue: 'guarded-rpc-2',
errorBehavior: MessageHandlerErrorBehavior.ACK,
errorHandler: ReplyErrorCallback,
})
@UseGuards(DenyGuard)
guardedRpc() {
return {
message: 'success',
};
}

@RabbitRPC({
connection: CONNECTION_NAME,
routingKey: 'error-reply-rpc-2',
exchange: 'exchange3',
queue: 'error-reply-rpc-2',
errorBehavior: MessageHandlerErrorBehavior.ACK,
errorHandler: ReplyErrorCallback,
})
errorReplyRpc(message: object) {
throw new RpcException(message);
}

@RabbitRPC({
connection: CONNECTION_NAME,
routingKey: 'non-json-rpc-2',
exchange: 'exchange3',
queue: 'non-json-rpc-2',
allowNonJsonMessages: true,
})
nonJsonRpc(nonJsonMessage: any) {
return {
echo: nonJsonMessage,
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq';
import { Module } from '@nestjs/common';
import { CONNECTION_NAME } from './named-connection.constants';
import { NamedConnectionController } from './named-connection.controller';

const rabbitHost =
process.env.NODE_ENV === 'ci' ? process.env.RABBITMQ_HOST : 'localhost';
const rabbitPort =
process.env.NODE_ENV === 'ci' ? process.env.RABBITMQ_PORT : '5672';
const uri = `amqp://rabbitmq:rabbitmq@${rabbitHost}:${rabbitPort}`;

@Module({
imports: [
RabbitMQModule.forRootAsync(RabbitMQModule, {
useFactory: () => ({
name: CONNECTION_NAME,
exchanges: [
{
name: 'exchange3',
type: 'topic',
},
],
uri,
connectionInitOptions: { wait: true, reject: true, timeout: 3000 },
}),
}),
],
controllers: [NamedConnectionController],
providers: [NamedConnectionController],
})
export class NamedConnectionModule {}
14 changes: 11 additions & 3 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export interface CorrelationMessage {
}

const defaultConfig = {
name: 'default',
prefetchCount: 10,
defaultExchangeType: 'topic',
defaultRpcErrorBehavior: MessageHandlerErrorBehavior.REQUEUE,
Expand Down Expand Up @@ -143,7 +144,9 @@ export class AmqpConnection {
}

private async initCore(): Promise<void> {
this.logger.log('Trying to connect to a RabbitMQ broker');
this.logger.log(
`Trying to connect to RabbitMQ broker (${this.config.name})`
);

this._managedConnection = connect(
Array.isArray(this.config.uri) ? this.config.uri : [this.config.uri],
Expand All @@ -152,11 +155,16 @@ export class AmqpConnection {

this._managedConnection.on('connect', ({ connection }) => {
this._connection = connection;
this.logger.log('Successfully connected to a RabbitMQ broker');
this.logger.log(
`Successfully connected to RabbitMQ broker (${this.config.name})`
);
});

this._managedConnection.on('disconnect', ({ err }) => {
this.logger.error('Disconnected from RabbitMQ broker', err?.stack);
this.logger.error(
`Disconnected from RabbitMQ broker (${this.config.name})`,
err?.stack
);
});

const defaultChannel: { name: string; config: RabbitMQChannelConfig } = {
Expand Down
19 changes: 19 additions & 0 deletions packages/rabbitmq/src/amqp/connectionManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { AmqpConnection } from './connection';

export class AmqpConnectionManager {
andrewda marked this conversation as resolved.
Show resolved Hide resolved
private connections: AmqpConnection[] = [];

addConnection(connection: AmqpConnection) {
this.connections.push(connection);
}

getConnection(name: string) {
return this.connections.find(
(connection) => connection.configuration.name === name
);
}

getConnections() {
return this.connections;
}
}
1 change: 1 addition & 0 deletions packages/rabbitmq/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './amqp/connection';
export * from './amqp/connectionManager';
export * from './amqp/errorBehaviors';
export * from './amqp/handlerResponses';
export * from './rabbitmq.constants';
Expand Down
2 changes: 2 additions & 0 deletions packages/rabbitmq/src/rabbitmq.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export interface QueueOptions {
}

export interface MessageHandlerOptions {
connection?: string;
exchange?: string;
routingKey?: string | string[];
queue?: string;
Expand Down Expand Up @@ -80,6 +81,7 @@ export interface ConnectionInitOptions {
export type RabbitMQChannels = Record<string, RabbitMQChannelConfig>;

export interface RabbitMQConfig {
name?: string;
uri: string | string[];
/**
* Now specifies the default prefetch count for all channels.
Expand Down
Loading