Skip to content

Commit

Permalink
#6247 - add AMQP_PERSISTENT_MESSAGES support (#197)
Browse files Browse the repository at this point in the history
* #6247 - add AMQP_PERSISTENT_MESSAGES support

* #6247 - change to bool value

* #6247 - simplify
  • Loading branch information
ShkarupaNick committed Sep 20, 2022
1 parent a1a79db commit 2734348
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 7 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2.7.0 (September 15, 2022)

* Add AMQP_PERSISTENT_MESSAGES configuration env var to enable persistent delivery mode.

## 2.6.29 (July 14, 2022)

* Enabled keep-alive for global HTTPS agent ([#6359](https://github.com/elasticio/elasticio/issues/6359))
Expand Down
2 changes: 2 additions & 0 deletions lib/amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ class Amqp {
if (iteration) {
options.headers.retry = iteration;
}
// AMQP_PERSISTENT_MESSAGES is false by default if not specified by env var
options.persistent = this.settings.AMQP_PERSISTENT_MESSAGES;

log.debug('Current memory usage: %s Mb', process.memoryUsage().heapUsed / 1048576);
log.trace('Pushing to exchange=%s, routingKey=%s, messageSize=%d, options=%j, iteration=%d',
Expand Down
1 change: 1 addition & 0 deletions lib/settings.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ function getOptionalEnvVars(envVars) {
AMQP_PUBLISH_RETRY_DELAY: 100, // 100ms
AMQP_PUBLISH_RETRY_ATTEMPTS: Infinity,
AMQP_PUBLISH_MAX_RETRY_DELAY: 5 * 60 * 1000, // 5 mins
AMQP_PERSISTENT_MESSAGES: false,
OUTGOING_MESSAGE_SIZE_LIMIT: 10485760,
NO_SELF_PASSTRHOUGH: false,
PROTOCOL_VERSION: 1,
Expand Down
12 changes: 6 additions & 6 deletions mocha_spec/run.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ describe('Integration Test', () => {
expect(properties).to.deep.equal({
contentType: 'application/json',
contentEncoding: 'utf8',
deliveryMode: undefined,
deliveryMode: 1,
priority: undefined,
correlationId: undefined,
replyTo: undefined,
Expand Down Expand Up @@ -221,7 +221,7 @@ describe('Integration Test', () => {
expect(properties).to.deep.equal({
contentType: 'application/json',
contentEncoding: 'utf8',
deliveryMode: undefined,
deliveryMode: 1,
priority: undefined,
correlationId: undefined,
replyTo: undefined,
Expand Down Expand Up @@ -320,7 +320,7 @@ describe('Integration Test', () => {
expect(properties).to.deep.eql({
contentType: 'application/json',
contentEncoding: 'utf8',
deliveryMode: undefined,
deliveryMode: 1,
priority: undefined,
correlationId: undefined,
replyTo: undefined,
Expand Down Expand Up @@ -416,7 +416,7 @@ describe('Integration Test', () => {
expect(properties).to.deep.eql({
contentType: 'application/json',
contentEncoding: 'utf8',
deliveryMode: undefined,
deliveryMode: 1,
priority: undefined,
correlationId: undefined,
replyTo: undefined,
Expand Down Expand Up @@ -512,7 +512,7 @@ describe('Integration Test', () => {
expect(properties).to.deep.eql({
contentType: 'application/json',
contentEncoding: 'utf8',
deliveryMode: undefined,
deliveryMode: 1,
priority: undefined,
correlationId: undefined,
replyTo: undefined,
Expand Down Expand Up @@ -584,7 +584,7 @@ describe('Integration Test', () => {
expect(properties).to.deep.equal({
contentType: 'application/json',
contentEncoding: 'utf8',
deliveryMode: undefined,
deliveryMode: 1,
priority: undefined,
correlationId: undefined,
replyTo: undefined,
Expand Down
118 changes: 118 additions & 0 deletions mocha_spec/unit/amqp.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ describe('AMQP', () => {
userId: undefined,
appId: undefined,
mandatory: true,
persistent: false,
clusterId: ''
},
content: encryptor.encryptMessageContent({ content: 'Message content' })
Expand Down Expand Up @@ -131,6 +132,107 @@ describe('AMQP', () => {
contentType: 'application/json',
contentEncoding: 'utf8',
mandatory: true,
persistent: false,
headers: {
taskId: 'task1234567890',
stepId: 'step_456',
protocolVersion: 1,
messageId
}
},
sinon.match.func
);
});

it('Should publish with persistence', async () => {
const persistentSetting = { ...settings, AMQP_PERSISTENT_MESSAGES: true };
const amqp = new Amqp(persistentSetting);
amqp.publishChannel = {
on: sandbox.stub(),
publish: sandbox.stub().callsFake((exchangeName, routingKey, payloadBuffer, options, cb) => {
cb(null, 'Success');
return true;
})
};
const messageId = uuid.v4();
const headers = {
taskId: 'task1234567890',
stepId: 'step_456',
messageId
};

await amqp.sendData({
headers: {
'some-other-header': 'headerValue'
},
body: 'Message content'
}, headers);
expect(amqp.publishChannel.publish).to.have.been.calledOnce.and.calledWith(
settings.PUBLISH_MESSAGES_TO,
settings.DATA_ROUTING_KEY,
sinon.match(buf => {
const payload = encryptor.decryptMessageContent(buf, 'base64');
return sinon.match({
headers: {
'some-other-header': 'headerValue'
},
body: 'Message content'
}).test(payload);
}),
{
contentType: 'application/json',
contentEncoding: 'utf8',
mandatory: true,
persistent: true,
headers: {
taskId: 'task1234567890',
stepId: 'step_456',
protocolVersion: 1,
messageId
}
},
sinon.match.func
);
});
it('Should publish without persistence by default', async () => {
const amqp = new Amqp(settings);
amqp.publishChannel = {
on: sandbox.stub(),
publish: sandbox.stub().callsFake((exchangeName, routingKey, payloadBuffer, options, cb) => {
cb(null, 'Success');
return true;
})
};
const messageId = uuid.v4();
const headers = {
taskId: 'task1234567890',
stepId: 'step_456',
messageId
};

await amqp.sendData({
headers: {
'some-other-header': 'headerValue'
},
body: 'Message content'
}, headers);
expect(amqp.publishChannel.publish).to.have.been.calledOnce.and.calledWith(
settings.PUBLISH_MESSAGES_TO,
settings.DATA_ROUTING_KEY,
sinon.match(buf => {
const payload = encryptor.decryptMessageContent(buf, 'base64');
return sinon.match({
headers: {
'some-other-header': 'headerValue'
},
body: 'Message content'
}).test(payload);
}),
{
contentType: 'application/json',
contentEncoding: 'utf8',
mandatory: true,
persistent: false,
headers: {
taskId: 'task1234567890',
stepId: 'step_456',
Expand Down Expand Up @@ -181,6 +283,7 @@ describe('AMQP', () => {
contentType: 'application/json',
contentEncoding: 'utf8',
mandatory: true,
persistent: false,
headers: {
taskId: 'task1234567890',
stepId: 'step_456',
Expand Down Expand Up @@ -238,6 +341,7 @@ describe('AMQP', () => {
contentType: 'application/json',
contentEncoding: 'utf8',
mandatory: true,
persistent: false,
headers: {
taskId: 'task1234567890',
stepId: 'step_456',
Expand Down Expand Up @@ -304,6 +408,7 @@ describe('AMQP', () => {
contentType: 'application/json',
contentEncoding: 'utf8',
mandatory: true,
persistent: false,
headers: { ...headers, protocolVersion: settings.PROTOCOL_VERSION }
},
sinon.match.func
Expand Down Expand Up @@ -372,6 +477,7 @@ describe('AMQP', () => {
contentType: 'application/json',
contentEncoding: 'utf8',
mandatory: true,
persistent: false,
headers: {
taskId: 'task1234567890',
stepId: 'step_456',
Expand Down Expand Up @@ -432,6 +538,7 @@ describe('AMQP', () => {
contentType: 'application/json',
contentEncoding: 'utf8',
mandatory: true,
persistent: false,
headers: {
taskId: 'task1234567890',
stepId: 'step_456',
Expand Down Expand Up @@ -480,6 +587,7 @@ describe('AMQP', () => {
contentType: 'application/json',
contentEncoding: 'utf8',
mandatory: true,
persistent: false,
headers: {
taskId: 'task1234567890',
stepId: 'step_456',
Expand Down Expand Up @@ -514,6 +622,7 @@ describe('AMQP', () => {
contentType: 'application/json',
contentEncoding: 'utf8',
mandatory: true,
persistent: false,
headers: {
taskId: 'task1234567890',
stepId: 'step_456'
Expand Down Expand Up @@ -568,6 +677,7 @@ describe('AMQP', () => {
contentType: 'application/json',
contentEncoding: 'utf8',
mandatory: true,
persistent: false,
headers: {
taskId: 'task1234567890',
stepId: 'step_456',
Expand Down Expand Up @@ -619,6 +729,7 @@ describe('AMQP', () => {
contentType: 'application/json',
contentEncoding: 'utf8',
mandatory: true,
persistent: false,
headers: {
taskId: 'task1234567890',
stepId: 'step_456',
Expand Down Expand Up @@ -675,6 +786,7 @@ describe('AMQP', () => {
contentType: 'application/json',
contentEncoding: 'utf8',
mandatory: true,
persistent: false,
headers: {
messageId,
taskId: 'task1234567890',
Expand All @@ -695,6 +807,7 @@ describe('AMQP', () => {
contentType: 'application/json',
contentEncoding: 'utf8',
mandatory: true,
persistent: false,
headers: {
messageId,
'taskId': 'task1234567890',
Expand Down Expand Up @@ -752,6 +865,7 @@ describe('AMQP', () => {
contentType: 'application/json',
contentEncoding: 'utf8',
mandatory: true,
persistent: false,
headers: {
messageId,
taskId: 'task1234567890',
Expand Down Expand Up @@ -802,6 +916,7 @@ describe('AMQP', () => {
contentType: 'application/json',
contentEncoding: 'utf8',
mandatory: true,
persistent: false,
headers: {
taskId: 'task1234567890',
stepId: 'step_456',
Expand Down Expand Up @@ -849,6 +964,7 @@ describe('AMQP', () => {
contentType: 'application/json',
contentEncoding: 'utf8',
mandatory: true,
persistent: false,
headers: {
messageId,
taskId: 'task1234567890',
Expand Down Expand Up @@ -966,6 +1082,7 @@ describe('AMQP', () => {
contentEncoding: 'utf8',
contentType: 'application/json',
mandatory: true,
persistent: false,
headers: {
end: sinon.match.number,
execId: 'exec1234567890',
Expand Down Expand Up @@ -1028,6 +1145,7 @@ describe('AMQP', () => {
userId: undefined,
appId: undefined,
mandatory: true,
persistent: false,
clusterId: ''
},
content: encryptor.encryptMessageContent(
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "elasticio-sailor-nodejs",
"description": "The official elastic.io library for bootstrapping and executing for Node.js connectors",
"version": "2.6.29",
"version": "2.7.0",
"main": "run.js",
"scripts": {
"audit": "better-npm-audit audit --level high --production",
Expand Down

0 comments on commit 2734348

Please sign in to comment.