Skip to content

Commit

Permalink
feat(rabbitmq): error behaviour for replying error
Browse files Browse the repository at this point in the history
  • Loading branch information
perf2711 authored and WonderPanda committed Apr 16, 2020
1 parent 29b9683 commit e438a2a
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 31 deletions.
119 changes: 89 additions & 30 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import {
first,
map,
take,
timeoutWith
timeoutWith,
} from 'rxjs/operators';
import * as uuid from 'uuid';
import {
ConnectionInitOptions,
MessageHandlerErrorBehavior,
MessageHandlerOptions,
RabbitMQConfig,
RequestOptions
RequestOptions,
} from '../rabbitmq.interfaces';
import { Nack, RpcResponse, SubscribeResponse } from './handlerResponses';

Expand All @@ -37,11 +37,11 @@ const defaultConfig = {
connectionInitOptions: {
wait: true,
timeout: 5000,
reject: true
reject: true,
},
connectionManagerOptions: {},
registerHandlers: true,
enableDirectReplyTo: true
enableDirectReplyTo: true,
};

export class AmqpConnection {
Expand Down Expand Up @@ -84,7 +84,7 @@ export class AmqpConnection {
public async init(): Promise<void> {
const options: Required<ConnectionInitOptions> = {
...defaultConfig.connectionInitOptions,
...this.config.connectionInitOptions
...this.config.connectionInitOptions,
};
const { wait, timeout: timeoutInterval, reject } = options;

Expand All @@ -102,7 +102,7 @@ export class AmqpConnection {
)
)
),
catchError(err => (reject ? throwError(err) : empty()))
catchError((err) => (reject ? throwError(err) : empty()))
)
.toPromise<any>();
}
Expand All @@ -121,7 +121,7 @@ export class AmqpConnection {
});

this._managedChannel = this._managedConnection.createChannel({
name: AmqpConnection.name
name: AmqpConnection.name,
});

this._managedChannel.on('connect', () =>
Expand All @@ -136,15 +136,15 @@ export class AmqpConnection {
this.logger.log('Successfully closed a RabbitMQ channel')
);

await this._managedChannel.addSetup(c => this.setupInitChannel(c));
await this._managedChannel.addSetup((c) => this.setupInitChannel(c));
}

private async setupInitChannel(
channel: amqplib.ConfirmChannel
): Promise<void> {
this._channel = channel;

this.config.exchanges.forEach(async x =>
this.config.exchanges.forEach(async (x) =>
channel.assertExchange(
x.name,
x.type || this.config.defaultExchangeType,
Expand All @@ -165,20 +165,20 @@ export class AmqpConnection {
// Set up a consumer on the Direct Reply-To queue to facilitate RPC functionality
await channel.consume(
DIRECT_REPLY_QUEUE,
async msg => {
async (msg) => {
if (msg == null) {
return;
}

const correlationMessage: CorrelationMessage = {
correlationId: msg.properties.correlationId.toString(),
message: JSON.parse(msg.content.toString())
message: JSON.parse(msg.content.toString()),
};

this.messageSubject.next(correlationMessage);
},
{
noAck: true
noAck: true,
}
);
}
Expand All @@ -191,14 +191,14 @@ export class AmqpConnection {
const payload = requestOptions.payload || {};

const response$ = this.messageSubject.pipe(
filter(x => x.correlationId === correlationId),
map(x => x.message as T),
filter((x) => x.correlationId === correlationId),
map((x) => x.message as T),
first()
);

this.publish(requestOptions.exchange, requestOptions.routingKey, payload, {
replyTo: DIRECT_REPLY_QUEUE,
correlationId
correlationId,
});

const timeout$ = interval(timeout).pipe(
Expand All @@ -215,19 +215,19 @@ export class AmqpConnection {

public async createSubscriber<T>(
handler: (
msg: T,
msg: T | undefined,
rawMessage?: amqplib.ConsumeMessage
) => Promise<SubscribeResponse>,
msgOptions: MessageHandlerOptions
) {
return this._managedChannel.addSetup(channel =>
return this._managedChannel.addSetup((channel) =>
this.setupSubscriberChannel<T>(handler, msgOptions, channel)
);
}

private async setupSubscriberChannel<T>(
handler: (
msg: T,
msg: T | undefined,
rawMessage?: amqplib.ConsumeMessage
) => Promise<SubscribeResponse>,
msgOptions: MessageHandlerOptions,
Expand All @@ -243,17 +243,16 @@ export class AmqpConnection {
const routingKeys = Array.isArray(routingKey) ? routingKey : [routingKey];

await Promise.all(
routingKeys.map(x => channel.bindQueue(queue, exchange, x))
routingKeys.map((x) => channel.bindQueue(queue, exchange, x))
);

await channel.consume(queue, async msg => {
await channel.consume(queue, async (msg) => {
try {
if (msg == null) {
throw new Error('Received null message');
}

const message = JSON.parse(msg.content.toString()) as T;
const response = await handler(message, msg);
const response = await this.handleMessage(handler, msg);
if (response instanceof Nack) {
channel.nack(msg, false, response.requeue);
return;
Expand Down Expand Up @@ -282,6 +281,10 @@ export class AmqpConnection {
channel.nack(msg, false, true);
break;
}
case MessageHandlerErrorBehavior.REPLYERRORANDACK: {
this.handleReplyAndAckError(channel, msg, e);
break;
}
default:
channel.nack(msg, false, false);
}
Expand All @@ -292,19 +295,19 @@ export class AmqpConnection {

public async createRpc<T, U>(
handler: (
msg: T,
msg: T | undefined,
rawMessage?: amqplib.ConsumeMessage
) => Promise<RpcResponse<U>>,
rpcOptions: MessageHandlerOptions
) {
return this._managedChannel.addSetup(channel =>
return this._managedChannel.addSetup((channel) =>
this.setupRpcChannel<T, U>(handler, rpcOptions, channel)
);
}

public async setupRpcChannel<T, U>(
handler: (
msg: T,
msg: T | undefined,
rawMessage?: amqplib.ConsumeMessage
) => Promise<RpcResponse<U>>,
rpcOptions: MessageHandlerOptions,
Expand All @@ -320,18 +323,16 @@ export class AmqpConnection {
const routingKeys = Array.isArray(routingKey) ? routingKey : [routingKey];

await Promise.all(
routingKeys.map(x => channel.bindQueue(queue, exchange, x))
routingKeys.map((x) => channel.bindQueue(queue, exchange, x))
);

await channel.consume(queue, async msg => {
await channel.consume(queue, async (msg) => {
try {
if (msg == null) {
throw new Error('Received null message');
}

const message = JSON.parse(msg.content.toString()) as T;
const response = await handler(message, msg);

const response = await this.handleMessage(handler, msg);
if (response instanceof Nack) {
channel.nack(msg, false, response.requeue);
return;
Expand All @@ -357,6 +358,10 @@ export class AmqpConnection {
channel.nack(msg, false, true);
break;
}
case MessageHandlerErrorBehavior.REPLYERRORANDACK: {
this.handleReplyAndAckError(channel, msg, e);
break;
}
default:
channel.nack(msg, false, false);
}
Expand All @@ -383,4 +388,58 @@ export class AmqpConnection {
options
);
}

private handleReplyAndAckError(
channel: amqplib.Channel,
msg: amqplib.ConsumeMessage,
error: any
) {
try {
const { replyTo, correlationId } = msg.properties;
if (replyTo) {
this.publishError('', replyTo, error, { correlationId });
} else {
channel.nack(msg, false, false);
}
} catch {
channel.nack(msg, false, true);
}
}

private publishError(
exchange: string,
routingKey: string,
error: any,
options?: amqplib.Options.Publish
) {
if (error instanceof Error) {
error = {
name: error.name,
message: error.message,
stack: error.stack,
};
}

this.publish(exchange, routingKey, error, options);
}

private handleMessage<T, U>(
handler: (
msg: T | undefined,
rawMessage?: amqplib.ConsumeMessage
) => Promise<U>,
msg: amqplib.ConsumeMessage
) {
let message: T | undefined = undefined;
if (msg.content) {
try {
message = JSON.parse(msg.content.toString()) as T;
} catch {
// Let handler handle parsing error, it has the raw message anyway
message = undefined;
}
}

return handler(message, msg);
}
}
8 changes: 7 additions & 1 deletion packages/rabbitmq/src/rabbitmq.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ export interface QueueOptions {
export enum MessageHandlerErrorBehavior {
ACK,
NACK,
REQUEUE
REQUEUE,
/**
* If an exception occurs while handling the message, the error will be serialized and published on the `replyTo` queue.
* If `replyTo` is not provided, the message will be NACKed without requeueing.
* If publish fails, message will be NACKed and requeued.
*/
REPLYERRORANDACK,
}

export interface MessageHandlerOptions {
Expand Down

0 comments on commit e438a2a

Please sign in to comment.