diff --git a/test/base-queue-handler.test.ts b/test/base-queue-handler.test.ts index d1d6528..e2c37d2 100644 --- a/test/base-queue-handler.test.ts +++ b/test/base-queue-handler.test.ts @@ -183,12 +183,12 @@ describe('Test baseQueueHandler', function () { DemoHandler.prototype.afterDlq = originalAfterDlq; }); - it('should add string to dlq because afterDlq throws error', async function () { - sandbox.useFakeTimers(); + it('should add raw buffer to dlq when afterDlq throws to prevent double-encoding', async function () { const handler = new DemoHandler(this.name, rabbit, { retries: 2, - retryDelay: 100 + retryDelay: 10 }); + await handler.created; handler.handle = sandbox.spy(() => { throw new Error('test error'); }); @@ -198,18 +198,28 @@ describe('Test baseQueueHandler', function () { throw new Error('test error'); } })); + + const dlqMessages: any[] = []; + await rabbit.subscribe(this.name + '_dlq', (msg, ack) => { + dlqMessages.push({ event: JSON.parse(msg.content.toString()), content: msg.content }); + ack(null); + }); + await rabbit.publish(this.name, { test: 'data' }, { correlationId: '3' }); - const publish = (handler.rabbit.publish = sandbox.spy(handler.rabbit, 'publish')); - sandbox.clock.tick(100); - await rabbit.connected; - sandbox.clock.tick(100); - await rabbit.connected; - sandbox.clock.tick(100); - sandbox.clock.restore(); - await new Promise(resolve => setTimeout(resolve, 400)); + const publish = sandbox.spy(handler.rabbit, 'publish'); + await new Promise(resolve => setTimeout(resolve, 300)); + afterDlq.calledOnce.should.be.true(); publish.calledTwice.should.be.true(); - publish.args[publish.callCount - 1][1].should.eql('{"test":"data"}'); + const fallbackPayload = publish.args[publish.callCount - 1][1]; + Buffer.isBuffer(fallbackPayload).should.be.true(); + fallbackPayload.toString().should.eql('{"test":"data"}'); + + // TODO: This flow also produces duplicate messages on the DLQ, + // to be investigated and handled on the next PR + dlqMessages.length.should.equal(2); + dlqMessages[0].event.should.eql({ test: 'data' }); + dlqMessages[1].event.should.eql({ test: 'data' }); }); it('should add to dlq after x retries and get error response even if afterDlq throws error', async function () { diff --git a/test/encode-decode.test.ts b/test/encode-decode.test.ts index 5c3c457..8553d8b 100644 --- a/test/encode-decode.test.ts +++ b/test/encode-decode.test.ts @@ -14,6 +14,13 @@ describe('encode-decode', function() { it('supports text', function() { encode('foo', 'application/text').should.eql(Buffer.from('foo')); }); + + it('returns Buffer unchanged without re-encoding as JSON', function() { + const buf = Buffer.from('{"foo":"bar"}'); + const result = encode(buf); + result.should.equal(buf); + result.toString().should.equal('{"foo":"bar"}'); + }); }); describe('decode', function() { diff --git a/ts/base-queue-handler.ts b/ts/base-queue-handler.ts index acdce54..68c4fcd 100644 --- a/ts/base-queue-handler.ts +++ b/ts/base-queue-handler.ts @@ -182,16 +182,16 @@ abstract class BaseQueueHandler { } async addToDLQ(retries, msg: amqp.Message, ack) { + const correlationId = this.getCorrelationId(msg); try { - const correlationId = this.getCorrelationId(msg); const event = decode(msg); this.logger.warn(`[${correlationId}] Adding to dlq: ${this.dlqName} after ${retries} retries`); await this.rabbit.publish(this.dlqName, event, msg.properties); const response = await this.afterDlq({ msg, event }); ack(msg.properties.headers.errors.message, response); } catch (err) { - this.logger.error(err); - await this.rabbit.publish(this.dlqName, msg.content.toString(), msg.properties); + this.logger.error(`[${correlationId}] Failed to add to dlq: ${this.dlqName}`, err); + await this.rabbit.publish(this.dlqName, msg.content, msg.properties); ack(err.message, null); } } diff --git a/ts/encode-decode.ts b/ts/encode-decode.ts index d1430b3..8b8c8f4 100644 --- a/ts/encode-decode.ts +++ b/ts/encode-decode.ts @@ -1,4 +1,7 @@ export function encode(message: Buffer | string | Object = '', contentType = 'application/json') { + if (Buffer.isBuffer(message)) { + return message; + } if (contentType === 'application/json') { return Buffer.from(JSON.stringify(message)); }