diff --git a/CHANGELOG.md b/CHANGELOG.md index d45530f..9480c49 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 2.7.0 (September 15, 2022) + +* Add AMQP_PERSISTENT_MESSAGES configuration env var to enable persistent delivery mode. + ## 2.6.29 (July 14, 2022) * Enabled keep-alive for global HTTPS agent ([#6359](https://github.com/elasticio/elasticio/issues/6359)) diff --git a/lib/amqp.js b/lib/amqp.js index b958bd7..6395d20 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -330,6 +330,8 @@ class Amqp { if (iteration) { options.headers.retry = iteration; } + // AMQP_PERSISTENT_MESSAGES is false by default if not specified by env var + options.persistent = this.settings.AMQP_PERSISTENT_MESSAGES; log.debug('Current memory usage: %s Mb', process.memoryUsage().heapUsed / 1048576); log.trace('Pushing to exchange=%s, routingKey=%s, messageSize=%d, options=%j, iteration=%d', diff --git a/lib/settings.js b/lib/settings.js index 56f2b15..78d0b4d 100644 --- a/lib/settings.js +++ b/lib/settings.js @@ -20,6 +20,7 @@ function getOptionalEnvVars(envVars) { AMQP_PUBLISH_RETRY_DELAY: 100, // 100ms AMQP_PUBLISH_RETRY_ATTEMPTS: Infinity, AMQP_PUBLISH_MAX_RETRY_DELAY: 5 * 60 * 1000, // 5 mins + AMQP_PERSISTENT_MESSAGES: false, OUTGOING_MESSAGE_SIZE_LIMIT: 10485760, NO_SELF_PASSTRHOUGH: false, PROTOCOL_VERSION: 1, diff --git a/mocha_spec/run.spec.js b/mocha_spec/run.spec.js index f373e3b..a392abe 100644 --- a/mocha_spec/run.spec.js +++ b/mocha_spec/run.spec.js @@ -139,7 +139,7 @@ describe('Integration Test', () => { expect(properties).to.deep.equal({ contentType: 'application/json', contentEncoding: 'utf8', - deliveryMode: undefined, + deliveryMode: 1, priority: undefined, correlationId: undefined, replyTo: undefined, @@ -221,7 +221,7 @@ describe('Integration Test', () => { expect(properties).to.deep.equal({ contentType: 'application/json', contentEncoding: 'utf8', - deliveryMode: undefined, + deliveryMode: 1, priority: undefined, correlationId: undefined, replyTo: undefined, @@ -320,7 +320,7 @@ describe('Integration Test', () => { expect(properties).to.deep.eql({ contentType: 'application/json', contentEncoding: 'utf8', - deliveryMode: undefined, + deliveryMode: 1, priority: undefined, correlationId: undefined, replyTo: undefined, @@ -416,7 +416,7 @@ describe('Integration Test', () => { expect(properties).to.deep.eql({ contentType: 'application/json', contentEncoding: 'utf8', - deliveryMode: undefined, + deliveryMode: 1, priority: undefined, correlationId: undefined, replyTo: undefined, @@ -512,7 +512,7 @@ describe('Integration Test', () => { expect(properties).to.deep.eql({ contentType: 'application/json', contentEncoding: 'utf8', - deliveryMode: undefined, + deliveryMode: 1, priority: undefined, correlationId: undefined, replyTo: undefined, @@ -584,7 +584,7 @@ describe('Integration Test', () => { expect(properties).to.deep.equal({ contentType: 'application/json', contentEncoding: 'utf8', - deliveryMode: undefined, + deliveryMode: 1, priority: undefined, correlationId: undefined, replyTo: undefined, diff --git a/mocha_spec/unit/amqp.spec.js b/mocha_spec/unit/amqp.spec.js index 17f11e7..80ca208 100644 --- a/mocha_spec/unit/amqp.spec.js +++ b/mocha_spec/unit/amqp.spec.js @@ -76,6 +76,7 @@ describe('AMQP', () => { userId: undefined, appId: undefined, mandatory: true, + persistent: false, clusterId: '' }, content: encryptor.encryptMessageContent({ content: 'Message content' }) @@ -131,6 +132,107 @@ describe('AMQP', () => { contentType: 'application/json', contentEncoding: 'utf8', mandatory: true, + persistent: false, + headers: { + taskId: 'task1234567890', + stepId: 'step_456', + protocolVersion: 1, + messageId + } + }, + sinon.match.func + ); + }); + + it('Should publish with persistence', async () => { + const persistentSetting = { ...settings, AMQP_PERSISTENT_MESSAGES: true }; + const amqp = new Amqp(persistentSetting); + amqp.publishChannel = { + on: sandbox.stub(), + publish: sandbox.stub().callsFake((exchangeName, routingKey, payloadBuffer, options, cb) => { + cb(null, 'Success'); + return true; + }) + }; + const messageId = uuid.v4(); + const headers = { + taskId: 'task1234567890', + stepId: 'step_456', + messageId + }; + + await amqp.sendData({ + headers: { + 'some-other-header': 'headerValue' + }, + body: 'Message content' + }, headers); + expect(amqp.publishChannel.publish).to.have.been.calledOnce.and.calledWith( + settings.PUBLISH_MESSAGES_TO, + settings.DATA_ROUTING_KEY, + sinon.match(buf => { + const payload = encryptor.decryptMessageContent(buf, 'base64'); + return sinon.match({ + headers: { + 'some-other-header': 'headerValue' + }, + body: 'Message content' + }).test(payload); + }), + { + contentType: 'application/json', + contentEncoding: 'utf8', + mandatory: true, + persistent: true, + headers: { + taskId: 'task1234567890', + stepId: 'step_456', + protocolVersion: 1, + messageId + } + }, + sinon.match.func + ); + }); + it('Should publish without persistence by default', async () => { + const amqp = new Amqp(settings); + amqp.publishChannel = { + on: sandbox.stub(), + publish: sandbox.stub().callsFake((exchangeName, routingKey, payloadBuffer, options, cb) => { + cb(null, 'Success'); + return true; + }) + }; + const messageId = uuid.v4(); + const headers = { + taskId: 'task1234567890', + stepId: 'step_456', + messageId + }; + + await amqp.sendData({ + headers: { + 'some-other-header': 'headerValue' + }, + body: 'Message content' + }, headers); + expect(amqp.publishChannel.publish).to.have.been.calledOnce.and.calledWith( + settings.PUBLISH_MESSAGES_TO, + settings.DATA_ROUTING_KEY, + sinon.match(buf => { + const payload = encryptor.decryptMessageContent(buf, 'base64'); + return sinon.match({ + headers: { + 'some-other-header': 'headerValue' + }, + body: 'Message content' + }).test(payload); + }), + { + contentType: 'application/json', + contentEncoding: 'utf8', + mandatory: true, + persistent: false, headers: { taskId: 'task1234567890', stepId: 'step_456', @@ -181,6 +283,7 @@ describe('AMQP', () => { contentType: 'application/json', contentEncoding: 'utf8', mandatory: true, + persistent: false, headers: { taskId: 'task1234567890', stepId: 'step_456', @@ -238,6 +341,7 @@ describe('AMQP', () => { contentType: 'application/json', contentEncoding: 'utf8', mandatory: true, + persistent: false, headers: { taskId: 'task1234567890', stepId: 'step_456', @@ -304,6 +408,7 @@ describe('AMQP', () => { contentType: 'application/json', contentEncoding: 'utf8', mandatory: true, + persistent: false, headers: { ...headers, protocolVersion: settings.PROTOCOL_VERSION } }, sinon.match.func @@ -372,6 +477,7 @@ describe('AMQP', () => { contentType: 'application/json', contentEncoding: 'utf8', mandatory: true, + persistent: false, headers: { taskId: 'task1234567890', stepId: 'step_456', @@ -432,6 +538,7 @@ describe('AMQP', () => { contentType: 'application/json', contentEncoding: 'utf8', mandatory: true, + persistent: false, headers: { taskId: 'task1234567890', stepId: 'step_456', @@ -480,6 +587,7 @@ describe('AMQP', () => { contentType: 'application/json', contentEncoding: 'utf8', mandatory: true, + persistent: false, headers: { taskId: 'task1234567890', stepId: 'step_456', @@ -514,6 +622,7 @@ describe('AMQP', () => { contentType: 'application/json', contentEncoding: 'utf8', mandatory: true, + persistent: false, headers: { taskId: 'task1234567890', stepId: 'step_456' @@ -568,6 +677,7 @@ describe('AMQP', () => { contentType: 'application/json', contentEncoding: 'utf8', mandatory: true, + persistent: false, headers: { taskId: 'task1234567890', stepId: 'step_456', @@ -619,6 +729,7 @@ describe('AMQP', () => { contentType: 'application/json', contentEncoding: 'utf8', mandatory: true, + persistent: false, headers: { taskId: 'task1234567890', stepId: 'step_456', @@ -675,6 +786,7 @@ describe('AMQP', () => { contentType: 'application/json', contentEncoding: 'utf8', mandatory: true, + persistent: false, headers: { messageId, taskId: 'task1234567890', @@ -695,6 +807,7 @@ describe('AMQP', () => { contentType: 'application/json', contentEncoding: 'utf8', mandatory: true, + persistent: false, headers: { messageId, 'taskId': 'task1234567890', @@ -752,6 +865,7 @@ describe('AMQP', () => { contentType: 'application/json', contentEncoding: 'utf8', mandatory: true, + persistent: false, headers: { messageId, taskId: 'task1234567890', @@ -802,6 +916,7 @@ describe('AMQP', () => { contentType: 'application/json', contentEncoding: 'utf8', mandatory: true, + persistent: false, headers: { taskId: 'task1234567890', stepId: 'step_456', @@ -849,6 +964,7 @@ describe('AMQP', () => { contentType: 'application/json', contentEncoding: 'utf8', mandatory: true, + persistent: false, headers: { messageId, taskId: 'task1234567890', @@ -966,6 +1082,7 @@ describe('AMQP', () => { contentEncoding: 'utf8', contentType: 'application/json', mandatory: true, + persistent: false, headers: { end: sinon.match.number, execId: 'exec1234567890', @@ -1028,6 +1145,7 @@ describe('AMQP', () => { userId: undefined, appId: undefined, mandatory: true, + persistent: false, clusterId: '' }, content: encryptor.encryptMessageContent( diff --git a/package.json b/package.json index 57f2919..41e541d 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "elasticio-sailor-nodejs", "description": "The official elastic.io library for bootstrapping and executing for Node.js connectors", - "version": "2.6.29", + "version": "2.7.0", "main": "run.js", "scripts": { "audit": "better-npm-audit audit --level high --production",