Skip to content

Commit

Permalink
Update build-queues.ts
Browse files Browse the repository at this point in the history
  • Loading branch information
juneil committed Aug 29, 2018
1 parent d865048 commit e7819d2
Showing 1 changed file with 53 additions and 55 deletions.
108 changes: 53 additions & 55 deletions src/module/extension/build-queues.ts
Expand Up @@ -13,62 +13,60 @@ import { consumeQueue } from './consume-queue';
export default function buildQueues(
modules: CoreModule[], connection: ConnectionManager, MessageRouter: Type<MessageRouterInterface>
): Observable<any> {
return (
Observable.from(modules)
.filter(_module => !!_module)
.flatMap(_module =>
metadataFromDeclarations<QueueDecoratorInterface>(_module.declarations, 'Queue')
.map(metadata => ({ metadata, _module }))
)
.flatMap(({ metadata, _module }) =>
DependencyInjection.instantiateComponent(metadata.token, _module.di)
.map(instance => ({ instance, _module, metadata})))
// Assert queue
.mergeMap(({ instance, _module, metadata }) =>
metadata.data.channel ?
getChannel(connection, metadata.data.channel)
.map(channel => ({ instance, metadata, channel, _module })) :
Observable.of({ instance, metadata, channel: connection.defaultChannelManager, _module }))
.mergeMap(({ instance, metadata, channel, _module }) => {
const queue = new QueueManager(channel, new QueueWrapper(instance, metadata.data));
return Observable.forkJoin(queue.assert(), Observable.of(metadata), Observable.of(_module));
})
// Bind queue
.mergeMap(([queue, metadata, _module]) => {
if (Array.isArray(metadata.data.binds)) {
return Observable.forkJoin(
metadata.data.binds.map(bind => {
if (Array.isArray(bind.pattern)) {
return Observable.forkJoin(bind.pattern.map(pattern => queue.bind(
extractMetadataByDecorator<ExchangeDecoratorInterface>(bind.exchange, 'Exchange').name, pattern)));
}
return Observable.from(modules)
.filter(_module => !!_module)
.flatMap(_module =>
metadataFromDeclarations<QueueDecoratorInterface>(_module.declarations, 'Queue')
.map(metadata => ({ metadata, _module }))
)
.flatMap(({ metadata, _module }) =>
DependencyInjection.instantiateComponent(metadata.token, _module.di)
.map(instance => ({ instance, _module, metadata})))
// Assert queue
.mergeMap(({ instance, _module, metadata }) =>
metadata.data.channel ?
getChannel(connection, metadata.data.channel)
.map(channel => ({ instance, metadata, channel, _module })) :
Observable.of({ instance, metadata, channel: connection.defaultChannelManager, _module }))
.mergeMap(({ instance, metadata, channel, _module }) => {
const queue = new QueueManager(channel, new QueueWrapper(instance, metadata.data));
return Observable.forkJoin(queue.assert(), Observable.of(metadata), Observable.of(_module));
})
// Bind queue
.mergeMap(([queue, metadata, _module]) => {
if (Array.isArray(metadata.data.binds)) {
return Observable.forkJoin(
metadata.data.binds.map(bind => {
if (Array.isArray(bind.pattern)) {
return Observable.forkJoin(bind.pattern.map(pattern => queue.bind(
extractMetadataByDecorator<ExchangeDecoratorInterface>(bind.exchange, 'Exchange').name, pattern)));
}

return queue.bind(
extractMetadataByDecorator<ExchangeDecoratorInterface>(bind.exchange, 'Exchange').name, bind.pattern
);
})).map(() => ({ queue, _module }));
}
return queue.bind(
extractMetadataByDecorator<ExchangeDecoratorInterface>(bind.exchange, 'Exchange').name, bind.pattern
);
})).map(() => ({ queue, _module }));
}

return Observable.of(({ queue, _module }));
})
// Register messages related to queue
// Consume queue
// Dont consume queue if there are no messages or consume() method on queue
.mergeMap(({ queue, _module }) => {
const messageRouter = new MessageRouter();
return registerMessages(modules, queue, messageRouter)
.defaultIfEmpty(null)
.filter(item => !!item)
.toArray()
.switchMap((registeredMessages) => {
const _queue = queue.getQueue();
if (registeredMessages.length || typeof _queue['onMessage'] === 'function') {
return consumeQueue(queue, messageRouter);
}
return Observable.of(({ queue, _module }));
})
// Register messages related to queue
// Consume queue
// Dont consume queue if there are no messages or consume() method on queue
.mergeMap(({ queue, _module }) => {
const messageRouter = new MessageRouter();
return registerMessages(modules, queue, messageRouter)
.defaultIfEmpty(null)
.filter(item => !!item)
.toArray()
.switchMap((registeredMessages) => {
const _queue = queue.getQueue();
if (registeredMessages.length || typeof _queue['onMessage'] === 'function') {
return consumeQueue(queue, messageRouter);
}

return Observable.of(null);
});
})
.toArray()
);
return Observable.of(null);
});
})
.toArray();
}

0 comments on commit e7819d2

Please sign in to comment.