From c66d7c475a905d8820d314cc3d801707ee5222bb Mon Sep 17 00:00:00 2001 From: Antoine Gomez Date: Fri, 24 Aug 2018 10:35:05 +0200 Subject: [PATCH 1/4] Refactor to allow custom MessageRouter --- README.md | 2 + package-lock.json | 2 +- package.json | 2 +- src/module/decorators.ts | 2 +- src/module/extension/build-exchange.ts | 23 ++ src/module/extension/build-queues.ts | 74 ++++ src/module/extension/consume-queue.ts | 14 + src/module/extension/get-channel.ts | 10 + src/module/extension/register-annotations.ts | 23 ++ src/module/extension/register-messages.ts | 21 ++ src/module/index.ts | 2 +- src/module/interfaces/consume-options.ts | 2 +- src/module/interfaces/message-router.ts | 17 + src/module/managers/queue-manager.ts | 2 +- src/module/message-router.ts | 31 +- src/module/message.ts | 2 +- src/module/rabbitmq.extension.ts | 23 +- src/module/register-annotations.ts | 148 -------- src/module/utils.ts | 12 +- test/integration/message.router.test.ts | 369 +++++++++++++++++++ test/mocks/ConnectionManager.ts | 2 +- test/mocks/ConnectionService.ts | 2 +- test/unit/extension/init-extension.test.ts | 25 +- test/unit/message.router.test.ts | 292 --------------- test/unit/services/channel.test.ts | 2 +- 25 files changed, 619 insertions(+), 485 deletions(-) create mode 100644 src/module/extension/build-exchange.ts create mode 100644 src/module/extension/build-queues.ts create mode 100644 src/module/extension/consume-queue.ts create mode 100644 src/module/extension/get-channel.ts create mode 100644 src/module/extension/register-annotations.ts create mode 100644 src/module/extension/register-messages.ts create mode 100644 src/module/interfaces/message-router.ts delete mode 100644 src/module/register-annotations.ts create mode 100644 test/integration/message.router.test.ts delete mode 100644 test/unit/message.router.test.ts diff --git a/README.md b/README.md index b2402bb..e0bb1cf 100644 --- a/README.md +++ b/README.md @@ -453,6 +453,8 @@ To set up your development environment: [Back to top](#table-of-contents) ## Change History +* v1.5.0 (2018-08-24) + * Add possibility to provide a custom MessageRouter * v1.4.3 (2018-08-20) * Emit RETRY_LIMIT_EXCEEDED error on ConnectionManager * v1.4.2 (2018-06-11) diff --git a/package-lock.json b/package-lock.json index f14a7b9..49f08ac 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "@hapiness/rabbitmq", - "version": "1.4.3", + "version": "1.5.0", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index db011a6..0a275ca 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@hapiness/rabbitmq", - "version": "1.4.3", + "version": "1.5.0", "description": "Hapiness module for rabbitmq", "main": "commonjs/index.js", "types": "index.d.ts", diff --git a/src/module/decorators.ts b/src/module/decorators.ts index 2dbd5a8..7a1c921 100644 --- a/src/module/decorators.ts +++ b/src/module/decorators.ts @@ -1,7 +1,7 @@ import { Type } from '@hapiness/core'; import { createDecorator, CoreDecorator } from '@hapiness/core'; import { Options } from 'amqplib'; -import { ExchangeType } from './interfaces/index'; +import { ExchangeType } from './interfaces'; export interface Bind { exchange: Type; diff --git a/src/module/extension/build-exchange.ts b/src/module/extension/build-exchange.ts new file mode 100644 index 0000000..6cc846e --- /dev/null +++ b/src/module/extension/build-exchange.ts @@ -0,0 +1,23 @@ +import { Observable } from 'rxjs/Observable'; +import { CoreModule, DependencyInjection } from '@hapiness/core'; +import { ConnectionManager } from '../managers/connection-manager'; +import { metadataFromDeclarations } from '../utils'; +import { ExchangeDecoratorInterface } from '../decorators'; +import { ExchangeManager } from '../managers/exchange-manager'; +import { ExchangeWrapper } from '../managers/exchange-wrapper'; + +export default function buildExchanges(modules: CoreModule[], connection: ConnectionManager): Observable { + return Observable.from(modules) + .filter(_module => !!_module) + .flatMap(_module => + metadataFromDeclarations(_module.declarations, 'Exchange') + .map(metadata => ({ metadata, _module })) + ) + .flatMap(({ metadata, _module }) => DependencyInjection.instantiateComponent(metadata.token, _module.di) + .map(instance => ({ instance, _module, metadata }))) + .flatMap(({ instance, _module, metadata }) => { + const exchange = new ExchangeManager(connection.defaultChannel, new ExchangeWrapper(instance, metadata.data)); + return exchange.assert(); + }) + .toArray(); +} diff --git a/src/module/extension/build-queues.ts b/src/module/extension/build-queues.ts new file mode 100644 index 0000000..b82e3e7 --- /dev/null +++ b/src/module/extension/build-queues.ts @@ -0,0 +1,74 @@ +import { CoreModule, DependencyInjection, extractMetadataByDecorator, Type } from '@hapiness/core'; +import { ConnectionManager } from '../managers/connection-manager'; +import { Observable } from 'rxjs/Observable'; +import { metadataFromDeclarations } from '../utils'; +import { QueueDecoratorInterface, ExchangeDecoratorInterface } from '../decorators'; +import { getChannel } from './get-channel'; +import { QueueManager } from '../managers/queue-manager'; +import { QueueWrapper } from '../managers/queue-wrapper'; +import registerMessages from './register-messages'; +import { MessageRouterInterface } from '../interfaces/message-router'; +import { consumeQueue } from './consume-queue'; + +export default function buildQueues( + modules: CoreModule[], connection: ConnectionManager, MessageRouter: Type +): Observable { + return ( + Observable.from(modules) + .filter(_module => !!_module) + .flatMap(_module => + metadataFromDeclarations(_module.declarations, 'Queue') + .map(metadata => ({ metadata, _module })) + ) + .flatMap(({ metadata, _module }) => + DependencyInjection.instantiateComponent(metadata.token, _module.di) + .map(instance => ({ instance, _module, metadata}))) + // Assert queue + .mergeMap(({ instance, _module, metadata }) => + metadata.data.channel ? + getChannel(connection, metadata.data.channel) + .map(channel => ({ instance, metadata, channel, _module })) : + Observable.of({ instance, metadata, channel: connection.defaultChannelManager, _module })) + .mergeMap(({ instance, metadata, channel, _module }) => { + const queue = new QueueManager(channel, new QueueWrapper(instance, metadata.data)); + return Observable.forkJoin(queue.assert(), Observable.of(metadata), Observable.of(_module)); + }) + // Bind queue + .mergeMap(([queue, metadata, _module]) => { + if (Array.isArray(metadata.data.binds)) { + return Observable.forkJoin( + metadata.data.binds.map(bind => { + if (Array.isArray(bind.pattern)) { + return Observable.forkJoin(bind.pattern.map(pattern => queue.bind( + extractMetadataByDecorator(bind.exchange, 'Exchange').name, pattern))); + } + + return queue.bind( + extractMetadataByDecorator(bind.exchange, 'Exchange').name, bind.pattern + ); + })).map(() => ({ queue, _module })); + } + + return Observable.of(({ queue, _module })); + }) + // Register messages related to queue + // Consume queue + // Dont consume queue if there are no messages or consume() method on queue + .mergeMap(({ queue, _module }) => { + const messageRouter = new MessageRouter(); + return registerMessages(modules, queue, messageRouter) + .defaultIfEmpty(null) + .filter(item => !!item) + .toArray() + .switchMap((registeredMessages) => { + const _queue = queue.getQueue(); + if (registeredMessages.length || typeof _queue['onMessage'] === 'function') { + return consumeQueue(queue, messageRouter); + } + + return Observable.of(null); + }); + }) + .toArray() + ); +} diff --git a/src/module/extension/consume-queue.ts b/src/module/extension/consume-queue.ts new file mode 100644 index 0000000..d133c56 --- /dev/null +++ b/src/module/extension/consume-queue.ts @@ -0,0 +1,14 @@ +import { QueueManager } from '../managers/queue-manager'; +import { Observable } from 'rxjs/Observable'; +import { errorHandler } from '@hapiness/core'; +import { MessageRouterInterface } from '../interfaces/message-router'; + +const debug = require('debug')('hapiness:rabbitmq'); + +export function consumeQueue(queue: QueueManager, messageRouter: MessageRouterInterface): Observable { + debug(`Creating dispatcher for queue ${queue.getName()}`); + return queue.consume( + (ch, message) => messageRouter.getDispatcher(ch, message)) + .catch(err => Observable.of(errorHandler(err))) + .do(() => debug('consumed')); +} diff --git a/src/module/extension/get-channel.ts b/src/module/extension/get-channel.ts new file mode 100644 index 0000000..03b282d --- /dev/null +++ b/src/module/extension/get-channel.ts @@ -0,0 +1,10 @@ +import { ConnectionManager } from '../managers/connection-manager'; +import { ChannelOptions } from '../decorators'; +import { Observable } from 'rxjs/Observable'; +import { ChannelManager } from '../managers/channel-manager'; + +export function getChannel(connection: ConnectionManager, channel: ChannelOptions): Observable { + return connection + .channelStore + .upsert(channel.key, { prefetch: channel.prefetch, global: channel.global }); +} diff --git a/src/module/extension/register-annotations.ts b/src/module/extension/register-annotations.ts new file mode 100644 index 0000000..86c1830 --- /dev/null +++ b/src/module/extension/register-annotations.ts @@ -0,0 +1,23 @@ +import { Observable } from 'rxjs'; +import { CoreModule, Type } from '@hapiness/core'; +import { ConnectionManager } from '../managers'; +import { getModules } from '../utils'; +import buildExchanges from './build-exchange'; +import buildQueues from './build-queues'; +import { MessageRouterInterface } from '../interfaces/message-router'; + +const debug = require('debug')('hapiness:rabbitmq'); + +export class RegisterAnnotations { + public static bootstrap(module: CoreModule, connection: ConnectionManager, MessageRouter: Type) { + debug('bootstrap extension'); + const modules = getModules(module); + return buildExchanges(modules, connection) + .toArray() + .flatMap(_ => buildQueues(modules, connection, MessageRouter)) + .toArray() + .flatMap(_ => { + return Observable.of(null); + }); + } +} diff --git a/src/module/extension/register-messages.ts b/src/module/extension/register-messages.ts new file mode 100644 index 0000000..5b05faa --- /dev/null +++ b/src/module/extension/register-messages.ts @@ -0,0 +1,21 @@ +import { CoreModule, extractMetadataByDecorator } from '@hapiness/core'; +import { Observable } from 'rxjs/Observable'; +import { QueueManager } from '../managers/queue-manager'; +import { metadataFromDeclarations } from '../utils'; +import { MessageDecoratorInterface, ExchangeDecoratorInterface } from '../decorators'; +import { MessageRouterInterface } from '../interfaces/message-router'; + +const debug = require('debug')('hapiness:rabbitmq'); + +export default function registerMessages(modules: CoreModule[], queue: QueueManager, messageRouter: MessageRouterInterface) { + debug('register messages'); + return Observable.from(modules) + .flatMap(_module => metadataFromDeclarations(_module.declarations, 'Message') + .filter(metadata => { + const { name } = extractMetadataByDecorator(metadata.data.queue, 'Queue'); + debug('filtering message for queue', name, queue.getName()); + return name === queue.getName(); + }) + .flatMap(_ => messageRouter.registerMessage({ token: _.token, module: _module, data: _.data })) + ); +} diff --git a/src/module/index.ts b/src/module/index.ts index 868d0e4..4122a20 100644 --- a/src/module/index.ts +++ b/src/module/index.ts @@ -7,5 +7,5 @@ export * from './message-router'; export * from './message'; export * from './rabbitmq.extension'; export * from './rabbitmq.module'; -export * from './register-annotations'; +export * from './extension/register-annotations'; export * from './utils'; diff --git a/src/module/interfaces/consume-options.ts b/src/module/interfaces/consume-options.ts index 26ebfba..afab62c 100644 --- a/src/module/interfaces/consume-options.ts +++ b/src/module/interfaces/consume-options.ts @@ -1,4 +1,4 @@ -import { RabbitMessage } from './index'; +import { RabbitMessage } from '.'; import { Channel } from 'amqplib'; export interface ConsumeOptions { diff --git a/src/module/interfaces/message-router.ts b/src/module/interfaces/message-router.ts new file mode 100644 index 0000000..0080d52 --- /dev/null +++ b/src/module/interfaces/message-router.ts @@ -0,0 +1,17 @@ +import { Channel as ChannelInterface } from 'amqplib' +import { Observable } from 'rxjs/Observable'; +import { RabbitMessage } from './rabbit-message'; +import { MessageResult } from './message-result'; +import { CoreModule } from '@hapiness/core'; +import { MessageDecoratorInterface } from '../decorators'; + +export interface RegisterMessageOptions { + token: any; + module: CoreModule; + data: MessageDecoratorInterface; +}; + +export interface MessageRouterInterface { + registerMessage({ token, module }: RegisterMessageOptions): Observable; + getDispatcher(ch: ChannelInterface, message: RabbitMessage): Observable<() => Observable>; +} diff --git a/src/module/managers/queue-manager.ts b/src/module/managers/queue-manager.ts index a53dde0..2ba9f26 100644 --- a/src/module/managers/queue-manager.ts +++ b/src/module/managers/queue-manager.ts @@ -5,7 +5,7 @@ import { extractMetadataByDecorator, errorHandler } from '@hapiness/core'; import { sendMessage, decodeJSONContent } from '../message'; import { MessageResult, MessageOptions, RabbitMessage, QueueOptions, ConsumeOptions, QueueInterface } from '../interfaces'; import { Bind } from '../decorators'; -import { ExchangeDecoratorInterface, ChannelManager } from '../index'; +import { ExchangeDecoratorInterface, ChannelManager } from '..'; import { QueueWrapper } from './queue-wrapper'; import { events } from '../events'; import { MessageStore, StoreMessage } from './message-store'; diff --git a/src/module/message-router.ts b/src/module/message-router.ts index 53a73a1..18050a5 100644 --- a/src/module/message-router.ts +++ b/src/module/message-router.ts @@ -1,13 +1,14 @@ import * as _get from 'lodash.get'; -import { extractMetadataByDecorator } from '@hapiness/core'; +import { extractMetadataByDecorator, DependencyInjection } from '@hapiness/core'; import { Channel as ChannelInterface } from 'amqplib'; import { Observable } from 'rxjs'; import { MessageResult, RabbitMessage, MessageInterface } from './interfaces'; import { MessageDecoratorInterface, QueueDecoratorInterface, ExchangeDecoratorInterface } from './decorators'; +import { MessageRouterInterface, RegisterMessageOptions } from './interfaces/message-router'; export type messageResult = Observable; -export class MessageRouter { +export class DefaultMessageRouter implements MessageRouterInterface { private classes: Array<{ messageClass: MessageInterface; data: MessageDecoratorInterface; @@ -17,23 +18,23 @@ export class MessageRouter { this.classes = []; } - registerMessage(messageClass: MessageInterface) { - const data = extractMetadataByDecorator(messageClass.constructor, 'Message'); - - if (!data || !data.queue) { - throw new Error('Cannot register a message class without a queue'); - } + registerMessage({ token, data, module }: RegisterMessageOptions): Observable { + return DependencyInjection.instantiateComponent(token, module.di).flatMap(messageClass => { + if (!data || !data.queue) { + return Observable.throw(new Error('Cannot register a message class without a queue')); + } - if (!data.exchange && !data.routingKey && (!data.filter || !Object.keys(data.filter).length) && !data.is_fallback) { - throw new Error(`Cannot register a message without an exchange or routingKey, - filter or set is_fallback to true use your queue onMessage method instead`); - } + if (!data.exchange && !data.routingKey && (!data.filter || !Object.keys(data.filter).length) && !data.is_fallback) { + return Observable.throw(new Error(`Cannot register a message without an exchange or routingKey, + filter or set is_fallback to true use your queue onMessage method instead`)); + } - this.classes.push({ messageClass, data }); - return messageClass; + this.classes.push({ messageClass, data }); + return Observable.of(messageClass); + }); } - getDispatcher(ch: ChannelInterface, message: RabbitMessage): Observable<() => messageResult> { + getDispatcher(ch: ChannelInterface, message: RabbitMessage): Observable<() => Observable> { // If empty message or not an object // returns and fake ACK if (!message || typeof message !== 'object') { diff --git a/src/module/message.ts b/src/module/message.ts index 6a6eb95..ef86e65 100644 --- a/src/module/message.ts +++ b/src/module/message.ts @@ -1,7 +1,7 @@ import { Channel as ChannelInterface } from 'amqplib'; import * as _has from 'lodash.has'; import * as _pick from 'lodash.pick'; -import { MessageOptions } from './interfaces/index'; +import { MessageOptions } from './interfaces'; import { events } from './events'; export function sendMessage(ch: ChannelInterface, message: any, options: MessageOptions = {}): boolean { diff --git a/src/module/rabbitmq.extension.ts b/src/module/rabbitmq.extension.ts index 2840fe1..daace67 100644 --- a/src/module/rabbitmq.extension.ts +++ b/src/module/rabbitmq.extension.ts @@ -15,10 +15,24 @@ import { RabbitMQConfig } from './interfaces/config'; import { MessageStore } from './managers/message-store'; +import { RegisterAnnotations } from './extension/register-annotations'; +import { DefaultMessageRouter } from './message-router'; + const debug = require('debug')('hapiness:rabbitmq'); export class RabbitMQExt implements OnExtensionLoad, OnModuleInstantiated, OnShutdown { - static ConnectionManager: typeof ConnectionManager; + static ConnectionManager: typeof ConnectionManager = ConnectionManager; + static MessageRouter = DefaultMessageRouter; + + public static setConnectionManager(_ConnectionManager: typeof ConnectionManager) { + this.ConnectionManager = _ConnectionManager; + return this; + } + + public static setMessageRouter(_MessageRouter) { + this.MessageRouter = _MessageRouter; + return this; + } public static setConfig(config: RabbitMQConfig): ExtensionWithConfig { return { @@ -60,13 +74,13 @@ export class RabbitMQExt implements OnExtensionLoad, OnModuleInstantiated, OnShu connection.on('error', () => { connection .connect() - .flatMap(() => RegisterAnnotations.bootstrap(module, connection)) + .flatMap(() => RegisterAnnotations.bootstrap(module, connection, RabbitMQExt.MessageRouter)) .subscribe(() => {}, err => { errorHandler(err); }); }); - return RegisterAnnotations.bootstrap(module, connection); + return RegisterAnnotations.bootstrap(module, connection, RabbitMQExt.MessageRouter); } onShutdown(module, connection: ConnectionManager) { @@ -84,7 +98,4 @@ export class RabbitMQExt implements OnExtensionLoad, OnModuleInstantiated, OnShu } } -RabbitMQExt.ConnectionManager = ConnectionManager; - -import { RegisterAnnotations } from './register-annotations'; diff --git a/src/module/register-annotations.ts b/src/module/register-annotations.ts deleted file mode 100644 index 70d846d..0000000 --- a/src/module/register-annotations.ts +++ /dev/null @@ -1,148 +0,0 @@ -import { Observable } from 'rxjs'; -import { Type, CoreModule, extractMetadataByDecorator, errorHandler, DependencyInjection } from '@hapiness/core'; -import { ConnectionManager, ChannelManager } from './managers'; -import { QueueDecoratorInterface, ExchangeDecoratorInterface, MessageDecoratorInterface, ChannelOptions } from './decorators'; -import { QueueManager } from './managers'; -import { ExchangeManager, ExchangeWrapper, QueueWrapper } from './managers'; -import { MessageRouter } from './message-router'; -import { MessageInterface } from './interfaces'; -import { getModules } from './utils'; - -const debug = require('debug')('hapiness:rabbitmq'); - -export class RegisterAnnotations { - public static bootstrap(module: CoreModule, connection: ConnectionManager) { - debug('bootstrap extension'); - const modules = getModules(module); - return RegisterAnnotations.buildExchanges(modules, connection) - .toArray() - .flatMap(_ => RegisterAnnotations.buildQueues(modules, connection)) - .toArray() - .flatMap(_ => { - return Observable.of(null); - }); - } - - public static getChannel(module: CoreModule, connection: ConnectionManager, channel: ChannelOptions): Observable { - return connection - .channelStore - .upsert(channel.key, { prefetch: channel.prefetch, global: channel.global }); - } - - public static buildExchanges(modules: CoreModule[], connection: ConnectionManager): Observable { - return Observable.from(modules) - .filter(_module => !!_module) - .flatMap(_module => - RegisterAnnotations.metadataFromDeclarations(_module.declarations, 'Exchange') - .map(metadata => ({ metadata, _module })) - ) - .flatMap(({ metadata, _module }) => DependencyInjection.instantiateComponent(metadata.token, _module.di) - .map(instance => ({ instance, _module, metadata }))) - .flatMap(({ instance, _module, metadata }) => { - const exchange = new ExchangeManager(connection.defaultChannel, new ExchangeWrapper(instance, metadata.data)); - return exchange.assert(); - }) - .toArray(); - } - - public static buildQueues(modules: CoreModule[], connection: ConnectionManager): Observable { - return ( - Observable.from(modules) - .filter(_module => !!_module) - .flatMap(_module => - RegisterAnnotations.metadataFromDeclarations(_module.declarations, 'Queue') - .map(metadata => ({ metadata, _module })) - ) - .flatMap(({ metadata, _module }) => - DependencyInjection.instantiateComponent(metadata.token, _module.di) - .map(instance => ({ instance, _module, metadata}))) - // Assert queue - .mergeMap(({ instance, _module, metadata }) => - metadata.data.channel ? - RegisterAnnotations - .getChannel(_module, connection, metadata.data.channel) - .map(channel => ({ instance, metadata, channel, _module })) : - Observable.of({ instance, metadata, channel: connection.defaultChannelManager, _module })) - .mergeMap(({ instance, metadata, channel, _module }) => { - const queue = new QueueManager(channel, new QueueWrapper(instance, metadata.data)); - return Observable.forkJoin(queue.assert(), Observable.of(metadata), Observable.of(_module)); - }) - // Bind queue - .mergeMap(([queue, metadata, _module]) => { - if (Array.isArray(metadata.data.binds)) { - return Observable.forkJoin( - metadata.data.binds.map(bind => { - if (Array.isArray(bind.pattern)) { - return Observable.forkJoin(bind.pattern.map(pattern => queue.bind( - extractMetadataByDecorator(bind.exchange, 'Exchange').name, pattern))); - } - - return queue.bind( - extractMetadataByDecorator(bind.exchange, 'Exchange').name, bind.pattern - ); - })).map(() => ({ queue, _module })); - } - - return Observable.of(({ queue, _module })); - }) - // Register messages related to queue - // Consume queue - // Dont consume queue if there are no messages or consume() method on queue - .mergeMap(({ queue, _module }) => { - const messageRouter = new MessageRouter(); - return RegisterAnnotations.registerMessages(modules, queue, messageRouter) - .defaultIfEmpty(null) - .filter(item => !!item) - .toArray() - .switchMap((registeredMessages) => { - const _queue = queue.getQueue(); - if (registeredMessages.length || typeof _queue['onMessage'] === 'function') { - return RegisterAnnotations.consumeQueue(queue, messageRouter); - } - - return Observable.of(null); - }); - }) - .toArray() - ); - } - - static consumeQueue(queue: QueueManager, messageRouter: MessageRouter): Observable { - debug(`Creating dispatcher for queue ${queue.getName()}`); - return queue.consume( - (ch, message) => messageRouter.getDispatcher(ch, message)) - .catch(err => Observable.of(errorHandler(err))) - .do(() => debug('consumed')); - } - - public static registerMessages(modules: CoreModule[], queue: QueueManager, messageRouter: MessageRouter) { - debug('register messages'); - return Observable.from(modules) - .flatMap(_module => RegisterAnnotations.metadataFromDeclarations(_module.declarations, 'Message') - .filter(metadata => { - const { name } = extractMetadataByDecorator(metadata.data.queue, 'Queue'); - debug('filtering message for queue', name, queue.getName()); - return name === queue.getName(); - }) - .flatMap(_ => DependencyInjection.instantiateComponent(_.token, _module.di).map(instance => ({ instance, _ }))) - .map(({ instance, _ }) => { - return messageRouter.registerMessage(instance); - })); - } - - /** - * Extract metadata filtered by queue - * from the declarations provided - * - * @param {Type} declarations - * @returns Array - */ - public static metadataFromDeclarations(declarations: Type[], decoratorName) { - return Observable.from([].concat(declarations)) - .filter(_ => !!_ && !!extractMetadataByDecorator(_, decoratorName)) - .map(_ => ({ - token: _, - data: extractMetadataByDecorator(_, decoratorName) - })); - } -} diff --git a/src/module/utils.ts b/src/module/utils.ts index f57e252..d2b54d2 100644 --- a/src/module/utils.ts +++ b/src/module/utils.ts @@ -1,4 +1,5 @@ -import { CoreModule } from '@hapiness/core'; +import { CoreModule, Type, extractMetadataByDecorator } from '@hapiness/core'; +import { Observable } from 'rxjs/Observable'; export const getModules = (module: CoreModule): CoreModule[] => { const lookup = (_module: CoreModule) => { @@ -13,3 +14,12 @@ export const getModules = (module: CoreModule): CoreModule[] => { }; return lookup(module); }; + +export function metadataFromDeclarations(declarations: Type[], decoratorName) { + return Observable.from([].concat(declarations)) + .filter(_ => !!_ && !!extractMetadataByDecorator(_, decoratorName)) + .map(_ => ({ + token: _, + data: extractMetadataByDecorator(_, decoratorName) + })); +} diff --git a/test/integration/message.router.test.ts b/test/integration/message.router.test.ts new file mode 100644 index 0000000..b2fceaf --- /dev/null +++ b/test/integration/message.router.test.ts @@ -0,0 +1,369 @@ +import { test, suite } from 'mocha-typescript'; +import * as unit from 'unit.js'; + +import { DefaultMessageRouter } from '../../src/module/message-router'; +import { Observable } from 'rxjs/Observable'; + +import { ChannelMock } from '../mocks/Channel'; +import { + GeneratePdf, + OrderCreatedMessage, + UserCreatedMessage, + UserEditedMessage, + UserDeletedMessage, + FallbackMessage, + FallbackMessageOk, + PokemonsMessage, + FooMessage, + UserCreatedActionMessage, + UserCreatedActionNotMatchedMessage, + BasketEditedMessage, + ProfileEditedMessage +} from '../fixtures/Messages'; +import { MayonaiseService } from '../fixtures/Services'; +import { generateMessage } from '../mocks/Message'; +import { extractMetadataByDecorator, Hapiness, HapinessModule, OnStart } from '@hapiness/core'; +import { MessageDecoratorInterface } from '../../src/module/decorators'; +import { RabbitMQModule, RabbitConnectionService, RabbitMQExt } from '../../src/module'; +import { ConnectionManagerMock } from '../mocks/ConnectionManager'; +import { Config } from '@hapiness/config'; + + +@suite('- Integration MessageRouter') +export class MessageRouterUnitTest { + private spyFindClass: any; + private ch: ChannelMock; + private messageRouter: DefaultMessageRouter; + + before() { + this.messageRouter = new DefaultMessageRouter(); + this.ch = new ChannelMock(); + unit.spy(this.messageRouter, 'getDispatcher'); + this.spyFindClass = unit.spy(this.messageRouter, 'findClass'); + unit.spy(this.messageRouter, 'registerMessage'); + unit.spy(this.messageRouter, '_testValue'); + } + + after() { + this.messageRouter = null; + this.ch = null; + } + + @test('- Init module') + testModule(done) { + @HapinessModule({ + version: '1.0.0-rc.7.0', + declarations: [ + GeneratePdf, + OrderCreatedMessage, + UserCreatedMessage, + UserEditedMessage, + UserDeletedMessage, + FallbackMessage, + FallbackMessageOk, + PokemonsMessage, + FooMessage, + UserCreatedActionMessage, + UserCreatedActionNotMatchedMessage, + BasketEditedMessage, + ProfileEditedMessage + ], + providers: [MayonaiseService], + exports: [], + imports: [RabbitMQModule] + }) + class RabbitMQModuleTest implements OnStart { + constructor(private _connectionService: RabbitConnectionService) {} + + onStart() { + unit.object(this._connectionService).isInstanceOf(RabbitConnectionService); + unit.object(this._connectionService.connectionManager); + unit.string(this._connectionService.connectionManager.uri).is('amqp://localhost:5672'); + unit.object(this._connectionService.connection); + done(); + } + + onError(err) {} + } + + Hapiness.bootstrap(RabbitMQModuleTest, [ + RabbitMQExt + .setConnectionManager(ConnectionManagerMock) + .setMessageRouter(DefaultMessageRouter) + .setConfig({ + connection: Config.get('rabbitmq') + }) + ]).catch(err => done(err)); + } + + @test('- Should test MessageRouter') + testNew() { + unit.object(this.messageRouter).isInstanceOf(DefaultMessageRouter); + unit.function(this.messageRouter.registerMessage); + unit.function(this.messageRouter.findClass); + unit.function(this.messageRouter.getDispatcher); + unit.function(this.messageRouter['_testValue']); + } + + @test('- Should test register/dispatch/find') + testRegister(done) { + function registerMessageOptionsWrap(token) { + return { + token, + data: extractMetadataByDecorator(token, 'Message'), + module: Hapiness['module'] + }; + } + + const userCreatedMessage = new UserCreatedMessage(new MayonaiseService()); + const userDeletedMessage = new UserDeletedMessage(); + const generatePdfMessage = new GeneratePdf(); + const orderCreatedMessage = new OrderCreatedMessage(); + const pokemonsMessage = new PokemonsMessage(); + const fallbackMessageOk = new FallbackMessageOk(); + const userEditedMessage = new UserEditedMessage(new MayonaiseService()); + const fooMessage = new FooMessage(); + const userCreatedActionMessage = new UserCreatedActionMessage(); + const basketEditedMessage = new BasketEditedMessage(); + const profileEditedMessage = new ProfileEditedMessage(); + + const registerMessagesObservable = []; + + registerMessagesObservable.push(this.messageRouter.registerMessage(registerMessageOptionsWrap(UserDeletedMessage))); + registerMessagesObservable.push(this.messageRouter.registerMessage(registerMessageOptionsWrap(BasketEditedMessage))); + registerMessagesObservable.push(this.messageRouter.registerMessage(registerMessageOptionsWrap(ProfileEditedMessage))); + registerMessagesObservable.push(this.messageRouter.registerMessage(registerMessageOptionsWrap(UserEditedMessage))); + registerMessagesObservable.push(this.messageRouter.registerMessage(registerMessageOptionsWrap(FallbackMessageOk))); + registerMessagesObservable.push(this.messageRouter.registerMessage(registerMessageOptionsWrap(OrderCreatedMessage))); + registerMessagesObservable.push(this.messageRouter.registerMessage(registerMessageOptionsWrap(GeneratePdf))); + registerMessagesObservable.push(this.messageRouter.registerMessage(registerMessageOptionsWrap(UserCreatedMessage))); + registerMessagesObservable.push(this.messageRouter.registerMessage(registerMessageOptionsWrap(UserCreatedActionMessage))); + registerMessagesObservable.push(this.messageRouter.registerMessage(registerMessageOptionsWrap(PokemonsMessage))); + registerMessagesObservable.push(this.messageRouter.registerMessage(registerMessageOptionsWrap(FooMessage))); + registerMessagesObservable.push(this.messageRouter.registerMessage(registerMessageOptionsWrap(UserCreatedActionNotMatchedMessage))); + + registerMessagesObservable.push(this.messageRouter.registerMessage(registerMessageOptionsWrap(FallbackMessage)) + .flatMap(() => Observable.throw(new Error('Cannot be here'))) + .catch(err => { + if (err.message === 'Cannot be here') { + return Observable.throw(err); + } + + unit.object(err).isInstanceOf(Error) + .hasProperty('message', `Cannot register a message without an exchange or routingKey, + filter or set is_fallback to true use your queue onMessage method instead`); + return Observable.of(null); + })); + + registerMessagesObservable.push(this.messageRouter.registerMessage(registerMessageOptionsWrap(class InvalidMessage {})) + .flatMap(() => Observable.throw(new Error('Cannot be here'))) + .catch(err => { + if (err.message === 'Cannot be here') { + return err; + } + + unit.object(err).isInstanceOf(Error) + .hasProperty('message', 'Cannot register a message class without a queue'); + return Observable.of(null); + })); + + Observable.forkJoin(registerMessagesObservable) + .subscribe(() => { + const message_fooMessage = generateMessage({ user_id: 4028, action: 'test' }, { exchange: 'foo.exchange' }, false); + unit + .object(this.messageRouter.findClass(message_fooMessage)) + .isInstanceOf(FooMessage) + .is(fooMessage); + + const message_fooMessageRoutingKey = generateMessage({ user_id: 5678, action: 'test' }, + { exchange: 'foo.exchange', routingKey: 'bar' }, false); + unit + .value(this.messageRouter.findClass(message_fooMessageRoutingKey)) + .is(null); + + const message_basketEdited = generateMessage( + { basket_id: 8309, action: 'edited' }, { exchange: 'user.exchange', routingKey: 'basket' }, false); + unit + .object(this.messageRouter.findClass(message_basketEdited)) + .isInstanceOf(BasketEditedMessage) + .is(basketEditedMessage); + + const message_profileEdited_notFound = generateMessage( + { profile_id: 19237, action: 'edited' }, { exchange: 'user.exchange', routingKey: 'profile' }, false); + unit + .value(this.messageRouter.findClass(message_profileEdited_notFound)) + .is(null); + + const message_profileEdited = generateMessage( + { profile_id: 19237, action: 'edited', foo: 'bar' }, { exchange: 'user.exchange', routingKey: 'profile' }, false); + unit + .object(this.messageRouter.findClass(message_profileEdited)) + .isInstanceOf(ProfileEditedMessage) + .is(profileEditedMessage); + + const message_userEdited = generateMessage( + { user_id: 4028, action: 'edited' }, { exchange: 'user.exchange', routingKey: 'user' }, false); + unit + .object(this.messageRouter.findClass(message_userEdited)) + .isInstanceOf(UserEditedMessage) + .is(userEditedMessage); + + const message_userExchangeRoutingKeyWithoutAction = generateMessage( + { user_id: 4028 }, { exchange: 'user.exchange', routingKey: 'user' }, false); + unit.value(this.messageRouter.findClass(message_userExchangeRoutingKeyWithoutAction)).is(null); + + const message_userExchangeWithoutRoutingKey = generateMessage( + { user_id: 5678 }, { exchange: 'user.exchange' }, false); + unit.value(this.messageRouter.findClass(message_userExchangeWithoutRoutingKey)).is(null); + + const message_userCreated = + generateMessage({ user_id: 60936 }, { exchange: 'user.exchange', routingKey: 'user.created' }, false); + unit + .object(this.messageRouter.findClass(message_userCreated)) + .isInstanceOf(UserCreatedMessage) + .is(userCreatedMessage); + + const message_userCreatedAction = generateMessage({ user_id: 60936, action: 'special' }, + { exchange: 'user.exchange', routingKey: 'user.created' }, false); + unit + .object(this.messageRouter.findClass(message_userCreatedAction)) + .isInstanceOf(UserCreatedActionMessage) + .is(userCreatedActionMessage); + + const message_userDeleted = + generateMessage({ user_id: 2341 }, { exchange: 'user.exchange', routingKey: 'user.deleted' }, false); + unit + .object(this.messageRouter.findClass(message_userDeleted)) + .isInstanceOf(UserDeletedMessage) + .is(userDeletedMessage); + + const message_generatePdf = generateMessage( + { url: 'http://xxx.com/zzz.html', action: 'generate_pdf' }, + { routingKey: 'worker', exchange: '' }, + false + ); + unit + .object(this.messageRouter.findClass(message_generatePdf)) + .isInstanceOf(GeneratePdf) + .is(generatePdfMessage); + + const message_orderCreated = generateMessage( + { user_id: 3445, order_id: 238109 }, + { exchange: 'another.exchange', routingKey: 'order.created' } + ); + unit + .object(this.messageRouter.findClass(message_orderCreated)) + .isInstanceOf(OrderCreatedMessage) + .is(orderCreatedMessage); + + const message_fallback = generateMessage({ + reason: 'These are not the droid you are looking for' + }, { routingKey: 'test.fallback' }, false); + unit + .value(this.messageRouter.findClass(message_fallback)) + .is(fallbackMessageOk); + + const message_NotFound = generateMessage( + { reason: 'These are not the droid you are looking for' }, + { exchange: 'another.exchange' }, + false + ); + unit.value(this.messageRouter.findClass(message_NotFound)).is(null); + + const message_FindPokemon = generateMessage( + { action: 'pokemons_find', area: { lat: 0.234, long: 0.2345, radius: 5 } }, + { routingKey: 'another.queue' }, + false + ); + unit + .object(this.messageRouter.findClass(message_FindPokemon)) + .isInstanceOf(PokemonsMessage) + .is(pokemonsMessage); + + const message_FindPokemon_notFound = generateMessage( + { action: 'pokemons_find', area: { lat: 0.234, long: 0.2345, radius: 5 } }, + { routingKey: 'another.exchange' }, + false + ); + unit + .value(this.messageRouter.findClass(message_FindPokemon_notFound)) + .is(null); + + const pending = []; + pending.push( + this.messageRouter.getDispatcher(this.ch, message_orderCreated).switchMap(dispatcher => { + unit.function(dispatcher); + return dispatcher(); + }) + .map(messageResult => { + unit.object(messageResult).is({ reject: true }); + }) + ); + + pending.push( + this.messageRouter + .getDispatcher(this.ch, null) + .catch(err => { + unit + .object(err) + .isInstanceOf(Error) + .hasProperty('message', 'Invalid or empty message'); + return Observable.of('ok'); + }) + .map(_ => { + unit.value(_).is('ok'); + }) + ); + + pending.push( + this.messageRouter.getDispatcher(this.ch, message_generatePdf).switchMap(dispatcher => { + unit.function(dispatcher); + return dispatcher(); + }) + .map(messageResult => { + unit.object(messageResult).is({ reject: true, requeue: true }); + }) + ); + + pending.push( + this.messageRouter.getDispatcher(this.ch, message_NotFound).catch(err => { + unit + .object(err) + .isInstanceOf(Error) + .hasProperty('code', 'MESSAGE_CLASS_NOT_FOUND'); + return Observable.of(null); + }) + ); + + Observable.forkJoin(pending).subscribe(_ => { + unit.number(this.messageRouter.getDispatcher['callCount']).isGreaterThan(1); + unit.number(this.messageRouter.findClass['callCount']).isGreaterThan(15); + // unit.array(this.messageRouter.registerMessage['firstCall'].args).is([userDeletedMessage]); + unit.number(this.messageRouter['_testValue']['callCount']).isGreaterThan(50); + done(); + }); + }, err => done(err)); + } + + @test('- Should throw when invalid message class is provided to process() method') + testProcessInvalidMessageClass(done) { + this.spyFindClass.restore(); + const stub = unit.stub(this.messageRouter, 'findClass'); + stub.returns({}); + const obs = this.messageRouter.getDispatcher( + this.ch, + generateMessage({ error: 'invalid_message_class' }), + ); + obs.subscribe( + _ => done(new Error('Cannot be here')), + err => { + unit + .object(err) + .isInstanceOf(Error) + .hasProperty('message', 'Message class Object should implement onMessage() method'); + stub.restore(); + done(); + } + ); + } +} diff --git a/test/mocks/ConnectionManager.ts b/test/mocks/ConnectionManager.ts index c9939bd..64891c7 100644 --- a/test/mocks/ConnectionManager.ts +++ b/test/mocks/ConnectionManager.ts @@ -1,4 +1,4 @@ -import { ConnectionManager, ChannelManager } from '../../src/index'; +import { ConnectionManager, ChannelManager } from '../../src'; import { RabbitConnectionMock } from './RabbitConnection'; import { ChannelMock } from './Channel'; import { Observable } from 'rxjs'; diff --git a/test/mocks/ConnectionService.ts b/test/mocks/ConnectionService.ts index dddd84b..4d23679 100644 --- a/test/mocks/ConnectionService.ts +++ b/test/mocks/ConnectionService.ts @@ -1,4 +1,4 @@ -import { ConnectionManager } from '../../src/index'; +import { ConnectionManager } from '../../src'; import { ConnectionManagerMock } from './ConnectionManager'; import { Connection } from 'amqplib'; import { Injectable, Inject } from '@hapiness/core'; diff --git a/test/unit/extension/init-extension.test.ts b/test/unit/extension/init-extension.test.ts index 35c1f36..c6ea317 100644 --- a/test/unit/extension/init-extension.test.ts +++ b/test/unit/extension/init-extension.test.ts @@ -1,9 +1,9 @@ import { test, suite } from 'mocha-typescript'; import * as unit from 'unit.js'; -import { RegisterAnnotations } from '../../../src/module/register-annotations'; +import { RegisterAnnotations } from '../../../src/module/extension/register-annotations'; import { QueueManager, QueueWrapper, ChannelManager } from '../../../src/module/managers'; import { ChannelMock } from '../../mocks/Channel'; -import { MessageRouter } from '../../../src/module/message-router'; +import { DefaultMessageRouter } from '../../../src/module/message-router'; import { UserQueue } from '../../fixtures/Queues'; import { extractMetadataByDecorator } from '@hapiness/core'; import { generateMessage } from '../../mocks/Message'; @@ -11,6 +11,7 @@ import { Observable, Subscription } from 'rxjs'; import { ErrorObservable } from 'rxjs/observable/ErrorObservable'; import { RabbitMQExt } from '../../../src/module/rabbitmq.extension'; import { ConnectionManagerMock } from '../../mocks/ConnectionManager'; +import { consumeQueue } from '../../../src/module/extension/consume-queue'; // let errorHandler = require('@hapiness/core/core').errorHandler; @@ -18,7 +19,7 @@ import { ConnectionManagerMock } from '../../mocks/ConnectionManager'; export class InitExtensionUnitTest { private ch: ChannelManager; private queueWrapper: QueueWrapper; - private messageRouter: MessageRouter; + private messageRouter: DefaultMessageRouter; private queue: QueueManager; private userQueue; @@ -29,7 +30,7 @@ export class InitExtensionUnitTest { this.ch['ch'] = new ChannelMock(); this.userQueue = new UserQueue(); this.queueWrapper = new QueueWrapper(this.userQueue, extractMetadataByDecorator(UserQueue, 'Queue')); - this.messageRouter = new MessageRouter(); + this.messageRouter = new DefaultMessageRouter(); this.queue = new QueueManager(this.ch, this.queueWrapper); unit.spy(this.userQueue, 'onMessage'); done(); @@ -48,8 +49,8 @@ export class InitExtensionUnitTest { @test('- Should test consumeQueue when there is no message found') testConsumeQueue(done) { const spy = unit.spy(this.queue['_ch'].getChannel(), 'consume'); - unit.function(RegisterAnnotations.consumeQueue); - RegisterAnnotations.consumeQueue(this.queue, this.messageRouter); + unit.function(consumeQueue); + consumeQueue(this.queue, this.messageRouter); unit.object(spy['firstCall']); unit.array(spy['firstCall']['args']); unit.string(spy['firstCall']['args'][0]).is('user.queue'); @@ -62,18 +63,16 @@ export class InitExtensionUnitTest { @test('- Should test consumeQueue when queue.consume() returns error <>') testConsumeQueueSubscribeError(done) { - unit.function(RegisterAnnotations.consumeQueue); + unit.function(consumeQueue); unit.stub(this.queue['_ch'].getChannel(), 'consume').returns(Promise.reject(new Error('Cannot consume queue'))); - RegisterAnnotations - .consumeQueue(this.queue, this.messageRouter) - .subscribe(_ => done(), err => done(err)); + consumeQueue(this.queue, this.messageRouter).subscribe(_ => done(), err => done(err)); } @test('- Should test consumeQueue when there is an error other than message not found') testConsumeQueueError() { const spy = unit.spy(this.queue['_ch'].getChannel(), 'consume'); - unit.function(RegisterAnnotations.consumeQueue); - RegisterAnnotations.consumeQueue(this.queue, this.messageRouter); + unit.function(consumeQueue); + consumeQueue(this.queue, this.messageRouter); const stub = unit.stub(this.messageRouter, 'getDispatcher'); stub.returns(Observable.throw(new Error('Oops, something terrible happened!'))); const message = generateMessage({ foo: 'bar' }, { exchange: 'user.queue' }); @@ -136,7 +135,7 @@ export class InitExtensionUnitTest { getName: () => 'hello', consume: () => Observable.throw(new Error('Cannot consume')) }; - RegisterAnnotations.consumeQueue(queueStub, {}) + consumeQueue(queueStub, {}) .subscribe((_) => done(), err => done(err)); } } diff --git a/test/unit/message.router.test.ts b/test/unit/message.router.test.ts deleted file mode 100644 index a02abdb..0000000 --- a/test/unit/message.router.test.ts +++ /dev/null @@ -1,292 +0,0 @@ -import { test, suite } from 'mocha-typescript'; -import * as unit from 'unit.js'; - -import { MessageRouter } from '../../src/module/message-router'; -import { Observable } from 'rxjs/Observable'; - -import { ChannelMock } from '../mocks/Channel'; -import { - GeneratePdf, - OrderCreatedMessage, - UserCreatedMessage, - UserEditedMessage, - UserDeletedMessage, - FallbackMessage, - FallbackMessageOk, - PokemonsMessage, - FooMessage, - UserCreatedActionMessage, - UserCreatedActionNotMatchedMessage, - BasketEditedMessage, - ProfileEditedMessage -} from '../fixtures/Messages'; -import { MayonaiseService } from '../fixtures/Services'; -import { generateMessage } from '../mocks/Message'; - -@suite('- Unit MessageRouter') -export class MessageRouterUnitTest { - private spyFindClass: any; - private ch: ChannelMock; - private messageRouter: MessageRouter; - - before() { - this.messageRouter = new MessageRouter(); - this.ch = new ChannelMock(); - unit.spy(this.messageRouter, 'getDispatcher'); - this.spyFindClass = unit.spy(this.messageRouter, 'findClass'); - unit.spy(this.messageRouter, 'registerMessage'); - unit.spy(this.messageRouter, '_testValue'); - } - - after() { - this.messageRouter = null; - this.ch = null; - } - - @test('- Should test MessageRouter') - testNew() { - unit.object(this.messageRouter).isInstanceOf(MessageRouter); - unit.function(this.messageRouter.registerMessage); - unit.function(this.messageRouter.findClass); - unit.function(this.messageRouter.getDispatcher); - unit.function(this.messageRouter['_testValue']); - } - - @test('- Should test register/dispatch/find') - testRegister(done) { - const userCreatedMessage = new UserCreatedMessage(new MayonaiseService()); - const userDeletedMessage = new UserDeletedMessage(); - const generatePdfMessage = new GeneratePdf(); - const orderCreatedMessage = new OrderCreatedMessage(); - const pokemonsMessage = new PokemonsMessage(); - const fallbackMessage = new FallbackMessage(); - const fallbackMessageOk = new FallbackMessageOk(); - const userEditedMessage = new UserEditedMessage(new MayonaiseService()); - const fooMessage = new FooMessage(); - const userCreatedActionMessage = new UserCreatedActionMessage(); - const basketEditedMessage = new BasketEditedMessage(); - const profileEditedMessage = new ProfileEditedMessage(); - - this.messageRouter.registerMessage(userDeletedMessage); - this.messageRouter.registerMessage(basketEditedMessage); - this.messageRouter.registerMessage(profileEditedMessage); - this.messageRouter.registerMessage(userEditedMessage); - this.messageRouter.registerMessage(fallbackMessageOk); - this.messageRouter.registerMessage(orderCreatedMessage); - this.messageRouter.registerMessage(generatePdfMessage); - this.messageRouter.registerMessage(userCreatedMessage); - this.messageRouter.registerMessage(userCreatedActionMessage); - this.messageRouter.registerMessage(pokemonsMessage); - this.messageRouter.registerMessage(fooMessage); - this.messageRouter.registerMessage(new UserCreatedActionNotMatchedMessage()); - - unit - .exception(_ => { - unit.when('Invalid message', this.messageRouter.registerMessage(fallbackMessage)); - }) - .isInstanceOf(Error) - .hasProperty('message', `Cannot register a message without an exchange or routingKey, - filter or set is_fallback to true use your queue onMessage method instead`); - - unit - .exception(_ => { - unit.when('Invalid message', this.messageRouter.registerMessage(new (class InvalidMessage {})())); - }) - .isInstanceOf(Error) - .hasProperty('message', 'Cannot register a message class without a queue'); - - const message_fooMessage = generateMessage({ user_id: 4028, action: 'test' }, { exchange: 'foo.exchange' }, false); - unit - .object(this.messageRouter.findClass(message_fooMessage)) - .isInstanceOf(FooMessage) - .is(fooMessage); - - const message_fooMessageRoutingKey = generateMessage({ user_id: 5678, action: 'test' }, - { exchange: 'foo.exchange', routingKey: 'bar' }, false); - unit - .value(this.messageRouter.findClass(message_fooMessageRoutingKey)) - .is(null); - - const message_basketEdited = generateMessage( - { basket_id: 8309, action: 'edited' }, { exchange: 'user.exchange', routingKey: 'basket' }, false); - unit - .object(this.messageRouter.findClass(message_basketEdited)) - .isInstanceOf(BasketEditedMessage) - .is(basketEditedMessage); - - const message_profileEdited_notFound = generateMessage( - { profile_id: 19237, action: 'edited' }, { exchange: 'user.exchange', routingKey: 'profile' }, false); - unit - .value(this.messageRouter.findClass(message_profileEdited_notFound)) - .is(null); - - const message_profileEdited = generateMessage( - { profile_id: 19237, action: 'edited', foo: 'bar' }, { exchange: 'user.exchange', routingKey: 'profile' }, false); - unit - .object(this.messageRouter.findClass(message_profileEdited)) - .isInstanceOf(ProfileEditedMessage) - .is(profileEditedMessage); - - const message_userEdited = generateMessage( - { user_id: 4028, action: 'edited' }, { exchange: 'user.exchange', routingKey: 'user' }, false); - unit - .object(this.messageRouter.findClass(message_userEdited)) - .isInstanceOf(UserEditedMessage) - .is(userEditedMessage); - - const message_userExchangeRoutingKeyWithoutAction = generateMessage( - { user_id: 4028 }, { exchange: 'user.exchange', routingKey: 'user' }, false); - unit.value(this.messageRouter.findClass(message_userExchangeRoutingKeyWithoutAction)).is(null); - - const message_userExchangeWithoutRoutingKey = generateMessage( - { user_id: 5678 }, { exchange: 'user.exchange' }, false); - unit.value(this.messageRouter.findClass(message_userExchangeWithoutRoutingKey)).is(null); - - const message_userCreated = generateMessage({ user_id: 60936 }, { exchange: 'user.exchange', routingKey: 'user.created' }, false); - unit - .object(this.messageRouter.findClass(message_userCreated)) - .isInstanceOf(UserCreatedMessage) - .is(userCreatedMessage); - - const message_userCreatedAction = generateMessage({ user_id: 60936, action: 'special' }, - { exchange: 'user.exchange', routingKey: 'user.created' }, false); - unit - .object(this.messageRouter.findClass(message_userCreatedAction)) - .isInstanceOf(UserCreatedActionMessage) - .is(userCreatedActionMessage); - - const message_userDeleted = generateMessage({ user_id: 2341 }, { exchange: 'user.exchange', routingKey: 'user.deleted' }, false); - unit - .object(this.messageRouter.findClass(message_userDeleted)) - .isInstanceOf(UserDeletedMessage) - .is(userDeletedMessage); - - const message_generatePdf = generateMessage( - { url: 'http://xxx.com/zzz.html', action: 'generate_pdf' }, - { routingKey: 'worker', exchange: '' }, - false - ); - unit - .object(this.messageRouter.findClass(message_generatePdf)) - .isInstanceOf(GeneratePdf) - .is(generatePdfMessage); - - const message_orderCreated = generateMessage( - { user_id: 3445, order_id: 238109 }, - { exchange: 'another.exchange', routingKey: 'order.created' } - ); - unit - .object(this.messageRouter.findClass(message_orderCreated)) - .isInstanceOf(OrderCreatedMessage) - .is(orderCreatedMessage); - - const message_fallback = generateMessage({ - reason: 'These are not the droid you are looking for' - }, { routingKey: 'test.fallback' }, false); - unit - .value(this.messageRouter.findClass(message_fallback)) - .is(fallbackMessageOk); - - const message_NotFound = generateMessage( - { reason: 'These are not the droid you are looking for' }, - { exchange: 'another.exchange' }, - false - ); - unit.value(this.messageRouter.findClass(message_NotFound)).is(null); - - const message_FindPokemon = generateMessage( - { action: 'pokemons_find', area: { lat: 0.234, long: 0.2345, radius: 5 } }, - { routingKey: 'another.queue' }, - false - ); - unit - .object(this.messageRouter.findClass(message_FindPokemon)) - .isInstanceOf(PokemonsMessage) - .is(pokemonsMessage); - - const message_FindPokemon_notFound = generateMessage( - { action: 'pokemons_find', area: { lat: 0.234, long: 0.2345, radius: 5 } }, - { routingKey: 'another.exchange' }, - false - ); - unit - .value(this.messageRouter.findClass(message_FindPokemon_notFound)) - .is(null); - - const pending = []; - pending.push( - this.messageRouter.getDispatcher(this.ch, message_orderCreated).switchMap(dispatcher => { - unit.function(dispatcher); - return dispatcher(); - }) - .map(messageResult => { - unit.object(messageResult).is({ reject: true }); - }) - ); - - pending.push( - this.messageRouter - .getDispatcher(this.ch, null) - .catch(err => { - unit - .object(err) - .isInstanceOf(Error) - .hasProperty('message', 'Invalid or empty message'); - return Observable.of('ok'); - }) - .map(_ => { - unit.value(_).is('ok'); - }) - ); - - pending.push( - this.messageRouter.getDispatcher(this.ch, message_generatePdf).switchMap(dispatcher => { - unit.function(dispatcher); - return dispatcher(); - }) - .map(messageResult => { - unit.object(messageResult).is({ reject: true, requeue: true }); - }) - ); - - pending.push( - this.messageRouter.getDispatcher(this.ch, message_NotFound).catch(err => { - unit - .object(err) - .isInstanceOf(Error) - .hasProperty('code', 'MESSAGE_CLASS_NOT_FOUND'); - return Observable.of(null); - }) - ); - - Observable.forkJoin(pending).subscribe(_ => { - unit.number(this.messageRouter.getDispatcher['callCount']).isGreaterThan(1); - unit.number(this.messageRouter.findClass['callCount']).isGreaterThan(15); - unit.array(this.messageRouter.registerMessage['firstCall'].args).is([userDeletedMessage]); - unit.number(this.messageRouter['_testValue']['callCount']).isGreaterThan(50); - done(); - }); - } - - @test('- Should throw when invalid message class is provided to process() method') - testProcessInvalidMessageClass(done) { - this.spyFindClass.restore(); - const stub = unit.stub(this.messageRouter, 'findClass'); - stub.returns({}); - const obs = this.messageRouter.getDispatcher( - this.ch, - generateMessage({ error: 'invalid_message_class' }), - ); - obs.subscribe( - _ => done(new Error('Cannot be here')), - err => { - unit - .object(err) - .isInstanceOf(Error) - .hasProperty('message', 'Message class Object should implement onMessage() method'); - stub.restore(); - done(); - } - ); - } -} diff --git a/test/unit/services/channel.test.ts b/test/unit/services/channel.test.ts index 1d1588a..3ae99eb 100644 --- a/test/unit/services/channel.test.ts +++ b/test/unit/services/channel.test.ts @@ -3,7 +3,7 @@ import * as unit from 'unit.js'; import { Observable } from 'rxjs/Observable'; import { RabbitConnectionService, ChannelService } from '../../../src/module/services'; -import { ChannelManager } from '../../../src/index'; +import { ChannelManager } from '../../../src'; import { ConnectionManagerMock } from '../../mocks/ConnectionManager'; import { ChannelMock } from '../../mocks/Channel'; From e4f17cd90e448f5fa8f8d6c5ca24edf26fcd5988 Mon Sep 17 00:00:00 2001 From: Antoine Gomez Date: Wed, 29 Aug 2018 10:13:39 +0200 Subject: [PATCH 2/4] Update interfaces --- package-lock.json | 2 +- package.json | 2 +- src/module/decorators.ts | 60 --------------------- src/module/decorators/exchange.decorator.ts | 18 +++++++ src/module/decorators/index.ts | 3 ++ src/module/decorators/message.decorator.ts | 21 ++++++++ src/module/decorators/queue.decorator.ts | 21 ++++++++ src/module/extension/get-channel.ts | 2 +- src/module/extension/index.ts | 6 +++ src/module/index.ts | 22 ++++---- src/module/interfaces/bind.ts | 6 +++ src/module/interfaces/channel-options.ts | 5 ++ src/module/interfaces/exchange-type.ts | 2 +- src/module/interfaces/index.ts | 27 +++++----- src/module/interfaces/queue-options.ts | 2 +- src/module/managers/queue-manager.ts | 3 +- src/module/managers/queue-wrapper.ts | 4 +- src/module/services/message.service.ts | 6 +-- tools/files.json | 2 +- tools/packaging.ts | 2 +- tsconfig.build.json | 2 +- 21 files changed, 120 insertions(+), 98 deletions(-) delete mode 100644 src/module/decorators.ts create mode 100644 src/module/decorators/exchange.decorator.ts create mode 100644 src/module/decorators/index.ts create mode 100644 src/module/decorators/message.decorator.ts create mode 100644 src/module/decorators/queue.decorator.ts create mode 100644 src/module/extension/index.ts create mode 100644 src/module/interfaces/bind.ts create mode 100644 src/module/interfaces/channel-options.ts diff --git a/package-lock.json b/package-lock.json index 49f08ac..26d39f3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "@hapiness/rabbitmq", - "version": "1.5.0", + "version": "1.5.0-alpha2", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index 0a275ca..d733bf5 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@hapiness/rabbitmq", - "version": "1.5.0", + "version": "1.5.0-alpha2", "description": "Hapiness module for rabbitmq", "main": "commonjs/index.js", "types": "index.d.ts", diff --git a/src/module/decorators.ts b/src/module/decorators.ts deleted file mode 100644 index 7a1c921..0000000 --- a/src/module/decorators.ts +++ /dev/null @@ -1,60 +0,0 @@ -import { Type } from '@hapiness/core'; -import { createDecorator, CoreDecorator } from '@hapiness/core'; -import { Options } from 'amqplib'; -import { ExchangeType } from './interfaces'; - -export interface Bind { - exchange: Type; - pattern?: string | string[]; -} - -export interface ChannelOptions { - key: string; - prefetch?: number; - global?: boolean; -} - -export interface QueueDecoratorInterface { - name: string; - binds?: Array; - options?: Options.AssertQueue; - channel?: ChannelOptions; - force_json_decode?: boolean; -} -export const Queue: CoreDecorator = createDecorator('Queue', { - name: undefined, - binds: undefined, - options: undefined, - channel: undefined, - force_json_decode: false -}); - -export interface ExchangeDecoratorInterface { - name: string; - type: ExchangeType; - options?: Options.AssertExchange; - channel?: ChannelOptions; -} -export const Exchange: CoreDecorator = createDecorator('Exchange', { - name: undefined, - type: undefined, - options: undefined, - channel: undefined -}); - -export interface MessageDecoratorInterface { - queue: Type; - exchange?: Type; - routingKey?: string | RegExp; - filter?: { - [key: string]: string | RegExp; - }; - is_fallback?: boolean; -} -export const Message: CoreDecorator = createDecorator('Message', { - queue: undefined, - exchange: undefined, - routingKey: undefined, - filter: undefined, - is_fallback: false -}); diff --git a/src/module/decorators/exchange.decorator.ts b/src/module/decorators/exchange.decorator.ts new file mode 100644 index 0000000..ce23031 --- /dev/null +++ b/src/module/decorators/exchange.decorator.ts @@ -0,0 +1,18 @@ +import { createDecorator, CoreDecorator, Type } from '@hapiness/core'; +import { Options } from 'amqplib'; +import { ExchangeType, ChannelOptions } from '../interfaces'; + +export interface ExchangeDecoratorInterface { + name: string; + type: ExchangeType; + options?: Options.AssertExchange; + channel?: ChannelOptions; + providers?: Array | any>; +} +export const Exchange: CoreDecorator = createDecorator('Exchange', { + name: undefined, + type: undefined, + options: undefined, + channel: undefined, + providers: [], +}); diff --git a/src/module/decorators/index.ts b/src/module/decorators/index.ts new file mode 100644 index 0000000..198db2e --- /dev/null +++ b/src/module/decorators/index.ts @@ -0,0 +1,3 @@ +export * from './exchange.decorator'; +export * from './message.decorator'; +export * from './queue.decorator'; diff --git a/src/module/decorators/message.decorator.ts b/src/module/decorators/message.decorator.ts new file mode 100644 index 0000000..d7bf696 --- /dev/null +++ b/src/module/decorators/message.decorator.ts @@ -0,0 +1,21 @@ +import { Type } from '@hapiness/core'; +import { createDecorator, CoreDecorator } from '@hapiness/core'; + +export interface MessageDecoratorInterface { + queue: Type; + exchange?: Type; + routingKey?: string | RegExp; + filter?: { + [key: string]: string | RegExp; + }; + is_fallback?: boolean; + providers?: Array | any>; +} +export const Message: CoreDecorator = createDecorator('Message', { + queue: undefined, + exchange: undefined, + routingKey: undefined, + filter: undefined, + is_fallback: false, + providers: [], +}); diff --git a/src/module/decorators/queue.decorator.ts b/src/module/decorators/queue.decorator.ts new file mode 100644 index 0000000..2822999 --- /dev/null +++ b/src/module/decorators/queue.decorator.ts @@ -0,0 +1,21 @@ +import { Type } from '@hapiness/core'; +import { createDecorator, CoreDecorator } from '@hapiness/core'; +import { Options } from 'amqplib'; +import { Bind, ChannelOptions } from '../interfaces'; + +export interface QueueDecoratorInterface { + name: string; + binds?: Array; + options?: Options.AssertQueue; + channel?: ChannelOptions; + force_json_decode?: boolean; + providers?: Array | any>; +} +export const Queue: CoreDecorator = createDecorator('Queue', { + name: undefined, + binds: undefined, + options: undefined, + channel: undefined, + force_json_decode: false, + providers: [], +}); diff --git a/src/module/extension/get-channel.ts b/src/module/extension/get-channel.ts index 03b282d..3854e2b 100644 --- a/src/module/extension/get-channel.ts +++ b/src/module/extension/get-channel.ts @@ -1,7 +1,7 @@ import { ConnectionManager } from '../managers/connection-manager'; -import { ChannelOptions } from '../decorators'; import { Observable } from 'rxjs/Observable'; import { ChannelManager } from '../managers/channel-manager'; +import { ChannelOptions } from '../interfaces'; export function getChannel(connection: ConnectionManager, channel: ChannelOptions): Observable { return connection diff --git a/src/module/extension/index.ts b/src/module/extension/index.ts new file mode 100644 index 0000000..59d139a --- /dev/null +++ b/src/module/extension/index.ts @@ -0,0 +1,6 @@ +export * from './build-exchange'; +export * from './build-queues'; +export * from './consume-queue'; +export * from './get-channel'; +export * from './register-annotations'; +export * from './register-messages'; diff --git a/src/module/index.ts b/src/module/index.ts index 4122a20..b5d5822 100644 --- a/src/module/index.ts +++ b/src/module/index.ts @@ -1,11 +1,11 @@ -export * from './interfaces'; -export * from './managers'; -export * from './services'; -export * from './decorators'; -export * from './events'; -export * from './message-router'; -export * from './message'; -export * from './rabbitmq.extension'; -export * from './rabbitmq.module'; -export * from './extension/register-annotations'; -export * from './utils'; +export * from './interfaces'; +export * from './extension'; +export * from './managers'; +export * from './decorators'; +export * from './events'; +export * from './services'; +export * from './message-router'; +export * from './message'; +export * from './rabbitmq.extension'; +export * from './rabbitmq.module'; +export * from './utils'; diff --git a/src/module/interfaces/bind.ts b/src/module/interfaces/bind.ts new file mode 100644 index 0000000..28acece --- /dev/null +++ b/src/module/interfaces/bind.ts @@ -0,0 +1,6 @@ +import { Type } from '@hapiness/core'; + +export interface Bind { + exchange: Type; + pattern?: string | string[]; +}; diff --git a/src/module/interfaces/channel-options.ts b/src/module/interfaces/channel-options.ts new file mode 100644 index 0000000..8042cf3 --- /dev/null +++ b/src/module/interfaces/channel-options.ts @@ -0,0 +1,5 @@ +export interface ChannelOptions { + key: string; + prefetch?: number; + global?: boolean; +}; diff --git a/src/module/interfaces/exchange-type.ts b/src/module/interfaces/exchange-type.ts index 442272b..022e325 100644 --- a/src/module/interfaces/exchange-type.ts +++ b/src/module/interfaces/exchange-type.ts @@ -2,4 +2,4 @@ export enum ExchangeType { Direct = 'direct', Topic = 'topic', Fanout = 'fanout' -} +}; diff --git a/src/module/interfaces/index.ts b/src/module/interfaces/index.ts index 4fbe0f2..0dfe1f8 100644 --- a/src/module/interfaces/index.ts +++ b/src/module/interfaces/index.ts @@ -1,12 +1,15 @@ -export * from './config'; -export * from './consume-options'; -export * from './create-channel.options'; -export * from './exchange-type'; -export * from './exchange'; -export * from './message-options'; -export * from './message-result'; -export * from './message'; -export * from './on-asserted'; -export * from './queue-options'; -export * from './queue'; -export * from './rabbit-message'; +export * from './bind'; +export * from './channel-options'; +export * from './config'; +export * from './consume-options'; +export * from './create-channel.options'; +export * from './exchange-type'; +export * from './exchange'; +export * from './message-options'; +export * from './message-result'; +export * from './message-router'; +export * from './message'; +export * from './on-asserted'; +export * from './queue-options'; +export * from './queue'; +export * from './rabbit-message'; diff --git a/src/module/interfaces/queue-options.ts b/src/module/interfaces/queue-options.ts index bb632df..e8ef05d 100644 --- a/src/module/interfaces/queue-options.ts +++ b/src/module/interfaces/queue-options.ts @@ -1,5 +1,5 @@ import { Options } from 'amqplib'; -import { Bind } from '../decorators'; +import { Bind } from './bind'; export interface QueueOptions { name: string; diff --git a/src/module/managers/queue-manager.ts b/src/module/managers/queue-manager.ts index 2ba9f26..09c794a 100644 --- a/src/module/managers/queue-manager.ts +++ b/src/module/managers/queue-manager.ts @@ -3,8 +3,7 @@ import { Observable } from 'rxjs'; import { Channel as ChannelInterface, Replies } from 'amqplib'; import { extractMetadataByDecorator, errorHandler } from '@hapiness/core'; import { sendMessage, decodeJSONContent } from '../message'; -import { MessageResult, MessageOptions, RabbitMessage, QueueOptions, ConsumeOptions, QueueInterface } from '../interfaces'; -import { Bind } from '../decorators'; +import { MessageResult, MessageOptions, RabbitMessage, QueueOptions, ConsumeOptions, QueueInterface, Bind } from '../interfaces'; import { ExchangeDecoratorInterface, ChannelManager } from '..'; import { QueueWrapper } from './queue-wrapper'; import { events } from '../events'; diff --git a/src/module/managers/queue-wrapper.ts b/src/module/managers/queue-wrapper.ts index f926f84..a2f446b 100644 --- a/src/module/managers/queue-wrapper.ts +++ b/src/module/managers/queue-wrapper.ts @@ -1,6 +1,6 @@ -import { QueueDecoratorInterface, Bind } from '../decorators'; +import { QueueDecoratorInterface } from '../decorators'; import { Options } from 'amqplib'; -import { QueueInterface } from '../interfaces'; +import { QueueInterface, Bind } from '../interfaces'; export class QueueWrapper { private _instance: QueueInterface; diff --git a/src/module/services/message.service.ts b/src/module/services/message.service.ts index 7521e00..5412a71 100644 --- a/src/module/services/message.service.ts +++ b/src/module/services/message.service.ts @@ -1,4 +1,4 @@ -import { Injectable, extractMetadataByDecorator } from '@hapiness/core'; +import { Injectable, extractMetadataByDecorator, Type } from '@hapiness/core'; import { ChannelService } from './channel.service'; import { MessageOptions, QueueInterface, ExchangeInterface } from '../interfaces'; import { sendMessage } from '../message'; @@ -16,7 +16,7 @@ export class MessageService { return this._channelService.connectionManager.isConnected(); } - sendToQueue(message, queue: typeof QueueInterface | string, options?: MessageOptions): boolean { + sendToQueue(message, queue: Type | string, options?: MessageOptions): boolean { const ch = this._channelService.getChannel(); const _options: MessageOptions = Object.assign({}, options); _options.queue = typeof queue === 'string' ? queue : extractMetadataByDecorator(queue, 'Queue').name; @@ -24,7 +24,7 @@ export class MessageService { return this.send(message, _options, ch); } - publish(message, exchange: typeof ExchangeInterface | string, options?: MessageOptions): boolean { + publish(message, exchange: Type | string, options?: MessageOptions): boolean { const ch = this._channelService.getChannel(); const _options: MessageOptions = Object.assign({}, options); _options.exchange = typeof exchange === 'string' ? diff --git a/tools/files.json b/tools/files.json index 841ca5a..14d472c 100644 --- a/tools/files.json +++ b/tools/files.json @@ -2,4 +2,4 @@ { "name":"README.md" }, { "name":"LICENSE.md" }, { "name":"package.json" } -] \ No newline at end of file +] diff --git a/tools/packaging.ts b/tools/packaging.ts index e76daab..72a9d9a 100644 --- a/tools/packaging.ts +++ b/tools/packaging.ts @@ -98,7 +98,7 @@ class Packaging { // function to write JSON const writeJson = (dest: string, data: any): Observable => { return > Observable.create((observer) => { - fs.outputJson(dest, data, (error) => { + fs.outputJson(dest, data, { spaces: 2 }, (error) => { if (error) { return observer.error(error); } diff --git a/tsconfig.build.json b/tsconfig.build.json index 35d3609..ccb76a1 100644 --- a/tsconfig.build.json +++ b/tsconfig.build.json @@ -6,5 +6,5 @@ "declarationDir": "./dist", "types": ["node"] }, - "exclude": ["node_modules", "dist", "test", "tools"] + "exclude": ["node_modules", "dist", "test", "tools", "sample"] } From d865048f587f1620d8362d05a5219cecffbeb7ac Mon Sep 17 00:00:00 2001 From: Julien Fauville Date: Wed, 29 Aug 2018 13:49:53 +0200 Subject: [PATCH 3/4] Update package.json --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index d733bf5..0a275ca 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@hapiness/rabbitmq", - "version": "1.5.0-alpha2", + "version": "1.5.0", "description": "Hapiness module for rabbitmq", "main": "commonjs/index.js", "types": "index.d.ts", From e7819d20666f4f4f37aea72a147b6e2cff21251b Mon Sep 17 00:00:00 2001 From: Julien Fauville Date: Wed, 29 Aug 2018 14:02:39 +0200 Subject: [PATCH 4/4] Update build-queues.ts --- src/module/extension/build-queues.ts | 108 +++++++++++++-------------- 1 file changed, 53 insertions(+), 55 deletions(-) diff --git a/src/module/extension/build-queues.ts b/src/module/extension/build-queues.ts index b82e3e7..c6671e0 100644 --- a/src/module/extension/build-queues.ts +++ b/src/module/extension/build-queues.ts @@ -13,62 +13,60 @@ import { consumeQueue } from './consume-queue'; export default function buildQueues( modules: CoreModule[], connection: ConnectionManager, MessageRouter: Type ): Observable { - return ( - Observable.from(modules) - .filter(_module => !!_module) - .flatMap(_module => - metadataFromDeclarations(_module.declarations, 'Queue') - .map(metadata => ({ metadata, _module })) - ) - .flatMap(({ metadata, _module }) => - DependencyInjection.instantiateComponent(metadata.token, _module.di) - .map(instance => ({ instance, _module, metadata}))) - // Assert queue - .mergeMap(({ instance, _module, metadata }) => - metadata.data.channel ? - getChannel(connection, metadata.data.channel) - .map(channel => ({ instance, metadata, channel, _module })) : - Observable.of({ instance, metadata, channel: connection.defaultChannelManager, _module })) - .mergeMap(({ instance, metadata, channel, _module }) => { - const queue = new QueueManager(channel, new QueueWrapper(instance, metadata.data)); - return Observable.forkJoin(queue.assert(), Observable.of(metadata), Observable.of(_module)); - }) - // Bind queue - .mergeMap(([queue, metadata, _module]) => { - if (Array.isArray(metadata.data.binds)) { - return Observable.forkJoin( - metadata.data.binds.map(bind => { - if (Array.isArray(bind.pattern)) { - return Observable.forkJoin(bind.pattern.map(pattern => queue.bind( - extractMetadataByDecorator(bind.exchange, 'Exchange').name, pattern))); - } + return Observable.from(modules) + .filter(_module => !!_module) + .flatMap(_module => + metadataFromDeclarations(_module.declarations, 'Queue') + .map(metadata => ({ metadata, _module })) + ) + .flatMap(({ metadata, _module }) => + DependencyInjection.instantiateComponent(metadata.token, _module.di) + .map(instance => ({ instance, _module, metadata}))) + // Assert queue + .mergeMap(({ instance, _module, metadata }) => + metadata.data.channel ? + getChannel(connection, metadata.data.channel) + .map(channel => ({ instance, metadata, channel, _module })) : + Observable.of({ instance, metadata, channel: connection.defaultChannelManager, _module })) + .mergeMap(({ instance, metadata, channel, _module }) => { + const queue = new QueueManager(channel, new QueueWrapper(instance, metadata.data)); + return Observable.forkJoin(queue.assert(), Observable.of(metadata), Observable.of(_module)); + }) + // Bind queue + .mergeMap(([queue, metadata, _module]) => { + if (Array.isArray(metadata.data.binds)) { + return Observable.forkJoin( + metadata.data.binds.map(bind => { + if (Array.isArray(bind.pattern)) { + return Observable.forkJoin(bind.pattern.map(pattern => queue.bind( + extractMetadataByDecorator(bind.exchange, 'Exchange').name, pattern))); + } - return queue.bind( - extractMetadataByDecorator(bind.exchange, 'Exchange').name, bind.pattern - ); - })).map(() => ({ queue, _module })); - } + return queue.bind( + extractMetadataByDecorator(bind.exchange, 'Exchange').name, bind.pattern + ); + })).map(() => ({ queue, _module })); + } - return Observable.of(({ queue, _module })); - }) - // Register messages related to queue - // Consume queue - // Dont consume queue if there are no messages or consume() method on queue - .mergeMap(({ queue, _module }) => { - const messageRouter = new MessageRouter(); - return registerMessages(modules, queue, messageRouter) - .defaultIfEmpty(null) - .filter(item => !!item) - .toArray() - .switchMap((registeredMessages) => { - const _queue = queue.getQueue(); - if (registeredMessages.length || typeof _queue['onMessage'] === 'function') { - return consumeQueue(queue, messageRouter); - } + return Observable.of(({ queue, _module })); + }) + // Register messages related to queue + // Consume queue + // Dont consume queue if there are no messages or consume() method on queue + .mergeMap(({ queue, _module }) => { + const messageRouter = new MessageRouter(); + return registerMessages(modules, queue, messageRouter) + .defaultIfEmpty(null) + .filter(item => !!item) + .toArray() + .switchMap((registeredMessages) => { + const _queue = queue.getQueue(); + if (registeredMessages.length || typeof _queue['onMessage'] === 'function') { + return consumeQueue(queue, messageRouter); + } - return Observable.of(null); - }); - }) - .toArray() - ); + return Observable.of(null); + }); + }) + .toArray(); }