Skip to content

Commit

Permalink
Add hooks before and after certain operations. (#67)
Browse files Browse the repository at this point in the history
* Add hooks before and after certain operations.

This is with the intention of using these hooks for registering instrumentation wrappers for publish and subscribe flows.

* Test hooks.

* Documentation.

* 0.16.0

* run formatting

* Change some tests.

* Add documentation and change object to map.

* Hooks never throw, and instead the callbacks can return `false` in order to cancel certain operations.

* Clean `getCorrelationId`.

* Test and fix hook cancellation stuff.

* Remove unneeded `await`.

* Read 'logger' from it's own file instead directly from config.

* Freeze configuration object.

* Remove hooks from configuration object.
Since the configuration object changes every time the connection is constructed, it doesn't make sense to have the hooks there.

* Clean logger initialization.

* Allow wrapping the consumer callback inside the hook.

Required in order to pass instrumentation context into the callback.

* Remove flow interruption option through hooks.

* Initialize connection before invoking producer hooks.
This makes sure the `beforePublish` hook is never called before the connection hooks.

* Fix renaming hook I missed.

* Fix unregister hooks.

* Trigger after try-catch where applicable to simplify code.
  • Loading branch information
ramhr committed Mar 6, 2024
1 parent 452be98 commit 5fd183a
Show file tree
Hide file tree
Showing 17 changed files with 818 additions and 85 deletions.
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,42 @@ You can send publish commands with routing keys (thanks to @nekrasoft)
arnavmq.publish('queue:name', { message: 'content' }, { routingKey: 'my-routing-key' });
```

## Hooks

You can register callbacks to be invoked on certain events:

```javascript
const arnavmq = require('arnavmq')({
host: 'amqp://localhost',
// Can pass hooks directly on connection configuration.
hooks: {
connection: {
beforeConnect: () => {
/*...*/
},
},
},
});

arnavmq.hooks.connection.afterConnect(({ connection, config }) => {
console.log('Connected to ' + config.host);
});

arnavmq.hooks.producer.beforeProduce(({ properties /*... other parameters ...*/ }) => {
// Message properties and other options objects can be changed, for example to set a message id:
properties.messageId = randomUUID();
});

// Can register a single callback at a time or multiple callbacks at once.
arnavmq.hooks.consumer.afterProcessMessage([afterProcessCallback1, afterProcessCallback2]);
```

For full details of the available hooks and callback signatures, check the documentation on the files:

- [Connection](src/modules/hooks/connection_hooks.js)
- [Consumer](src/modules/hooks/consumer_hooks.js)
- [Producer](src/modules/hooks/producer_hooks.js)

## Config

You can specify a config object, properties and default values are:
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "arnavmq",
"version": "0.15.2",
"version": "0.16.0",
"description": "ArnavMQ is a RabbitMQ wrapper",
"keywords": [
"rabbitmq",
Expand Down Expand Up @@ -48,7 +48,7 @@
"mocha": "^10.0.0",
"nyc": "^15.1.0",
"prettier": "^3.0.0",
"sinon": "^17.0.0"
"sinon": "^17.0.1"
},
"engines": {
"node": ">=14"
Expand Down
17 changes: 7 additions & 10 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const uuid = require('uuid');
const utils = require('./modules/utils');
const connection = require('./modules/connection');
const { setLogger } = require('./modules/logger');

/* eslint global-require: "off" */
module.exports = (config) => {
Expand Down Expand Up @@ -30,15 +30,6 @@ module.exports = (config) => {
// generate a hostname so we can track this connection on the broker (rabbitmq management plugin)
hostname: process.env.HOSTNAME || process.env.USER || uuid.v4(),

/**
* A logger object with a log function for each of the log levels ("debug", "info", "warn", or "error").
* Each log function receives one parameter containing a log event with the following fields:
* * message - A string message describing the event. Always present.
* * error - An 'Error' object in case one is present.
* * params - An optional object containing extra parameters that can provide extra context for the event.
*/
logger: utils.emptyLogger,

...config,
};

Expand All @@ -47,5 +38,11 @@ module.exports = (config) => {
}

configuration.prefetch = parseInt(configuration.prefetch, 10) || 0;

setLogger(configuration.logger);
delete configuration.logger;

Object.freeze(configuration);

return require('./modules/arnavmq')(connection(configuration));
};
7 changes: 7 additions & 0 deletions src/modules/arnavmq.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ module.exports = (connection) => {
publish: instance.publish.bind(instance),
};

const hooks = {
connection: instance.connection.hooks,
consumer: instance.consumer.hooks,
producer: instance.producer.hooks,
};

return {
connection: instance.connection,
consume: consumer.consume,
Expand All @@ -67,5 +73,6 @@ module.exports = (connection) => {
publish: producer.publish,
consumer,
producer,
hooks,
};
};
8 changes: 5 additions & 3 deletions src/modules/channels.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const { logger } = require('./logger');

const DEFAULT_CHANNEL = 'DEFAULT_CHANNEL';

class ChannelAlreadyExistsError extends Error {
Expand Down Expand Up @@ -75,7 +77,7 @@ class Channels {
this._channels.delete(key);
});
channel.on('error', (error) => {
this._config.logger.error({
logger.error({
message: `Got channel error [${error.message}] for [${key}]`,
error,
});
Expand All @@ -84,7 +86,7 @@ class Channels {
return channel;
} catch (error) {
this._channels.delete(key);
this._config.logger.error({
logger.error({
message: `Failed to create channel for [${key}] - [${error.message}]`,
error,
});
Expand All @@ -94,7 +96,7 @@ class Channels {
try {
await channel.close();
} catch (closeError) {
this._config.logger.error({
logger.error({
message: `Failed to cleanup channel after failed initialization for [${key}] - [${closeError.message}]`,
error: closeError,
});
Expand Down
31 changes: 19 additions & 12 deletions src/modules/connection.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
const assert = require('assert');
const amqp = require('amqplib');
const { Channels } = require('./channels');
const { ConnectionHooks } = require('./hooks');
const packageVersion = require('../../package.json').version;
const { logger } = require('./logger');

/**
* Log errors from connection/channel error events.
* @param {Error} error
*/
function onConnectionError(error) {
logger.error({
message: error.message,
error,
});
}

class Connection {
constructor(config) {
this._config = config;

this._connectionPromise = null; // Promise of amqp connection
this._channels = null;
this.hooks = new ConnectionHooks();
this.startedAt = new Date().toISOString();
}

Expand All @@ -27,6 +41,7 @@ class Connection {

async _connect() {
try {
await this.hooks.trigger(this, ConnectionHooks.beforeConnectEvent, { config: this._config });
const connection = await amqp.connect(this._config.host, {
clientProperties: {
hostname: this._config.hostname,
Expand All @@ -42,27 +57,19 @@ class Connection {
this._connectionPromise = null;
this._channels = null;
});
connection.on('error', this._onError.bind(this));
connection.on('error', onConnectionError);

await this.hooks.trigger(this, ConnectionHooks.afterConnectEvent, { config: this._config, connection });

return connection;
} catch (error) {
await this.hooks.trigger(this, ConnectionHooks.afterConnectEvent, { config: this._config, error });
this._connectionPromise = null;
this._channels = null;
throw error;
}
}

/**
* Log errors from connection/channel error events.
* @param {Error} error
*/
_onError(error) {
this._config.logger.error({
message: error.message,
error,
});
}

async getChannel(queue, config) {
await this.getConnection();
return await this._channels.get(queue, config);
Expand Down
Loading

0 comments on commit 5fd183a

Please sign in to comment.