Skip to content

Commit

Permalink
Fix issue #26 and #55 and add missing fields in type definition + quo…
Browse files Browse the repository at this point in the history
…rum support (#56)

* chore(package.json): bump version

* feat(index.d.ts): Add `messageTtl`, `passive` (existing fields) and `type` (x-queue-type, for quorum support) + protocol definition for #26

* chore(docs): update docs t reflect changes

* fix(integration): proper protocol (#26) and fix url encode error (#55)

* chore(spec): update connection and queue options with new protocol and queue type field

* refactor(connection.js): use `startsWith` instead of equal, just in case

* feat(index.d.ts): Add `autoDelete` key to QueueOptions definition (it is documented and shown in examples)

* feat(queue.js): Add proper support for quorum queues and silently omit incompatible fields

* feat(queue.spec.js): Add test case for quorum queue type and check if call is valid

* refactor(queue.js): Use ternary operation instead for omition to improve readability

* feat(queue.spec.js): Add behavior test case for quorum queue type (not set)

* chore(topology.md): update docs with queue type and classic queue deprecation warning

* feat(connection.js): make sure `protocol` field is found and backwards compatible (#26)

* docs(topology.md): Added known workaround for issue #23 using async/await example with explanation

* feat(queue.js): add dead letter strategy argument (supports at least once since v3.10 for quorum) + update typing to reflect it

* test(queue.spec.js): Check behavior for `x-dead-letter-strategy` and validate it is correct (only for quorum)

* docs(topology.md): Add `deadLetterStrategy` and `overflow` to `addQueue` parameter description + explanation of choices

* docs(topology.md): refactor example and formatting recent addition to fit norm

* docs(topology.md): shorten example

* docs(topology.md): update configure example to properly use promises

* docs: finalized toplogy example and changes all outdated `var` variable to `const`

* refactor(connection.js): Avoid duplicate error messages, use constant

* test(queue.spec.js): Fix value for `x-dead-letter-strategy` and add similar test case for default options.type "classic"

* feat(queue.js): Add queue version upport for classic queues + behavior test and doc entry

* fix: autodelete queue in test has to be classic

* chore: revert change a16ba0f (bump version)

* fix(amqp/connection.js): decode URI credentials in `parseUri` to avoid double escaping

---------

Co-authored-by: Martial Habimana <martial@ringring.be>
  • Loading branch information
MartianH and Martial Habimana committed Apr 9, 2024
1 parent 98e631a commit e503616
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 30 deletions.
4 changes: 2 additions & 2 deletions docs/connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Options is a hash that can contain the following:
| **host** | the IP address or DNS name of the RabbitMQ server. | `"localhost"` |
| **port** | the TCP/IP port on which RabbitMQ is listening. | `5672` |
| **vhost** | the named vhost to use in RabbitMQ. | `"%2f"` ~ `"/"` |
| **protocol** | the connection protocol to use. | `"amqp://"` |
| **protocol** | the connection protocol to use. | `"amqp"` |
| **user** | the username used for authentication / authorization with this connection | `"guest"` |
| **pass** | the password for the specified user. | `"guest"` |
| **timeout** | how long to wait for a connection to be established in milliseconds. | `2000` |
Expand Down Expand Up @@ -147,7 +147,7 @@ Your goal should be building systems resilient to failures (by allowing them to

```js
// How to create a zombie
var rabbit = require( "foo-foo-mq" );
const rabbit = require( "foo-foo-mq" );

rabbit.on( "unreachable", function() {
rabbit.retry();
Expand Down
8 changes: 4 additions & 4 deletions docs/receiving.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ If using the first form, the options hash can contain the following properties,
In this example, any possible error is caught in an explicit try/catch:

```javascript
var handler = rabbit.handle( "company.project.messages.logEntry", function( message ) {
const handler = rabbit.handle( "company.project.messages.logEntry", function( message ) {
try {
// do something meaningful?
console.log( message.body );
Expand All @@ -67,7 +67,7 @@ This example shows how to have foo-foo-mq wrap all handlers with a try catch tha
// that nacks the message on an error
rabbit.nackOnError();

var handler = rabbit.handle( "company.project.messages.logEntry", function( message ) {
const handler = rabbit.handle( "company.project.messages.logEntry", function( message ) {
console.log( message.body );
message.ack();
} );
Expand All @@ -83,7 +83,7 @@ rabbit.ignoreHandlerErrors();
Provide a strategy for handling errors to multiple handles or attach an error handler after the fact.

```javascript
var handler = rabbit.handle( "company.project.messages.logEntry", function( message ) {
const handler = rabbit.handle( "company.project.messages.logEntry", function( message ) {
console.log( message.body );
message.ack();
} );
Expand Down Expand Up @@ -299,7 +299,7 @@ the encoding is passed in case the message was produced by another library using
### `rabbit.addSerializer( contentType, serializer )`

```javascript
var yaml = require( "js-yaml" );
const yaml = require( "js-yaml" );

rabbit.addSerializer( "application/yaml", {
deserialize: function( bytes, encoding ) {
Expand Down
65 changes: 59 additions & 6 deletions docs/topology.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ If a disconnect takes place, foo-foo-mq will attempt to re-establish the connect
This example shows most of the available options described above.
```javascript
var settings = {
const settings = {
connection: {
user: "guest",
pass: "guest",
Expand Down Expand Up @@ -38,12 +38,44 @@ This example shows most of the available options described above.
```

To establish a connection with all settings in place and ready to go call configure:

```javascript
var rabbit = require( "foo-foo-mq" );
const rabbit = require( "foo-foo-mq" );

rabbit.configure( settings )
.then(() => {
// ready to go!
}).catch((err) => console.error(err))
```

rabbit.configure( settings ).done( function() {
// ready to go!
} );
### Caveat

It can happen that configure is called before RabbitMQ is running ([#23](https://github.com/Foo-Foo-MQ/foo-foo-mq/issues/23#issuecomment-921194756)), which can cause a consumer to be connected without subscribers.
Fortunately, it is easy to handle using simple retry logic.

```javascript
const rabbit = require( "foo-foo-mq" );
const { setTimeout } = require( "timers/promises" );

async function tryConfigure(
settings,
opts
) {
const retries = opts.retries || 10;
try {
await rabbit.configure(settings);
} catch (error) {
if (error === 'No endpoints could be reached' && retries > 0) {
if (opts.defer) await setTimeout(opts.defer);
await rabbit.shutdown();
await rabbit.reset();
await this.tryConfigure(settings, { ...opts, retries: retries - 1 });
} else {
throw error;
}
}
return rabbit;
}
```

## `rabbit.addExchange( exchangeName, exchangeType, [options], [connectionName] )`
Expand Down Expand Up @@ -74,8 +106,13 @@ The call returns a promise that can be used to determine when the queue has been

Options is a hash that can contain the following:

> **Warning:** Classic Mirrored Queues [are deprecated](https://www.rabbitmq.com/blog/2021/08/21/4.0-deprecation-announcements) and will no longer be supported [after v3.13](https://www.rabbitmq.com/blog/2024/03/11/rabbitmq-3.13.0-announcement#thats-a-wrap-for-3x).
>
> For quorum queues, [unsupported options](https://www.rabbitmq.com/docs/quorum-queues#feature-matrix) (`exclusive`, `autoDelete`, `maxPriority`) given to quorum queues will be *silently ignored*.
| option | type | description | default |
|--:|:-:|:--|:-:|
| **type** | string | Set the queue type to either `classic` or `quorum` | `classic` |
| **autoDelete** | boolean | delete when consumer count goes to 0 | |
| **durable** | boolean | survive broker restarts | false |
| **exclusive** | boolean | limits queue to the current connection only (danger) | false |
Expand All @@ -85,15 +122,31 @@ Options is a hash that can contain the following:
| **noBatch** | boolean | causes ack, nack & reject to take place immediately | false |
| **noCacheKeys** | boolean | disable cache of matched routing keys to prevent unbounded memory growth | false |
| **queueLimit** | 2^32 |max number of ready messages a queue can hold | |
| **queueVersion** | `1`, `2` | Sets queue version for classic queues original (CQv1) and new (CQv2) | |
| **overflow** | `drop-head`, `reject-publish` | Behavior when queue limit is reached, defaults to `drop-head`(discard oldest) | |
| **messageTtl** | 2^32 |time in ms before a message expires on the queue | |
| **expires** | 2^32 |time in ms before a queue with 0 consumers expires | |
| **deadLetter** | string | the exchange to dead-letter messages to | |
| **deadLetterRoutingKey** | string | the routing key to add to a dead-lettered message
| **deadLetterStrategy** | `at-least-once`, `at-most-once` | the dead letter strategy, defaults to `at-most-once`.
| **maxPriority** | 2^8 | the highest priority this queue supports | |
| **unique** | `"hash", `"id", "consistent"` | creates a unique queue name by including the client id or hash in the name | |
| **unique** | `hash`, `id`, `consistent` | creates a unique queue name by including the client id or hash in the name | |
| **poison** | boolean | indicates that this queue is specifically for poison / rejected messages| false |
| **passive** | boolean | when `true` will not create the queueName specified | false |

### deadLetterStrategy

* `at-least-once` - enables [At-Least-Once Dead Lettering
](https://www.rabbitmq.com/blog/2022/03/29/at-least-once-dead-lettering) (available since v3.10) for quorum queues, provided that:
* the [feature flag](https://www.rabbitmq.com/docs/feature-flags) `stream_queue` is enabled
* Option `overflow` is set to `reject-publish`
* `at-most-once` - the default strategy and backwards-compatible behavior

### overflow

* `drop-head` - default behavior, drops or dead-letter messages from the front of the queue (i.e. the oldest messages in the queue)
* `reject-publish` - the most recently published messages will be discarded. In addition, if publisher confirms are enabled, the publisher will be informed of the reject via a basic.nack message

### unique

The unique option has 3 different possible values, each with its own behavior:
Expand Down
146 changes: 146 additions & 0 deletions spec/behavior/queue.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,151 @@ describe('AMQP Queue', function () {
});
});
});

describe('when options.type is not set', function () {
it('sets `x-queue-type` to "classic"', () => {
const qType = 'classic';
options.queueLimit = 1000;
options.maxPriority = 100;
return ampqQueue(options, topology, serializers)
.then((instance) => {
return instance.define();
})
.then(() => {
amqpChannelMock.assertQueue.calledWith(
options.uniqueName,
{ ...options, arguments: { 'x-queue-type': qType } });
});
});
});

describe('when options.type is "classic"', function () {
it('sets `x-queue-type` to "classic" and respects deprecated fields', () => {
options.type = 'classic';
options.queueLimit = 1000;
options.autoDelete = 100;
options.maxPriority = 100;
return ampqQueue(options, topology, serializers)
.then((instance) => {
return instance.define();
})
.then(() => {
amqpChannelMock.assertQueue.calledWith(
options.uniqueName,
{
queueLimit: options.queueLimit,
arguments: { 'x-queue-type': options.type }
});
});
});

it('Omits `x-dead-letter-strategy` argument if given', () => {
options.type = 'classic';
options.queueLimit = 1000;
options.maxPriority = 100;
options.deadLetterStrategy = 'at-least-once';
return ampqQueue(options, topology, serializers)
.then((instance) => {
return instance.define();
})
.then(() => {
amqpChannelMock.assertQueue.calledWith(
options.uniqueName,
{
...options,
arguments: {
'x-queue-type': options.type
}
});
});
});

it('Sets `x-queue-version` argument if given', () => {
options.type = 'classic';
options.queueLimit = 1000;
options.queueVersion = 2;
options.maxPriority = 100;
return ampqQueue(options, topology, serializers)
.then((instance) => {
return instance.define();
})
.then(() => {
amqpChannelMock.assertQueue.calledWith(
options.uniqueName,
{
...options,
arguments: {
'x-queue-type': options.type,
'x-queue-version': options.queueVersion
}
});
});
});
});

describe('when options.type is "quorum"', function () {
it('sets `x-queue-type` to "quorum" and omits incompatible fields', () => {
options.type = 'quorum';
options.queueLimit = 1000;
options.autoDelete = true;
options.maxPriority = 100;
return ampqQueue(options, topology, serializers)
.then((instance) => {
return instance.define();
})
.then(() => {
amqpChannelMock.assertQueue.calledWith(
options.uniqueName,
{
queueLimit: options.queueLimit,
arguments: { 'x-queue-type': options.type }
});
});
});

it('sets `x-dead-letter-strategy` argument if given', () => {
options.type = 'quorum';
options.queueLimit = 1000;
options.autoDelete = true;
options.maxPriority = 100;
options.deadLetterStrategy = 'at-least-once';
return ampqQueue(options, topology, serializers)
.then((instance) => {
return instance.define();
})
.then(() => {
amqpChannelMock.assertQueue.calledWith(
options.uniqueName,
{
queueLimit: options.queueLimit,
arguments: {
'x-queue-type': options.type,
'x-dead-letter-strategy': options.deadLetterStrategy
}
});
});
});

it('Omits `x-queue-version` argument if given', () => {
options.type = 'classic';
options.queueLimit = 1000;
options.queueVersion = 2;
options.maxPriority = 100;
return ampqQueue(options, topology, serializers)
.then((instance) => {
return instance.define();
})
.then(() => {
amqpChannelMock.assertQueue.calledWith(
options.uniqueName,
{
...options,
arguments: {
'x-queue-type': options.type
}
});
});
});
});
});
});
1 change: 1 addition & 0 deletions spec/integration/configuration.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module.exports = {
connection: {
protocol: 'amqp',
name: 'default',
user: 'guest',
pass: 'guest',
Expand Down
6 changes: 4 additions & 2 deletions spec/integration/typeSpecific.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ describe('Type Handling On Any Queue', function () {
name: 'rabbot-q.topic-1',
autoDelete: true,
subscribe: true,
deadletter: 'rabbot-ex.deadletter'
deadletter: 'rabbot-ex.deadletter',
type: 'classic'
},
{
name: 'rabbot-q.topic-2',
autoDelete: true,
subscribe: true,
deadletter: 'rabbot-ex.deadletter'
deadletter: 'rabbot-ex.deadletter',
type: 'classic'
}
],
bindings: [
Expand Down
Loading

0 comments on commit e503616

Please sign in to comment.