Skip to content

Commit

Permalink
Test hook cancellations.
Browse files Browse the repository at this point in the history
  • Loading branch information
ramhr committed Feb 7, 2024
1 parent d2bf89e commit 9fd4a8d
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 11 deletions.
13 changes: 9 additions & 4 deletions src/modules/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class Consumer {
content: body,
});
if (!shouldContinue) {
await this._rejectMessageAfterProcess(channel, queue, msg, body);
await this._rejectMessageAfterProcess(channel, queue, msg, body, false);
return;
}
const res = await callback(body, msg.properties);
Expand All @@ -219,7 +219,7 @@ class Consumer {
params: { queue, message: messageString },
});

await this._rejectMessageAfterProcess(channel, queue, msg, body, error);
await this._rejectMessageAfterProcess(channel, queue, msg, body, this._connection.config.requeue, error);
return;
}

Expand Down Expand Up @@ -254,9 +254,14 @@ class Consumer {
}

/** @private */
async _rejectMessageAfterProcess(channel, queue, msg, parsedBody, error) {
async _rejectMessageAfterProcess(channel, queue, msg, parsedBody, requeue, error) {
try {
channel.reject(msg, this._connection.config.requeue);
channel.reject(msg, requeue);

if (!requeue) {
// If not requeued and message will be removed from the queue, return rpc error response if needed.
await this.checkRpc(msg.properties, queue, error instanceof Error ? { error } : undefined);
}
} catch (rejectError) {
await this.hooks.trigger(this, ConsumerHooks.afterProcessMessageEvent, {
queue,
Expand Down
4 changes: 2 additions & 2 deletions src/modules/hooks/base_hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ module.exports = class BaseHooks {
return;
}

const callbacks = this._getCallbacks();
const callbacks = this._getCallbacks(event);
if (callbacks) {
return;
}
Expand All @@ -69,7 +69,7 @@ module.exports = class BaseHooks {
* @private
*/
_manyOff(event, callbacks) {
const registered = this._getCallbacks();
const registered = this._getCallbacks(event);
if (registered) {
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/modules/hooks/producer_hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ProducerHooks extends BaseHooks {
* - result - The value return from publication. If rpc, will be the deserialized object.
* - error - The error object in case the publication failed, or received an erroneous RPC response.
* - shouldRetry - If received an error, 'true' if the publication will be retried (if retry configured).
* In case the hook callback was called with an error, it can return false in order to abort any further publish retries (if retry is configured).
* In case the hook callback was called with an error, it can return `false` in order to abort any further publish retries (if retry is configured).
* @param {Function | Function[]} callback A callback or callbacks array to register.
*/
afterPublish(callback) {
Expand Down
8 changes: 4 additions & 4 deletions src/modules/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,11 @@ class Producer {
properties: settings,
currentRetry: currentRetryNumber,
});
if (!shouldContinue) {
return null;
}

const result = await this.checkRpc(queue, parsedMessage, settings);
let result;
if (shouldContinue) {
result = await this.checkRpc(queue, parsedMessage, settings);
}

await this.hooks.trigger(this, ProducerHooks.afterPublish, {
queue,
Expand Down

0 comments on commit 9fd4a8d

Please sign in to comment.