Skip to content

Commit

Permalink
Fix hook cancellation stuff.
Browse files Browse the repository at this point in the history
  • Loading branch information
ramhr committed Feb 7, 2024
1 parent d2bf89e commit f72bd00
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 14 deletions.
13 changes: 9 additions & 4 deletions src/modules/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class Consumer {
content: body,
});
if (!shouldContinue) {
await this._rejectMessageAfterProcess(channel, queue, msg, body);
await this._rejectMessageAfterProcess(channel, queue, msg, body, false);
return;
}
const res = await callback(body, msg.properties);
Expand All @@ -219,7 +219,7 @@ class Consumer {
params: { queue, message: messageString },
});

await this._rejectMessageAfterProcess(channel, queue, msg, body, error);
await this._rejectMessageAfterProcess(channel, queue, msg, body, this._connection.config.requeue, error);
return;
}

Expand Down Expand Up @@ -254,9 +254,14 @@ class Consumer {
}

/** @private */
async _rejectMessageAfterProcess(channel, queue, msg, parsedBody, error) {
async _rejectMessageAfterProcess(channel, queue, msg, parsedBody, requeue, error) {
try {
channel.reject(msg, this._connection.config.requeue);
channel.reject(msg, requeue);

if (!requeue) {
// If not requeued and message will be removed from the queue, return rpc error response if needed.
await this.checkRpc(msg.properties, queue, error instanceof Error ? { error } : undefined);
}
} catch (rejectError) {
await this.hooks.trigger(this, ConsumerHooks.afterProcessMessageEvent, {
queue,
Expand Down
4 changes: 2 additions & 2 deletions src/modules/hooks/base_hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ module.exports = class BaseHooks {
return;
}

const callbacks = this._getCallbacks();
const callbacks = this._getCallbacks(event);
if (callbacks) {
return;
}
Expand All @@ -69,7 +69,7 @@ module.exports = class BaseHooks {
* @private
*/
_manyOff(event, callbacks) {
const registered = this._getCallbacks();
const registered = this._getCallbacks(event);
if (registered) {
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/modules/hooks/producer_hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ProducerHooks extends BaseHooks {
* - result - The value return from publication. If rpc, will be the deserialized object.
* - error - The error object in case the publication failed, or received an erroneous RPC response.
* - shouldRetry - If received an error, 'true' if the publication will be retried (if retry configured).
* In case the hook callback was called with an error, it can return false in order to abort any further publish retries (if retry is configured).
* In case the hook callback was called with an error, it can return `false` in order to abort any further publish retries (if retry is configured).
* @param {Function | Function[]} callback A callback or callbacks array to register.
*/
afterPublish(callback) {
Expand Down
8 changes: 4 additions & 4 deletions src/modules/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,11 @@ class Producer {
properties: settings,
currentRetry: currentRetryNumber,
});
if (!shouldContinue) {
return null;
}

const result = await this.checkRpc(queue, parsedMessage, settings);
let result;
if (shouldContinue) {
result = await this.checkRpc(queue, parsedMessage, settings);
}

await this.hooks.trigger(this, ProducerHooks.afterPublish, {
queue,
Expand Down
6 changes: 3 additions & 3 deletions test/disconnect-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ describe('disconnections', function () {
});

it('calls event hooks on connecting', async () => {
const hookTestsQueue = 'disco:test:hooks';
const consumedAllPromise = pDefer();
await arnavmq.consumer.consume(queue, () => {
await arnavmq.consumer.consume(hookTestsQueue, () => {
consumedAllPromise.resolve();
});

Expand All @@ -91,8 +92,7 @@ describe('disconnections', function () {
arnavmq.hooks.connection.afterConnect(afterConnectHook);

await docker.connectNetwork();
await arnavmq.producer.produce(queue);

await arnavmq.producer.produce(hookTestsQueue);
await consumedAllPromise.promise;

sinon.assert.called(beforeConnectHook);
Expand Down

0 comments on commit f72bd00

Please sign in to comment.