Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions test/base-queue-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,12 @@ describe('Test baseQueueHandler', function () {
DemoHandler.prototype.afterDlq = originalAfterDlq;
});

it('should add string to dlq because afterDlq throws error', async function () {
sandbox.useFakeTimers();
it('should add raw buffer to dlq when afterDlq throws to prevent double-encoding', async function () {
const handler = new DemoHandler(this.name, rabbit, {
retries: 2,
retryDelay: 100
retryDelay: 10
});
await handler.created;
handler.handle = sandbox.spy(() => {
throw new Error('test error');
});
Expand All @@ -198,18 +198,28 @@ describe('Test baseQueueHandler', function () {
throw new Error('test error');
}
}));

const dlqMessages: any[] = [];
await rabbit.subscribe(this.name + '_dlq', (msg, ack) => {
dlqMessages.push({ event: JSON.parse(msg.content.toString()), content: msg.content });
ack(null);
});

await rabbit.publish(this.name, { test: 'data' }, { correlationId: '3' });
const publish = (handler.rabbit.publish = sandbox.spy(handler.rabbit, 'publish'));
sandbox.clock.tick(100);
await rabbit.connected;
sandbox.clock.tick(100);
await rabbit.connected;
sandbox.clock.tick(100);
sandbox.clock.restore();
await new Promise(resolve => setTimeout(resolve, 400));
const publish = sandbox.spy(handler.rabbit, 'publish');
await new Promise(resolve => setTimeout(resolve, 300));

Comment thread
Dimitris-Ilias marked this conversation as resolved.
afterDlq.calledOnce.should.be.true();
publish.calledTwice.should.be.true();
publish.args[publish.callCount - 1][1].should.eql('{"test":"data"}');
const fallbackPayload = publish.args[publish.callCount - 1][1];
Buffer.isBuffer(fallbackPayload).should.be.true();
fallbackPayload.toString().should.eql('{"test":"data"}');

// TODO: This flow also produces duplicate messages on the DLQ,
// to be investigated and handled on the next PR
dlqMessages.length.should.equal(2);
dlqMessages[0].event.should.eql({ test: 'data' });
dlqMessages[1].event.should.eql({ test: 'data' });
Comment thread
Dimitris-Ilias marked this conversation as resolved.
});

it('should add to dlq after x retries and get error response even if afterDlq throws error', async function () {
Expand Down
7 changes: 7 additions & 0 deletions test/encode-decode.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ describe('encode-decode', function() {
it('supports text', function() {
encode('foo', 'application/text').should.eql(Buffer.from('foo'));
});

it('returns Buffer unchanged without re-encoding as JSON', function() {
const buf = Buffer.from('{"foo":"bar"}');
const result = encode(buf);
result.should.equal(buf);
result.toString().should.equal('{"foo":"bar"}');
});
});

describe('decode', function() {
Expand Down
6 changes: 3 additions & 3 deletions ts/base-queue-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,16 +182,16 @@ abstract class BaseQueueHandler {
}

async addToDLQ(retries, msg: amqp.Message, ack) {
const correlationId = this.getCorrelationId(msg);
try {
const correlationId = this.getCorrelationId(msg);
const event = decode(msg);
this.logger.warn(`[${correlationId}] Adding to dlq: ${this.dlqName} after ${retries} retries`);
await this.rabbit.publish(this.dlqName, event, msg.properties);
const response = await this.afterDlq({ msg, event });
ack(msg.properties.headers.errors.message, response);
} catch (err) {
this.logger.error(err);
await this.rabbit.publish(this.dlqName, msg.content.toString(), msg.properties);
Comment thread
nikosd23 marked this conversation as resolved.
this.logger.error(`[${correlationId}] Failed to add to dlq: ${this.dlqName}`, err);
await this.rabbit.publish(this.dlqName, msg.content, msg.properties);
ack(err.message, null);
}
}
Expand Down
3 changes: 3 additions & 0 deletions ts/encode-decode.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
export function encode(message: Buffer | string | Object = '', contentType = 'application/json') {
if (Buffer.isBuffer(message)) {
Comment thread
nikosd23 marked this conversation as resolved.
return message;
}
if (contentType === 'application/json') {
return Buffer.from(JSON.stringify(message));
}
Comment thread
Dimitris-Ilias marked this conversation as resolved.
Expand Down