## Serverless Data Lake Ingestion with Kinesis

In this notebook we will walk through the steps required to use the Kinesis suite of tools as a swiss army knife to land data into your Data Lake. We will simulate Apache logs from a web server that could be sent from the [Kinesis Logs Agent](https://github.com/awslabs/amazon-kinesis-agent) to an Kinesis Data Stream. Once the logs have been sent to Kinesis we can convert, transform, and persist the processed and raw logs in your Data Lake. Finally, we will also show the real-time aspect of Kinesis by using an enhanced fan-out consumer with Lambda to send 500 errors found in the stream to Slack. The diagram below depicts the solutions we will be creating below.

![Kinesis Ingestion](../../docs/assets/images/kinesis-swiss-army.png)

You will need a Slack account and web hook to complete this workshop. Create a Slack account [here](https://slack.com/get-started). Once you have the acocunt created you will need to create a [Slack Channel](https://get.slack.help/hc/en-us/articles/201402297-Create-a-channel) and add a [WebHook](https://get.slack.help/hc/en-us/articles/115005265063-Incoming-WebHooks-for-Slack) to it.

In [None]:
import boto3
import botocore
import json
import time
import project_path
import getpass

from lib import workshop

cfn = boto3.client('cloudformation')
logs = boto3.client('logs')
firehose = boto3.client('firehose')
s3 = boto3.client('s3')

# General variables for the region and account id for the location of the resources being created
session = boto3.session.Session()
region = session.region_name
account_id = boto3.client('sts').get_caller_identity().get('Account')

delivery_stream_name = 'dc-demo-firehose'
kdg_stack = 'kinesis-data-generator-cognito'
kdg_username = 'admin'

slack_web_hook = '{{SlackURL}}'

### [Create S3 Bucket](https://docs.aws.amazon.com/AmazonS3/latest/gsg/CreatingABucket.html)

We will create an S3 bucket that will be used throughout the workshop for storing our data.

[s3.create_bucket](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.create_bucket) boto3 documentation

In [None]:
bucket = workshop.create_bucket(region, session, 'demo-')
print(bucket)

### Enter password for the Kinesis Data Generator (KDG)

**Must contain only alphanumeric characters with at least one capital letter and one number.**

In [None]:
password = getpass.getpass()

### [Launch Kinesis Data Generator](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)

*** This is optional if you haven't already launched the KDG in your account ***

The Amazon Kinesis Data Generator (KDG) makes it easy to send data to Kinesis Streams or Kinesis Firehose. Learn how to use the tool and create templates for your records.

In [None]:
cfn_kdg_template = 'https://s3-{0}.amazonaws.com/kinesis-helpers/cognito-setup.json'.format(region)
print(cfn_kdg_template)

response = cfn.create_stack(
    StackName=kdg_stack,
    TemplateURL=cfn_kdg_template,
    Capabilities = ["CAPABILITY_NAMED_IAM"],
    Parameters=[
        {
            'ParameterKey': 'Password',
            'ParameterValue': password
        },
        {
            'ParameterKey': 'Username',
            'ParameterValue': kdg_username
        }
    ]    
)

print(response)

In [None]:
response = cfn.describe_stacks(
    StackName=kdg_stack
)

while response['Stacks'][0]['StackStatus'] != 'CREATE_COMPLETE':
    print('Not yet complete.')
    time.sleep(30)
    response = cfn.describe_stacks(
        StackName=kdg_stack
    )
    
for output in response['Stacks'][0]['Outputs']:
    if (output['OutputKey'] == 'KinesisDataGeneratorUrl'):
        print(output['OutputValue'])

### Upload [CloudFormation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/GettingStarted.html) template

In the interest of time we will leverage CloudFormation to launch many of the supporting resources needed for utilizing Kinesis Data Firehose to store the raw data, pre-process the incoming data, and store the curated data in S3.

In [None]:
demo_file = 'cfn/kinesis-swiss-army.yaml'
session.resource('s3').Bucket(bucket).Object(demo_file).upload_file(demo_file)

In [None]:
cfn_template = 'https://s3-{0}.amazonaws.com/{1}/{2}'.format(region, bucket, demo_file)
print(cfn_template)

stack_name = 'kinesis-swiss-army'
response = cfn.create_stack(
    StackName=stack_name,
    TemplateURL=cfn_template,
    Capabilities = ["CAPABILITY_NAMED_IAM"],
    Parameters=[
        {
            'ParameterKey': 'SlackWebHookUrl',
            'ParameterValue': slack_web_hook
        }
    ]    
    
)

print(response)

In [None]:
response = cfn.describe_stacks(
    StackName=stack_name
)

while response['Stacks'][0]['StackStatus'] != 'CREATE_COMPLETE':
    print('Not yet complete.')
    time.sleep(30)
    response = cfn.describe_stacks(
        StackName=stack_name
    )

for output in response['Stacks'][0]['Outputs']:
    if (output['OutputKey'] == 'FirehoseExecutionRole'):
        firehose_arn = output['OutputValue']
        print('Firehose Role Arn: {0}'.format(firehose_arn))
    if (output['OutputKey'] == 'LambdaPreProcessArn'):
        pre_processing_arn = output['OutputValue']
        print('Lambda Pre Process Arn: {0}'.format(pre_processing_arn))
    if (output['OutputKey'] == 'GlueDatabase'):
        database = output['OutputValue']
        print('Glue Database: {0}'.format(database))
    if (output['OutputKey'] == 'RawTable'):
        raw_table = output['OutputValue']
        print('Glue Raw Table: {0}'.format(raw_table))
    if (output['OutputKey'] == 'CuratedTable'):
        curated_table = output['OutputValue']
        print('Glue Curated Table: {0}'.format(curated_table))
    if (output['OutputKey'] == 'WeblogsBucket'):
        event_bucket = output['OutputValue']
        print('S3 Weblogs Bucket: {0}'.format(event_bucket))
    if (output['OutputKey'] == 'FirehoseLogGroup'):
        cloudwatch_logs_group_name = output['OutputValue']
        print('CloudWatch Logs Group: {0}'.format(cloudwatch_logs_group_name))
    if (output['OutputKey'] == 'KinesisEventStream'):
        event_stream_arn = output['OutputValue']
        print('Kinesis Event Stream: {0}'.format(event_stream_arn))        

### [Create the Kinesis Firehose we will use to send Apache Logs to our Data Lake](https://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html)

Amazon Kinesis Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon Elasticsearch Service (Amazon ES), and Splunk. Kinesis Data Firehose is part of the Kinesis streaming data platform, along with Kinesis Data Streams, Kinesis Video Streams, and Amazon Kinesis Data Analytics. With Kinesis Data Firehose, you don't need to write applications or manage resources. You configure your data producers to send data to Kinesis Data Firehose, and it automatically delivers the data to the destination that you specified. You can also configure Kinesis Data Firehose to transform your data before delivering it.

In this example, we will create custom S3 prefixes for when the data lands in S3. This will allow us to precreate the partitions that will be cataloged in the Glue Data Catalog. To find more information follow this [link](https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html)

[firehose.create_delivery_stream](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/firehose.html#Firehose.Client.create_delivery_stream)

In [None]:
response = firehose.create_delivery_stream(
    DeliveryStreamName=delivery_stream_name,
    DeliveryStreamType='KinesisStreamAsSource',
    KinesisStreamSourceConfiguration={
        'KinesisStreamARN': event_stream_arn,
        'RoleARN': firehose_arn
    },
    ExtendedS3DestinationConfiguration={
        'RoleARN': firehose_arn,
        'BucketARN': 'arn:aws:s3:::' + event_bucket,
        'Prefix': 'weblogs/processed/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/',
        'ErrorOutputPrefix': 'weblogs/failed/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/',
        'BufferingHints': {
            'SizeInMBs': 128,
            'IntervalInSeconds': 60
        },
        'CompressionFormat': 'UNCOMPRESSED',
        'EncryptionConfiguration': {
            'NoEncryptionConfig': 'NoEncryption'
        },
        'CloudWatchLoggingOptions': {
            'Enabled': True,
            'LogGroupName': cloudwatch_logs_group_name,
            'LogStreamName': 'ingestion_stream'
        },
        'ProcessingConfiguration': {
            'Enabled': True,
            'Processors': [
                {
                    'Type': 'Lambda',
                    'Parameters': [
                        {
                            'ParameterName': 'LambdaArn',
                            'ParameterValue': '{0}:$LATEST'.format(pre_processing_arn)
                        },
                        {
                            'ParameterName': 'NumberOfRetries',
                            'ParameterValue': '1'
                        },
                        {
                            'ParameterName': 'RoleArn',
                            'ParameterValue': firehose_arn
                        },
                        {
                            'ParameterName': 'BufferSizeInMBs',
                            'ParameterValue': '3'
                        },
                        {
                            'ParameterName': 'BufferIntervalInSeconds',
                            'ParameterValue': '60'
                        }
                    ]
                }
            ]
        },
        'S3BackupMode': 'Enabled',
        'S3BackupConfiguration': {
            'RoleARN': firehose_arn,
            'BucketARN': 'arn:aws:s3:::' + event_bucket,
            'Prefix': 'weblogs/raw/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/',
            'ErrorOutputPrefix': 'weblogs/failed/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/',
            'BufferingHints': {
                'SizeInMBs': 128,
                'IntervalInSeconds': 60
            },
            'CompressionFormat': 'UNCOMPRESSED',
            'EncryptionConfiguration': {
                'NoEncryptionConfig': 'NoEncryption'
            },
            'CloudWatchLoggingOptions': {
                'Enabled': True,
                'LogGroupName': cloudwatch_logs_group_name,
                'LogStreamName': 'raw_stream'
            }
        },
        'DataFormatConversionConfiguration': {
            'SchemaConfiguration': {
                'RoleARN': firehose_arn,
                'DatabaseName': database,
                'TableName': curated_table,
                'Region': region,
                'VersionId': 'LATEST'
            },
            'InputFormatConfiguration': {
                'Deserializer': {
                    'OpenXJsonSerDe': {}
                }
            },
            'OutputFormatConfiguration': {
                'Serializer': {
                    'ParquetSerDe': {}
                }
            },
            'Enabled': True
        }
    }
)

print(response)

### Wait for the Kinesis Firehose to become 'Active'
The Kinesis Firehose Delivery Stream is in the process of being created.

In [None]:
response = firehose.describe_delivery_stream(
    DeliveryStreamName=delivery_stream_name
)

status = response['DeliveryStreamDescription']['DeliveryStreamStatus']
print(status)

while status == 'CREATING':
    time.sleep(30)
    response = firehose.describe_delivery_stream(
        DeliveryStreamName=delivery_stream_name
    )
    status = response['DeliveryStreamDescription']['DeliveryStreamStatus']
    print(status)

print('Kinesis Firehose created.')

In [None]:
response = cfn.describe_stacks(
    StackName=kdg_stack
)

for output in response['Stacks'][0]['Outputs']:
    if (output['OutputKey'] == 'KinesisDataGeneratorUrl'):
        print(output['OutputValue'])

We will be sending simulated apache logs to the Kinesis Data Stream and land the data in the raw and processed prefixes in the data lake.

``` json
{{internet.ip}} - - [{{date.now("DD/MMM/YYYY:HH:mm:ss ZZ")}}] "{{random.weightedArrayElement({"weights":[0.6,0.1,0.1,0.2],"data":["GET","POST","DELETE","PUT"]})}} {{random.arrayElement(["/list","/wp-content","/wp-admin","/explore","/search/tag/list","/app/main/posts","/posts/posts/explore"])}} HTTP/1.1" {{random.weightedArrayElement({"weights": [0.9,0.04,0.02,0.04], "data":["200","404","500","301"]})}} {{random.number(10000)}} "-" "{{internet.userAgent}}"
```

## Cleanup

In [None]:
!aws s3 rb s3://$event_bucket --force

In [None]:
response = cfn.delete_stack(StackName=stack_name)

In [None]:
waiter = cfn.get_waiter('stack_delete_complete')
waiter.wait(
    StackName=stack_name
)

print('The wait is over for {0}'.format(stack_name))

In [None]:
response = firehose.delete_delivery_stream(
    DeliveryStreamName=delivery_stream_name
)

In [None]:
!aws s3 rb s3://$bucket --force