Skip to content

Commit

Permalink
feat(*): Adding log processer lambda to firehose
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-cottone committed Jul 29, 2018
1 parent c6f706a commit 205b1a0
Show file tree
Hide file tree
Showing 8 changed files with 438 additions and 18 deletions.
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@
]
},
"dependencies": {
"lodash": "^4.17.10"
"fs-extra": "7.0.0",
"lodash": "4.17.10"
},
"devDependencies": {
"@semantic-release/changelog": "3.0.0",
"@semantic-release/git": "7.0.1",
"@semantic-release/github": "5.0.1",
"@types/fs-extra": "5.0.4",
"@types/lodash": "4.14.115",
"@types/node": "^10.5.4",
"@types/node": "10.5.4",
"rimraf": "2.6.2",
"semantic-release": "15.8.1",
"tslint": "5.11.0",
Expand Down
5 changes: 5 additions & 0 deletions src/formatters/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ export function formatIamFirehoseRole(opts: IFormatterOpts): any {
service,
'es-logs-cw-policy',
];
properties.Policies[3].PolicyName['Fn::Join'][1] = [
stage,
service,
'es-logs-lambda-policy',
];
properties.RoleName['Fn::Join'][1] = [
service,
stage,
Expand Down
76 changes: 69 additions & 7 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import _ from 'lodash';
import fs from 'fs-extra';
import path from 'path';

import { formatFirehose, formatIamFirehoseRole, formatLogGroup } from './formatters';

Expand All @@ -8,6 +10,7 @@ const cloudwatchLogStreamEsTemplate = require('../templates/cloudwatch-log-strea
const cloudwatchLogStreamS3Template = require('../templates/cloudwatch-log-stream-s3.json');
const firehoseTemplate = require('../templates/firehose.json');
const iamCloudwatchTemplate = require('../templates/iam-cloudwatch.json');
const iamLambdaTemplate = require('../templates/iam-lambda.json');
const iamFirehoseTemplate = require('../templates/iam-firehose.json');
const s3Template = require('../templates/s3.json');
// tslint:enable:no-var-requires
Expand All @@ -18,6 +21,8 @@ class ServerlessEsLogsPlugin {
private serverless: any;
private options: { [name: string]: any };
private custom: { [name: string]: any };
private logProcesserDir: string = '_es-logs';
private logProcesserName: string = 'esLogsProcesser';

constructor(serverless: any, options: { [name: string]: any }) {
this.serverless = serverless;
Expand All @@ -26,24 +31,34 @@ class ServerlessEsLogsPlugin {
this.custom = serverless.service.custom || {};
this.hooks = {
'after:package:initialize': this.afterPackageInitialize.bind(this),
'aws:package:finalize:mergeCustomProviderResources': this.mergeResources.bind(this),
'after:package:createDeploymentArtifacts': this.afterPackageCreateDeploymentArtifacts.bind(this),
'aws:package:finalize:mergeCustomProviderResources': this.mergeCustomProviderResources.bind(this),
};
}

private afterPackageCreateDeploymentArtifacts(): void {
this.serverless.cli.log('ServerlessEsLogsPlugin.afterPackageCreateDeploymentArtifacts()');
this.cleanupFiles();
}

private afterPackageInitialize(): void {
this.serverless.cli.log('ServerlessEsLogsPlugin.afterPackageInitialize()');
this.options.stage = this.options.stage
this.options.stage = this.options.stage
|| this.serverless.service.provider.stage
|| (this.serverless.service.defaults && this.serverless.service.defaults.stage)
|| 'dev';
this.options.region = this.options.region
|| this.serverless.service.provider.region
|| (this.serverless.service.defaults && this.serverless.service.defaults.region)
|| 'us-east-1';

// Add log processing lambda
// TODO: Find the right lifecycle method for this
this.addLogProcesser();
}

private mergeResources(): void {
this.serverless.cli.log('ServerlessEsLogsPlugin.mergeResources()');
private mergeCustomProviderResources(): void {
this.serverless.cli.log('ServerlessEsLogsPlugin.mergeCustomProviderResources()');
const { stage, region } = this.options;
const template = this.serverless.service.provider.compiledCloudFormationTemplate;
const formatOpts = {
Expand All @@ -55,10 +70,10 @@ class ServerlessEsLogsPlugin {
stage,
};

// Add cloudwatch subscriptions to existing functions
// Add cloudwatch subscriptions to firehose for functions' log groups
this.addCloudwatchSubscriptions();

// Add resources for firehose -> elasticsearch
// Add custom resources for firehose -> elasticsearch
_.merge(template.Resources, s3Template);
_.merge(template.Resources, formatLogGroup({
...formatOpts,
Expand All @@ -67,6 +82,7 @@ class ServerlessEsLogsPlugin {
_.merge(template.Resources, cloudwatchLogStreamEsTemplate);
_.merge(template.Resources, cloudwatchLogStreamS3Template);
_.merge(template.Resources, iamCloudwatchTemplate);
_.merge(template.Resources, iamLambdaTemplate);
_.merge(template.Resources, formatIamFirehoseRole({
...formatOpts,
template: iamFirehoseTemplate,
Expand All @@ -76,14 +92,19 @@ class ServerlessEsLogsPlugin {
template: firehoseTemplate,
}));

// console.log(JSON.stringify(template, null, 2));
// Patch log processer lambda role
this.patchLogProcesserRole();
}

private addCloudwatchSubscriptions(): void {
const template = this.serverless.service.provider.compiledCloudFormationTemplate;
const subscriptionsTemplate: { [name: string]: any } = {};
const functions = this.serverless.service.getAllFunctions();
functions.forEach((name: string) => {
if (name === this.logProcesserName) {
return;
}

const normalizedFunctionName = this.provider.naming.getNormalizedFunctionName(name);
const logicalId = `${normalizedFunctionName}SubscriptionFilter`;
const logGroupLogicalId = `${normalizedFunctionName}LogGroup`;
Expand Down Expand Up @@ -114,6 +135,47 @@ class ServerlessEsLogsPlugin {
});
_.merge(template.Resources, subscriptionsTemplate);
}

private addLogProcesser(): void {
const dirPath = path.join(this.serverless.config.servicePath, this.logProcesserDir);
const filePath = path.join(dirPath, 'index.js');
const handler = `${this.logProcesserDir}/index.handler`;
const name = `${this.serverless.service.service}-${this.options.stage}-es-logs-plugin`;
fs.ensureDirSync(dirPath);
fs.copySync(path.resolve(__dirname, '../templates/logProcesser.js'), filePath);
this.serverless.service.functions[this.logProcesserName] = {
description: 'Serverless ES Logs Plugin',
handler,
events: [],
memorySize: 512,
name,
runtime: 'nodejs8.10',
package: {
individually: true,
exclude: ['**'],
include: [this.logProcesserDir + '/**'],
},
timeout: 60,
};
}

private patchLogProcesserRole(): void {
const normalizedFunctionName = this.provider.naming.getNormalizedFunctionName(this.logProcesserName);
const templateKey = `${normalizedFunctionName}LambdaFunction`;
const template = this.serverless.service.provider.compiledCloudFormationTemplate;
template.Resources[templateKey].DependsOn.push('ServerlessEsLogsLambdaIAMRole');
template.Resources[templateKey].Properties.Role = {
'Fn::GetAtt': [
'ServerlessEsLogsLambdaIAMRole',
'Arn',
],
};
}

private cleanupFiles(): void {
const dirPath = path.join(this.serverless.config.servicePath, this.logProcesserDir);
fs.removeSync(dirPath);
}
}

export = ServerlessEsLogsPlugin;
47 changes: 42 additions & 5 deletions templates/firehose.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"ServerlessEsLogsLogGroup",
"ServerlessEsLogsEsLogStream",
"ServerlessEsLogsS3LogStream",
"ServerlessEsLogsIAMRole"
"ServerlessEsLogsIAMRole",
"EsLogsProcesserLambdaFunction"
],
"Properties": {
"DeliveryStreamName": "",
Expand All @@ -24,7 +25,7 @@
"IndexRotationPeriod": "OneDay",
"BufferingHints": {
"IntervalInSeconds": 60,
"SizeInMBs": 5
"SizeInMBs": 3
},
"RetryOptions": {
"DurationInSeconds": 60
Expand All @@ -45,7 +46,7 @@
},
"BufferingHints": {
"IntervalInSeconds": 60,
"SizeInMBs": 5
"SizeInMBs": 3
},
"CompressionFormat": "UNCOMPRESSED",
"EncryptionConfiguration": {
Expand All @@ -62,8 +63,44 @@
}
},
"ProcessingConfiguration": {
"Enabled": false,
"Processors": []
"Enabled": true,
"Processors": [
{
"Type": "Lambda",
"Parameters": [
{
"ParameterName": "LambdaArn",
"ParameterValue": {
"Fn::GetAtt": [
"EsLogsProcesserLambdaFunction",
"Arn"
]
}
},
{
"ParameterName": "NumberOfRetries",
"ParameterValue": "3"
},
{
"ParameterName": "RoleArn",
"ParameterValue": {
"Fn::GetAtt": [
"ServerlessEsLogsIAMRole",
"Arn"
]
}
},
{
"ParameterName": "BufferSizeInMBs",
"ParameterValue": "3"
},
{
"ParameterName": "BufferIntervalInSeconds",
"ParameterValue": "60"
}
]
}
]
},
"CloudWatchLoggingOptions": {
"Enabled": true,
Expand Down
2 changes: 1 addition & 1 deletion templates/iam-firehose.json
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@
"Resource": [
{
"Fn::GetAtt": [
"ServerlessEsLogsLogGroup",
"EsLogsProcesserLambdaFunction",
"Arn"
]
}
Expand Down
42 changes: 42 additions & 0 deletions templates/iam-lambda.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"ServerlessEsLogsLambdaIAMRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
},
"Action": [
"sts:AssumeRole"
]
}
]
},
"Policies": [
{
"PolicyName": "lambda-to-firehose-policy",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"firehose:*"
],
"Resource": {
"Fn::Sub": "arn:aws:firehose:${AWS::Region}:${AWS::AccountId}:deliverystream/*"
}
}
]
}
}
]
}
}
}

0 comments on commit 205b1a0

Please sign in to comment.