From e503616f24fe43ab5296b015bbc08e1264041561 Mon Sep 17 00:00:00 2001 From: MartianH Date: Tue, 9 Apr 2024 07:25:21 +0200 Subject: [PATCH] Fix issue #26 and #55 and add missing fields in type definition + quorum support (#56) * chore(package.json): bump version * feat(index.d.ts): Add `messageTtl`, `passive` (existing fields) and `type` (x-queue-type, for quorum support) + protocol definition for #26 * chore(docs): update docs t reflect changes * fix(integration): proper protocol (#26) and fix url encode error (#55) * chore(spec): update connection and queue options with new protocol and queue type field * refactor(connection.js): use `startsWith` instead of equal, just in case * feat(index.d.ts): Add `autoDelete` key to QueueOptions definition (it is documented and shown in examples) * feat(queue.js): Add proper support for quorum queues and silently omit incompatible fields * feat(queue.spec.js): Add test case for quorum queue type and check if call is valid * refactor(queue.js): Use ternary operation instead for omition to improve readability * feat(queue.spec.js): Add behavior test case for quorum queue type (not set) * chore(topology.md): update docs with queue type and classic queue deprecation warning * feat(connection.js): make sure `protocol` field is found and backwards compatible (#26) * docs(topology.md): Added known workaround for issue #23 using async/await example with explanation * feat(queue.js): add dead letter strategy argument (supports at least once since v3.10 for quorum) + update typing to reflect it * test(queue.spec.js): Check behavior for `x-dead-letter-strategy` and validate it is correct (only for quorum) * docs(topology.md): Add `deadLetterStrategy` and `overflow` to `addQueue` parameter description + explanation of choices * docs(topology.md): refactor example and formatting recent addition to fit norm * docs(topology.md): shorten example * docs(topology.md): update configure example to properly use promises * docs: finalized toplogy example and changes all outdated `var` variable to `const` * refactor(connection.js): Avoid duplicate error messages, use constant * test(queue.spec.js): Fix value for `x-dead-letter-strategy` and add similar test case for default options.type "classic" * feat(queue.js): Add queue version upport for classic queues + behavior test and doc entry * fix: autodelete queue in test has to be classic * chore: revert change a16ba0f91387cea8fc0a142a144eb6f4ce66b73a (bump version) * fix(amqp/connection.js): decode URI credentials in `parseUri` to avoid double escaping --------- Co-authored-by: Martial Habimana --- docs/connections.md | 4 +- docs/receiving.md | 8 +- docs/topology.md | 65 ++++++++++-- spec/behavior/queue.spec.js | 146 ++++++++++++++++++++++++++ spec/integration/configuration.js | 1 + spec/integration/typeSpecific.spec.js | 6 +- src/amqp/connection.js | 24 ++--- src/amqp/queue.js | 24 ++++- src/index.d.ts | 10 +- 9 files changed, 258 insertions(+), 30 deletions(-) diff --git a/docs/connections.md b/docs/connections.md index 68b2690..3c950a9 100644 --- a/docs/connections.md +++ b/docs/connections.md @@ -20,7 +20,7 @@ Options is a hash that can contain the following: | **host** | the IP address or DNS name of the RabbitMQ server. | `"localhost"` | | **port** | the TCP/IP port on which RabbitMQ is listening. | `5672` | | **vhost** | the named vhost to use in RabbitMQ. | `"%2f"` ~ `"/"` | -| **protocol** | the connection protocol to use. | `"amqp://"` | +| **protocol** | the connection protocol to use. | `"amqp"` | | **user** | the username used for authentication / authorization with this connection | `"guest"` | | **pass** | the password for the specified user. | `"guest"` | | **timeout** | how long to wait for a connection to be established in milliseconds. | `2000` | @@ -147,7 +147,7 @@ Your goal should be building systems resilient to failures (by allowing them to ```js // How to create a zombie -var rabbit = require( "foo-foo-mq" ); +const rabbit = require( "foo-foo-mq" ); rabbit.on( "unreachable", function() { rabbit.retry(); diff --git a/docs/receiving.md b/docs/receiving.md index b3e8e9c..3e716c8 100644 --- a/docs/receiving.md +++ b/docs/receiving.md @@ -42,7 +42,7 @@ If using the first form, the options hash can contain the following properties, In this example, any possible error is caught in an explicit try/catch: ```javascript -var handler = rabbit.handle( "company.project.messages.logEntry", function( message ) { +const handler = rabbit.handle( "company.project.messages.logEntry", function( message ) { try { // do something meaningful? console.log( message.body ); @@ -67,7 +67,7 @@ This example shows how to have foo-foo-mq wrap all handlers with a try catch tha // that nacks the message on an error rabbit.nackOnError(); -var handler = rabbit.handle( "company.project.messages.logEntry", function( message ) { +const handler = rabbit.handle( "company.project.messages.logEntry", function( message ) { console.log( message.body ); message.ack(); } ); @@ -83,7 +83,7 @@ rabbit.ignoreHandlerErrors(); Provide a strategy for handling errors to multiple handles or attach an error handler after the fact. ```javascript -var handler = rabbit.handle( "company.project.messages.logEntry", function( message ) { +const handler = rabbit.handle( "company.project.messages.logEntry", function( message ) { console.log( message.body ); message.ack(); } ); @@ -299,7 +299,7 @@ the encoding is passed in case the message was produced by another library using ### `rabbit.addSerializer( contentType, serializer )` ```javascript -var yaml = require( "js-yaml" ); +const yaml = require( "js-yaml" ); rabbit.addSerializer( "application/yaml", { deserialize: function( bytes, encoding ) { diff --git a/docs/topology.md b/docs/topology.md index d9578ac..e3533c3 100644 --- a/docs/topology.md +++ b/docs/topology.md @@ -10,7 +10,7 @@ If a disconnect takes place, foo-foo-mq will attempt to re-establish the connect This example shows most of the available options described above. ```javascript - var settings = { + const settings = { connection: { user: "guest", pass: "guest", @@ -38,12 +38,44 @@ This example shows most of the available options described above. ``` To establish a connection with all settings in place and ready to go call configure: + ```javascript - var rabbit = require( "foo-foo-mq" ); + const rabbit = require( "foo-foo-mq" ); + + rabbit.configure( settings ) + .then(() => { + // ready to go! + }).catch((err) => console.error(err)) +``` - rabbit.configure( settings ).done( function() { - // ready to go! - } ); +### Caveat + +It can happen that configure is called before RabbitMQ is running ([#23](https://github.com/Foo-Foo-MQ/foo-foo-mq/issues/23#issuecomment-921194756)), which can cause a consumer to be connected without subscribers. +Fortunately, it is easy to handle using simple retry logic. + +```javascript +const rabbit = require( "foo-foo-mq" ); +const { setTimeout } = require( "timers/promises" ); + +async function tryConfigure( + settings, + opts +) { + const retries = opts.retries || 10; + try { + await rabbit.configure(settings); + } catch (error) { + if (error === 'No endpoints could be reached' && retries > 0) { + if (opts.defer) await setTimeout(opts.defer); + await rabbit.shutdown(); + await rabbit.reset(); + await this.tryConfigure(settings, { ...opts, retries: retries - 1 }); + } else { + throw error; + } + } + return rabbit; +} ``` ## `rabbit.addExchange( exchangeName, exchangeType, [options], [connectionName] )` @@ -74,8 +106,13 @@ The call returns a promise that can be used to determine when the queue has been Options is a hash that can contain the following: +> **Warning:** Classic Mirrored Queues [are deprecated](https://www.rabbitmq.com/blog/2021/08/21/4.0-deprecation-announcements) and will no longer be supported [after v3.13](https://www.rabbitmq.com/blog/2024/03/11/rabbitmq-3.13.0-announcement#thats-a-wrap-for-3x). +> +> For quorum queues, [unsupported options](https://www.rabbitmq.com/docs/quorum-queues#feature-matrix) (`exclusive`, `autoDelete`, `maxPriority`) given to quorum queues will be *silently ignored*. + | option | type | description | default | |--:|:-:|:--|:-:| +| **type** | string | Set the queue type to either `classic` or `quorum` | `classic` | | **autoDelete** | boolean | delete when consumer count goes to 0 | | | **durable** | boolean | survive broker restarts | false | | **exclusive** | boolean | limits queue to the current connection only (danger) | false | @@ -85,15 +122,31 @@ Options is a hash that can contain the following: | **noBatch** | boolean | causes ack, nack & reject to take place immediately | false | | **noCacheKeys** | boolean | disable cache of matched routing keys to prevent unbounded memory growth | false | | **queueLimit** | 2^32 |max number of ready messages a queue can hold | | +| **queueVersion** | `1`, `2` | Sets queue version for classic queues original (CQv1) and new (CQv2) | | +| **overflow** | `drop-head`, `reject-publish` | Behavior when queue limit is reached, defaults to `drop-head`(discard oldest) | | | **messageTtl** | 2^32 |time in ms before a message expires on the queue | | | **expires** | 2^32 |time in ms before a queue with 0 consumers expires | | | **deadLetter** | string | the exchange to dead-letter messages to | | | **deadLetterRoutingKey** | string | the routing key to add to a dead-lettered message +| **deadLetterStrategy** | `at-least-once`, `at-most-once` | the dead letter strategy, defaults to `at-most-once`. | **maxPriority** | 2^8 | the highest priority this queue supports | | -| **unique** | `"hash", `"id", "consistent"` | creates a unique queue name by including the client id or hash in the name | | +| **unique** | `hash`, `id`, `consistent` | creates a unique queue name by including the client id or hash in the name | | | **poison** | boolean | indicates that this queue is specifically for poison / rejected messages| false | | **passive** | boolean | when `true` will not create the queueName specified | false | +### deadLetterStrategy + + * `at-least-once` - enables [At-Least-Once Dead Lettering +](https://www.rabbitmq.com/blog/2022/03/29/at-least-once-dead-lettering) (available since v3.10) for quorum queues, provided that: + * the [feature flag](https://www.rabbitmq.com/docs/feature-flags) `stream_queue` is enabled + * Option `overflow` is set to `reject-publish` + * `at-most-once` - the default strategy and backwards-compatible behavior + +### overflow + + * `drop-head` - default behavior, drops or dead-letter messages from the front of the queue (i.e. the oldest messages in the queue) + * `reject-publish` - the most recently published messages will be discarded. In addition, if publisher confirms are enabled, the publisher will be informed of the reject via a basic.nack message + ### unique The unique option has 3 different possible values, each with its own behavior: diff --git a/spec/behavior/queue.spec.js b/spec/behavior/queue.spec.js index 044f646..5c80a88 100644 --- a/spec/behavior/queue.spec.js +++ b/spec/behavior/queue.spec.js @@ -52,5 +52,151 @@ describe('AMQP Queue', function () { }); }); }); + + describe('when options.type is not set', function () { + it('sets `x-queue-type` to "classic"', () => { + const qType = 'classic'; + options.queueLimit = 1000; + options.maxPriority = 100; + return ampqQueue(options, topology, serializers) + .then((instance) => { + return instance.define(); + }) + .then(() => { + amqpChannelMock.assertQueue.calledWith( + options.uniqueName, + { ...options, arguments: { 'x-queue-type': qType } }); + }); + }); + }); + + describe('when options.type is "classic"', function () { + it('sets `x-queue-type` to "classic" and respects deprecated fields', () => { + options.type = 'classic'; + options.queueLimit = 1000; + options.autoDelete = 100; + options.maxPriority = 100; + return ampqQueue(options, topology, serializers) + .then((instance) => { + return instance.define(); + }) + .then(() => { + amqpChannelMock.assertQueue.calledWith( + options.uniqueName, + { + queueLimit: options.queueLimit, + arguments: { 'x-queue-type': options.type } + }); + }); + }); + + it('Omits `x-dead-letter-strategy` argument if given', () => { + options.type = 'classic'; + options.queueLimit = 1000; + options.maxPriority = 100; + options.deadLetterStrategy = 'at-least-once'; + return ampqQueue(options, topology, serializers) + .then((instance) => { + return instance.define(); + }) + .then(() => { + amqpChannelMock.assertQueue.calledWith( + options.uniqueName, + { + ...options, + arguments: { + 'x-queue-type': options.type + } + }); + }); + }); + + it('Sets `x-queue-version` argument if given', () => { + options.type = 'classic'; + options.queueLimit = 1000; + options.queueVersion = 2; + options.maxPriority = 100; + return ampqQueue(options, topology, serializers) + .then((instance) => { + return instance.define(); + }) + .then(() => { + amqpChannelMock.assertQueue.calledWith( + options.uniqueName, + { + ...options, + arguments: { + 'x-queue-type': options.type, + 'x-queue-version': options.queueVersion + } + }); + }); + }); + }); + + describe('when options.type is "quorum"', function () { + it('sets `x-queue-type` to "quorum" and omits incompatible fields', () => { + options.type = 'quorum'; + options.queueLimit = 1000; + options.autoDelete = true; + options.maxPriority = 100; + return ampqQueue(options, topology, serializers) + .then((instance) => { + return instance.define(); + }) + .then(() => { + amqpChannelMock.assertQueue.calledWith( + options.uniqueName, + { + queueLimit: options.queueLimit, + arguments: { 'x-queue-type': options.type } + }); + }); + }); + + it('sets `x-dead-letter-strategy` argument if given', () => { + options.type = 'quorum'; + options.queueLimit = 1000; + options.autoDelete = true; + options.maxPriority = 100; + options.deadLetterStrategy = 'at-least-once'; + return ampqQueue(options, topology, serializers) + .then((instance) => { + return instance.define(); + }) + .then(() => { + amqpChannelMock.assertQueue.calledWith( + options.uniqueName, + { + queueLimit: options.queueLimit, + arguments: { + 'x-queue-type': options.type, + 'x-dead-letter-strategy': options.deadLetterStrategy + } + }); + }); + }); + + it('Omits `x-queue-version` argument if given', () => { + options.type = 'classic'; + options.queueLimit = 1000; + options.queueVersion = 2; + options.maxPriority = 100; + return ampqQueue(options, topology, serializers) + .then((instance) => { + return instance.define(); + }) + .then(() => { + amqpChannelMock.assertQueue.calledWith( + options.uniqueName, + { + ...options, + arguments: { + 'x-queue-type': options.type + } + }); + }); + }); + }); }); }); diff --git a/spec/integration/configuration.js b/spec/integration/configuration.js index c4a45c6..3e2ac1b 100644 --- a/spec/integration/configuration.js +++ b/spec/integration/configuration.js @@ -1,5 +1,6 @@ module.exports = { connection: { + protocol: 'amqp', name: 'default', user: 'guest', pass: 'guest', diff --git a/spec/integration/typeSpecific.spec.js b/spec/integration/typeSpecific.spec.js index f61d676..c6a2c8e 100644 --- a/spec/integration/typeSpecific.spec.js +++ b/spec/integration/typeSpecific.spec.js @@ -24,13 +24,15 @@ describe('Type Handling On Any Queue', function () { name: 'rabbot-q.topic-1', autoDelete: true, subscribe: true, - deadletter: 'rabbot-ex.deadletter' + deadletter: 'rabbot-ex.deadletter', + type: 'classic' }, { name: 'rabbot-q.topic-2', autoDelete: true, subscribe: true, - deadletter: 'rabbot-ex.deadletter' + deadletter: 'rabbot-ex.deadletter', + type: 'classic' } ], bindings: [ diff --git a/src/amqp/connection.js b/src/amqp/connection.js index 9eeb62e..c9c7790 100644 --- a/src/amqp/connection.js +++ b/src/amqp/connection.js @@ -48,9 +48,7 @@ function getOption (opts, key, alt) { } function getUri (protocol, user, pass, server, port, vhost, heartbeat) { - return protocol + user + ':' + pass + - '@' + server + ':' + port + '/' + vhost + - '?heartbeat=' + heartbeat; + return `${protocol}://${user}:${pass}@${server}:${port}/${vhost}?heartbeat=${heartbeat}`; } function max (x, y) { @@ -62,9 +60,9 @@ function parseUri (uri) { const parsed = new url.URL(uri); const heartbeat = parsed.searchParams.get('heartbeat'); return { - useSSL: parsed.protocol === 'amqps:', - user: parsed.username, - pass: parsed.password, + useSSL: parsed.protocol.startsWith('amqps'), + user: decodeURIComponent(parsed.username), + pass: decodeURIComponent(parsed.password), host: parsed.hostname, port: parsed.port, vhost: parsed.pathname ? parsed.pathname.slice(1) : undefined, @@ -118,7 +116,7 @@ const Adapter = function (parameters) { const pfxPath = getOption(parameters, 'RABBIT_PFX') || getOption(parameters, 'pfxPath'); const useSSL = certPath || keyPath || passphrase || caPaths || pfxPath || parameters.useSSL; const portList = getOption(parameters, 'RABBIT_PORT') || getOption(parameters, 'port', (useSSL ? 5671 : 5672)); - this.protocol = getOption(parameters, 'RABBIT_PROTOCOL') || (useSSL ? 'amqps://' : 'amqp://'); + this.protocol = getOption(parameters, 'RABBIT_PROTOCOL') || getOption(parameters, 'protocol', undefined)?.replace(/:\/\/$/, '') || (useSSL ? 'amqps' : 'amqp'); this.ports = split(portList); this.options = { noDelay: true }; @@ -151,9 +149,10 @@ const Adapter = function (parameters) { Adapter.prototype.connect = function () { return new Promise(function (resolve, reject) { + const unreachable = 'No endpoints could be reached'; const attempted = []; const attempt = function () { - const nextUri = this.getNextUri(); + const [nextUri, serverHostname] = this.getNextUri(); log.info("Attempting connection to '%s' (%s)", this.name, nextUri); function onConnection (connection) { connection.uri = nextUri; @@ -168,16 +167,15 @@ Adapter.prototype.connect = function () { attempt(err); } else { log.info('Cannot connect to `%s` - all endpoints failed', this.name); - reject('No endpoints could be reached'); + reject(unreachable); } } if (attempted.indexOf(nextUri) < 0) { - const serverHostname = new url.URL(nextUri).hostname; amqp.connect(nextUri, Object.assign({ servername: serverHostname }, this.options)) .then(onConnection.bind(this), onConnectionError.bind(this)); } else { log.info('Cannot connect to `%s` - all endpoints failed', this.name); - reject('No endpoints could be reached'); + reject(unreachable); } }.bind(this); attempt(); @@ -195,8 +193,8 @@ Adapter.prototype.bumpIndex = function () { Adapter.prototype.getNextUri = function () { const server = this.getNext(this.servers); const port = this.getNext(this.ports); - const uri = getUri(this.protocol, this.user, escape(this.pass), server, port, this.vhost, this.heartbeat); - return uri; + const uri = getUri(this.protocol, encodeURIComponent(this.user), encodeURIComponent(this.pass), server, port, this.vhost, this.heartbeat); + return [uri, server]; }; Adapter.prototype.getNext = function (list) { diff --git a/src/amqp/queue.js b/src/amqp/queue.js index 8d09e92..5e4fd27 100644 --- a/src/amqp/queue.js +++ b/src/amqp/queue.js @@ -38,19 +38,39 @@ function aliasOptions (options, aliases, ...omit) { }, {}); } +function argOptions (options) { + const queueType = options.type || 'classic'; + const args = { + 'x-queue-type': queueType + }; + if (queueType === 'quorum' && options.deadLetterStrategy) { + args['x-dead-letter-strategy'] = options.deadLetterStrategy; + } else if (queueType === 'classic' && options.queueVersion) { + args['x-queue-version'] = options.queueVersion; + } + return args; +} + function define (channel, options, subscriber, connectionName) { + // Quorum queues dropped support for message prioritiy, exclusivity and non-durable queues + // See: https://www.rabbitmq.com/docs/quorum-queues#feature-matrix + const quorumIncompatible = ['exclusive', 'autoDelete', 'maxPriority']; + const optsFields = ['subscribe', 'limit', 'noBatch', 'unique', 'type', 'queueVersion']; + const omition = options.type === 'quorum' ? [...optsFields, ...quorumIncompatible] : optsFields; + const valid = aliasOptions(options, { queuelimit: 'maxLength', queueLimit: 'maxLength', deadletter: 'deadLetterExchange', deadLetter: 'deadLetterExchange', deadLetterRoutingKey: 'deadLetterRoutingKey' - }, 'subscribe', 'limit', 'noBatch', 'unique'); + }, ...omition); + valid.arguments = argOptions(options); + topLog.info("Declaring queue '%s' on connection '%s' with the options: %s", options.uniqueName, connectionName, JSON.stringify(options)); let queuePromise; - if (options.passive) { queuePromise = channel.checkQueue(options.uniqueName); } else { diff --git a/src/index.d.ts b/src/index.d.ts index 23ca707..7ef7797 100644 --- a/src/index.d.ts +++ b/src/index.d.ts @@ -198,7 +198,7 @@ declare namespace Broker { port?: number; server?: string | string[]; vhost?: string; - protocol?: string; + protocol?: "amqp" | "amqps"; user?: string; pass?: string; timeout?: number; @@ -230,8 +230,16 @@ declare namespace Broker { name: string; limit?: number; queueLimit?: number; + queueVersion?: 1 | 2; deadLetter?: string; + deadLetterRoutingKey?: string; + deadLetterStrategy?: "at-most-once" | "at-least-once"; subscribe?: boolean; + autoDelete?: boolean; + passive?: boolean; + messageTtl?: number; + type?: "classic" | "quorum"; + overflow?: "drop-head" | "reject-publish"; } export interface BindingOptions {