Skip to content

Commit

Permalink
Merge d865048 into 35e134d
Browse files Browse the repository at this point in the history
  • Loading branch information
antoinegomez committed Aug 29, 2018
2 parents 35e134d + d865048 commit 50b17b1
Show file tree
Hide file tree
Showing 40 changed files with 734 additions and 578 deletions.
2 changes: 2 additions & 0 deletions README.md
Expand Up @@ -453,6 +453,8 @@ To set up your development environment:
[Back to top](#table-of-contents)

## Change History
* v1.5.0 (2018-08-24)
* Add possibility to provide a custom MessageRouter
* v1.4.3 (2018-08-20)
* Emit RETRY_LIMIT_EXCEEDED error on ConnectionManager
* v1.4.2 (2018-06-11)
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
@@ -1,6 +1,6 @@
{
"name": "@hapiness/rabbitmq",
"version": "1.4.3",
"version": "1.5.0",
"description": "Hapiness module for rabbitmq",
"main": "commonjs/index.js",
"types": "index.d.ts",
Expand Down
60 changes: 0 additions & 60 deletions src/module/decorators.ts

This file was deleted.

18 changes: 18 additions & 0 deletions src/module/decorators/exchange.decorator.ts
@@ -0,0 +1,18 @@
import { createDecorator, CoreDecorator, Type } from '@hapiness/core';
import { Options } from 'amqplib';
import { ExchangeType, ChannelOptions } from '../interfaces';

export interface ExchangeDecoratorInterface {
name: string;
type: ExchangeType;
options?: Options.AssertExchange;
channel?: ChannelOptions;
providers?: Array<Type<any> | any>;
}
export const Exchange: CoreDecorator<ExchangeDecoratorInterface> = createDecorator<ExchangeDecoratorInterface>('Exchange', {
name: undefined,
type: undefined,
options: undefined,
channel: undefined,
providers: [],
});
3 changes: 3 additions & 0 deletions src/module/decorators/index.ts
@@ -0,0 +1,3 @@
export * from './exchange.decorator';
export * from './message.decorator';
export * from './queue.decorator';
21 changes: 21 additions & 0 deletions src/module/decorators/message.decorator.ts
@@ -0,0 +1,21 @@
import { Type } from '@hapiness/core';
import { createDecorator, CoreDecorator } from '@hapiness/core';

export interface MessageDecoratorInterface {
queue: Type<any>;
exchange?: Type<any>;
routingKey?: string | RegExp;
filter?: {
[key: string]: string | RegExp;
};
is_fallback?: boolean;
providers?: Array<Type<any> | any>;
}
export const Message: CoreDecorator<MessageDecoratorInterface> = createDecorator<MessageDecoratorInterface>('Message', {
queue: undefined,
exchange: undefined,
routingKey: undefined,
filter: undefined,
is_fallback: false,
providers: [],
});
21 changes: 21 additions & 0 deletions src/module/decorators/queue.decorator.ts
@@ -0,0 +1,21 @@
import { Type } from '@hapiness/core';
import { createDecorator, CoreDecorator } from '@hapiness/core';
import { Options } from 'amqplib';
import { Bind, ChannelOptions } from '../interfaces';

export interface QueueDecoratorInterface {
name: string;
binds?: Array<Bind>;
options?: Options.AssertQueue;
channel?: ChannelOptions;
force_json_decode?: boolean;
providers?: Array<Type<any> | any>;
}
export const Queue: CoreDecorator<QueueDecoratorInterface> = createDecorator<QueueDecoratorInterface>('Queue', {
name: undefined,
binds: undefined,
options: undefined,
channel: undefined,
force_json_decode: false,
providers: [],
});
23 changes: 23 additions & 0 deletions src/module/extension/build-exchange.ts
@@ -0,0 +1,23 @@
import { Observable } from 'rxjs/Observable';
import { CoreModule, DependencyInjection } from '@hapiness/core';
import { ConnectionManager } from '../managers/connection-manager';
import { metadataFromDeclarations } from '../utils';
import { ExchangeDecoratorInterface } from '../decorators';
import { ExchangeManager } from '../managers/exchange-manager';
import { ExchangeWrapper } from '../managers/exchange-wrapper';

export default function buildExchanges(modules: CoreModule[], connection: ConnectionManager): Observable<any> {
return Observable.from(modules)
.filter(_module => !!_module)
.flatMap(_module =>
metadataFromDeclarations<ExchangeDecoratorInterface>(_module.declarations, 'Exchange')
.map(metadata => ({ metadata, _module }))
)
.flatMap(({ metadata, _module }) => DependencyInjection.instantiateComponent(metadata.token, _module.di)
.map(instance => ({ instance, _module, metadata })))
.flatMap(({ instance, _module, metadata }) => {
const exchange = new ExchangeManager(connection.defaultChannel, new ExchangeWrapper(instance, metadata.data));
return exchange.assert();
})
.toArray();
}
74 changes: 74 additions & 0 deletions src/module/extension/build-queues.ts
@@ -0,0 +1,74 @@
import { CoreModule, DependencyInjection, extractMetadataByDecorator, Type } from '@hapiness/core';
import { ConnectionManager } from '../managers/connection-manager';
import { Observable } from 'rxjs/Observable';
import { metadataFromDeclarations } from '../utils';
import { QueueDecoratorInterface, ExchangeDecoratorInterface } from '../decorators';
import { getChannel } from './get-channel';
import { QueueManager } from '../managers/queue-manager';
import { QueueWrapper } from '../managers/queue-wrapper';
import registerMessages from './register-messages';
import { MessageRouterInterface } from '../interfaces/message-router';
import { consumeQueue } from './consume-queue';

export default function buildQueues(
modules: CoreModule[], connection: ConnectionManager, MessageRouter: Type<MessageRouterInterface>
): Observable<any> {
return (
Observable.from(modules)
.filter(_module => !!_module)
.flatMap(_module =>
metadataFromDeclarations<QueueDecoratorInterface>(_module.declarations, 'Queue')
.map(metadata => ({ metadata, _module }))
)
.flatMap(({ metadata, _module }) =>
DependencyInjection.instantiateComponent(metadata.token, _module.di)
.map(instance => ({ instance, _module, metadata})))
// Assert queue
.mergeMap(({ instance, _module, metadata }) =>
metadata.data.channel ?
getChannel(connection, metadata.data.channel)
.map(channel => ({ instance, metadata, channel, _module })) :
Observable.of({ instance, metadata, channel: connection.defaultChannelManager, _module }))
.mergeMap(({ instance, metadata, channel, _module }) => {
const queue = new QueueManager(channel, new QueueWrapper(instance, metadata.data));
return Observable.forkJoin(queue.assert(), Observable.of(metadata), Observable.of(_module));
})
// Bind queue
.mergeMap(([queue, metadata, _module]) => {
if (Array.isArray(metadata.data.binds)) {
return Observable.forkJoin(
metadata.data.binds.map(bind => {
if (Array.isArray(bind.pattern)) {
return Observable.forkJoin(bind.pattern.map(pattern => queue.bind(
extractMetadataByDecorator<ExchangeDecoratorInterface>(bind.exchange, 'Exchange').name, pattern)));
}

return queue.bind(
extractMetadataByDecorator<ExchangeDecoratorInterface>(bind.exchange, 'Exchange').name, bind.pattern
);
})).map(() => ({ queue, _module }));
}

return Observable.of(({ queue, _module }));
})
// Register messages related to queue
// Consume queue
// Dont consume queue if there are no messages or consume() method on queue
.mergeMap(({ queue, _module }) => {
const messageRouter = new MessageRouter();
return registerMessages(modules, queue, messageRouter)
.defaultIfEmpty(null)
.filter(item => !!item)
.toArray()
.switchMap((registeredMessages) => {
const _queue = queue.getQueue();
if (registeredMessages.length || typeof _queue['onMessage'] === 'function') {
return consumeQueue(queue, messageRouter);
}

return Observable.of(null);
});
})
.toArray()
);
}
14 changes: 14 additions & 0 deletions src/module/extension/consume-queue.ts
@@ -0,0 +1,14 @@
import { QueueManager } from '../managers/queue-manager';
import { Observable } from 'rxjs/Observable';
import { errorHandler } from '@hapiness/core';
import { MessageRouterInterface } from '../interfaces/message-router';

const debug = require('debug')('hapiness:rabbitmq');

export function consumeQueue(queue: QueueManager, messageRouter: MessageRouterInterface): Observable<any> {
debug(`Creating dispatcher for queue ${queue.getName()}`);
return queue.consume(
(ch, message) => messageRouter.getDispatcher(ch, message))
.catch(err => Observable.of(errorHandler(err)))
.do(() => debug('consumed'));
}
10 changes: 10 additions & 0 deletions src/module/extension/get-channel.ts
@@ -0,0 +1,10 @@
import { ConnectionManager } from '../managers/connection-manager';
import { Observable } from 'rxjs/Observable';
import { ChannelManager } from '../managers/channel-manager';
import { ChannelOptions } from '../interfaces';

export function getChannel(connection: ConnectionManager, channel: ChannelOptions): Observable<ChannelManager> {
return connection
.channelStore
.upsert(channel.key, { prefetch: channel.prefetch, global: channel.global });
}
6 changes: 6 additions & 0 deletions src/module/extension/index.ts
@@ -0,0 +1,6 @@
export * from './build-exchange';
export * from './build-queues';
export * from './consume-queue';
export * from './get-channel';
export * from './register-annotations';
export * from './register-messages';
23 changes: 23 additions & 0 deletions src/module/extension/register-annotations.ts
@@ -0,0 +1,23 @@
import { Observable } from 'rxjs';
import { CoreModule, Type } from '@hapiness/core';
import { ConnectionManager } from '../managers';
import { getModules } from '../utils';
import buildExchanges from './build-exchange';
import buildQueues from './build-queues';
import { MessageRouterInterface } from '../interfaces/message-router';

const debug = require('debug')('hapiness:rabbitmq');

export class RegisterAnnotations {
public static bootstrap(module: CoreModule, connection: ConnectionManager, MessageRouter: Type<MessageRouterInterface>) {
debug('bootstrap extension');
const modules = getModules(module);
return buildExchanges(modules, connection)
.toArray()
.flatMap(_ => buildQueues(modules, connection, MessageRouter))
.toArray()
.flatMap(_ => {
return Observable.of(null);
});
}
}
21 changes: 21 additions & 0 deletions src/module/extension/register-messages.ts
@@ -0,0 +1,21 @@
import { CoreModule, extractMetadataByDecorator } from '@hapiness/core';
import { Observable } from 'rxjs/Observable';
import { QueueManager } from '../managers/queue-manager';
import { metadataFromDeclarations } from '../utils';
import { MessageDecoratorInterface, ExchangeDecoratorInterface } from '../decorators';
import { MessageRouterInterface } from '../interfaces/message-router';

const debug = require('debug')('hapiness:rabbitmq');

export default function registerMessages(modules: CoreModule[], queue: QueueManager, messageRouter: MessageRouterInterface) {
debug('register messages');
return Observable.from(modules)
.flatMap(_module => metadataFromDeclarations<MessageDecoratorInterface>(_module.declarations, 'Message')
.filter(metadata => {
const { name } = extractMetadataByDecorator<ExchangeDecoratorInterface>(metadata.data.queue, 'Queue');
debug('filtering message for queue', name, queue.getName());
return name === queue.getName();
})
.flatMap(_ => messageRouter.registerMessage({ token: _.token, module: _module, data: _.data }))
);
}
22 changes: 11 additions & 11 deletions src/module/index.ts
@@ -1,11 +1,11 @@
export * from './interfaces';
export * from './managers';
export * from './services';
export * from './decorators';
export * from './events';
export * from './message-router';
export * from './message';
export * from './rabbitmq.extension';
export * from './rabbitmq.module';
export * from './register-annotations';
export * from './utils';
export * from './interfaces';
export * from './extension';
export * from './managers';
export * from './decorators';
export * from './events';
export * from './services';
export * from './message-router';
export * from './message';
export * from './rabbitmq.extension';
export * from './rabbitmq.module';
export * from './utils';
6 changes: 6 additions & 0 deletions src/module/interfaces/bind.ts
@@ -0,0 +1,6 @@
import { Type } from '@hapiness/core';

export interface Bind {
exchange: Type<any>;
pattern?: string | string[];
};
5 changes: 5 additions & 0 deletions src/module/interfaces/channel-options.ts
@@ -0,0 +1,5 @@
export interface ChannelOptions {
key: string;
prefetch?: number;
global?: boolean;
};
2 changes: 1 addition & 1 deletion src/module/interfaces/consume-options.ts
@@ -1,4 +1,4 @@
import { RabbitMessage } from './index';
import { RabbitMessage } from '.';
import { Channel } from 'amqplib';

export interface ConsumeOptions {
Expand Down
2 changes: 1 addition & 1 deletion src/module/interfaces/exchange-type.ts
Expand Up @@ -2,4 +2,4 @@ export enum ExchangeType {
Direct = 'direct',
Topic = 'topic',
Fanout = 'fanout'
}
};

0 comments on commit 50b17b1

Please sign in to comment.