Skip to content

Commit

Permalink
feat(rabbitmq): optional direct reply-to
Browse files Browse the repository at this point in the history
fix #109
  • Loading branch information
WonderPanda committed Mar 26, 2020
1 parent 88c6295 commit 3b7625c
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 18 deletions.
73 changes: 73 additions & 0 deletions integration/rabbitmq/e2e/rpc-no-direct-reply.e2e-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import {
AmqpConnection,
RabbitMQModule,
RabbitRPC,
} from '@golevelup/nestjs-rabbitmq';
import { INestApplication, Injectable } from '@nestjs/common';
import { Test } from '@nestjs/testing';

const testHandler = jest.fn();

const prefix = 'testRpcNoDirectReply';
const exchange = prefix;
const routingKey = `${prefix}Route`;

@Injectable()
class RpcService {
@RabbitRPC({
exchange,
routingKey: [routingKey],
queue: `${prefix}Queue`,
})
handleSubscribe(message: object) {
testHandler(message);
return 'pong';
}
}

describe('Rabbit Direct Reply To', () => {
let app: INestApplication;
let amqpConnection: AmqpConnection;

const rabbitHost = process.env.NODE_ENV === 'ci' ? 'rabbit' : 'localhost';
const uri = `amqp://rabbitmq:rabbitmq@${rabbitHost}:5672`;

beforeEach(async () => {
const moduleFixture = await Test.createTestingModule({
providers: [RpcService],
imports: [
RabbitMQModule.forRoot(RabbitMQModule, {
exchanges: [
{
name: exchange,
type: 'topic',
},
],
uri,
connectionInitOptions: { wait: true, reject: true, timeout: 3000 },
enableDirectReplyTo: false,
}),
],
}).compile();

app = moduleFixture.createNestApplication();
amqpConnection = app.get<AmqpConnection>(AmqpConnection);
await app.init();
});

it('should not receive subscribe messages because register handlers is disabled', async done => {
await expect(
amqpConnection.request({
exchange,
routingKey,
payload: 'ping',
timeout: 2000,
}),
).rejects.toThrow();

setTimeout(() => {
expect(testHandler).not.toHaveBeenCalled();
done();
}, 50);
});
});
4 changes: 2 additions & 2 deletions integration/rabbitmq/e2e/subscribe-no-handlers.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class SubscribeService {
}
}

describe('Rabbit Subscribe', () => {
describe('Rabbit Subscribe Without Register Handlers', () => {
let app: INestApplication;
let amqpConnection: AmqpConnection;

Expand Down Expand Up @@ -54,7 +54,7 @@ describe('Rabbit Subscribe', () => {
await app.init();
});

it('should receive subscribe messages and handle them', async done => {
it('should not receive subscribe messages because register handlers is disabled', async done => {
[routingKey1, routingKey2].forEach((x, i) =>
amqpConnection.publish(exchange, x, `testMessage-${i}`),
);
Expand Down
22 changes: 13 additions & 9 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
import { Logger } from '@nestjs/common';
import * as amqpcon from 'amqp-connection-manager';
import * as amqplib from 'amqplib';
import { interval, race, Subject, throwError, empty } from 'rxjs';
import { empty, interval, race, Subject, throwError } from 'rxjs';
import {
catchError,
filter,
first,
map,
take,
timeoutWith,
catchError
timeoutWith
} from 'rxjs/operators';
import * as uuid from 'uuid';
import * as amqpcon from 'amqp-connection-manager';
import {
ConnectionInitOptions,
MessageHandlerErrorBehavior,
MessageHandlerOptions,
RabbitMQConfig,
RequestOptions,
ConnectionInitOptions
RequestOptions
} from '../rabbitmq.interfaces';
import { Nack, RpcResponse, SubscribeResponse } from './handlerResponses';
import { Logger } from '@nestjs/common';

const DIRECT_REPLY_QUEUE = 'amq.rabbitmq.reply-to';

Expand All @@ -40,7 +40,8 @@ const defaultConfig = {
reject: true
},
connectionManagerOptions: {},
registerHandlers: true
registerHandlers: true,
enableDirectReplyTo: true
};

export class AmqpConnection {
Expand Down Expand Up @@ -152,7 +153,10 @@ export class AmqpConnection {
);

await channel.prefetch(this.config.prefetchCount);
await this.initDirectReplyQueue(channel);

if (this.config.enableDirectReplyTo) {
await this.initDirectReplyQueue(channel);
}

this.initialized.next();
}
Expand Down
3 changes: 2 additions & 1 deletion packages/rabbitmq/src/rabbitmq.interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as amqplib from 'amqplib';
import * as amqpConnectionManager from 'amqp-connection-manager';
import * as amqplib from 'amqplib';

export interface RabbitMQExchangeConfig {
name: string;
Expand Down Expand Up @@ -64,6 +64,7 @@ export interface RabbitMQConfig {
connectionInitOptions?: ConnectionInitOptions;
connectionManagerOptions?: amqpConnectionManager.AmqpConnectionManagerOptions;
registerHandlers?: boolean;
enableDirectReplyTo?: boolean;
}

export type RabbitHandlerType = 'rpc' | 'subscribe';
Expand Down
22 changes: 16 additions & 6 deletions packages/rabbitmq/src/rabbitmq.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,22 @@ export class RabbitMQModule

const { exchange, routingKey, queue } = config;

this.logger.log(
`${discoveredMethod.parentClass.name}.${
discoveredMethod.methodName
} {${config.type}} -> ${exchange}::${routingKey}::${queue ||
'amqpgen'}`
);
const handlerDisplayName = `${discoveredMethod.parentClass.name}.${
discoveredMethod.methodName
} {${config.type}} -> ${exchange}::${routingKey}::${queue ||
'amqpgen'}`;

if (
config.type === 'rpc' &&
!this.amqpConnection.configuration.enableDirectReplyTo
) {
this.logger.warn(
`Direct Reply-To Functionality is disabled. RPC handler ${handlerDisplayName} will not be registered`
);
return;
}

this.logger.log(handlerDisplayName);

return config.type === 'rpc'
? this.amqpConnection.createRpc(handler, config)
Expand Down

0 comments on commit 3b7625c

Please sign in to comment.