Skip to content

Commit

Permalink
fix(channel closed): If channel is closed and we try to send a messag…
Browse files Browse the repository at this point in the history
…e, emit an error on the connection and throw the error
  • Loading branch information
Sebastien Ritz committed Dec 13, 2019
1 parent 8aa9121 commit 60f3032
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 5 deletions.
2 changes: 1 addition & 1 deletion .nvmrc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
7.10.0
10
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@hapiness/rabbitmq",
"version": "1.7.0",
"version": "1.7.1",
"description": "Hapiness module for rabbitmq",
"main": "commonjs/index.js",
"types": "index.d.ts",
Expand Down
36 changes: 34 additions & 2 deletions src/module/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,27 @@ import * as _has from 'lodash.has';
import * as _pick from 'lodash.pick';
import { MessageOptions } from './interfaces';
import { events } from './events';
import { ConnectionManager } from './managers';

function handleChannelClosedError(cb, ...options) {
try {
cb(...options);
} catch (err) {
if (err.message && err.message.toLowerCase().includes('channel closed')) {
// Add the code for this error, so it can be handled later
(<any>err).code = 'CHANNEL_CLOSED_ERROR';

// Emit the error on the connection object
(<ConnectionManager>(events.connection)).emit('error', err);
}

throw err;
}
}

export function sendMessage(ch: ChannelInterface, message: any, options: MessageOptions = {}): boolean {
let { json } = Object.assign({ json: true }, options);

const allowedOptions = [
'expiration',
'userId',
Expand All @@ -25,6 +43,7 @@ export function sendMessage(ch: ChannelInterface, message: any, options: Message
'type',
'appId'
];

const publishOptions = _pick(options, allowedOptions);

if (!ch) {
Expand Down Expand Up @@ -62,15 +81,28 @@ export function sendMessage(ch: ChannelInterface, message: any, options: Message
queue: options.queue,
content: message
});
return ch.sendToQueue(options.queue, encodedMessage, publishOptions);

return handleChannelClosedError(
ch.sendToQueue,
options.queue,
encodedMessage,
publishOptions
);
} else if (options.exchange) {
events.message.emit('sent', {
publishOptions,
exchange: options.exchange,
routingKey: options.routingKey,
content: message
});
return ch.publish(options.exchange, options.routingKey, encodedMessage, publishOptions);

return handleChannelClosedError(
ch.publish,
options.exchange,
options.routingKey,
encodedMessage,
publishOptions
);
} else {
throw new Error('Specify a queue or an exchange');
}
Expand Down
2 changes: 2 additions & 0 deletions src/module/services/message.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import { Channel } from 'amqplib';

@Injectable()
export class MessageService {

private _sendMessage;

constructor(private _channelService: ChannelService) {
this._sendMessage = sendMessage;
}
Expand Down

0 comments on commit 60f3032

Please sign in to comment.