Skip to content

Commit

Permalink
Merge ab92678 into d57a2c8
Browse files Browse the repository at this point in the history
  • Loading branch information
nikksan committed Aug 3, 2020
2 parents d57a2c8 + ab92678 commit d7c19f0
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 14 deletions.
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@luckbox/mq-clients",
"version": "1.0.8",
"version": "1.0.9",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
Expand Down
14 changes: 6 additions & 8 deletions src/rabbitmq/RabbitMQClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class RabbitMQClient implements MQClient {
async subscribe(namespace: string, callback: CallbackFunc) {
assert(this.connection, 'You must connect() first!');

await this.saveSubscription(namespace, callback);
await this.saveSubscription(this.isExchangeInDirectType() ? this.exchangeConfig.name : namespace, callback);
await this.createExchange(namespace);
await this.createQueueAndBindItToExchange(namespace);
}
Expand Down Expand Up @@ -158,14 +158,12 @@ class RabbitMQClient implements MQClient {
}

private saveSubscription(namespace: string, callback: CallbackFunc) {
if (!this.subscriptions.has(namespace)) {
this.subscriptions.set(namespace, []);
}
const callbacks = this.subscriptions.has(namespace)
? this.subscriptions.get(namespace)
: [];
callbacks.push(callback);

this.subscriptions.set(namespace, [
...this.subscriptions.get(namespace),
callback,
]);
this.subscriptions.set(namespace, callbacks);
}

private async createExchange(exchange: string) {
Expand Down
14 changes: 10 additions & 4 deletions tests/RabbitMQClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,16 +228,22 @@ describe('RabbitMQClient', () => {
});

it('should invoke the callback passed to subscribe when a message is received', async () => {
const directModeConfiguredClient = new RabbitMQClient(createConstructorParams({
exchange: {
type: 'direct',
name: 'my-exchange',
},
}));
const mockQueue = createMockQueue();
mockChannel.assertQueue.mockResolvedValueOnce(mockQueue);

await client.connect();
await directModeConfiguredClient.connect();

const mockCallback = jest.fn();
const dummyData = { bar: 'foo' };
await client.subscribe('my-namespace', mockCallback);
await directModeConfiguredClient.subscribe('my-namespace', mockCallback);

triggerMockChannelConsumer('my-namespace', mockQueue.queue, dummyData);
triggerMockChannelConsumer('my-exchange', mockQueue.queue, dummyData);
await sleep(testRetryTimeout);

expect(mockCallback).toHaveBeenCalledWith(dummyData);
Expand Down Expand Up @@ -317,7 +323,7 @@ describe('RabbitMQClient', () => {
expect(mockCallback).toHaveBeenCalledWith(dummyData);
});

it('should not invoke the callback passed to subscribe when a maformed message is received after a reconnect', async () => {
it('should not invoke the callback passed to subscribe when a malformed message is received after a reconnect', async () => {
await client.connect();

const mockCallback = jest.fn();
Expand Down

0 comments on commit d7c19f0

Please sign in to comment.