Skip to content

Commit

Permalink
Fixing bug 2859 (#128)
Browse files Browse the repository at this point in the history
* Publish fail for the message leads to fails of all subsequent pending messages

* fix PR comments

* change version to 2.6.1-dev1

* fix error in publish function

* version bump
  • Loading branch information
VitaliyKovalchuk committed Jan 15, 2020
1 parent 7a7d9bd commit 95a667e
Show file tree
Hide file tree
Showing 3 changed files with 471 additions and 261 deletions.
29 changes: 19 additions & 10 deletions lib/amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class Amqp {
} catch (err) {
log.error(err,
'Error occurred while parsing message #%j payload',
message.fields.deliveryTag,
message.fields.deliveryTag
);
return self.reject(message);
}
Expand Down Expand Up @@ -125,26 +125,35 @@ class Amqp {

async publishMessage(exchangeName, routingKey, payloadBuffer, options, iteration) {
const settings = this.settings;
const result = this.publishChannel.publish(exchangeName, routingKey, payloadBuffer, options);
if (!result) {
log.warn('Buffer full when publishing a message to '
+ 'exchange=%s with routingKey=%s', exchangeName, routingKey);
const publishChannel = this.publishChannel;

function promisifiedPublish(exchangeName, routingKey, payloadBuffer, options) {
let result;
return new Promise((resolve, reject) => {
result = publishChannel.publish(exchangeName, routingKey, payloadBuffer, options, (err, ok) => {
err ? reject(err) : resolve(ok);
});
})
.then(() => result);
}

try {
await this.publishChannel.waitForConfirms();
const result = await promisifiedPublish(exchangeName, routingKey, payloadBuffer, options);
if (!result) {
log.warn('Buffer full when publishing a message to '
+ 'exchange=%s with routingKey=%s', exchangeName, routingKey);
}
return result;
} catch (error) {
log.error('Failed on publishing message to queue');
await new Promise(resolve => { setTimeout(resolve, settings.AMQP_PUBLISH_RETRY_DELAY); });
log.error(error, 'Failed on publishing message to queue');
await new Promise(resolve => setTimeout(resolve, settings.AMQP_PUBLISH_RETRY_DELAY));
iteration += 1;
if (iteration < settings.AMQP_PUBLISH_RETRY_ATTEMPTS) {
return await this.publishMessage(exchangeName, routingKey, payloadBuffer, options, iteration);
} else {
throw new Error(`Failed on publishing ${options.headers.messageId} message to MQ: ` + error);
}
}

return result;
}

async prepareMessageAndSendToExchange(data, properties, routingKey, throttle) {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "elasticio-sailor-nodejs",
"description": "The official elastic.io library for bootstrapping and executing for Node.js connectors",
"version": "2.6.0",
"version": "2.6.1",
"main": "run.js",
"scripts": {
"lint": "./node_modules/.bin/eslint lib spec mocha_spec lib run.js runService.js",
Expand Down
Loading

0 comments on commit 95a667e

Please sign in to comment.