Skip to content

Commit

Permalink
Minor refactor to handle weighted payloads, add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alexcasalboni committed Jan 3, 2022
1 parent 62096b6 commit 441c47a
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 15 deletions.
26 changes: 20 additions & 6 deletions lambda/utils.js
Expand Up @@ -284,6 +284,7 @@ module.exports.invokeLambda = (lambdaARN, alias, payload) => {
* Fetch the body of an S3 object, given an S3 path such as s3://BUCKET/KEY
*/
module.exports.fetchPayloadFromS3 = async(s3Path) => {
console.log('Fetch payload from S3', s3Path);

if (typeof s3Path !== 'string' || s3Path.indexOf('s3://') === -1) {
throw new Error('Invalid S3 path, not a string in the format s3://BUCKET/KEY');
Expand All @@ -299,27 +300,40 @@ module.exports.fetchPayloadFromS3 = async(s3Path) => {
throw new Error(`Invalid S3 path: "${s3Path}" (bucket: ${bucket}, key: ${key})`);
}

const s3 = new AWS.S3();
const data = await utils._fetchS3Object(bucket, key);

try {
// try to parse into JSON object
return JSON.parse(data);
} catch (_) {
// otherwise return as is
return data;
}


};

module.exports._fetchS3Object = async(bucket, key) => {
const s3 = new AWS.S3();
try {
const data = await s3.getObject({
const response = await s3.getObject({
Bucket: bucket,
Key: key,
}).promise();
return data.Body.toString('utf-8');
return response.Body.toString('utf-8');
} catch (err) {
if (err.statusCode === 403) {
throw new Error(
`Permission denied when trying to read ${s3Path}. ` +
`Permission denied when trying to read s3://${bucket}/${key}. ` +
'You might need to re-deploy the app with the correct payloadS3Bucket parameter.',
);
} else if (err.statusCode === 404) {
throw new Error(
`The S3 object ${s3Path} does not exist. ` +
`The object s3://${bucket}/${key} does not exist. ` +
'Make sure you are trying to access an existing object in the correct bucket.',
);
} else {
throw new Error(`Unknown error when trying to read ${s3Path}. ${err.message}`);
throw new Error(`Unknown error when trying to read s3://${bucket}/${key}. ${err.message}`);
}
}
};
Expand Down
48 changes: 42 additions & 6 deletions test/unit/test-lambda.js
Expand Up @@ -71,7 +71,8 @@ var getLambdaAliasStub,
invokeLambdaProcessorStub,
deleteLambdaAliasStub,
waitForFunctionUpdateStub,
getLambdaArchitectureStub;
getLambdaArchitectureStub,
fetchPayloadFromS3Stub;

/** unit tests below **/

Expand Down Expand Up @@ -382,10 +383,11 @@ describe('Lambda Functions', async() => {
return Math.floor(Math.random() * 2) === 0 ? 'x86_64' : 'arm64';
});

sandBox.stub(utils, 'fetchPayloadFromS3')
fetchPayloadFromS3Stub && fetchPayloadFromS3Stub.restore();
fetchPayloadFromS3Stub = sandBox.stub(utils, 'fetchPayloadFromS3')
.callsFake(async(_arn, _alias, payload) => {
fetchPayloadFromS3Counter++;
return '{"ValueFromS3": "OK"}';
return {ValueFromS3: 'OK'};
});
});

Expand Down Expand Up @@ -1018,11 +1020,11 @@ describe('Lambda Functions', async() => {
});
expect(fetchPayloadFromS3Counter).to.be(1);
for (let payload of invokeLambdaPayloads){
expect(payload).to.be('{"ValueFromS3": "OK"}');
expect(payload).to.be('{"ValueFromS3":"OK"}');
}
});

it('should fetch payload from S3 if boty payload and payloadS3 are given', async() => {
it('should fetch payload from S3 if both payload and payloadS3 are given', async() => {

await invokeForSuccess(handler, {
value: '128',
Expand All @@ -1035,10 +1037,44 @@ describe('Lambda Functions', async() => {
});
expect(fetchPayloadFromS3Counter).to.be(1);
for (let payload of invokeLambdaPayloads){
expect(payload).to.be('{"ValueFromS3": "OK"}');
expect(payload).to.be('{"ValueFromS3":"OK"}');
}
});

it('should generate weighted payload from S3', async() => {
fetchPayloadFromS3Stub && fetchPayloadFromS3Stub.restore();
fetchPayloadFromS3Stub = sandBox.stub(utils, 'fetchPayloadFromS3')
.callsFake(async(_arn, _alias, payload) => {
fetchPayloadFromS3Counter++;
return [
{ payload: {test: 'A'}, weight: 10 },
{ payload: {test: 'B'}, weight: 30 },
{ payload: {test: 'C'}, weight: 60 },
];
});

await invokeForSuccess(handler, {
value: '128',
input: {
lambdaARN: 'arnOK',
num: 10,
payloadS3: 's3://my-bucket/my-key.json',
},
});

expect(fetchPayloadFromS3Counter).to.be(1);
expect(invokeLambdaPayloads.length).to.be(10);
const counters = {
A: 0, B: 0, C: 0,
};
invokeLambdaPayloads.forEach(payload => {
counters[JSON.parse(payload).test] += 1;
});
expect(counters.A).to.be(1);
expect(counters.B).to.be(3);
expect(counters.C).to.be(6);
});

});

describe('analyzer', () => {
Expand Down
32 changes: 29 additions & 3 deletions test/unit/test-utils.js
Expand Up @@ -721,9 +721,8 @@ describe('Lambda Utils', () => {

it('should fetch the object from S3 if valid URI', async() => {
const payload = await utils.fetchPayloadFromS3('s3://my-bucket/my-key.json');
expect(payload).to.be.a('string');
expect(payload).to.contain('OK');
expect(JSON.parse(payload).Value).to.be('OK');
expect(payload).to.be.an('object');
expect(payload.Value).to.be('OK');
});

const invalidURIs = [
Expand Down Expand Up @@ -792,6 +791,33 @@ describe('Lambda Utils', () => {
}
});

const validJson = [
'{"value": "ok"}',
'[1, 2, 3]',
'[{"value": "ok"}, {"value2": "ok2"}]',
];

validJson.forEach(async(str) => {
it('should parse string if valid json - ' + str, async() => {
AWS.remock('S3', 'getObject', (params, callback) => {
callback(null, {Body: str});
});

const payload = await utils.fetchPayloadFromS3('s3://bucket/key.json');
expect(payload).to.be.an('object');
});
});

it('should return string if invalid json', async() => {
AWS.remock('S3', 'getObject', (params, callback) => {
callback(null, {Body: 'just a string'});
});

const payload = await utils.fetchPayloadFromS3('s3://bucket/key.json');
expect(payload).to.be.a('string');
expect(payload).to.equal('just a string');
});

});

});

0 comments on commit 441c47a

Please sign in to comment.