Skip to content

Commit

Permalink
feat(con-mgmt): init options,wiki,tests
Browse files Browse the repository at this point in the history
  • Loading branch information
azuker authored and WonderPanda committed Jan 12, 2020
1 parent ce79db6 commit 8ca3260
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 48 deletions.
12 changes: 6 additions & 6 deletions integration/rabbitmq/e2e/configuration.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ const rabbitHost = process.env.NODE_ENV === 'ci' ? 'rabbit' : 'localhost';
const uri = `amqp://rabbitmq:rabbitmq@${rabbitHost}:5672`;
const amqplibUri = `${uri}?heartbeat=5`;

function returnUrl() {
return {
const configFactory = () => ({
uri,
};
}
connectionManager: { heartbeatIntervalInSeconds: 5 },
connectionInit: { wait: true, reject: true }
});

class RabbitConfig {
createModuleConfig(): RabbitMQConfig {
return returnUrl();
return configFactory();
}
}

Expand Down Expand Up @@ -49,7 +49,7 @@ describe('Module Configuration', () => {
app = await Test.createTestingModule({
imports: [
RabbitMQModule.forRootAsync(RabbitMQModule, {
useFactory: returnUrl,
useFactory: configFactory,
}),
],
}).compile();
Expand Down
1 change: 1 addition & 0 deletions integration/rabbitmq/e2e/nack-and-requeue.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ describe('Nack and Requeue', () => {
},
],
uri,
connectionInit: { wait: true, reject: true },
}),
}),
],
Expand Down
22 changes: 19 additions & 3 deletions integration/rabbitmq/e2e/rpc.e2e-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
import { AmqpConnection, RabbitMQModule } from '@golevelup/nestjs-rabbitmq';
import { INestApplication } from '@nestjs/common';
import { Test } from '@nestjs/testing';
import { AppModule } from '../src/app.module';
import { RpcService } from '../src/rpc/rpc.service';

describe('Rabbit RPC', () => {
let app: INestApplication;
Expand All @@ -16,8 +16,24 @@ describe('Rabbit RPC', () => {
});

beforeAll(async () => {
const exchange = 'testRpcExhange';
const rabbitHost = process.env.NODE_ENV === 'ci' ? 'rabbit' : 'localhost';
const uri = `amqp://rabbitmq:rabbitmq@${rabbitHost}:5672`;

const moduleFixture = await Test.createTestingModule({
imports: [AppModule],
providers: [RpcService],
imports: [
RabbitMQModule.forRoot(RabbitMQModule, {
exchanges: [
{
name: exchange,
type: 'topic',
},
],
uri,
connectionInit: { wait: true, reject: true },
}),
],
}).compile();

app = moduleFixture.createNestApplication();
Expand Down
1 change: 1 addition & 0 deletions integration/rabbitmq/e2e/subscribe.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ describe('Rabbit Subscribe', () => {
},
],
uri,
connectionInit: { wait: true, reject: true },
}),
],
}).compile();
Expand Down
7 changes: 3 additions & 4 deletions packages/rabbitmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ NestJS offers an out of the box microservices experience with support for a vari

Some of the most notable missing functionality includes common messaging patterns like publish/subscribe and competing consumers.

## (Re)Connection
## Connection Management

This module leverages [`amqp-connection-manager`](https://github.com/benbria/node-amqp-connection-manager) package to persist related operations.
This module leverages [`amqp-connection-manager`](https://github.com/benbria/node-amqp-connection-manager) package to maintain connection resiliency.

Unavailability of a RabbitMQ broker still allows your application to bootstrap correctly and relevant channel setups take place whenever a connection can be established.

The same purpose applies when a connection is lost. In such cases, the module tries to reconnect and setup everything again once it is reconnected.
The same principle applies when a connection is lost. In such cases, the module tries to reconnect and set up everything again once it is reconnected.

## Usage

Expand Down
89 changes: 54 additions & 35 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const defaultConfig = {
defaultSubscribeErrorBehavior: MessageHandlerErrorBehavior.REQUEUE,
exchanges: [],
defaultRpcTimeout: 10000,
connectionInit: {},
connectionManager: {}
};

Expand Down Expand Up @@ -54,46 +55,64 @@ export class AmqpConnection {
return this._connection;
}

public async init(timeout = 3000): Promise<boolean> {
const connectAsync = new Promise<boolean>(async res => {
this._connection = amqpcon.connect(
Array.isArray(this.config.uri) ? this.config.uri : [this.config.uri],
this.config.connectionManager
);
public async init(): Promise<void> {
const {
wait = false,
timeout = 3000,
reject = false
} = this.config.connectionInit;

this.logger.log('Trying to connect to a RabbitMQ broker');
this._connection.on('connect', () => {
this.logger.log('Successfully connected to a RabbitMQ broker');
});

this._channel = this._connection.createChannel({
name: AmqpConnection.name
});

this._channel.on('connect', () => {
res(true);
this.logger.log('Successfully connected a RabbitMQ channel');
});
this._channel.on('error', (err, { name }) => {
this._rawChannel = undefined;
this.logger.log(
`Failed to setup a RabbitMQ channel - name: ${name} / error: ${
err.message
} ${err.stack}`
);
});
this._channel.on('close', () => {
this._rawChannel = undefined;
this.logger.log('Successfully closed a RabbitMQ channel');
});
if (!wait) {
return this.initCore();
}

const connectAsync = new Promise(res => this.initCore(() => res()));

const timeoutAsync = new Promise((res, rej) =>
setTimeout(
() => (reject ? rej('Failed to connect to a RabbitMQ broker') : res()),
timeout
)
);

return Promise.race<any>([connectAsync, timeoutAsync]);
}

await this._channel.addSetup(this.setupInitChannel.bind(this));
private async initCore(firstInitCallback?: () => void): Promise<void> {
this._connection = amqpcon.connect(
Array.isArray(this.config.uri) ? this.config.uri : [this.config.uri],
this.config.connectionManager
);

this.logger.log('Trying to connect to a RabbitMQ broker');
this._connection.on('connect', () => {
this.logger.log('Successfully connected to a RabbitMQ broker');
});

const timeoutAsync = new Promise<boolean>(res =>
setTimeout(() => res(false), timeout)
this._channel = this._connection.createChannel({
name: AmqpConnection.name
});

if (firstInitCallback) {
this._channel.once('connect', () => firstInitCallback());
}
this._channel.on('connect', () =>
this.logger.log('Successfully connected a RabbitMQ channel')
);
return Promise.race<boolean>([connectAsync, timeoutAsync]);
this._channel.on('error', (err, { name }) => {
this._rawChannel = undefined;
this.logger.log(
`Failed to setup a RabbitMQ channel - name: ${name} / error: ${
err.message
} ${err.stack}`
);
});
this._channel.on('close', () => {
this._rawChannel = undefined;
this.logger.log('Successfully closed a RabbitMQ channel');
});

await this._channel.addSetup(this.setupInitChannel.bind(this));
}

private async setupInitChannel(
Expand Down
7 changes: 7 additions & 0 deletions packages/rabbitmq/src/rabbitmq.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ export interface MessageHandlerOptions {
errorBehavior?: MessageHandlerErrorBehavior;
}

export interface ConnectionInitOptions {
wait?: boolean;
timeout?: number;
reject?: boolean;
}

export interface RabbitMQConfig {
uri: string | string[];
prefetchCount?: number;
Expand All @@ -54,6 +60,7 @@ export interface RabbitMQConfig {
defaultExchangeType?: string;
defaultRpcErrorBehavior?: MessageHandlerErrorBehavior;
defaultSubscribeErrorBehavior?: MessageHandlerErrorBehavior;
connectionInit?: ConnectionInitOptions;
connectionManager?: amqpConnectionManager.AmqpConnectionManagerOptions;
}

Expand Down

0 comments on commit 8ca3260

Please sign in to comment.