Skip to content

Commit

Permalink
Fix message routing logic
Browse files Browse the repository at this point in the history
  • Loading branch information
antoinegomez committed Oct 27, 2017
1 parent 1ef2240 commit a40db99
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 5 deletions.
11 changes: 8 additions & 3 deletions src/module/message-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export class MessageRouter {
const score = this.classes
.map(_class => {
const meta = _class.data;
const checks = [];
let checks = [];

if (
extractMetadataByDecorator<QueueDecoratorInterface>(meta.queue, 'Queue').name === message.fields.exchange &&
Expand All @@ -67,8 +67,8 @@ export class MessageRouter {
} else if (message.fields.routingKey && meta.routingKey && meta.exchange) {
checks.push(
extractMetadataByDecorator<ExchangeDecoratorInterface>(meta.exchange, 'Exchange').name === message.fields.exchange
&& typeof meta.routingKey === 'string' && this._testValue(meta.routingKey, message.fields.routingKey)
);
checks.push(typeof meta.routingKey === 'string' && this._testValue(meta.routingKey, message.fields.routingKey));
}

let checkFilter = false;
Expand All @@ -78,9 +78,14 @@ export class MessageRouter {
checkFilter = !!entries.find(([key, value]) => {
return this._testValue(value, _get(message, key.split('.')));
});

if (checkFilter) {
checks.push(true);
} else {
checks = [];
}
}
}
checks.push(checkFilter);

return {
score: checks.filter(Boolean).length,
Expand Down
17 changes: 17 additions & 0 deletions test/fixtures/Messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,23 @@ import { Observable } from 'rxjs';
import { AnotherQueue, WorkerQueue } from './Queues';
import { MessageInterface, RabbitMessage } from '../../src/module/interfaces';

@Message({
queue: AnotherQueue,
exchange: UserExchange,
routingKey: 'user',
filter: {
'content.action': 'edited'
}
})
export class UserEditedMessage implements MessageInterface {
constructor(private _mayo: MayonaiseService) {}

onMessage(message: RabbitMessage) {
this._mayo.eat();
return Observable.of({ ack: true });
}
}

@Message({
queue: AnotherQueue,
exchange: UserExchange,
Expand Down
22 changes: 20 additions & 2 deletions test/unit/message.router.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
GeneratePdf,
OrderCreatedMessage,
UserCreatedMessage,
UserEditedMessage,
UserDeletedMessage,
FallbackMessage,
PokemonsMessage
Expand Down Expand Up @@ -53,14 +54,31 @@ export class MessageRouterUnitTest {
const orderCreatedMessage = new OrderCreatedMessage();
const pokemonsMessage = new PokemonsMessage();
const fallbackMessage = new FallbackMessage();
const userEditedMessage = new UserEditedMessage(new MayonaiseService());

this.messageRouter.registerMessage(userDeletedMessage);
this.messageRouter.registerMessage(userEditedMessage);
this.messageRouter.registerMessage(fallbackMessage);
this.messageRouter.registerMessage(orderCreatedMessage);
this.messageRouter.registerMessage(generatePdfMessage);
this.messageRouter.registerMessage(userCreatedMessage);
this.messageRouter.registerMessage(pokemonsMessage);

const message_userEdited = generateMessage(
{ user_id: 4028, action: 'edited' }, { exchange: 'user.exchange', routingKey: 'user' }, false);
unit
.object(this.messageRouter.findClass(message_userEdited))
.isInstanceOf(UserEditedMessage)
.is(userEditedMessage);

const message_userExchangeRoutingKeyWithoutAction = generateMessage(
{ user_id: 4028 }, { exchange: 'user.exchange', routingKey: 'user' }, false);
unit.value(this.messageRouter.findClass(message_userExchangeRoutingKeyWithoutAction)).is(null);

const message_userExchangeWithoutRoutingKey = generateMessage(
{ user_id: 4028 }, { exchange: 'user.exchange' }, false);
unit.value(this.messageRouter.findClass(message_userExchangeWithoutRoutingKey)).is(null);

const message_userCreated = generateMessage({ user_id: 60936 }, { exchange: 'user.exchange', routingKey: 'user.created' }, false);
unit
.object(this.messageRouter.findClass(message_userCreated))
Expand Down Expand Up @@ -173,9 +191,9 @@ export class MessageRouterUnitTest {

Observable.forkJoin(pending).subscribe(_ => {
unit.number(this.messageRouter.getDispatcher['callCount']).is(5);
unit.number(this.messageRouter.findClass['callCount']).is(11);
unit.number(this.messageRouter.findClass['callCount']).is(14);
unit.array(this.messageRouter.registerMessage['firstCall'].args).is([userDeletedMessage]);
unit.number(this.messageRouter['_testValue']['callCount']).is(34);
unit.number(this.messageRouter['_testValue']['callCount']).is(56);
done();
});
}
Expand Down

0 comments on commit a40db99

Please sign in to comment.