From 713be45a4981351071858cf34ee50f34681e317d Mon Sep 17 00:00:00 2001 From: Ram Hershberg Date: Wed, 7 Feb 2024 13:02:29 +0200 Subject: [PATCH] Hooks never throw, and instead the callbacks can return `false` in order to cancel certain operations. --- src/modules/connection.js | 2 +- src/modules/consumer.js | 68 +++++++++++++++++---------- src/modules/hooks/base_hooks.js | 52 ++++++++++++++++++-- src/modules/hooks/connection_hooks.js | 4 +- src/modules/hooks/consumer_hooks.js | 5 +- src/modules/hooks/producer_hooks.js | 6 ++- src/modules/producer.js | 11 +++-- 7 files changed, 108 insertions(+), 40 deletions(-) diff --git a/src/modules/connection.js b/src/modules/connection.js index 6632fca..c34591e 100644 --- a/src/modules/connection.js +++ b/src/modules/connection.js @@ -10,7 +10,7 @@ class Connection { this._connectionPromise = null; // Promise of amqp connection this._channels = null; - this.hooks = new ConnectionHooks(config.hooks && config.hooks.connection); + this.hooks = new ConnectionHooks(config.hooks && config.hooks.connection, config.logger); this.startedAt = new Date().toISOString(); } diff --git a/src/modules/consumer.js b/src/modules/consumer.js index b775075..472d9a6 100644 --- a/src/modules/consumer.js +++ b/src/modules/consumer.js @@ -9,7 +9,10 @@ class Consumer { constructor(connection) { this._connection = connection; this._configuration = this._connection.config; - this.hooks = new ConsumerHooks(this._configuration.hooks && this._configuration.hooks.consumer); + this.hooks = new ConsumerHooks( + this._configuration.hooks && this._configuration.hooks.consumer, + this._connection.config.logger, + ); } set connection(value) { @@ -197,7 +200,15 @@ class Consumer { // receive message, parse it, execute callback, check if should answer, ack/reject message const body = parsers.in(msg); try { - await this.hooks.trigger(this, ConsumerHooks.beforeProcessMessageEvent, { queue, message: msg, content: body }); + const shouldContinue = await this.hooks.trigger(this, ConsumerHooks.beforeProcessMessageEvent, { + queue, + message: msg, + content: body, + }); + if (!shouldContinue) { + await this._rejectMessageAfterProcess(channel, queue, msg, body); + return; + } const res = await callback(body, msg.properties); await this.checkRpc(msg.properties, queue, res); } catch (error) { @@ -208,30 +219,7 @@ class Consumer { params: { queue, message: messageString }, }); - try { - channel.reject(msg, this._connection.config.requeue); - } catch (rejectError) { - await this.hooks.trigger(this, ConsumerHooks.afterProcessMessageEvent, { - queue, - message: msg, - content: body, - error, - rejectError, - }); - this._connection.config.logger.error({ - message: `${loggerAlias} Failed to reject message after processing failure on queue ${queue}: ${rejectError.message}`, - error: rejectError, - params: { queue }, - }); - return; - } - - await this.hooks.trigger(this, ConsumerHooks.afterProcessMessageEvent, { - queue, - message: msg, - content: body, - error, - }); + await this._rejectMessageAfterProcess(channel, queue, msg, body, error); return; } @@ -264,6 +252,34 @@ class Consumer { }); } } + + /** @private */ + async _rejectMessageAfterProcess(channel, queue, msg, parsedBody, error) { + try { + channel.reject(msg, this._connection.config.requeue); + } catch (rejectError) { + await this.hooks.trigger(this, ConsumerHooks.afterProcessMessageEvent, { + queue, + message: msg, + content: parsedBody, + error, + rejectError, + }); + this._connection.config.logger.error({ + message: `${loggerAlias} Failed to reject message after processing failure on queue ${queue}: ${rejectError.message}`, + error: rejectError, + params: { queue }, + }); + return; + } + + await this.hooks.trigger(this, ConsumerHooks.afterProcessMessageEvent, { + queue, + message: msg, + content: parsedBody, + error, + }); + } } /* eslint no-unused-expressions: "off" */ diff --git a/src/modules/hooks/base_hooks.js b/src/modules/hooks/base_hooks.js index 2bf817d..cc797a2 100644 --- a/src/modules/hooks/base_hooks.js +++ b/src/modules/hooks/base_hooks.js @@ -1,11 +1,16 @@ module.exports = class BaseHooks { - constructor() { + constructor(logger) { /** * @type {Map.>} A map between an event name to a set of callbacks registered for it. * Function shape varies between different events. * @private */ this._events = new Map(); + /** + * A logger for logging hook progress. + * @private + */ + this._logger = logger; } /** @@ -76,19 +81,60 @@ module.exports = class BaseHooks { * @param {*} source The class/object that triggered the event. Will be bound as the 'this' argument of the callbacks. * @param {string} eventName The name of the event to trigger. * @param {*} payload The event to pass to the registered callbacks as an argument. + * @returns {Promise.} false if any of the hooks returned false (===) to cancel the operation; true otherwise. * @public */ async trigger(source, eventName, payload) { const callbacks = this._getCallbacks(eventName); if (!callbacks) { - return; + return true; } + let shouldContinue = true; + const hookPromises = []; // This rule intends to restrict it for arrays, but this is a Set which doesn't have a '.map' function to use instead. // eslint-disable-next-line no-restricted-syntax for (const callback of callbacks) { - await callback.call(source, payload); + hookPromises.push( + // This is safe as I want shouldContinue to have the last value. + // eslint-disable-next-line no-loop-func + this._runHook(source, eventName, payload, callback).then((callbackResult) => { + if (callbackResult === false) { + shouldContinue = false; + } + }), + ); + } + await Promise.all(hookPromises); + + return shouldContinue; + } + + /** @private */ + async _runHook(source, eventName, payload, callback) { + const logParams = { hook: eventName, payload, callbackName: callback.name }; + try { + const callbackResult = await callback.call(source, payload); + if (callbackResult === false) { + this._logger.info({ + message: `arnav_mq:hooks A '${eventName}' hook returned false. Canceling further execution.`, + params: logParams, + }); + return false; + } + this._logger.debug({ + message: `arnav_mq:hooks A '${eventName}' finished execution.`, + params: logParams, + }); + } catch (error) { + this._logger.error({ + message: `arnav_mq:hooks Execution of '${eventName}' hook caused an error: ${error.message}`, + error, + params: logParams, + }); } + + return true; } /** @private */ diff --git a/src/modules/hooks/connection_hooks.js b/src/modules/hooks/connection_hooks.js index 2300bab..e7c590a 100644 --- a/src/modules/hooks/connection_hooks.js +++ b/src/modules/hooks/connection_hooks.js @@ -1,8 +1,8 @@ const BaseHooks = require('./base_hooks'); class ConnectionHooks extends BaseHooks { - constructor(hooks) { - super(); + constructor(hooks, logger) { + super(logger); if (!hooks) { return; diff --git a/src/modules/hooks/consumer_hooks.js b/src/modules/hooks/consumer_hooks.js index d14a578..68613d7 100644 --- a/src/modules/hooks/consumer_hooks.js +++ b/src/modules/hooks/consumer_hooks.js @@ -1,8 +1,8 @@ const BaseHooks = require('./base_hooks'); class ConsumerHooks extends BaseHooks { - constructor(hooks) { - super(); + constructor(hooks, logger) { + super(logger); if (!hooks) { return; @@ -27,6 +27,7 @@ class ConsumerHooks extends BaseHooks { * - queue - The queue or exchange to publish to. * - message - The raw amqplib message. * - content - The deserialized message content. + * The hook callback can return `false` in order to skip the message processing, rejecting it and jumping right to the "after process" hook. * @param {Function | Function[]} callback A callback or callbacks array to register. */ beforeProcessMessage(callback) { diff --git a/src/modules/hooks/producer_hooks.js b/src/modules/hooks/producer_hooks.js index 7c61631..848c3ae 100644 --- a/src/modules/hooks/producer_hooks.js +++ b/src/modules/hooks/producer_hooks.js @@ -1,8 +1,8 @@ const BaseHooks = require('./base_hooks'); class ProducerHooks extends BaseHooks { - constructor(hooks) { - super(); + constructor(hooks, logger) { + super(logger); if (!hooks) { return; @@ -23,6 +23,7 @@ class ProducerHooks extends BaseHooks { * - parsedMessage - The serialized message buffer * - properties - The publish properties and options. If a "routingKey" is specified, it serves as the queue while the "queue" option represents the exchange instead. Otherwise the default exchange is used. * - currentRetry - The current retry attempt number + * The hook callback can return `false` in order to cancel publication and jump right to the "after publish" hook. * @param {Function | Function[]} callback A callback or callbacks array to register. */ beforePublish(callback) { @@ -45,6 +46,7 @@ class ProducerHooks extends BaseHooks { * - result - The value return from publication. If rpc, will be the deserialized object. * - error - The error object in case the publication failed, or received an erroneous RPC response. * - shouldRetry - If received an error, 'true' if the publication will be retried (if retry configured). + * In case the hook callback was called with an error, it can return false in order to abort any further publish retries (if retry is configured). * @param {Function | Function[]} callback A callback or callbacks array to register. */ afterPublish(callback) { diff --git a/src/modules/producer.js b/src/modules/producer.js index 1f2fea2..78277ff 100644 --- a/src/modules/producer.js +++ b/src/modules/producer.js @@ -24,7 +24,7 @@ class Producer { constructor(connection) { this._connection = connection; const { hooks } = this._connection.config; - this.hooks = new ProducerHooks(hooks && hooks.producer); + this.hooks = new ProducerHooks(hooks && hooks.producer, this._connection.config.logger); /** * Map of rpc queues @@ -249,13 +249,16 @@ class Producer { utils.setCorrelationId(settings); } try { - await this.hooks.trigger(this, ProducerHooks.beforePublish, { + const shouldContinue = await this.hooks.trigger(this, ProducerHooks.beforePublish, { queue, message, parsedMessage, properties: settings, currentRetry: currentRetryNumber, }); + if (!shouldContinue) { + return null; + } const result = await this.checkRpc(queue, parsedMessage, settings); @@ -271,7 +274,7 @@ class Producer { return result; } catch (error) { const shouldRetry = this._shouldRetry(error, currentRetryNumber); - await this.hooks.trigger(this, ProducerHooks.afterPublish, { + const shouldContinue = await this.hooks.trigger(this, ProducerHooks.afterPublish, { queue, message, parsedMessage, @@ -280,7 +283,7 @@ class Producer { shouldRetry, error, }); - if (!shouldRetry) { + if (!shouldRetry || !shouldContinue) { throw error; }