Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rabbitmq): add support for multiple connections #411

Merged
2 changes: 2 additions & 0 deletions integration/rabbitmq/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq';
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { ControllerDiscoveryModule } from './controller-discovery/controller-discovery.module';
import { NamedConnectionModule } from './named-connection/named-connection.module';
import { RpcService } from './rpc/rpc.service';

const rabbitHost =
Expand All @@ -25,6 +26,7 @@ const uri = `amqp://rabbitmq:rabbitmq@${rabbitHost}:${rabbitPort}`;
}),
}),
ControllerDiscoveryModule,
NamedConnectionModule,
],
controllers: [AppController],
providers: [RpcService],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const CONNECTION_NAME = 'test-connection';
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import {
AmqpConnectionManager,
MessageHandlerErrorBehavior,
RabbitPayload,
RabbitRPC,
} from '@golevelup/nestjs-rabbitmq';
import {
Controller,
Get,
UseGuards,
UseInterceptors,
UsePipes,
} from '@nestjs/common';
import { ReplyErrorCallback } from '../rpc/reply.error.callback';
import { TransformInterceptor } from '../transform.interceptor';
import { RpcException } from '../rpc/rpc-exception';
import { DenyGuard } from '../deny.guard';
import { ValidationPipe } from '../validation.pipe';
import { CONNECTION_NAME } from './named-connection.constants';

@Controller('named-connection')
export class NamedConnectionController {
constructor(private readonly amqpConnectionManager: AmqpConnectionManager) {}

get amqpConnection() {
return this.amqpConnectionManager.getConnection(CONNECTION_NAME);
}

@Get()
getHello(): string {
return 'Hello World!';
}

@Get('rpc')
async getRpc() {
return this.amqpConnection.request({
exchange: 'exchange3',
routingKey: 'rpc-2',
});
}

@RabbitRPC({
connection: CONNECTION_NAME,
routingKey: 'rpc-2',
exchange: 'exchange2',
queue: 'rpc-2',
})
rpc(message: object) {
return {
echo: message,
};
}

@RabbitRPC({
connection: CONNECTION_NAME,
routingKey: 'intercepted-rpc-2',
exchange: 'exchange2',
queue: 'intercepted-rpc-2',
})
@UseInterceptors(TransformInterceptor)
interceptedRpc() {
return {
message: 42,
};
}

@RabbitRPC({
connection: CONNECTION_NAME,
routingKey: 'piped-rpc-2',
exchange: 'exchange3',
queue: 'piped-rpc-2',
errorBehavior: MessageHandlerErrorBehavior.ACK,
errorHandler: ReplyErrorCallback,
})
@UsePipes(ValidationPipe)
pipedRpc(@RabbitPayload() message: number) {
return {
message,
};
}

@RabbitRPC({
connection: CONNECTION_NAME,
routingKey: 'piped-param-rpc-2',
exchange: 'exchange3',
queue: 'piped-param-rpc-2',
errorBehavior: MessageHandlerErrorBehavior.ACK,
errorHandler: ReplyErrorCallback,
})
// eslint-disable-next-line sonarjs/no-identical-functions
pipedParamRpc(@RabbitPayload(ValidationPipe) message: number) {
return {
message,
};
}

@RabbitRPC({
connection: CONNECTION_NAME,
routingKey: 'guarded-rpc-2',
exchange: 'exchange3',
queue: 'guarded-rpc-2',
errorBehavior: MessageHandlerErrorBehavior.ACK,
errorHandler: ReplyErrorCallback,
})
@UseGuards(DenyGuard)
guardedRpc() {
return {
message: 'success',
};
}

@RabbitRPC({
connection: CONNECTION_NAME,
routingKey: 'error-reply-rpc-2',
exchange: 'exchange3',
queue: 'error-reply-rpc-2',
errorBehavior: MessageHandlerErrorBehavior.ACK,
errorHandler: ReplyErrorCallback,
})
errorReplyRpc(message: object) {
throw new RpcException(message);
}

@RabbitRPC({
connection: CONNECTION_NAME,
routingKey: 'non-json-rpc-2',
exchange: 'exchange3',
queue: 'non-json-rpc-2',
allowNonJsonMessages: true,
})
nonJsonRpc(nonJsonMessage: any) {
return {
echo: nonJsonMessage,
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq';
import { Module } from '@nestjs/common';
import { CONNECTION_NAME } from './named-connection.constants';
import { NamedConnectionController } from './named-connection.controller';

const rabbitHost =
process.env.NODE_ENV === 'ci' ? process.env.RABBITMQ_HOST : 'localhost';
const rabbitPort =
process.env.NODE_ENV === 'ci' ? process.env.RABBITMQ_PORT : '5672';
const uri = `amqp://rabbitmq:rabbitmq@${rabbitHost}:${rabbitPort}`;

@Module({
imports: [
RabbitMQModule.forRootAsync(RabbitMQModule, {
useFactory: () => ({
name: CONNECTION_NAME,
exchanges: [
{
name: 'exchange3',
type: 'topic',
},
],
uri,
connectionInitOptions: { wait: true, reject: true, timeout: 3000 },
}),
}),
],
controllers: [NamedConnectionController],
providers: [NamedConnectionController],
})
export class NamedConnectionModule {}
1 change: 1 addition & 0 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export interface CorrelationMessage {
}

const defaultConfig = {
name: 'default',
prefetchCount: 10,
defaultExchangeType: 'topic',
defaultRpcErrorBehavior: MessageHandlerErrorBehavior.REQUEUE,
Expand Down
19 changes: 19 additions & 0 deletions packages/rabbitmq/src/amqp/connectionManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { AmqpConnection } from './connection';

export class AmqpConnectionManager {
andrewda marked this conversation as resolved.
Show resolved Hide resolved
private connections: AmqpConnection[] = [];

addConnection(connection: AmqpConnection) {
this.connections.push(connection);
}

getConnection(name: string) {
return this.connections.find(
(connection) => connection.configuration.name === name
);
}

getConnections() {
return this.connections;
}
}
1 change: 1 addition & 0 deletions packages/rabbitmq/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './amqp/connection';
export * from './amqp/connectionManager';
export * from './amqp/errorBehaviors';
export * from './amqp/handlerResponses';
export * from './rabbitmq.constants';
Expand Down
2 changes: 2 additions & 0 deletions packages/rabbitmq/src/rabbitmq.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export interface QueueOptions {
}

export interface MessageHandlerOptions {
connection?: string;
exchange?: string;
routingKey?: string | string[];
queue?: string;
Expand Down Expand Up @@ -80,6 +81,7 @@ export interface ConnectionInitOptions {
export type RabbitMQChannels = Record<string, RabbitMQChannelConfig>;

export interface RabbitMQConfig {
name?: string;
uri: string | string[];
/**
* Now specifies the default prefetch count for all channels.
Expand Down
Loading