From 73cad9a80ccca304723e6a073070b9431f8b7268 Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Wed, 19 Apr 2023 14:22:00 -0400 Subject: [PATCH] fix: permanently fail any ack/modAck/nack that fails once under exactly-once delivery --- src/lease-manager.ts | 5 ++-- src/subscriber.ts | 45 ++++++++++++++++++++++++++++--- test/lease-manager.ts | 30 +++++++++++++++++++++ test/subscriber.ts | 63 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 138 insertions(+), 5 deletions(-) diff --git a/src/lease-manager.ts b/src/lease-manager.ts index eae9d6f60..084f4b759 100644 --- a/src/lease-manager.ts +++ b/src/lease-manager.ts @@ -15,7 +15,7 @@ */ import {EventEmitter} from 'events'; -import {Message, Subscriber} from './subscriber'; +import {AckError, Message, Subscriber} from './subscriber'; import {defaultOptions} from './default-options'; export interface FlowControlOptions { @@ -258,9 +258,10 @@ export class LeaseManager extends EventEmitter { if (lifespan < this._options.maxExtensionMinutes!) { if (this._subscriber.isExactlyOnceDelivery) { - message.modAckWithResponse(deadline).catch(() => { + message.modAckWithResponse(deadline).catch(e => { // In the case of a permanent failure (temporary failures are retried), // we need to stop trying to lease-manage the message. + message.ackFailed(e as AckError); this.remove(message); }); } else { diff --git a/src/subscriber.ts b/src/subscriber.ts index b9b3d468c..fe5defc8e 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -103,6 +103,8 @@ export class Message { private _handled: boolean; private _length: number; private _subscriber: Subscriber; + private _ackFailed?: AckError; + /** * @hideconstructor * @@ -194,6 +196,16 @@ export class Message { return this._length; } + /** + * Sets this message's exactly once delivery acks to permanent failure. This is + * meant for internal library use only. + * + * @private + */ + ackFailed(error: AckError): void { + this._ackFailed = error; + } + /** * Acknowledges the message. * @@ -228,9 +240,18 @@ export class Message { return AckResponses.Success; } + if (this._ackFailed) { + throw this._ackFailed; + } + if (!this._handled) { this._handled = true; - return await this._subscriber.ackWithResponse(this); + try { + return await this._subscriber.ackWithResponse(this); + } catch (e) { + this.ackFailed(e as AckError); + throw e; + } } else { return AckResponses.Invalid; } @@ -261,8 +282,17 @@ export class Message { return AckResponses.Success; } + if (this._ackFailed) { + throw this._ackFailed; + } + if (!this._handled) { - return await this._subscriber.modAckWithResponse(this, deadline); + try { + return await this._subscriber.modAckWithResponse(this, deadline); + } catch (e) { + this.ackFailed(e as AckError); + throw e; + } } else { return AckResponses.Invalid; } @@ -303,9 +333,18 @@ export class Message { return AckResponses.Success; } + if (this._ackFailed) { + throw this._ackFailed; + } + if (!this._handled) { this._handled = true; - return await this._subscriber.nackWithResponse(this); + try { + return await this._subscriber.nackWithResponse(this); + } catch (e) { + this.ackFailed(e as AckError); + throw e; + } } else { return AckResponses.Invalid; } diff --git a/test/lease-manager.ts b/test/lease-manager.ts index eb70a9eba..2ceb17cf7 100644 --- a/test/lease-manager.ts +++ b/test/lease-manager.ts @@ -54,6 +54,10 @@ class FakeMessage { this.received = Date.now(); } modAck(): void {} + async modAckWithResponse(): Promise { + return AckResponses.Success; + } + ackFailed() {} } interface LeaseManagerInternals { @@ -308,6 +312,32 @@ describe('LeaseManager', () => { assert.strictEqual(deadline, subscriber.ackDeadline); }); + it('should remove and ackFailed any messages that fail to ack', done => { + (subscriber as unknown as FakeSubscriber).isExactlyOnceDelivery = true; + + leaseManager.setOptions({ + maxExtensionMinutes: 600, + }); + + const goodMessage = new FakeMessage(); + + const removeStub = sandbox.stub(leaseManager, 'remove'); + const mawrStub = sandbox + .stub(goodMessage, 'modAckWithResponse') + .rejects(new AckError(AckResponses.Invalid)); + const failed = sandbox.stub(goodMessage, 'ackFailed'); + + removeStub.callsFake(() => { + assert.strictEqual(mawrStub.callCount, 1); + assert.strictEqual(removeStub.callCount, 1); + assert.strictEqual(failed.callCount, 1); + done(); + }); + + leaseManager.add(goodMessage as {} as Message); + clock.tick(halfway * 2 + 1); + }); + it('should continuously extend the deadlines', () => { const message = new FakeMessage(); // eslint-disable-next-line @typescript-eslint/no-explicit-any diff --git a/test/subscriber.ts b/test/subscriber.ts index 414344a02..cab00e286 100644 --- a/test/subscriber.ts +++ b/test/subscriber.ts @@ -1048,6 +1048,27 @@ describe('Subscriber', () => { assert.strictEqual(msg, message); }); + it('should ack the message with response', async () => { + subscriber.subscriptionProperties = {exactlyOnceDeliveryEnabled: true}; + const stub = sandbox.stub(subscriber, 'ackWithResponse'); + + stub.resolves(s.AckResponses.Success); + const response = await message.ackWithResponse(); + assert.strictEqual(response, s.AckResponses.Success); + }); + + it('should fail to ack the message with response', async () => { + subscriber.subscriptionProperties = {exactlyOnceDeliveryEnabled: true}; + const stub = sandbox.stub(subscriber, 'ackWithResponse'); + + stub.rejects(new s.AckError(s.AckResponses.Invalid)); + await assert.rejects(message.ackWithResponse()); + + // Should cache the result also. + await assert.rejects(message.ackWithResponse()); + assert.strictEqual(stub.callCount, 1); + }); + it('should not ack the message if its been handled', () => { const stub = sandbox.stub(subscriber, 'ack'); @@ -1070,6 +1091,27 @@ describe('Subscriber', () => { assert.strictEqual(deadline, fakeDeadline); }); + it('should modAck the message with response', async () => { + subscriber.subscriptionProperties = {exactlyOnceDeliveryEnabled: true}; + const stub = sandbox.stub(subscriber, 'modAckWithResponse'); + + stub.resolves(s.AckResponses.Success); + const response = await message.modAckWithResponse(0); + assert.strictEqual(response, s.AckResponses.Success); + }); + + it('should fail to modAck the message with response', async () => { + subscriber.subscriptionProperties = {exactlyOnceDeliveryEnabled: true}; + const stub = sandbox.stub(subscriber, 'modAckWithResponse'); + + stub.rejects(new s.AckError(s.AckResponses.Invalid)); + await assert.rejects(message.modAckWithResponse(0)); + + // Should cache the result also. + await assert.rejects(message.modAckWithResponse(0)); + assert.strictEqual(stub.callCount, 1); + }); + it('should not modAck the message if its been handled', () => { const deadline = 10; const stub = sandbox.stub(subscriber, 'modAck'); @@ -1092,6 +1134,27 @@ describe('Subscriber', () => { assert.strictEqual(delay, 0); }); + it('should nack the message with response', async () => { + subscriber.subscriptionProperties = {exactlyOnceDeliveryEnabled: true}; + const stub = sandbox.stub(subscriber, 'nackWithResponse'); + + stub.resolves(s.AckResponses.Success); + const response = await message.nackWithResponse(); + assert.strictEqual(response, s.AckResponses.Success); + }); + + it('should fail to nack the message with response', async () => { + subscriber.subscriptionProperties = {exactlyOnceDeliveryEnabled: true}; + const stub = sandbox.stub(subscriber, 'nackWithResponse'); + + stub.rejects(new s.AckError(s.AckResponses.Invalid)); + await assert.rejects(message.nackWithResponse()); + + // Should cache the result also. + await assert.rejects(message.nackWithResponse()); + assert.strictEqual(stub.callCount, 1); + }); + it('should not nack the message if its been handled', () => { const stub = sandbox.stub(subscriber, 'modAck');