Skip to content

Commit

Permalink
Merge 71408b0 into 1c41fd6
Browse files Browse the repository at this point in the history
  • Loading branch information
antoinegomez committed Feb 27, 2019
2 parents 1c41fd6 + 71408b0 commit 77ce5c9
Show file tree
Hide file tree
Showing 15 changed files with 186 additions and 108 deletions.
4 changes: 4 additions & 0 deletions README.md
Expand Up @@ -453,6 +453,10 @@ To set up your development environment:
[Back to top](#table-of-contents)

## Change History
* v1.7.0 (2019-02-27)
* Add method to cancel consuming queue
* Refactor consume queue to allow easier consume/cancel
* Add a QueueStore to fetch all the queues manager instances
* v1.6.2 (2018-11-22)
* Create DI with providers for queues and exchanges
* v1.6.1 (2018-11-14)
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
@@ -1,6 +1,6 @@
{
"name": "@hapiness/rabbitmq",
"version": "1.6.2",
"version": "1.7.0",
"description": "Hapiness module for rabbitmq",
"main": "commonjs/index.js",
"types": "index.d.ts",
Expand Down
4 changes: 3 additions & 1 deletion src/module/extension/build-queues.ts
Expand Up @@ -10,6 +10,7 @@ import registerMessages from './register-messages';
import { MessageRouterInterface } from '../interfaces/message-router';
import { consumeQueue } from './consume-queue';
import { RabbitMQExt } from '../rabbitmq.extension';
import { QueueStore } from '../managers/queue-store';

export default function buildQueues(
modules: CoreModule[], connection: ConnectionManager, MessageRouter: Type<MessageRouterInterface>
Expand All @@ -30,7 +31,8 @@ export default function buildQueues(
.map(channel => ({ instance, metadata, channel, _module })) :
Observable.of({ instance, metadata, channel: connection.defaultChannelManager, _module }))
.flatMap(({ instance, metadata, channel, _module }) => {
const queue = new QueueManager(channel, new QueueWrapper(instance, metadata.data));
const queue = new QueueManager(channel, new QueueWrapper(instance, metadata));
QueueStore.getInstance().addQueue(queue);
const shouldAssert = typeof metadata.data.assert === 'boolean' ? metadata.data.assert : RabbitMQExt.getConfig().assert;
// Don't check queue if we assert it
const assertOrCheck$ = shouldAssert
Expand Down
4 changes: 2 additions & 2 deletions src/module/extension/consume-queue.ts
Expand Up @@ -7,8 +7,8 @@ const debug = require('debug')('hapiness:rabbitmq');

export function consumeQueue(queue: QueueManager, messageRouter: MessageRouterInterface): Observable<any> {
debug(`Creating dispatcher for queue ${queue.getName()}`);
return queue.consume(
(ch, message) => messageRouter.getDispatcher(ch, message))
queue.setDispatcher((ch, message, params) => messageRouter.getDispatcher(ch, message, params));
return queue.consume()
.catch(err => Observable.of(errorHandler(err)))
.do(() => debug('consumed'));
}
1 change: 1 addition & 0 deletions src/module/interfaces/index.ts
Expand Up @@ -10,6 +10,7 @@ export * from './message-result';
export * from './message-router';
export * from './message';
export * from './on-asserted';
export * from './queue-dispatcher-options';
export * from './queue-options';
export * from './queue';
export * from './rabbit-message';
4 changes: 3 additions & 1 deletion src/module/interfaces/message-router.ts
Expand Up @@ -4,6 +4,7 @@ import { RabbitMessage } from './rabbit-message';
import { MessageResult } from './message-result';
import { CoreModule } from '@hapiness/core';
import { MessageDecoratorInterface } from '../decorators';
import { QueueDispatcherOptions } from './queue-dispatcher-options';

export interface RegisterMessageOptions {
token: any;
Expand All @@ -13,5 +14,6 @@ export interface RegisterMessageOptions {

export interface MessageRouterInterface {
registerMessage({ token, module }: RegisterMessageOptions): Observable<any>;
getDispatcher(ch: ChannelInterface, message: RabbitMessage): Observable<() => Observable<MessageResult>>;
getDispatcher(ch: ChannelInterface, message: RabbitMessage, { queue }: QueueDispatcherOptions):
Observable<() => Observable<MessageResult>>;
}
5 changes: 5 additions & 0 deletions src/module/interfaces/queue-dispatcher-options.ts
@@ -0,0 +1,5 @@
import { QueueWrapper } from '../managers';

export interface QueueDispatcherOptions {
queue: QueueWrapper
};
1 change: 1 addition & 0 deletions src/module/managers/channel-manager.ts
Expand Up @@ -25,6 +25,7 @@ export class ChannelManager extends EventEmitter {
this._connectionManager.on('error', () => {
this._isConnected = false;
});
this.setMaxListeners(0);
}

get prefetch(): number {
Expand Down
17 changes: 9 additions & 8 deletions src/module/managers/index.ts
@@ -1,8 +1,9 @@
export * from './channel-manager';
export * from './channel-store';
export * from './connection-manager';
export * from './exchange-manager';
export * from './exchange-wrapper';
export * from './message-store';
export * from './queue-manager';
export * from './queue-wrapper';
export * from './channel-manager';
export * from './channel-store';
export * from './connection-manager';
export * from './exchange-manager';
export * from './exchange-wrapper';
export * from './message-store';
export * from './queue-manager';
export * from './queue-store';
export * from './queue-wrapper';
105 changes: 75 additions & 30 deletions src/module/managers/queue-manager.ts
@@ -1,9 +1,9 @@
import * as _pick from 'lodash.pick';
import { Observable } from 'rxjs';
import { Channel as ChannelInterface, Replies } from 'amqplib';
import { Channel as ChannelInterface, Replies, Options } from 'amqplib';
import { extractMetadataByDecorator, errorHandler } from '@hapiness/core';
import { sendMessage, decodeJSONContent } from '../message';
import { MessageResult, MessageOptions, RabbitMessage, QueueOptions, ConsumeOptions, QueueInterface, Bind } from '../interfaces';
import { MessageResult, MessageOptions, RabbitMessage, ConsumeOptions, QueueInterface, Bind, QueueDispatcherOptions } from '../interfaces';
import { ExchangeDecoratorInterface, ChannelManager } from '..';
import { QueueWrapper } from './queue-wrapper';
import { events } from '../events';
Expand All @@ -16,12 +16,17 @@ export class QueueManager {
private _ch: ChannelManager;
private _name: string;
private _queue: QueueInterface;
private _queueWrapper: QueueWrapper;
private _binds: Array<Bind>;
private _forceJsonDecode: boolean;
private _isAsserted: boolean;
private _options;
private _consuming = false;
private _consumerTag: string;
private _dispatcher: (ch: ChannelInterface, message: RabbitMessage, { queue }: QueueDispatcherOptions) =>
Observable<() => Observable<MessageResult>>;
private _options: Options.AssertQueue;

constructor(ch: ChannelManager, queue: QueueWrapper | QueueOptions) {
constructor(ch: ChannelManager, queue: QueueWrapper) {
this._ch = ch;

if (queue instanceof QueueWrapper) {
Expand All @@ -30,11 +35,7 @@ export class QueueManager {
this._binds = queue.getBinds();
this._options = _pick(queue.getAssertOptions(), QUEUE_OPTIONS);
this._forceJsonDecode = queue.getForceJsonDecode();
} else if (typeof queue === 'object') {
this._name = queue.name;
this._binds = queue.binds;
this._options = _pick(queue.options || {}, QUEUE_OPTIONS);
this._forceJsonDecode = queue.force_json_decode || false;
this._queueWrapper = queue;
} else {
throw new Error('Invalid queue parameter');
}
Expand All @@ -50,6 +51,14 @@ export class QueueManager {
return this._queue;
}

getOptions(): Options.AssertQueue {
return this._options;
}

getWrapper() {
return this._queueWrapper;
}

assert(): Observable<QueueManager> {
debug(`asserting queue ${this.getName()}...`);
return Observable.fromPromise(this._ch.getChannel().assertQueue(this.getName(), this._options)).map(_ => {
Expand All @@ -69,44 +78,67 @@ export class QueueManager {
return this._isAsserted;
}

setDispatcher(_dispatcher: (ch: ChannelInterface, message: RabbitMessage, { queue }: QueueDispatcherOptions) =>
Observable<() => Observable<MessageResult>>) {
if (typeof _dispatcher !== 'function') {
throw new Error(`dispatcher should be a function and not a ${typeof _dispatcher}`);
}

this._dispatcher = _dispatcher;
}

getDispatcher() {
return this._dispatcher;
}

consume(
_dispatcher?: (ch: ChannelInterface, message: RabbitMessage) => Observable<() => Observable<MessageResult>>,
options: ConsumeOptions = { decodeMessageContent: true, errorHandler: null, force_json_decode: false }
): Observable<Replies.Consume> {
debug(`consuming queue ${this.getName()}...`);
if (typeof options.decodeMessageContent !== 'boolean') {
options.decodeMessageContent = true;
}

let dispatcher = _dispatcher;
let defaultDispatch: (message: RabbitMessage, ch) => Observable<MessageResult>;
const dispatcher = this.getDispatcher();

// Reconsume queue when channel is reconnected
this._ch.on('reconnected', () => {
this._consume({ dispatcher, options });
});
return this._consume({ dispatcher, options });
}

private _consume({
dispatcher,
options
}): Observable<Replies.Consume> {
if (this._consuming) {
return Observable.of(null);
}

let _dispatcher = dispatcher;

let defaultDispatch: (message: RabbitMessage, ch, { queue }: QueueDispatcherOptions) => Observable<MessageResult>;

// If the @Queue does not ship an onMessage method we create a fallback
// that will ack the message
if (typeof this._queue['onMessage'] !== 'function') {
defaultDispatch = (message: RabbitMessage, ch): Observable<MessageResult> => {
defaultDispatch = (message: RabbitMessage, ch, { queue }: QueueDispatcherOptions): Observable<MessageResult> => {
// message not dispatched
debug('message not dispatched', message);
events.queueManager.emit('message_not_dispatched', message, ch);
return Observable.of({ ack: true });
};
} else {
// Otherwise the fallback of the dispatcher will be the queue onMessage function
defaultDispatch = this._queue['onMessage'].bind(this._queue);
}

if (typeof _dispatcher !== 'function') {
dispatcher = (ch: ChannelInterface, message: RabbitMessage) => Observable.of(() => defaultDispatch(message, ch));
if (typeof dispatcher !== 'function') {
_dispatcher = (ch: ChannelInterface, message: RabbitMessage) =>
Observable.of(() => defaultDispatch(message, ch, { queue: this._queueWrapper }));
}

// Reconsume queue when channel is reconnected
this._ch.on('reconnected', () => {
this._consume({ dispatcher, options, defaultDispatch });
});
return this._consume({ dispatcher, options, defaultDispatch });
}

private _consume({
dispatcher,
options,
defaultDispatch
}): Observable<Replies.Consume> {
const consumerChannel = this._ch.getChannel();
return Observable.fromPromise(
this._ch.getChannel().consume(this.getName(), message => {
Expand All @@ -121,10 +153,10 @@ export class QueueManager {

debug(`new message on queue ${this.getName()}`, _message.fields.deliveryTag);

return dispatcher(consumerChannel, _message).switchMap(dispatch => {
return _dispatcher(consumerChannel, _message, { queue: this._queueWrapper }).switchMap(dispatch => {
if (typeof dispatch !== 'function') {
debug('dispatcher did not returned a function, using defaultDispatcher');
return defaultDispatch(_message, this._ch);
debug('dispatcher did not returned a function, call default dispatch');
return defaultDispatch(_message, this._ch, { queue: this._queueWrapper });
} else {
return dispatch();
}
Expand All @@ -147,6 +179,8 @@ export class QueueManager {
}
})
)
.do(() => this._consuming = true)
.do(({ consumerTag }) => this._consumerTag = consumerTag)
.do(res => MessageStore.addConsumer(consumerChannel, res.consumerTag))
.do(res => {
// If channel is closed, has en error or is reconnected the consumerTag is not valid
Expand All @@ -158,6 +192,17 @@ export class QueueManager {
});
}

cancel() {
if (!this._consuming) {
return Observable.of(null);
}

return Observable
.fromPromise(this._ch.getChannel().cancel(this._consumerTag))
.do(() => MessageStore.removeConsumer(this._consumerTag))
.do(() => this._consuming = false);
}

handleMessageError(
message: RabbitMessage,
{ storeMessage, options, err, consumerChannel }:
Expand Down
41 changes: 41 additions & 0 deletions src/module/managers/queue-store.ts
@@ -0,0 +1,41 @@
import { QueueManager } from './queue-manager';

export class QueueStore {
private static instance: QueueStore;
private queues: QueueManager[] = [];

public static getInstance(): QueueStore {
if (!QueueStore.instance) {
QueueStore.instance = new QueueStore();
}

return QueueStore.instance;
}

addQueue(queue: QueueManager) {
this.queues.push(queue);

return this;
}

removeQueue(queue: QueueManager) {
const index = this.queues.indexOf(queue);

if (!index) {
return this;
}

this.queues.splice(index, 1);

return this;
}

removeAll() {
this.queues = [];
}

getQueues() {
return this.queues;
}
}

14 changes: 11 additions & 3 deletions src/module/managers/queue-wrapper.ts
@@ -1,20 +1,28 @@
import { QueueDecoratorInterface } from '../decorators';
import { Type } from '@hapiness/core';
import { Options } from 'amqplib';
import { QueueDecoratorInterface } from '../decorators';
import { QueueInterface, Bind } from '../interfaces';

export class QueueWrapper {
private _instance: QueueInterface;
private _meta: QueueDecoratorInterface;
private _token: Type<QueueInterface>;

constructor(instance: QueueInterface, meta: QueueDecoratorInterface) {
constructor(instance: QueueInterface, metadata: { token: Type<QueueInterface>, data: QueueDecoratorInterface }) {
this._instance = instance;
this._meta = meta;
const { data, token } = metadata;
this._meta = data;
this._token = token;
}

public getMeta(): QueueDecoratorInterface {
return this._meta;
}

public getToken(): Type<QueueInterface> {
return this._token;
}

public getAssertOptions(): Options.AssertExchange {
try {
return this.getMeta().options;
Expand Down
4 changes: 3 additions & 1 deletion test/unit/extension/init-extension.test.ts
Expand Up @@ -29,7 +29,8 @@ export class InitExtensionUnitTest {
this.ch = new ChannelManager(connection);
this.ch['ch'] = <any>new ChannelMock();
this.userQueue = new UserQueue();
this.queueWrapper = new QueueWrapper(this.userQueue, extractMetadataByDecorator(UserQueue, 'Queue'));
this.queueWrapper = new QueueWrapper(
this.userQueue, { token: UserQueue, data: extractMetadataByDecorator(UserQueue, 'Queue') });
this.messageRouter = new DefaultMessageRouter();
this.queue = new QueueManager(this.ch, this.queueWrapper);
unit.spy(this.userQueue, 'onMessage');
Expand Down Expand Up @@ -138,6 +139,7 @@ export class InitExtensionUnitTest {
@test('- Should test error on consumeQueue')
testConsumeError(done) {
const queueStub = {
setDispatcher() {},
getName: () => 'hello',
consume: () => Observable.throw(new Error('Cannot consume'))
};
Expand Down

0 comments on commit 77ce5c9

Please sign in to comment.