# Provision AWS Resources to allow Kinesis Firehose to Stream Data to S3 Bucket

In [1]:
%load_ext lab_black
%load_ext autoreload
%autoreload 2

In [2]:
import json
import os
from glob import glob
from time import sleep
from typing import Dict

import boto3
from dotenv import find_dotenv, load_dotenv

In [3]:
%aimport src.s3.buckets
import src.s3.buckets as s3b

%aimport src.cw.cloudwatch_logs
import src.cw.cloudwatch_logs as cwlogs

%aimport src.iam.iam_roles
import src.iam.iam_roles as iamr

%aimport src.firehose.kinesis_firehose
import src.firehose.kinesis_firehose as knsfire

%aimport src.ec2.ec2_instances_sec_groups
import src.ec2.ec2_instances_sec_groups as ec2h

%aimport src.keypairs.ssh_keypairs
import src.keypairs.ssh_keypairs as ssh_keys

%aimport src.ansible.playbook_utils
import src.ansible.playbook_utils as pbu

In [4]:
load_dotenv(find_dotenv())

True

In [5]:
aws_region = os.getenv("AWS_REGION")

## About

In this notebook, the following AWS resources will be provisioned
- S3 bucket
- CloudWatch Logging group
- CloudWatch Logging stream
- IAM role
- Kinesis Firehose Delivery Stream

### Pre-Requisites
1. The following environment variables should be set with the user's AWS credendials ([1](https://docs.aws.amazon.com/sdk-for-php/v3/developer-guide/guide_credentials_environment.html), [2](https://docs.aws.amazon.com/sdk-for-php/v3/developer-guide/guide_credentials_profiles.html))
   - `AWS_ACCESS_KEY_ID`
   - `AWS_SECRET_KEY`
   - `AWS_REGION`

   These credentials must be associated to a user group whose users have been granted programmatic access to AWS resources. In order to configure this for an IAM user group, see the documentation [here](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_users_create.html#id_users_create_console).

### Notes
1. All resources will be created in the same AWS region (specified by the environment variable `AWS_REGION`).

## User Inputs

In [6]:
# S3
s3_bucket_name = "sagemakertestwillz3s"

# IAM Role
iam_role_path = "/"
iam_role_name = "kinesis-firehose-role"
iam_role_description = "IAM Role to be assumed by Kinesis Firehose"
iam_role_trust_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Principal": {"Service": "firehose.amazonaws.com"},
            "Action": "sts:AssumeRole",
        }
    ],
}
iam_firehose_s3_policy_name = "mypolicy"
iam_firehose_s3_policy_description = "IAM Policy Granting Firehose Access to S3"
iam_firehose_s3_policy_tags = [{"Key": "Name", "Value": "firehose_access_s3"}]

# Kinesis Firehose Stream
stream_s3_destination_prefix = "datasets/twitter/kinesis-demo/"
firehose_stream_name = "twitter_delivery_stream"

# CloudWatch Logging
cw_logs_group_name = f"kinesisfirehose_{firehose_stream_name}"

# EC2 Security Groups
sg_group_tags = [{"Key": "Name", "Value": "allow-inbound-ssh"}]

# SSH Key Pairs
key_fname = "aws_ec2_key"
keypair_name = "ec2-key-pair"

# EC2 Instances
ec2_instance_image_id = "ami-0cc00ed857256d2b4"
ec2_instance_type = "t2.micro"
ec2_instance_tags_list = [{"Key": "Name", "Value": "my-ec2-instance"}]
ansible_inventory_host_vars_fpath = "inventories/production/host_vars/ec2host"

In [7]:
# IAM role granting Kinesis Firehose access to S3
account_id = boto3.client("sts").get_caller_identity().get("Account")
iam_firehose_s3_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject",
            ],
            "Resource": [
                f"arn:aws:s3:::{s3_bucket_name}",
                f"arn:aws:s3:::{s3_bucket_name}/*",
            ],
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
            ],
            "Resource": f"arn:aws:kinesis:{aws_region}:{account_id}:stream/{firehose_stream_name}",
        },
        {
            "Effect": "Allow",
            "Action": ["logs:PutLogEvents"],
            "Resource": [
                f"arn:aws:logs:{aws_region}:{account_id}:log-group:{cw_logs_group_name}:log-stream:{firehose_stream_name}"
            ],
        },
    ],
}

## Create the S3 Bucket

In [None]:
%%time
s3_bucket_creation_response = s3b.create_s3_bucket(s3_bucket_name, aws_region)
s3_bucket_creation_response

## Create CloudWatch Log Group and Stream

In [None]:
%%time
cw_log_creation_response, cw_stream_creation_response = cwlogs.create_cw_logs_group_stream(
    cw_logs_group_name, firehose_stream_name, aws_region
)
cw_log_creation_response

## IAM

### Create IAM Role to be Assumed by Firehose

In [None]:
%%time
iam_role_creation_response = iamr.create_iam_role(
    iam_role_path,
    iam_role_name,
    iam_role_description,
    iam_role_trust_policy,
    aws_region,
)
iam_role_creation_response

### Create IAM Firehose-S3 Policy

In [None]:
%%time
iam_policy_creation_response = iamr.create_iam_policy(
    aws_region,
    iam_firehose_s3_policy_name,
    iam_firehose_s3_policy_document,
    iam_firehose_s3_policy_description,
    iam_firehose_s3_policy_tags,
)
iam_policy_creation_response

### Get IAM Firehose-S3 Policy

In [12]:
%%time
iam_firehose_s3_policy_list = iamr.get_iam_policies(aws_region, attached=False)
[
    [
        iam_firehose_s3_policy["PolicyName"],
        iam_firehose_s3_policy["CreateDate"].strftime("%Y-%m-%d %H:%M:%S"),
    ]
    for iam_firehose_s3_policy in iam_firehose_s3_policy_list
    if iam_firehose_s3_policy_name in iam_firehose_s3_policy["PolicyName"]
]

CPU times: user 35.8 ms, sys: 0 ns, total: 35.8 ms
Wall time: 209 ms


[['mypolicy', '2021-12-27 02:25:07']]

### Attach IAM Firehose-S3 Policy to IAM Role

In [None]:
%%time
policy_attachment_response = iamr.attach_iam_policy_to_role(
    iam_role_name, aws_region, iam_firehose_s3_policy_list[0]["Arn"]
)
policy_attachment_response

**Note**
1. The creation of this IAM role must be completed before the role can be used. After running the command above to create the role, it will be necessary to wait for a few seconds before the newly created role can be assumed by Kinesis Firehose when it is created next. For this reason, we will now pause before proceeding to create the Kinesis Firehost delivery stream.

In [14]:
sleep(30)

## Create the Kinesis Firehose Delivery Stream

In [None]:
%%time
kinesis_firehose_stream_creation_response = knsfire.create_kinesis_firehose_stream(
    firehose_stream_name,
    iam_role_creation_response["Role"]["Arn"],
    s3_bucket_name,
    stream_s3_destination_prefix,
    cw_logs_group_name,
    iam_role_name,
    iam_role_path,
    aws_region,
    5,
    60,
)
kinesis_firehose_stream_creation_response

In [None]:
knsfire.describe_kinesis_firehose_stream(firehose_stream_name, aws_region)

## EC2

### Create EC2 Security Group to Allow SSH Traffic on Port 22

In [None]:
%%time
ec2h.create_security_group(
    "allow-inbound-ssh",
    "Allow inbound SSH traffic",
    aws_region,
    sg_group_tags,
)

### Create SSH Key Pair

Create a local SSH Key Pair file

In [None]:
ssh_key_creation_response = ssh_keys.create_key_pair(
    keypair_name,
    aws_region,
    "/tmp",
    key_fname,
    [{"Key": "Name", "Value": "my-ssh-key-pair"}],
)
ssh_key_creation_response

Show the created keypair file

In [19]:
glob(f"/tmp/{key_fname}*.pem")

['/tmp/aws_ec2_key.pem']

### Create EC2 Instance

In [None]:
%%time
created_instance_response = ec2h.create_instance(
    image_id=ec2_instance_image_id,
    instance_type=ec2_instance_type,
    keypair_name=keypair_name,
    region=aws_region,
    tags_list=ec2_instance_tags_list,
)
created_instance_response

### Attach EC2 Security Group to EC2 Instance To Grant SSH Access to Instance

In [None]:
%%time
sg_filter = dict(Filters=[{"Name": "tag:Name", "Values": ["allow-inbound-ssh"]}])
ec2_instance_filter = dict(Filters=[{"Name": "tag:Name", "Values": ["my-ec2-instance"]}])
ec2h.attach_sg_to_ec2_instance(sg_filter, ec2_instance_filter, 0, -1, aws_region)

### Set EC2 Public IP Address in Ansible Inventory

Get attributes of the newly created EC2 instance

In [None]:
%%time
ec2_instance_filter = dict(
    Filters=[{"Name": "tag:Name", "Values": ["my-ec2-instance"]}]
)
ec2_instances_list = ec2h.list_ec2_instances_by_filter(aws_region, ec2_instance_filter)
ec2_instances_list

This has to be done since, on initial provisioning of the EC2 host (step 2.), the only version of Python installed on the instance is Python 2.7. After Python 3 is installed in step 2., we need to notify Ansible to use Python 3 instead. This is done in this step.

In [None]:
pbu.replace_inventory_host_ip(
    ansible_inventory_host_vars_fpath,
    ec2_instances_list[-1]["public_dns_name"],
)