/
kinesis_lambda_es.js
71 lines (62 loc) · 2.09 KB
/
kinesis_lambda_es.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/*
* Sample node.js code for AWS Lambda to upload the JSON documents
* pushed from Kinesis to Amazon Elasticsearch.
*
* Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: MIT-0
*/
/* == Imports == */
var AWS = require('aws-sdk');
var path = require('path');
/* == Globals == */
var esDomain = {
region: 'us-east-1',
endpoint: 'my-domain-search-endpoint',
index: 'myindex',
doctype: 'mytype'
};
var endpoint = new AWS.Endpoint(esDomain.endpoint);
/*
* The AWS credentials are picked up from the environment.
* They belong to the IAM role assigned to the Lambda function.
* Since the ES requests are signed using these credentials,
* make sure to apply a policy that allows ES domain operations
* to the role.
*/
var creds = new AWS.EnvironmentCredentials('AWS');
/* Lambda "main": Execution begins here */
exports.handler = function(event, context) {
console.log(JSON.stringify(event, null, ' '));
event.Records.forEach(function(record) {
var jsonDoc = new Buffer(record.kinesis.data, 'base64');
postToES(jsonDoc.toString(), context);
});
}
/*
* Post the given document to Elasticsearch
*/
function postToES(doc, context) {
var req = new AWS.HttpRequest(endpoint);
req.method = 'POST';
req.path = path.join('/', esDomain.index, esDomain.doctype);
req.region = esDomain.region;
req.headers['presigned-expires'] = false;
req.headers['Host'] = endpoint.host;
req.body = doc;
var signer = new AWS.Signers.V4(req , 'es'); // es: service code
signer.addAuthorization(creds, new Date());
var send = new AWS.NodeHttpClient();
send.handleRequest(req, null, function(httpResp) {
var respBody = '';
httpResp.on('data', function (chunk) {
respBody += chunk;
});
httpResp.on('end', function (chunk) {
console.log('Response: ' + respBody);
context.succeed('Lambda added document ' + doc);
});
}, function(err) {
console.log('Error: ' + err);
context.fail('Lambda failed with error ' + err);
});
}