Skip to content

Commit

Permalink
#4950 fixed rebound message headers (#164)
Browse files Browse the repository at this point in the history
* #4950 fixed rebound message headers

* #4950 added changelog
  • Loading branch information
zuker committed Dec 1, 2020
1 parent ff1884a commit e79b225
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 101 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2.6.20 (December 1, 2020)

* Fixed rebound message headers ([#4950](https://github.com/elasticio/elasticio/issues/4950))

## 2.6.19 (November 23, 2020)

* Separate connections for consuming and publishing
Expand Down
49 changes: 22 additions & 27 deletions lib/amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -513,20 +513,20 @@ class Amqp {
return result;
}

async sendRebound(reboundError, originalMessage, headers) {
// TODO: inconsistency
// rebound message should be
// a) repacked to currently used protocol version
// b) passed as is
// not as it's done now: send rebound but take protocolVersion header from current context
// TODO double think about: why we need headers as argument here?
// seems that rebound should be published with same headers as received
// seems that answer is error, when rebound limit is exceeded.
const properties = this._createPropsFromHeaders(headers);
const settings = this.settings;

async sendRebound(reboundError, originalMessage) {
const { settings } = this;
let { properties: { headers } } = originalMessage;
headers = {
...headers,
end: new Date().getTime(),
reboundReason: reboundError.message
};
log.trace('Rebound message');
const reboundIteration = getReboundIteration(originalMessage.properties.headers.reboundIteration);
let reboundIteration = 1;

if (headers.reboundIteration && typeof headers.reboundIteration === 'number') {
reboundIteration = headers.reboundIteration + 1;
}

if (reboundIteration > settings.REBOUND_LIMIT) {
return this.sendError(
Expand All @@ -535,8 +535,15 @@ class Amqp {
originalMessage
);
} else {
properties.expiration = getExpiration(reboundIteration);
properties.headers.reboundIteration = reboundIteration;
const properties = {
...originalMessage.properties,
headers: {
...headers,
// retry in 15 sec, 30 sec, 1 min, 2 min, 4 min, 8 min, etc.
expiration: Math.pow(2, reboundIteration - 1) * settings.REBOUND_INITIAL_EXPIRATION,
reboundIteration
}
};

return this.sendToExchange(
settings.PUBLISH_MESSAGES_TO,
Expand All @@ -545,18 +552,6 @@ class Amqp {
properties
);
}

function getReboundIteration(previousIteration) {
if (previousIteration && typeof previousIteration === 'number') {
return previousIteration + 1;
}
return 1;
}

// retry in 15 sec, 30 sec, 1 min, 2 min, 4 min, 8 min, etc.
function getExpiration(iteration) {
return Math.pow(2, iteration - 1) * settings.REBOUND_INITIAL_EXPIRATION;
}
}

async sendSnapshot(data, headers, throttle) {
Expand Down
5 changes: 1 addition & 4 deletions lib/sailor.js
Original file line number Diff line number Diff line change
Expand Up @@ -487,16 +487,13 @@ class Sailor {
}

async function onRebound(err) {
const headers = _.clone(outgoingMessageHeaders);
err = formatError(err);
logger.trace({
err,
messagesCount: that.messagesCount,
messageProcessingTime: Date.now() - timeStart
}, 'processMessage emit rebound');
headers.end = new Date().getTime();
headers.reboundReason = err.message;
return that.amqpConnection.sendRebound(err, message, headers);
return that.amqpConnection.sendRebound(err, message);
}

async function onSnapshot(data) {
Expand Down
95 changes: 27 additions & 68 deletions mocha_spec/unit/amqp.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -869,19 +869,9 @@ describe('AMQP', () => {
return true;
})
};
const messageId = uuid.v4();
const headers = {
execId: 'exec1234567890',
taskId: 'task1234567890',
stepId: 'step_1',
compId: 'comp1',
function: 'list',
start: '1432815685034',
protocolVersion: 2,
messageId
};

await amqp.sendRebound(new Error('Rebound error'), message, headers);
const reboundError = new Error('Rebound error');
await amqp.sendRebound(reboundError, message);
expect(amqp.publishChannel.publish).to.have.been.calledOnce.and.calledWith(
settings.PUBLISH_MESSAGES_TO,
settings.REBOUND_ROUTING_KEY,
Expand All @@ -891,20 +881,13 @@ describe('AMQP', () => {
return true;
}),
{
contentType: 'application/json',
contentEncoding: 'utf8',
mandatory: true,
expiration: 15000,
...message.properties,
headers: {
execId: 'exec1234567890',
taskId: 'task1234567890',
stepId: 'step_1',
compId: 'comp1',
function: 'list',
start: '1432815685034',
reboundIteration: 1,
protocolVersion: 2,
messageId
...message.properties.headers,
end: sinon.match.number,
reboundReason: reboundError.message,
expiration: settings.REBOUND_INITIAL_EXPIRATION,
reboundIteration: 1
}
},
sinon.match.func
Expand All @@ -920,22 +903,12 @@ describe('AMQP', () => {
return true;
})
};
const messageId = uuid.v4();
const headers = {
execId: 'exec1234567890',
taskId: 'task1234567890',
stepId: 'step_1',
compId: 'comp1',
function: 'list',
start: '1432815685034',
protocolVersion: 2,
messageId
};

const clonedMessage = _.cloneDeep(message);
clonedMessage.properties.headers.reboundIteration = 2;

await amqp.sendRebound(new Error('Rebound error'), clonedMessage, headers);
const reboundError = new Error('Rebound error');
await amqp.sendRebound(reboundError, clonedMessage);
expect(amqp.publishChannel.publish).to.have.been.calledOnce.and.calledWith(
settings.PUBLISH_MESSAGES_TO,
settings.REBOUND_ROUTING_KEY,
Expand All @@ -945,20 +918,13 @@ describe('AMQP', () => {
return true;
}),
{
contentType: 'application/json',
contentEncoding: 'utf8',
mandatory: true,
expiration: 60000,
...message.properties,
headers: {
execId: 'exec1234567890',
taskId: 'task1234567890',
stepId: 'step_1',
compId: 'comp1',
function: 'list',
start: '1432815685034',
reboundIteration: 3,
protocolVersion: 2,
messageId
...message.properties.headers,
end: sinon.match.number,
reboundReason: reboundError.message,
expiration: 60000,
reboundIteration: 3
}
},
sinon.match.func
Expand All @@ -975,21 +941,14 @@ describe('AMQP', () => {
})
};
const messageId = uuid.v4();
const headers = {
execId: 'exec1234567890',
taskId: 'task1234567890',
stepId: 'step_1',
compId: 'comp1',
function: 'list',
start: '1432815685034',
protocolVersion: 2,
messageId
};

const clonedMessage = _.cloneDeep(message);
clonedMessage.properties.headers.reboundIteration = 100;
delete clonedMessage.properties.headers.reply_to;
clonedMessage.properties.headers.messageId = messageId;

await amqp.sendRebound(new Error('Rebound error'), clonedMessage, headers);
const reboundError = new Error('Rebound error');
await amqp.sendRebound(reboundError, clonedMessage);
expect(amqp.publishChannel.publish).to.have.been.calledOnce.and.calledWith(
settings.PUBLISH_MESSAGES_TO,
settings.ERROR_ROUTING_KEY,
Expand All @@ -1003,18 +962,18 @@ describe('AMQP', () => {
return true;
}),
{
contentType: 'application/json',

contentEncoding: 'utf8',
contentType: 'application/json',
mandatory: true,
headers: {
end: sinon.match.number,
execId: 'exec1234567890',
taskId: 'task1234567890',
stepId: 'step_1',
compId: 'comp1',
function: 'list',
start: '1432815685034',
messageId,
protocolVersion: 2,
messageId
reboundIteration: 100,
reboundReason: reboundError.message,
taskId: 'task1234567890'
}
},
sinon.match.func
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "elasticio-sailor-nodejs",
"description": "The official elastic.io library for bootstrapping and executing for Node.js connectors",
"version": "2.6.19",
"version": "2.6.20",
"main": "run.js",
"scripts": {
"lint": "./node_modules/.bin/eslint lib spec mocha_spec lib run.js runService.js",
Expand Down

0 comments on commit e79b225

Please sign in to comment.