Skip to content

Commit

Permalink
fix: permanently fail any ack/modAck/nack that fails once under exact…
Browse files Browse the repository at this point in the history
…ly-once delivery
  • Loading branch information
feywind committed Apr 19, 2023
1 parent 73937c1 commit 73cad9a
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 5 deletions.
5 changes: 3 additions & 2 deletions src/lease-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import {EventEmitter} from 'events';
import {Message, Subscriber} from './subscriber';
import {AckError, Message, Subscriber} from './subscriber';
import {defaultOptions} from './default-options';

export interface FlowControlOptions {
Expand Down Expand Up @@ -258,9 +258,10 @@ export class LeaseManager extends EventEmitter {

if (lifespan < this._options.maxExtensionMinutes!) {
if (this._subscriber.isExactlyOnceDelivery) {
message.modAckWithResponse(deadline).catch(() => {
message.modAckWithResponse(deadline).catch(e => {
// In the case of a permanent failure (temporary failures are retried),
// we need to stop trying to lease-manage the message.
message.ackFailed(e as AckError);
this.remove(message);
});
} else {
Expand Down
45 changes: 42 additions & 3 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ export class Message {
private _handled: boolean;
private _length: number;
private _subscriber: Subscriber;
private _ackFailed?: AckError;

/**
* @hideconstructor
*
Expand Down Expand Up @@ -194,6 +196,16 @@ export class Message {
return this._length;
}

/**
* Sets this message's exactly once delivery acks to permanent failure. This is
* meant for internal library use only.
*
* @private
*/
ackFailed(error: AckError): void {
this._ackFailed = error;
}

/**
* Acknowledges the message.
*
Expand Down Expand Up @@ -228,9 +240,18 @@ export class Message {
return AckResponses.Success;
}

if (this._ackFailed) {
throw this._ackFailed;
}

if (!this._handled) {
this._handled = true;
return await this._subscriber.ackWithResponse(this);
try {
return await this._subscriber.ackWithResponse(this);
} catch (e) {
this.ackFailed(e as AckError);
throw e;
}
} else {
return AckResponses.Invalid;
}
Expand Down Expand Up @@ -261,8 +282,17 @@ export class Message {
return AckResponses.Success;
}

if (this._ackFailed) {
throw this._ackFailed;
}

if (!this._handled) {
return await this._subscriber.modAckWithResponse(this, deadline);
try {
return await this._subscriber.modAckWithResponse(this, deadline);
} catch (e) {
this.ackFailed(e as AckError);
throw e;
}
} else {
return AckResponses.Invalid;
}
Expand Down Expand Up @@ -303,9 +333,18 @@ export class Message {
return AckResponses.Success;
}

if (this._ackFailed) {
throw this._ackFailed;
}

if (!this._handled) {
this._handled = true;
return await this._subscriber.nackWithResponse(this);
try {
return await this._subscriber.nackWithResponse(this);
} catch (e) {
this.ackFailed(e as AckError);
throw e;
}
} else {
return AckResponses.Invalid;
}
Expand Down
30 changes: 30 additions & 0 deletions test/lease-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class FakeMessage {
this.received = Date.now();
}
modAck(): void {}
async modAckWithResponse(): Promise<AckResponse> {
return AckResponses.Success;
}
ackFailed() {}
}

interface LeaseManagerInternals {
Expand Down Expand Up @@ -308,6 +312,32 @@ describe('LeaseManager', () => {
assert.strictEqual(deadline, subscriber.ackDeadline);
});

it('should remove and ackFailed any messages that fail to ack', done => {
(subscriber as unknown as FakeSubscriber).isExactlyOnceDelivery = true;

leaseManager.setOptions({
maxExtensionMinutes: 600,
});

const goodMessage = new FakeMessage();

const removeStub = sandbox.stub(leaseManager, 'remove');
const mawrStub = sandbox
.stub(goodMessage, 'modAckWithResponse')
.rejects(new AckError(AckResponses.Invalid));
const failed = sandbox.stub(goodMessage, 'ackFailed');

removeStub.callsFake(() => {
assert.strictEqual(mawrStub.callCount, 1);
assert.strictEqual(removeStub.callCount, 1);
assert.strictEqual(failed.callCount, 1);
done();
});

leaseManager.add(goodMessage as {} as Message);
clock.tick(halfway * 2 + 1);
});

it('should continuously extend the deadlines', () => {
const message = new FakeMessage();
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down
63 changes: 63 additions & 0 deletions test/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,27 @@ describe('Subscriber', () => {
assert.strictEqual(msg, message);
});

it('should ack the message with response', async () => {
subscriber.subscriptionProperties = {exactlyOnceDeliveryEnabled: true};
const stub = sandbox.stub(subscriber, 'ackWithResponse');

stub.resolves(s.AckResponses.Success);
const response = await message.ackWithResponse();
assert.strictEqual(response, s.AckResponses.Success);
});

it('should fail to ack the message with response', async () => {
subscriber.subscriptionProperties = {exactlyOnceDeliveryEnabled: true};
const stub = sandbox.stub(subscriber, 'ackWithResponse');

stub.rejects(new s.AckError(s.AckResponses.Invalid));
await assert.rejects(message.ackWithResponse());

// Should cache the result also.
await assert.rejects(message.ackWithResponse());
assert.strictEqual(stub.callCount, 1);
});

it('should not ack the message if its been handled', () => {
const stub = sandbox.stub(subscriber, 'ack');

Expand All @@ -1070,6 +1091,27 @@ describe('Subscriber', () => {
assert.strictEqual(deadline, fakeDeadline);
});

it('should modAck the message with response', async () => {
subscriber.subscriptionProperties = {exactlyOnceDeliveryEnabled: true};
const stub = sandbox.stub(subscriber, 'modAckWithResponse');

stub.resolves(s.AckResponses.Success);
const response = await message.modAckWithResponse(0);
assert.strictEqual(response, s.AckResponses.Success);
});

it('should fail to modAck the message with response', async () => {
subscriber.subscriptionProperties = {exactlyOnceDeliveryEnabled: true};
const stub = sandbox.stub(subscriber, 'modAckWithResponse');

stub.rejects(new s.AckError(s.AckResponses.Invalid));
await assert.rejects(message.modAckWithResponse(0));

// Should cache the result also.
await assert.rejects(message.modAckWithResponse(0));
assert.strictEqual(stub.callCount, 1);
});

it('should not modAck the message if its been handled', () => {
const deadline = 10;
const stub = sandbox.stub(subscriber, 'modAck');
Expand All @@ -1092,6 +1134,27 @@ describe('Subscriber', () => {
assert.strictEqual(delay, 0);
});

it('should nack the message with response', async () => {
subscriber.subscriptionProperties = {exactlyOnceDeliveryEnabled: true};
const stub = sandbox.stub(subscriber, 'nackWithResponse');

stub.resolves(s.AckResponses.Success);
const response = await message.nackWithResponse();
assert.strictEqual(response, s.AckResponses.Success);
});

it('should fail to nack the message with response', async () => {
subscriber.subscriptionProperties = {exactlyOnceDeliveryEnabled: true};
const stub = sandbox.stub(subscriber, 'nackWithResponse');

stub.rejects(new s.AckError(s.AckResponses.Invalid));
await assert.rejects(message.nackWithResponse());

// Should cache the result also.
await assert.rejects(message.nackWithResponse());
assert.strictEqual(stub.callCount, 1);
});

it('should not nack the message if its been handled', () => {
const stub = sandbox.stub(subscriber, 'modAck');

Expand Down

0 comments on commit 73cad9a

Please sign in to comment.