Skip to content

Commit

Permalink
Hooks never throw, and instead the callbacks can return false in or…
Browse files Browse the repository at this point in the history
…der to cancel certain operations.
  • Loading branch information
ramhr committed Feb 7, 2024
1 parent 342427a commit 713be45
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 40 deletions.
2 changes: 1 addition & 1 deletion src/modules/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class Connection {

this._connectionPromise = null; // Promise of amqp connection
this._channels = null;
this.hooks = new ConnectionHooks(config.hooks && config.hooks.connection);
this.hooks = new ConnectionHooks(config.hooks && config.hooks.connection, config.logger);
this.startedAt = new Date().toISOString();
}

Expand Down
68 changes: 42 additions & 26 deletions src/modules/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ class Consumer {
constructor(connection) {
this._connection = connection;
this._configuration = this._connection.config;
this.hooks = new ConsumerHooks(this._configuration.hooks && this._configuration.hooks.consumer);
this.hooks = new ConsumerHooks(
this._configuration.hooks && this._configuration.hooks.consumer,
this._connection.config.logger,
);
}

set connection(value) {
Expand Down Expand Up @@ -197,7 +200,15 @@ class Consumer {
// receive message, parse it, execute callback, check if should answer, ack/reject message
const body = parsers.in(msg);
try {
await this.hooks.trigger(this, ConsumerHooks.beforeProcessMessageEvent, { queue, message: msg, content: body });
const shouldContinue = await this.hooks.trigger(this, ConsumerHooks.beforeProcessMessageEvent, {
queue,
message: msg,
content: body,
});
if (!shouldContinue) {
await this._rejectMessageAfterProcess(channel, queue, msg, body);
return;
}
const res = await callback(body, msg.properties);
await this.checkRpc(msg.properties, queue, res);
} catch (error) {
Expand All @@ -208,30 +219,7 @@ class Consumer {
params: { queue, message: messageString },
});

try {
channel.reject(msg, this._connection.config.requeue);
} catch (rejectError) {
await this.hooks.trigger(this, ConsumerHooks.afterProcessMessageEvent, {
queue,
message: msg,
content: body,
error,
rejectError,
});
this._connection.config.logger.error({
message: `${loggerAlias} Failed to reject message after processing failure on queue ${queue}: ${rejectError.message}`,
error: rejectError,
params: { queue },
});
return;
}

await this.hooks.trigger(this, ConsumerHooks.afterProcessMessageEvent, {
queue,
message: msg,
content: body,
error,
});
await this._rejectMessageAfterProcess(channel, queue, msg, body, error);
return;
}

Expand Down Expand Up @@ -264,6 +252,34 @@ class Consumer {
});
}
}

/** @private */
async _rejectMessageAfterProcess(channel, queue, msg, parsedBody, error) {
try {
channel.reject(msg, this._connection.config.requeue);
} catch (rejectError) {
await this.hooks.trigger(this, ConsumerHooks.afterProcessMessageEvent, {
queue,
message: msg,
content: parsedBody,
error,
rejectError,
});
this._connection.config.logger.error({
message: `${loggerAlias} Failed to reject message after processing failure on queue ${queue}: ${rejectError.message}`,
error: rejectError,
params: { queue },
});
return;
}

await this.hooks.trigger(this, ConsumerHooks.afterProcessMessageEvent, {
queue,
message: msg,
content: parsedBody,
error,
});
}
}

/* eslint no-unused-expressions: "off" */
Expand Down
52 changes: 49 additions & 3 deletions src/modules/hooks/base_hooks.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
module.exports = class BaseHooks {
constructor() {
constructor(logger) {
/**
* @type {Map.<string, Set.<Function>>} A map between an event name to a set of callbacks registered for it.
* Function shape varies between different events.
* @private
*/
this._events = new Map();
/**
* A logger for logging hook progress.
* @private
*/
this._logger = logger;
}

/**
Expand Down Expand Up @@ -76,19 +81,60 @@ module.exports = class BaseHooks {
* @param {*} source The class/object that triggered the event. Will be bound as the 'this' argument of the callbacks.
* @param {string} eventName The name of the event to trigger.
* @param {*} payload The event to pass to the registered callbacks as an argument.
* @returns {Promise.<boolean>} false if any of the hooks returned false (===) to cancel the operation; true otherwise.
* @public
*/
async trigger(source, eventName, payload) {
const callbacks = this._getCallbacks(eventName);
if (!callbacks) {
return;
return true;
}

let shouldContinue = true;
const hookPromises = [];
// This rule intends to restrict it for arrays, but this is a Set which doesn't have a '.map' function to use instead.
// eslint-disable-next-line no-restricted-syntax
for (const callback of callbacks) {
await callback.call(source, payload);
hookPromises.push(
// This is safe as I want shouldContinue to have the last value.
// eslint-disable-next-line no-loop-func
this._runHook(source, eventName, payload, callback).then((callbackResult) => {
if (callbackResult === false) {
shouldContinue = false;
}
}),
);
}
await Promise.all(hookPromises);

return shouldContinue;
}

/** @private */
async _runHook(source, eventName, payload, callback) {
const logParams = { hook: eventName, payload, callbackName: callback.name };
try {
const callbackResult = await callback.call(source, payload);
if (callbackResult === false) {
this._logger.info({
message: `arnav_mq:hooks A '${eventName}' hook returned false. Canceling further execution.`,
params: logParams,
});
return false;
}
this._logger.debug({
message: `arnav_mq:hooks A '${eventName}' finished execution.`,
params: logParams,
});
} catch (error) {
this._logger.error({
message: `arnav_mq:hooks Execution of '${eventName}' hook caused an error: ${error.message}`,
error,
params: logParams,
});
}

return true;
}

/** @private */
Expand Down
4 changes: 2 additions & 2 deletions src/modules/hooks/connection_hooks.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
const BaseHooks = require('./base_hooks');

class ConnectionHooks extends BaseHooks {
constructor(hooks) {
super();
constructor(hooks, logger) {
super(logger);

if (!hooks) {
return;
Expand Down
5 changes: 3 additions & 2 deletions src/modules/hooks/consumer_hooks.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
const BaseHooks = require('./base_hooks');

class ConsumerHooks extends BaseHooks {
constructor(hooks) {
super();
constructor(hooks, logger) {
super(logger);

if (!hooks) {
return;
Expand All @@ -27,6 +27,7 @@ class ConsumerHooks extends BaseHooks {
* - queue - The queue or exchange to publish to.
* - message - The raw amqplib message.
* - content - The deserialized message content.
* The hook callback can return `false` in order to skip the message processing, rejecting it and jumping right to the "after process" hook.
* @param {Function | Function[]} callback A callback or callbacks array to register.
*/
beforeProcessMessage(callback) {
Expand Down
6 changes: 4 additions & 2 deletions src/modules/hooks/producer_hooks.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
const BaseHooks = require('./base_hooks');

class ProducerHooks extends BaseHooks {
constructor(hooks) {
super();
constructor(hooks, logger) {
super(logger);

if (!hooks) {
return;
Expand All @@ -23,6 +23,7 @@ class ProducerHooks extends BaseHooks {
* - parsedMessage - The serialized message buffer
* - properties - The publish properties and options. If a "routingKey" is specified, it serves as the queue while the "queue" option represents the exchange instead. Otherwise the default exchange is used.
* - currentRetry - The current retry attempt number
* The hook callback can return `false` in order to cancel publication and jump right to the "after publish" hook.
* @param {Function | Function[]} callback A callback or callbacks array to register.
*/
beforePublish(callback) {
Expand All @@ -45,6 +46,7 @@ class ProducerHooks extends BaseHooks {
* - result - The value return from publication. If rpc, will be the deserialized object.
* - error - The error object in case the publication failed, or received an erroneous RPC response.
* - shouldRetry - If received an error, 'true' if the publication will be retried (if retry configured).
* In case the hook callback was called with an error, it can return false in order to abort any further publish retries (if retry is configured).
* @param {Function | Function[]} callback A callback or callbacks array to register.
*/
afterPublish(callback) {
Expand Down
11 changes: 7 additions & 4 deletions src/modules/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class Producer {
constructor(connection) {
this._connection = connection;
const { hooks } = this._connection.config;
this.hooks = new ProducerHooks(hooks && hooks.producer);
this.hooks = new ProducerHooks(hooks && hooks.producer, this._connection.config.logger);

/**
* Map of rpc queues
Expand Down Expand Up @@ -249,13 +249,16 @@ class Producer {
utils.setCorrelationId(settings);
}
try {
await this.hooks.trigger(this, ProducerHooks.beforePublish, {
const shouldContinue = await this.hooks.trigger(this, ProducerHooks.beforePublish, {
queue,
message,
parsedMessage,
properties: settings,
currentRetry: currentRetryNumber,
});
if (!shouldContinue) {
return null;
}

const result = await this.checkRpc(queue, parsedMessage, settings);

Expand All @@ -271,7 +274,7 @@ class Producer {
return result;
} catch (error) {
const shouldRetry = this._shouldRetry(error, currentRetryNumber);
await this.hooks.trigger(this, ProducerHooks.afterPublish, {
const shouldContinue = await this.hooks.trigger(this, ProducerHooks.afterPublish, {
queue,
message,
parsedMessage,
Expand All @@ -280,7 +283,7 @@ class Producer {
shouldRetry,
error,
});
if (!shouldRetry) {
if (!shouldRetry || !shouldContinue) {
throw error;
}

Expand Down

0 comments on commit 713be45

Please sign in to comment.