Skip to content

Commit

Permalink
feat(rabbitmq): add ability to bind handlers to multiple exchange keys
Browse files Browse the repository at this point in the history
fix #79
  • Loading branch information
WonderPanda committed Dec 19, 2019
1 parent a0b57c1 commit dd131fe
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 48 deletions.
14 changes: 9 additions & 5 deletions integration/rabbitmq/e2e/subscribe.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import { Test } from '@nestjs/testing';
const testHandler = jest.fn();

const exchange = 'testSubscribeExhange';
const routingKey = 'testSubscribeRoute';
const routingKey1 = 'testSubscribeRoute1';
const routingKey2 = 'testSubscribeRoute2';
const testMessage = {
messageProp: 42,
};
Expand All @@ -18,7 +19,7 @@ const testMessage = {
class SubscribeService {
@RabbitSubscribe({
exchange,
routingKey,
routingKey: [routingKey1, routingKey2],
queue: 'subscribeQueue',
})
handleSubscribe(message: object) {
Expand Down Expand Up @@ -55,11 +56,14 @@ describe('Rabbit Subscribe', () => {
});

it('should receive subscribe messages and handle them', async done => {
amqpConnection.publish(exchange, routingKey, testMessage);
[routingKey1, routingKey2].forEach((x, i) =>
amqpConnection.publish(exchange, x, `testMessage-${i}`),
);

setTimeout(() => {
expect(testHandler).toHaveBeenCalledTimes(1);
expect(testHandler).toHaveBeenCalledWith(testMessage);
expect(testHandler).toHaveBeenCalledTimes(2);
expect(testHandler).toHaveBeenCalledWith(`testMessage-0`);
expect(testHandler).toHaveBeenCalledWith(`testMessage-1`);
done();
}, 50);
});
Expand Down
14 changes: 7 additions & 7 deletions integration/rabbitmq/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@
"test:e2e": "jest --config ../../jest-e2e.json"
},
"dependencies": {
"@nestjs/common": "6.9.0",
"@nestjs/core": "6.9.0",
"@nestjs/microservices": "6.9.0",
"@nestjs/platform-express": "^6.9.0",
"@nestjs/websockets": "6.9.0",
"@nestjs/common": "6.10.12",
"@nestjs/core": "6.10.12",
"@nestjs/microservices": "6.10.12",
"@nestjs/platform-express": "^6.10.12",
"@nestjs/websockets": "6.10.12",
"reflect-metadata": "0.1.13",
"rimraf": "2.6.3",
"rxjs": "6.4.0"
"rxjs": "6.5.3"
},
"devDependencies": {
"@nestjs/testing": "6.9.0",
"@nestjs/testing": "6.10.12",
"@types/express": "4.16.1",
"@types/jest": "24.0.11",
"@types/node": "11.11.6",
Expand Down
66 changes: 33 additions & 33 deletions integration/rabbitmq/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -276,55 +276,55 @@
"@types/istanbul-lib-coverage" "^1.1.0"
"@types/yargs" "^12.0.9"

"@nestjs/common@6.9.0":
version "6.9.0"
resolved "https://registry.yarnpkg.com/@nestjs/common/-/common-6.9.0.tgz#00323078ff7e8585df4921af0f4f75dfde873866"
integrity sha512-dIRiKob3SkXA2JoV3/Li3vThmZAh2yEH2znM3DRXxcA+twLzM7sJNxAuVsqaVUNwZ89Piepu66fm2BFcKNlJ6A==
"@nestjs/common@6.10.12":
version "6.10.12"
resolved "https://registry.yarnpkg.com/@nestjs/common/-/common-6.10.12.tgz#3ac48b5ec305243802320c5c53480fab3a5c1820"
integrity sha512-UWQ3HqMDVAf85HNZAgahT0Ig7/4WGI1jE+UvZlNMIrLcI8y98LHwbS8fwHUxdRYW9L7NEDTYn8oBL6lS+z4OWA==
dependencies:
axios "0.19.0"
cli-color "2.0.0"
uuid "3.3.3"

"@nestjs/core@6.9.0":
version "6.9.0"
resolved "https://registry.yarnpkg.com/@nestjs/core/-/core-6.9.0.tgz#cf4ceb19b699f838f2dc5bc46797f6e357f2cfc8"
integrity sha512-DvzhJQAs8dXzlJ1Ru6YPHQM+7ODQ2Ity5Ad6OU1YHqQp2GJEwa+6uzCVL/37zu7i/FhUfgLCZLcwHhofRPcgkg==
"@nestjs/core@6.10.12":
version "6.10.12"
resolved "https://registry.yarnpkg.com/@nestjs/core/-/core-6.10.12.tgz#546363837e3cf9a4467e24c4cd8902e9a492668b"
integrity sha512-pZ+KK7BQ9OH6ZSQoakbodFTuoUqa14WDJ/d1GQZbDpkGGOv26rmitjHmuBRMkttkHHsSvLtU1Y2e4vtMFevDcA==
dependencies:
"@nuxtjs/opencollective" "0.2.2"
fast-safe-stringify "2.0.7"
iterare "1.2.0"
object-hash "2.0.0"
object-hash "2.0.1"
uuid "3.3.3"

"@nestjs/microservices@6.9.0":
version "6.9.0"
resolved "https://registry.yarnpkg.com/@nestjs/microservices/-/microservices-6.9.0.tgz#b8d43d628c5a71db650ebf76243ef9132ff6450b"
integrity sha512-HxnqnJFkXP0vuywzFrO/hhgAAi73hw0lg3yWQdxBKUsAwlP1my9tMmVxo4hr80ilLiTzgjWnoRuAus//bFytLg==
"@nestjs/microservices@6.10.12":
version "6.10.12"
resolved "https://registry.yarnpkg.com/@nestjs/microservices/-/microservices-6.10.12.tgz#88cefaa363e4b20919a3391087e982da52ad02bf"
integrity sha512-I7eYZpeVNCt2LM6PmlVWWVRvoOeJz00jABcLo+KKBnlo2pIWCH7W84wZslSWSJPi2q11j6tWvfAtUdR29pX53Q==
dependencies:
iterare "1.2.0"
json-socket "0.3.0"

"@nestjs/platform-express@^6.9.0":
version "6.9.0"
resolved "https://registry.yarnpkg.com/@nestjs/platform-express/-/platform-express-6.9.0.tgz#4d517ce7ba12c96daa27bde159fb0d915391f71a"
integrity sha512-r44oHpTmUUW488BQh3v9ZxABs+xhlnTAb/SWO4rAI4MLTskJ0qpgJ6IPKCUgphiecl50A5O3x9kWJ3tYDWuy/w==
"@nestjs/platform-express@^6.10.12":
version "6.10.12"
resolved "https://registry.yarnpkg.com/@nestjs/platform-express/-/platform-express-6.10.12.tgz#e8c3189a5b3761c05ec0bbc40db0573b5070e9cc"
integrity sha512-XNW2tBIF229OQtZ+rcBLGcqh7QM2C3et7zry/af+nWNecKCiORyDNAQBAi8r6n6BtQJUv4Po7Wts5CGJz0Nldw==
dependencies:
body-parser "1.19.0"
cors "2.8.5"
express "4.17.1"
multer "1.4.2"

"@nestjs/testing@6.9.0":
version "6.9.0"
resolved "https://registry.yarnpkg.com/@nestjs/testing/-/testing-6.9.0.tgz#7d806c53555d1f2a89cd061dc7e476c04441fe89"
integrity sha512-vCxUiu5XnPhaQ3RFHQmj04mnQvFHPGAyTEds/2/EpXCPmRbnqPl67NrA1xDTI8GX8sy+yg4DHMPrT7IjQwZfzg==
"@nestjs/testing@6.10.12":
version "6.10.12"
resolved "https://registry.yarnpkg.com/@nestjs/testing/-/testing-6.10.12.tgz#146c90bf0c8b2575e5eaa11ec86007147a063cf9"
integrity sha512-dHCv7FFnZingfUeuQKO3ybbC/kWyrP2W/i7SSn04+ntvKDzd9cysNgEH1YCdUjH2HeADy21H0CK2Q8OqY8ROTA==
dependencies:
optional "0.1.4"

"@nestjs/websockets@6.9.0":
version "6.9.0"
resolved "https://registry.yarnpkg.com/@nestjs/websockets/-/websockets-6.9.0.tgz#9ad17a34e3a3405ba4fbda49ab7e118c90ce2c44"
integrity sha512-6dsB4V2ovdDFSF80wTDb6C28KJ6gNpbag0q/e4rReFc+uIOPvm1PVFQt6Q2g/0OZfhnwVXjBGRNgtCF0pXDZCQ==
"@nestjs/websockets@6.10.12":
version "6.10.12"
resolved "https://registry.yarnpkg.com/@nestjs/websockets/-/websockets-6.10.12.tgz#18b2171d193f522cb3457a52d8f26ec1adf084c0"
integrity sha512-E9CmcuEB/3xZym9uXAINHUdCwGkpJEXPwWVxWJ9kxKxlDPU+mM25ox1HL+hnAjX70GjIe6PLjf/8V8SXCCiPdg==
dependencies:
iterare "1.2.0"

Expand Down Expand Up @@ -3473,10 +3473,10 @@ object-copy@^0.1.0:
define-property "^0.2.5"
kind-of "^3.0.3"

object-hash@2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/object-hash/-/object-hash-2.0.0.tgz#7c4cc341eb8b53367312a7c546142f00c9e0ea20"
integrity sha512-I7zGBH0rDKwVGeGZpZoFaDhIwvJa3l1CZE+8VchylXbInNiCj7sxxea9P5dTM4ftKR5//nrqxrdeGSTWL2VpBA==
object-hash@2.0.1:
version "2.0.1"
resolved "https://registry.yarnpkg.com/object-hash/-/object-hash-2.0.1.tgz#cef18a0c940cc60aa27965ecf49b782cbf101d96"
integrity sha512-HgcGMooY4JC2PBt9sdUdJ6PMzpin+YtY3r/7wg0uTifP+HJWW8rammseSEHuyt0UeShI183UGssCJqm1bJR7QA==

object-keys@^1.0.12:
version "1.1.0"
Expand Down Expand Up @@ -4042,10 +4042,10 @@ rsvp@^4.8.4:
resolved "https://registry.yarnpkg.com/rsvp/-/rsvp-4.8.4.tgz#b50e6b34583f3dd89329a2f23a8a2be072845911"
integrity sha512-6FomvYPfs+Jy9TfXmBpBuMWNH94SgCsZmJKcanySzgNNP6LjWxBvyLTa9KaMfDDM5oxRfrKDB0r/qeRsLwnBfA==

rxjs@6.4.0:
version "6.4.0"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-6.4.0.tgz#f3bb0fe7bda7fb69deac0c16f17b50b0b8790504"
integrity sha512-Z9Yfa11F6B9Sg/BK9MnqnQ+aQYicPLtilXBp2yUtDt2JRCE0h26d33EnfO3ZxoNxG0T92OUucP3Ct7cpfkdFfw==
rxjs@6.5.3:
version "6.5.3"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-6.5.3.tgz#510e26317f4db91a7eb1de77d9dd9ba0a4899a3a"
integrity sha512-wuYsAYYFdWTAnAaPoKGNhfpWwKZbJW+HgAJ+mImp+Epl7BG8oNWBCTyRM8gba9k4lk8BgWdoYm21Mo/RYhhbgA==
dependencies:
tslib "^1.9.0"

Expand Down
12 changes: 10 additions & 2 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ export class AmqpConnection {
msgOptions.queueOptions || undefined
);

await this.channel.bindQueue(queue, exchange, routingKey);
const routingKeys = Array.isArray(routingKey) ? routingKey : [routingKey];

await Promise.all(
routingKeys.map(x => this.channel.bindQueue(queue, exchange, x))
);

await this.channel.consume(queue, async msg => {
try {
Expand Down Expand Up @@ -166,7 +170,11 @@ export class AmqpConnection {
rpcOptions.queueOptions || undefined
);

await this.channel.bindQueue(queue, exchange, routingKey);
const routingKeys = Array.isArray(routingKey) ? routingKey : [routingKey];

await Promise.all(
routingKeys.map(x => this.channel.bindQueue(queue, exchange, x))
);

await this.channel.consume(queue, async msg => {
try {
Expand Down
2 changes: 1 addition & 1 deletion packages/rabbitmq/src/rabbitmq.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export enum MessageHandlerErrorBehavior {

export interface MessageHandlerOptions {
exchange: string;
routingKey: string;
routingKey: string | string[];
queue?: string;
queueOptions?: QueueOptions;
errorBehavior?: MessageHandlerErrorBehavior;
Expand Down

0 comments on commit dd131fe

Please sign in to comment.