Skip to content

Commit

Permalink
Added parsed records to afterHook
Browse files Browse the repository at this point in the history
  • Loading branch information
AntonBazhal committed Mar 2, 2017
1 parent b33ef10 commit bba12b8
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 138 deletions.
47 changes: 29 additions & 18 deletions lib/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ module.exports = function(options) {

utils.validate(event, schemas.EVENT, { allowUnknown: true });

const actions = event.Records.reduce((acc, record) => {
const parsedEvent = event.Records.reduce((acc, record) => {
try {
const parsedRecord = marshaler.toJS({
newDoc: { M: record.dynamodb.NewImage || {} },
oldDoc: { M: record.dynamodb.OldImage || {} },
keys: { M: record.dynamodb.Keys }
NewImage: { M: record.dynamodb.NewImage || {} },
OldImage: { M: record.dynamodb.OldImage || {} },
Keys: { M: record.dynamodb.Keys }
});

const id = options.idField
? utils.assembleField(parsedRecord, options.idField, separator)
: utils.assembleField(parsedRecord, _.keys(parsedRecord.keys), separator);
: utils.assembleField(parsedRecord, _.keys(parsedRecord.Keys), separator);

const actionDescriptionObj = {
_index: options.index
Expand All @@ -53,26 +53,28 @@ module.exports = function(options) {
}

const doc = options.pickFields
? _.pick(parsedRecord.newDoc, options.pickFields)
: parsedRecord.newDoc;
? _.pick(parsedRecord.NewImage, options.pickFields)
: parsedRecord.NewImage;

switch (record.eventName) {
case 'INSERT':
case 'MODIFY':
acc.push({ index: actionDescriptionObj });
acc.push(doc);
acc.actions.push({ index: actionDescriptionObj });
acc.actions.push(doc);
break;

case 'REMOVE':
if (_.has(actionDescriptionObj, 'version')) {
actionDescriptionObj.version++;
}
acc.push({ delete: actionDescriptionObj });
acc.actions.push({ delete: actionDescriptionObj });
break;

default:
throw new errors.UnknownEventNameError(record);
}

acc.records.push(_.merge({}, record, { dynamodb: parsedRecord }));
} catch (err) {
if (options.recordErrorHook) {
options.recordErrorHook(event, context, err);
Expand All @@ -82,23 +84,32 @@ module.exports = function(options) {
}

return acc;
}, []);
}, { actions: [], records: [] });

if (actions.length === 0) {
if (parsedEvent.actions.length === 0) {
return {
took: 0,
errors: false,
items: []
esResult: {
took: 0,
errors: false,
items: []
},
parsedRecords: { Records: parsedEvent.records }
};
}

return esclient.bulk({ body: actions });
return esclient.bulk({ body: parsedEvent.actions })
.then(esResult => {
return {
esResult,
parsedRecords: { Records: parsedEvent.records }
};
});
})
.then(result => {
if (options.afterHook) {
options.afterHook(event, context, result);
options.afterHook(event, context, result.esResult, result.parsedRecords);
}
return result;
return result.esResult;
})
.catch(err => {
if (options.errorHook) {
Expand Down
2 changes: 1 addition & 1 deletion lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ module.exports = {
},

getField(parsedRecord, path) {
const value = [parsedRecord.keys, parsedRecord.newDoc, parsedRecord.oldDoc]
const value = [parsedRecord.Keys, parsedRecord.NewImage, parsedRecord.OldImage]
.reduce((acc, entry) => {
return acc === undefined
? _.get(entry, path)
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "dynamo2es-lambda",
"version": "1.0.2",
"version": "1.0.3",
"description": "Configurable AWS Lambda handler to index documents from DynamoDB Streams in Amazon Elasticsearch",
"repository": {
"type": "git",
Expand Down

0 comments on commit bba12b8

Please sign in to comment.