Skip to content

Commit

Permalink
fix: also reject Promises on failure, and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
feywind committed Jun 16, 2022
1 parent c9d539b commit 918a64d
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 0 deletions.
6 changes: 6 additions & 0 deletions src/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ export class AckQueue extends MessageQueue {
responsePromise?.resolve(AckResponses.Success);
});
} catch (e) {
batch.forEach(({responsePromise}) => {
responsePromise?.reject(e);
});
throw new BatchError(e as ServiceError, ackIds, 'acknowledge');
}
}
Expand Down Expand Up @@ -286,6 +289,9 @@ export class ModAckQueue extends MessageQueue {
responsePromise?.resolve(AckResponses.Success);
});
} catch (e) {
batch.forEach(({responsePromise}) => {
responsePromise?.reject(e);
});
throw new BatchError(e as ServiceError, ackIds, 'modifyAckDeadline');
}
});
Expand Down
62 changes: 62 additions & 0 deletions test/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,24 @@ describe('MessageQueues', () => {
clock.tick(delay);
assert.strictEqual(stub.callCount, 1);
});

it('should return a Promise that resolves when the ack is sent', async () => {
const clock = sandbox.useFakeTimers();
const delay = 1000;
messageQueue.setOptions({maxMilliseconds: delay});

sandbox
.stub(messageQueue, '_sendBatch')
.callsFake((batch: messageTypes.QueuedMessages) => {
batch.forEach(m => {
m.responsePromise?.resolve();
});
});

const completion = messageQueue.add(new FakeMessage() as Message);
clock.tick(delay);
await completion;
});
});

describe('flush', () => {
Expand Down Expand Up @@ -391,6 +409,26 @@ describe('MessageQueues', () => {
messages.forEach(message => ackQueue.add(message as Message));
ackQueue.flush();
});

it('should appropriately resolve result promises', async () => {
const stub = sandbox.stub(subscriber.client, 'acknowledge').resolves();

const message = new FakeMessage() as Message;
const completion = ackQueue.add(message);
await ackQueue.flush();
assert.strictEqual(stub.callCount, 1);
await completion;
});

it('should appropriately reject result promises', async () => {
const stub = sandbox.stub(subscriber.client, 'acknowledge').resolves();

const message = new FakeMessage() as Message;
const completion = ackQueue.add(message);
await ackQueue.flush();
assert.strictEqual(stub.callCount, 1);
await completion;
});
});

describe('ModAckQueue', () => {
Expand Down Expand Up @@ -516,5 +554,29 @@ describe('MessageQueues', () => {
messages.forEach(message => modAckQueue.add(message as Message));
modAckQueue.flush();
});

it('should appropriately resolve result promises', async () => {
const stub = sandbox
.stub(subscriber.client, 'modifyAckDeadline')
.resolves();

const message = new FakeMessage() as Message;
const completion = modAckQueue.add(message);
await modAckQueue.flush();
assert.strictEqual(stub.callCount, 1);
await completion;
});

it('should appropriately reject result promises', async () => {
const stub = sandbox
.stub(subscriber.client, 'modifyAckDeadline')
.resolves();

const message = new FakeMessage() as Message;
const completion = modAckQueue.add(message);
await modAckQueue.flush();
assert.strictEqual(stub.callCount, 1);
await completion;
});
});
});

0 comments on commit 918a64d

Please sign in to comment.