Skip to content

Commit

Permalink
Merge pull request #42 from fourTheorem/kinesis-data-streams
Browse files Browse the repository at this point in the history
Kinesis data streams
  • Loading branch information
eoinsha committed May 21, 2021
2 parents ea2f974 + 04b6700 commit f32e873
Show file tree
Hide file tree
Showing 10 changed files with 349 additions and 24 deletions.
38 changes: 37 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ SLIC Watch provides a CloudWatch Dashboard and Alarms for:
2. API Gateway
3. Step Functions
4. DynamoDB Tables
5. Kinesis Data Streams (available for Lambda consumers, more coming soon)
5. Kinesis Data Streams
6. SQS Queues (Coming soon)

Currently, SLIC Watch is available as a Serverless Framework plugin.
Expand Down Expand Up @@ -95,6 +95,28 @@ custom:
Threshold: 0
SystemErrors:
Threshold: 0
Kinesis:
GetRecords.IteratorAgeMilliseconds:
Statistic: Maximum
Threshold: 10000
ReadProvisionedThroughputExceeded:
Statistic: Maximum
Threshold: 0
WriteProvisionedThroughputExceeded:
Statistic: Maximum
Threshold: 0
PutRecord.Success:
ComparisonOperator: LessThanThreshold
Statistic: Average
Threshold: 1
PutRecords.Success:
ComparisonOperator: LessThanThreshold
Statistic: Average
Threshold: 1
GetRecords.Success:
ComparisonOperator: LessThanThreshold
Statistic: Average
Threshold: 1

dashboard:
timeRange:
Expand Down Expand Up @@ -143,6 +165,20 @@ custom:
Statistic: ['Sum']
WriteThrottleEvents:
Statistic: ['Sum']
Kinesis:
# Kinesis Data Streams
GetRecords.IteratorAgeMilliseconds:
Statistic: ['Maximum']
ReadProvisionedThroughputExceeded:
Statistic: ['Sum']
WriteProvisionedThroughputExceeded:
Statistic: ['Sum']
PutRecord.Success:
Statistic: ['Average']
PutRecords.Success:
Statistic: ['Average']
GetRecords.Success:
Statistic: ['Average']

```

Expand Down
80 changes: 80 additions & 0 deletions serverless-plugin/alarms-kinesis.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
'use strict'

const { makeResourceName } = require('./util')

const kinesisAlarmTypes = {
StreamIteratorAge: 'GetRecords.IteratorAgeMilliseconds',
StreamReadThroughput: 'ReadProvisionedThroughputExceeded',
StreamWriteThroughput: 'WriteProvisionedThroughputExceeded',
StreamPutRecordSuccess: 'PutRecord.Success',
StreamPutRecordsSuccess: 'PutRecords.Success',
StreamGetRecordsSuccess: 'GetRecords.Success'
}

/**
* @param {object} kinesisAlarmConfig The fully resolved alarm configuration for Kinesis Data Streams
*/
module.exports = function KinesisAlarms (kinesisAlarmConfig, context) {
return {
createKinesisAlarms
}

/**
* Add all required Kinesis Data Stream alarms to the provided CloudFormation template
* based on the resources found within
*
* @param {CloudFormationTemplate} cfTemplate A CloudFormation template object
*/
function createKinesisAlarms (cfTemplate) {
const streamResources = cfTemplate.getResourcesByType(
'AWS::Kinesis::Stream'
)

for (const [streamResourceName, streamResource] of Object.entries(streamResources)) {
for (const [type, metric] of Object.entries(kinesisAlarmTypes)) {
if (kinesisAlarmConfig[metric].enabled) {
const alarm = createStreamAlarm(
streamResourceName,
streamResource,
type,
metric,
kinesisAlarmConfig[metric]
)
cfTemplate.addResource(alarm.resourceName, alarm.resource)
}
}
}
}

function createStreamAlarm (streamResourceName, streamResource, type, metric, config) {
const streamName = streamResource.Properties.Name // TODO: Allow for Ref usage in resource names (see #14)
const threshold = config.Threshold
const metricProperties = {
Dimensions: [{ Name: 'StreamName', Value: streamName }],
MetricName: metric,
Namespace: 'AWS/Kinesis',
Period: config.Period,
Statistic: config.Statistic,
ExtendedStatistic: config.ExtendedStatistic
}

const resource = {
Type: 'AWS::CloudWatch::Alarm',
Properties: {
ActionsEnabled: true,
AlarmActions: [context.topicArn],
AlarmName: `${type}_${streamName}`,
AlarmDescription: `Kinesis ${config.Statistic} ${metric} for ${streamName} breaches ${threshold} milliseconds`,
EvaluationPeriods: 1,
ComparisonOperator: config.ComparisonOperator,
Threshold: config.Threshold,
TreatMissingData: config.TreatMissingData,
...metricProperties
}
}
return {
resourceName: makeResourceName('kinesis', streamName, type),
resource
}
}
}
6 changes: 5 additions & 1 deletion serverless-plugin/alarms.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@ const lambdaAlarms = require('./alarms-lambda')
const apiGatewayAlarms = require('./alarms-api-gateway')
const stepFunctionAlarms = require('./alarms-step-functions')
const dynamoDbAlarms = require('./alarms-dynamodb')
const kinesisAlarms = require('./alarms-kinesis')

module.exports = function alarms (serverless, alarmConfig, context) {
const {
Lambda: lambdaConfig,
ApiGateway: apiGwConfig,
States: sfConfig,
DynamoDB: dynamoDbConfig
DynamoDB: dynamoDbConfig,
Kinesis: kinesisConfig
} = cascade(alarmConfig)
const { createLambdaAlarms } = lambdaAlarms(lambdaConfig, context)
const { createApiGatewayAlarms } = apiGatewayAlarms(apiGwConfig, context)
const { createStatesAlarms } = stepFunctionAlarms(sfConfig, context)
const { createDynamoDbAlarms } = dynamoDbAlarms(dynamoDbConfig, context)
const { createKinesisAlarms } = kinesisAlarms(kinesisConfig, context)

return {
addAlarms
Expand All @@ -35,6 +38,7 @@ module.exports = function alarms (serverless, alarmConfig, context) {
apiGwConfig.enabled && createApiGatewayAlarms(cfTemplate)
sfConfig.enabled && createStatesAlarms(cfTemplate)
dynamoDbConfig.enabled && createDynamoDbAlarms(cfTemplate)
kinesisConfig.enabled && createKinesisAlarms(cfTemplate)
}
}
}
6 changes: 4 additions & 2 deletions serverless-plugin/config-schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ const supportedAlarms = {
Lambda: ['Errors', 'ThrottlesPc', 'DurationPc', 'Invocations', 'IteratorAge'],
ApiGateway: ['5XXError', '4XXError', 'Latency'],
States: ['ExecutionsThrottled', 'ExecutionsFailed', 'ExecutionsTimedOut'],
DynamoDB: ['ReadThrottleEvents', 'WriteThrottleEvents', 'UserErrors', 'SystemErrors']
DynamoDB: ['ReadThrottleEvents', 'WriteThrottleEvents', 'UserErrors', 'SystemErrors'],
Kinesis: ['GetRecords.IteratorAgeMilliseconds', 'ReadProvisionedThroughputExceeded', 'WriteProvisionedThroughputExceeded', 'PutRecord.Success', 'PutRecords.Success', 'GetRecords.Success']
}

const supportedWidgets = {
Lambda: ['Errors', 'Throttles', 'Duration', 'Invocations', 'ConcurrentExecutions', 'IteratorAge'],
ApiGateway: ['5XXError', '4XXError', 'Latency', 'Count'],
States: ['ExecutionsThrottled', 'ExecutionsFailed', 'ExecutionsTimedOut'],
DynamoDB: ['ReadThrottleEvents', 'WriteThrottleEvents']
DynamoDB: ['ReadThrottleEvents', 'WriteThrottleEvents'],
Kinesis: ['GetRecords.IteratorAgeMilliseconds', 'ReadProvisionedThroughputExceeded', 'WriteProvisionedThroughputExceeded', 'PutRecord.Success', 'PutRecords.Success', 'GetRecords.Success']
}

const commonAlarmProperties = {
Expand Down
74 changes: 60 additions & 14 deletions serverless-plugin/dashboard.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ module.exports = function dashboard (serverless, dashboardConfig, context) {
Lambda: lambdaDashConfig,
ApiGateway: apiGwDashConfig,
States: sfDashConfig,
DynamoDB: dynamoDbDashConfig
DynamoDB: dynamoDbDashConfig,
Kinesis: kinesisDashConfig
}
} = cascade(dashboardConfig)

Expand All @@ -38,6 +39,9 @@ module.exports = function dashboard (serverless, dashboardConfig, context) {
const tableResources = cfTemplate.getResourcesByType(
'AWS::DynamoDB::Table'
)
const streamResources = cfTemplate.getResourcesByType(
'AWS::Kinesis::Stream'
)

const eventSourceMappingFunctions = cfTemplate.getEventSourceMappingFunctions()
const apiWidgets = createApiWidgets(apiResources)
Expand All @@ -47,11 +51,14 @@ module.exports = function dashboard (serverless, dashboardConfig, context) {
lambdaResources,
Object.keys(eventSourceMappingFunctions)
)
const streamWidgets = createStreamWidgets(streamResources)

const positionedWidgets = layOutWidgets([
...apiWidgets,
...stateMachineWidgets,
...dynamoDbWidgets,
...lambdaWidgets
...lambdaWidgets,
...streamWidgets
])
const dash = { start: timeRange.start, end: timeRange.end, widgets: positionedWidgets }
const dashboardResource = {
Expand All @@ -69,9 +76,9 @@ module.exports = function dashboard (serverless, dashboardConfig, context) {
*
* @param {string} title The metric title
* @param {Array.<object>} metrics The metric definitions to render
* @param {Object} Cascaded metric configuration
* @param {Object} Cascaded widget/metric configuration
*/
function createMetricWidget (title, metricDefs, metricConfig) {
function createMetricWidget (title, metricDefs, config) {
const metrics = metricDefs.map(
({ namespace, metric, dimensions, stat }) => [
namespace,
Expand All @@ -91,18 +98,18 @@ module.exports = function dashboard (serverless, dashboardConfig, context) {
title,
view: 'timeSeries',
region: context.region,
period: metricConfig.metricPeriod
period: config.metricPeriod
},
width: metricConfig.width,
height: metricConfig.height
width: config.width,
height: config.height
}
}

/**
* Create a set of CloudWatch Dashboard widgets for the Lambda
* CloudFormation resources provided
*
* @param {object} functionResources Object with of CloudFormation Lambda Function resources by resource name
* @param {object} functionResources Object with CloudFormation Lambda Function resources by resource name
* @param {Array.<string>} eventSourceMappingFunctionResourceNames Names of Lambda function resources that are linked to EventSourceMappings
*/
function createLambdaWidgets (
Expand All @@ -111,7 +118,7 @@ module.exports = function dashboard (serverless, dashboardConfig, context) {
) {
const lambdaWidgets = []
if (Object.keys(functionResources).length > 0) {
for (const [metric, metricConfig] of getConfiguredMetrics(lambdaDashConfig)) {
for (const [metric, metricConfig] of Object.entries(getConfiguredMetrics(lambdaDashConfig))) {
if (metric !== 'IteratorAge') {
for (const stat of metricConfig.Statistic) {
const metricStatWidget = createMetricWidget(
Expand All @@ -133,7 +140,7 @@ module.exports = function dashboard (serverless, dashboardConfig, context) {
const functionName = functionResources[funcResName].Properties.FunctionName
const stats = metricConfig.Statistic
const iteratorAgeWidget = createMetricWidget(
`IteratorAge ${functionName} ${stats.join(',')}`,
`Lambda IteratorAge ${functionName} ${stats.join(',')}`,
stats.map(stat => ({
namespace: 'AWS/Lambda',
metric: 'IteratorAge',
Expand Down Expand Up @@ -167,7 +174,7 @@ module.exports = function dashboard (serverless, dashboardConfig, context) {
extractedConfig[metric] = metricConfig
}
}
return Object.entries(extractedConfig)
return extractedConfig
}

/**
Expand All @@ -180,7 +187,7 @@ module.exports = function dashboard (serverless, dashboardConfig, context) {
const apiWidgets = []
for (const res of Object.values(apiResources)) {
const apiName = res.Properties.Name // TODO: Allow for Ref usage in resource names
for (const [metric, metricConfig] of getConfiguredMetrics(apiGwDashConfig)) {
for (const [metric, metricConfig] of Object.entries(getConfiguredMetrics(apiGwDashConfig))) {
const metricStatWidget = createMetricWidget(
`${metric} API ${apiName}`,
Object.values(metricConfig.Statistic).map((stat) => ({
Expand Down Expand Up @@ -210,7 +217,7 @@ module.exports = function dashboard (serverless, dashboardConfig, context) {
for (const res of Object.values(smResources)) {
const smName = res.Properties.StateMachineName // TODO: Allow for Ref usage in resource names (see #14)
const widgetMetrics = []
for (const [metric, metricConfig] of getConfiguredMetrics(sfDashConfig)) {
for (const [metric, metricConfig] of Object.entries(getConfiguredMetrics(sfDashConfig))) {
for (const stat of metricConfig.Statistic) {
widgetMetrics.push({
namespace: 'AWS/States',
Expand Down Expand Up @@ -241,7 +248,7 @@ module.exports = function dashboard (serverless, dashboardConfig, context) {
const ddbWidgets = []
for (const res of Object.values(tableResources)) {
const tableName = res.Properties.TableName
for (const [metric, metricConfig] of getConfiguredMetrics(dynamoDbDashConfig)) {
for (const [metric, metricConfig] of Object.entries(getConfiguredMetrics(dynamoDbDashConfig))) {
ddbWidgets.push(createMetricWidget(
`${metric} Table ${tableName}`,
Object.values(metricConfig.Statistic).map((stat) => ({
Expand Down Expand Up @@ -275,6 +282,45 @@ module.exports = function dashboard (serverless, dashboardConfig, context) {
return ddbWidgets
}

/**
* Create a set of CloudWatch Dashboard widgets for the Kinesis Data Stream CloudFormation resources provided
*
* @param {object} streamResources Object with CloudFormation Kinesis Data Stream resources by resource name
*/
function createStreamWidgets (streamResources) {
const streamWidgets = []

for (const streamResource of Object.values(streamResources)) {
const metricGroups = {
IteratorAge: ['GetRecords.IteratorAgeMilliseconds'],
'Get/Put Success': ['PutRecord.Success', 'PutRecords.Success', 'GetRecords.Success'],
'Provisioned Throughput': ['ReadProvisionedThroughputExceeded', 'WriteProvisionedThroughputExceeded']
}
const streamName = streamResource.Properties.Name
const metricConfigs = getConfiguredMetrics(kinesisDashConfig)
for (const [group, metrics] of Object.entries(metricGroups)) {
const widgetMetrics = []
for (const metric of metrics) {
const metricConfig = metricConfigs[metric]
for (const stat of metricConfig.Statistic) {
widgetMetrics.push({
namespace: 'AWS/Kinesis',
metric,
dimensions: { StreamName: streamName },
stat
})
}
}
streamWidgets.push(createMetricWidget(
`${group} ${streamName} Kinesis`,
widgetMetrics,
kinesisDashConfig
))
}
}
return streamWidgets
}

/**
* Set the location and dimension properties of each provided widget
*
Expand Down
Loading

0 comments on commit f32e873

Please sign in to comment.