Skip to content

Commit

Permalink
elasticio-4100 Handle case of webhook - error handler - request-reply
Browse files Browse the repository at this point in the history
  • Loading branch information
Anton Kotenko authored and anton-kotenko committed Jul 1, 2020
1 parent 6ab87f1 commit e760f8d
Show file tree
Hide file tree
Showing 6 changed files with 285 additions and 229 deletions.
10 changes: 7 additions & 3 deletions lib/amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,14 @@ class Amqp {
}
const errorPayload = JSON.stringify(payload);

let result = this.sendToExchange(settings.PUBLISH_MESSAGES_TO, settings.ERROR_ROUTING_KEY,
errorPayload, properties, throttle);
let result = this.sendToExchange(
settings.PUBLISH_MESSAGES_TO,
settings.ERROR_ROUTING_KEY,
errorPayload, properties,
throttle
);

if (headers.reply_to) {
if (!settings.NO_ERROR_REPLIES && headers.reply_to) {
log.debug('Sending error to %s', headers.reply_to);
const replyToOptions = _.cloneDeep(properties);
replyToOptions.headers[HEADER_ERROR_RESPONSE] = true;
Expand Down
111 changes: 65 additions & 46 deletions lib/settings.js
Original file line number Diff line number Diff line change
@@ -1,39 +1,8 @@
const _ = require('lodash');


const PREFIX = 'ELASTICIO_';

function readFrom(envVars) {
const result = {};

// required settings
const requiredAlways = [
'FLOW_ID',
'EXEC_ID',
'STEP_ID',
'CONTAINER_ID',
'WORKSPACE_ID',

'USER_ID',
'COMP_ID',
'FUNCTION',

'API_URI',
'API_USERNAME',
'API_KEY'
];

const requiredForMessageProcessing = [
'AMQP_URI',
'LISTEN_MESSAGES_ON',
'PUBLISH_MESSAGES_TO',

'DATA_ROUTING_KEY',
'ERROR_ROUTING_KEY',
'REBOUND_ROUTING_KEY',
'SNAPSHOT_ROUTING_KEY'
];

function getOptionalEnvVars(envVars) {
const optional = {
REBOUND_INITIAL_EXPIRATION: 15000,
REBOUND_LIMIT: 20,
Expand All @@ -53,9 +22,29 @@ function readFrom(envVars) {
AMQP_PUBLISH_MAX_RETRY_DELAY: 5 * 60 * 1000, // 5 mins
OUTGOING_MESSAGE_SIZE_LIMIT: 10485760,
NO_SELF_PASSTRHOUGH: false,
PROTOCOL_VERSION: 1
PROTOCOL_VERSION: 1,
NO_ERROR_REPLIES: false
};

const result = {};
_.forEach(optional, function readOptional(defaultValue, key) {
const envVarName = PREFIX + key;
if (typeof defaultValue === 'number' && envVars[envVarName]) {
result[key] = parseInt(envVars[envVarName]) || defaultValue;
} else if (typeof defaultValue === 'boolean') {
if (envVars[envVarName] === undefined) {
result[key] = defaultValue;
} else {
result[key] = (!envVars[envVarName] || envVars[envVarName] === 'false') ? false : true;
}
} else {
result[key] = envVars[envVarName] || defaultValue;
}
});
return result;
}

function getAdditionalVars(envVars) {
if (envVars.ELASTICIO_ADDITIONAL_VARS_FOR_HEADERS) {
const vars = {};
envVars.ELASTICIO_ADDITIONAL_VARS_FOR_HEADERS
Expand All @@ -65,32 +54,62 @@ function readFrom(envVars) {
const key = env.indexOf(PREFIX) === 0 ? env.slice(PREFIX.length) : env;
vars[key] = envVars[env];
});
result.additionalVars = vars;

return vars;
}
}

function getMandatoryEnvVars(envVars) {
// required settings
const requiredAlways = [
'FLOW_ID',
'EXEC_ID',
'STEP_ID',
'CONTAINER_ID',
'WORKSPACE_ID',

'USER_ID',
'COMP_ID',
'FUNCTION',

'API_URI',
'API_USERNAME',
'API_KEY'
];

const requiredForMessageProcessing = [
'AMQP_URI',
'LISTEN_MESSAGES_ON',
'PUBLISH_MESSAGES_TO',

'DATA_ROUTING_KEY',
'ERROR_ROUTING_KEY',
'REBOUND_ROUTING_KEY',
'SNAPSHOT_ROUTING_KEY'
];

const envVarsList = requiredAlways.slice(0);

if (!envVars.ELASTICIO_HOOK_SHUTDOWN) {
envVarsList.push(...requiredForMessageProcessing);
}

envVarsList.forEach(key => {
return envVarsList.reduce((result, key) => {
const envVarName = PREFIX + key;
if (!envVars[envVarName]) {
throw new Error(`${envVarName} is missing`);
}
result[key] = envVars[envVarName];
});

_.forEach(optional, function readOptional(defaultValue, key) {
const envVarName = PREFIX + key;
if (typeof defaultValue === 'number' && envVars[envVarName]) {
result[key] = parseInt(envVars[envVarName]) || defaultValue;
} else {
result[key] = envVars[envVarName] || defaultValue;
}
});
return result;
}, {});
}

return result;
function readFrom(envVars) {
return {
...getMandatoryEnvVars(envVars),
...getOptionalEnvVars(envVars),
additionalVars: getAdditionalVars(envVars)
};
}

exports.readFrom = readFrom;
188 changes: 125 additions & 63 deletions mocha_spec/unit/amqp.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,73 +12,78 @@ const encryptor = require('../../lib/encryptor.js');
const Amqp = require('../../lib/amqp.js').Amqp;

describe('AMQP', () => {
process.env.ELASTICIO_MESSAGE_CRYPTO_PASSWORD = 'testCryptoPassword';
process.env.ELASTICIO_MESSAGE_CRYPTO_IV = 'iv=any16_symbols';

const envVars = {};
envVars.ELASTICIO_AMQP_URI = 'amqp://test2/test2';
envVars.ELASTICIO_AMQP_PUBLISH_RETRY_ATTEMPTS = 10;
envVars.ELASTICIO_AMQP_PUBLISH_MAX_RETRY_DELAY = 60 * 1000;

envVars.ELASTICIO_FLOW_ID = '5559edd38968ec0736000003';
envVars.ELASTICIO_STEP_ID = 'step_1';
envVars.ELASTICIO_EXEC_ID = 'some-exec-id';
envVars.ELASTICIO_WORKSPACE_ID = '5559edd38968ec073600683';
envVars.ELASTICIO_CONTAINER_ID = 'dc1c8c3f-f9cb-49e1-a6b8-716af9e15948';

envVars.ELASTICIO_USER_ID = '5559edd38968ec0736000002';
envVars.ELASTICIO_COMP_ID = '5559edd38968ec0736000456';
envVars.ELASTICIO_FUNCTION = 'list';

envVars.ELASTICIO_LISTEN_MESSAGES_ON = '5559edd38968ec0736000003:step_1:1432205514864:messages';
envVars.ELASTICIO_PUBLISH_MESSAGES_TO = 'userexchange:5527f0ea43238e5d5f000001';
envVars.ELASTICIO_DATA_ROUTING_KEY = '5559edd38968ec0736000003:step_1:1432205514864:message';
envVars.ELASTICIO_ERROR_ROUTING_KEY = '5559edd38968ec0736000003:step_1:1432205514864:error';
envVars.ELASTICIO_REBOUND_ROUTING_KEY = '5559edd38968ec0736000003:step_1:1432205514864:rebound';
envVars.ELASTICIO_SNAPSHOT_ROUTING_KEY = '5559edd38968ec0736000003:step_1:1432205514864:snapshot';

envVars.ELASTICIO_API_URI = 'http://apihost.com';
envVars.ELASTICIO_API_USERNAME = 'test@test.com';
envVars.ELASTICIO_API_KEY = '5559edd';

const settings = Settings.readFrom(envVars);

const message = {
fields: {
consumerTag: 'abcde',
deliveryTag: 12345,
exchange: 'test',
routingKey: 'test.hello'
},
properties: {
contentType: 'application/json',
contentEncoding: 'utf8',
headers: {
taskId: 'task1234567890',
execId: 'exec1234567890',
reply_to: 'replyTo1234567890',
protocolVersion: 2
},
deliveryMode: undefined,
priority: undefined,
correlationId: undefined,
replyTo: undefined,
expiration: undefined,
messageId: undefined,
timestamp: undefined,
type: undefined,
userId: undefined,
appId: undefined,
mandatory: true,
clusterId: ''
},
content: encryptor.encryptMessageContent({ content: 'Message content' })
};
let settings;
let message;
let sandbox;

beforeEach(() => {
sandbox = sinon.createSandbox();
sandbox.spy(encryptor, 'decryptMessageContent');

// specially for cipher.js
// TODO fix cipher.js to accept encryption settings
// though argument.
process.env.ELASTICIO_MESSAGE_CRYPTO_PASSWORD = 'testCryptoPassword';
process.env.ELASTICIO_MESSAGE_CRYPTO_IV = 'iv=any16_symbols';
const envVars = {};
envVars.ELASTICIO_AMQP_URI = 'amqp://test2/test2';
envVars.ELASTICIO_AMQP_PUBLISH_RETRY_ATTEMPTS = 10;
envVars.ELASTICIO_AMQP_PUBLISH_MAX_RETRY_DELAY = 60 * 1000;

envVars.ELASTICIO_FLOW_ID = '5559edd38968ec0736000003';
envVars.ELASTICIO_STEP_ID = 'step_1';
envVars.ELASTICIO_EXEC_ID = 'some-exec-id';
envVars.ELASTICIO_WORKSPACE_ID = '5559edd38968ec073600683';
envVars.ELASTICIO_CONTAINER_ID = 'dc1c8c3f-f9cb-49e1-a6b8-716af9e15948';

envVars.ELASTICIO_USER_ID = '5559edd38968ec0736000002';
envVars.ELASTICIO_COMP_ID = '5559edd38968ec0736000456';
envVars.ELASTICIO_FUNCTION = 'list';

envVars.ELASTICIO_LISTEN_MESSAGES_ON = '5559edd38968ec0736000003:step_1:1432205514864:messages';
envVars.ELASTICIO_PUBLISH_MESSAGES_TO = 'userexchange:5527f0ea43238e5d5f000001';
envVars.ELASTICIO_DATA_ROUTING_KEY = '5559edd38968ec0736000003:step_1:1432205514864:message';
envVars.ELASTICIO_ERROR_ROUTING_KEY = '5559edd38968ec0736000003:step_1:1432205514864:error';
envVars.ELASTICIO_REBOUND_ROUTING_KEY = '5559edd38968ec0736000003:step_1:1432205514864:rebound';
envVars.ELASTICIO_SNAPSHOT_ROUTING_KEY = '5559edd38968ec0736000003:step_1:1432205514864:snapshot';

envVars.ELASTICIO_API_URI = 'http://apihost.com';
envVars.ELASTICIO_API_USERNAME = 'test@test.com';
envVars.ELASTICIO_API_KEY = '5559edd';

settings = Settings.readFrom(envVars);

message = {
fields: {
consumerTag: 'abcde',
deliveryTag: 12345,
exchange: 'test',
routingKey: 'test.hello'
},
properties: {
contentType: 'application/json',
contentEncoding: 'utf8',
headers: {
taskId: 'task1234567890',
execId: 'exec1234567890',
reply_to: 'replyTo1234567890',
protocolVersion: 2
},
deliveryMode: undefined,
priority: undefined,
correlationId: undefined,
replyTo: undefined,
expiration: undefined,
messageId: undefined,
timestamp: undefined,
type: undefined,
userId: undefined,
appId: undefined,
mandatory: true,
clusterId: ''
},
content: encryptor.encryptMessageContent({ content: 'Message content' })
};
});
afterEach(() => {
sandbox.restore();
Expand Down Expand Up @@ -385,7 +390,7 @@ describe('AMQP', () => {
it('Should throw error after ${settings.AMQP_PUBLISH_RETRY_ATTEMPTS} attempts to publish message',
async function test() {
this.timeout(20000); // eslint-disable-line
const retryCount = envVars.ELASTICIO_AMQP_PUBLISH_RETRY_ATTEMPTS;
const retryCount = settings.AMQP_PUBLISH_RETRY_ATTEMPTS;
const amqp = new Amqp(settings);
amqp.publishChannel = {
on: sandbox.stub(),
Expand Down Expand Up @@ -704,6 +709,63 @@ describe('AMQP', () => {
}
);
});
it('Should not send message to reply_to queue if NO_ERROR_REPLIES is set to true', async () => {
settings.NO_ERROR_REPLIES = true;
const expectedErrorPayload = {
error: {
name: 'Error',
message: 'Test error',
stack: sinon.match.string
},
errorInput: {
content: 'Message content'
}
};

const amqp = new Amqp(settings);
amqp.publishChannel = {
on: sandbox.stub(),
publish: sandbox.stub().callsFake((exchangeName, routingKey, payloadBuffer, options, cb) => {
cb(null, 'Success');
return true;
})
};
const messageId = uuid.v4();
const headers = {
taskId: 'task1234567890',
stepId: 'step_456',
reply_to: 'my-special-routing-key',
protocolVersion: 1,
messageId
};

await amqp.sendError(new Error('Test error'), headers, message);
expect(amqp.publishChannel.publish).to.have.been.calledOnce
.and.calledWith(
settings.PUBLISH_MESSAGES_TO,
'5559edd38968ec0736000003:step_1:1432205514864:error',
sinon.match(arg => {
const payload = JSON.parse(arg.toString());
payload.error = encryptor.decryptMessageContent(payload.error, 'base64');
payload.errorInput = encryptor.decryptMessageContent(payload.errorInput, 'base64');

return sinon.match(expectedErrorPayload).test(payload);
}),
{
contentType: 'application/json',
contentEncoding: 'utf8',
mandatory: true,
headers: {
messageId,
taskId: 'task1234567890',
stepId: 'step_456',
reply_to: 'my-special-routing-key',
protocolVersion: 1
}
}
);
});


it('Should not provide errorInput if errorInput was empty', async () => {
const amqp = new Amqp(settings);
Expand Down

0 comments on commit e760f8d

Please sign in to comment.