Skip to content

Commit

Permalink
Merge 1a08e42 into b0051eb
Browse files Browse the repository at this point in the history
  • Loading branch information
xmaIIoc committed Dec 13, 2019
2 parents b0051eb + 1a08e42 commit 86bae57
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 5 deletions.
2 changes: 1 addition & 1 deletion .nvmrc
@@ -1 +1 @@
7.10.0
10
2 changes: 2 additions & 0 deletions README.md
Expand Up @@ -453,6 +453,8 @@ To set up your development environment:
[Back to top](#table-of-contents)

## Change History
* v1.7.1 (2019-12-13)
* Handle channel closed error when sending a message to add a custom code on the thrown error
* v1.7.0 (2019-02-27)
* Add method to cancel consuming queue
* Refactor consume queue to allow easier consume/cancel
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
@@ -1,6 +1,6 @@
{
"name": "@hapiness/rabbitmq",
"version": "1.7.0",
"version": "1.7.1",
"description": "Hapiness module for rabbitmq",
"main": "commonjs/index.js",
"types": "index.d.ts",
Expand Down
36 changes: 34 additions & 2 deletions src/module/message.ts
Expand Up @@ -3,9 +3,27 @@ import * as _has from 'lodash.has';
import * as _pick from 'lodash.pick';
import { MessageOptions } from './interfaces';
import { events } from './events';
import { ConnectionManager } from './managers';

function handleChannelClosedError(cb, ...options): boolean {
try {
return cb(...options);
} catch (err) {
if (err.message && err.message.toLowerCase().includes('channel closed')) {
// Add the code for this error, so it can be handled later
(<any>err).code = 'CHANNEL_CLOSED_ERROR';

// Emit the error on the connection object
(<ConnectionManager>(events.connection)).emit('error', err);
}

throw err;
}
}

export function sendMessage(ch: ChannelInterface, message: any, options: MessageOptions = {}): boolean {
let { json } = Object.assign({ json: true }, options);

const allowedOptions = [
'expiration',
'userId',
Expand All @@ -25,6 +43,7 @@ export function sendMessage(ch: ChannelInterface, message: any, options: Message
'type',
'appId'
];

const publishOptions = _pick(options, allowedOptions);

if (!ch) {
Expand Down Expand Up @@ -62,15 +81,28 @@ export function sendMessage(ch: ChannelInterface, message: any, options: Message
queue: options.queue,
content: message
});
return ch.sendToQueue(options.queue, encodedMessage, publishOptions);

return handleChannelClosedError(
ch.sendToQueue,
options.queue,
encodedMessage,
publishOptions
);
} else if (options.exchange) {
events.message.emit('sent', {
publishOptions,
exchange: options.exchange,
routingKey: options.routingKey,
content: message
});
return ch.publish(options.exchange, options.routingKey, encodedMessage, publishOptions);

return handleChannelClosedError(
ch.publish,
options.exchange,
options.routingKey,
encodedMessage,
publishOptions
);
} else {
throw new Error('Specify a queue or an exchange');
}
Expand Down
2 changes: 2 additions & 0 deletions src/module/services/message.service.ts
Expand Up @@ -7,7 +7,9 @@ import { Channel } from 'amqplib';

@Injectable()
export class MessageService {

private _sendMessage;

constructor(private _channelService: ChannelService) {
this._sendMessage = sendMessage;
}
Expand Down

0 comments on commit 86bae57

Please sign in to comment.