# Data Engineering Project 3: Data Warehouse

The purpose of this notebook is to create the AWS resources required to complete this project:
- Redshift
- IAM Role
- Security Group

Furthermore, it will serve as a sandbox to explore the data being loaded and to run queries against the Redshift cluster to verify that data have been loaded successfully.

## Import required libraries

In [1]:
import pandas as pd
import boto3
import json
import configparser

def to_dataframe(col, val):
    return pd.DataFrame({"Param": col, "Value": val})

## Step 1: Prerequisites and initialization

- Create IAM user with programmatic access
- For demo purposes, assign this user the `AdministratorAccess` policy
    - **Note**: In production one should only assign the least possible privileges. Since this is a demo, we can ignore it for now.
- Take not of the key and secret
- Add the information to the file `dwh.cfg`

### Load dwh.cfg

In [2]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

# AWS key and secret, make sure to remove them from the dwh.cfg prior to sharing the files!
AWS_KEY                  = config.get("AWS","KEY")
AWS_SECRET               = config.get("AWS","SECRET")

DB_CLUSTER_IDENTIFIER    = config.get("CLUSTER","DB_CLUSTER_IDENTIFIER")
DB_CLUSTER_TYPE          = config.get("CLUSTER","DB_CLUSTER_TYPE")
DB_NODE_TYPE             = config.get("CLUSTER","DB_NODE_TYPE")
DB_NUM_NODES             = config.get("CLUSTER", "DB_NUM_NODES")

DB_NAME                  = config.get("CLUSTER","DB_NAME")
DB_USER                  = config.get("CLUSTER","DB_USER")
DB_PASSWORD              = config.get("CLUSTER","DB_PASSWORD")
DB_PORT                  = config.get("CLUSTER","DB_PORT")

IAM_ROLE_NAME            = config.get("IAM_ROLE", "IAM_ROLE_NAME")
IAM_ROLE_ARN             = config.get("IAM_ROLE", "IAM_ROLE_ARN")

S3_BUCKET_REGION         = config.get("S3", "BUCKET_REGION")
S3_LOG_DATA              = config.get("S3", "LOG_DATA")
S3_LOG_JSONPATH          = config.get("S3", "LOG_JSONPATH")
S3_SONG_DATA             = config.get("S3", "SONG_DATA")

column = ["DB_CLUSTER_IDENTIFIER", "DB_CLUSTER_TYPE", "DB_NODE_TYPE", "DB_NUM_NODES", "DB_NAME", "DB_USER", "DB_PASSWORD", "DB_PORT", "IAM_ROLE_NAME", "IAM_ROLE_ARN", "S3_BUCKET_REGION", "S3_LOG_DATA", "S3_LOG_JSONPATH", "S3_SONG_DATA"]
value = [DB_CLUSTER_IDENTIFIER, DB_CLUSTER_TYPE, DB_NODE_TYPE, DB_NUM_NODES, DB_NAME, DB_USER, DB_PASSWORD, DB_PORT, IAM_ROLE_NAME, IAM_ROLE_ARN, S3_BUCKET_REGION, S3_LOG_DATA, S3_LOG_JSONPATH, S3_SONG_DATA]

to_dataframe(column, value)

Unnamed: 0,Param,Value
0,DB_CLUSTER_IDENTIFIER,myredshiftcluster
1,DB_CLUSTER_TYPE,multi-node
2,DB_NODE_TYPE,dc2.large
3,DB_NUM_NODES,2
4,DB_NAME,dev
5,DB_USER,awsuser
6,DB_PASSWORD,Passw0rd
7,DB_PORT,5439
8,IAM_ROLE_NAME,myRedshiftRole
9,IAM_ROLE_ARN,arn:aws:iam::864885010001:role/myRedshiftRole


### Initialize Boto3 clients for EC2, S3, IAM, and Redshift

In [3]:
ec2 = boto3.resource('ec2', 
                      region_name="us-west-2",
                      aws_access_key_id=AWS_KEY,
                      aws_secret_access_key=AWS_SECRET)

s3 = boto3.resource('s3',
                    region_name="us-west-2",
                    aws_access_key_id=AWS_KEY,
                    aws_secret_access_key=AWS_SECRET)

iam = boto3.client('iam',
                    region_name="us-west-2",
                    aws_access_key_id=AWS_KEY,
                    aws_secret_access_key=AWS_SECRET)

redshift = boto3.client('redshift',
                        region_name="us-west-2",
                        aws_access_key_id=AWS_KEY,
                        aws_secret_access_key=AWS_SECRET)

### Get file source file information

In [4]:
s3_bucket = "udacity-dend"
s3_prefix_song_data = "song_data"
s3_prefix_log_data = "log_data"

bucket_object = s3.Bucket(s3_bucket)

num_log_data = sum(1 for _ in bucket_object.objects.filter(Prefix=s3_prefix_log_data))
num_song_data = sum(1 for _ in bucket_object.objects.filter(Prefix=s3_prefix_song_data))

print("Number of log data: ", num_log_data)
print("Number of song data: ", num_song_data)

Number of log data:  31
Number of song data:  14897


### Print S3_LOG_JSONPATH

In [5]:
s3_object = s3.Object(s3_bucket, 'log_json_path.json')
file_contents = s3_object.get()["Body"].read()

print(file_contents.decode("utf-8"))

{
    "jsonpaths": [
        "$['artist']",
        "$['auth']",
        "$['firstName']",
        "$['gender']",
        "$['itemInSession']",
        "$['lastName']",
        "$['length']",
        "$['level']",
        "$['location']",
        "$['method']",
        "$['page']",
        "$['registration']",
        "$['sessionId']",
        "$['song']",
        "$['status']",
        "$['ts']",
        "$['userAgent']",
        "$['userId']"
    ]
}


## Step 2: Create IAM role

This step creates a role that will provide privileges to the Redshift service to have read access on S3.

Copy the output ARN and add it to `dwh.cfg` as the value for **IAM_ROLE_ARN**.

In [6]:
try:
    dwhRole = iam.create_role(
        Path='/',
        RoleName=IAM_ROLE_NAME,
        Description="Allow Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
                'Effect': 'Allow',
                'Principal': {'Service': 'redshift.amazonaws.com'}}],
            'Version': '2012-10-17'})
    )
except Exception as e:
    print(e)

# Attach policy to role
try:
    iam.attach_role_policy(RoleName=IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
except Exception as e:
    print(e)

# Print IAM Role ARN
iam_role = iam.get_role(RoleName=IAM_ROLE_NAME)
print("IAM_ROLE_ARN:", iam_role['Role']['Arn'])

An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name myRedshiftRole already exists.
IAM_ROLE_ARN: arn:aws:iam::864885010001:role/myRedshiftRole


## Step 3: Create Redshift cluster

More information about the Boto3 Redshift cluster creation can be found here:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Client.create_cluster

To run this step, make sure that **IAM_ROLE_ARN** is defined in `dwh.cfg`.

In [7]:
try:
    response = redshift.create_cluster(
        ClusterIdentifier=DB_CLUSTER_IDENTIFIER,
        ClusterType=DB_CLUSTER_TYPE,
        NodeType=DB_NODE_TYPE,
        NumberOfNodes=int(DB_NUM_NODES),
        DBName=DB_NAME,
        MasterUsername=DB_USER,
        MasterUserPassword=DB_PASSWORD,
        IamRoles=[IAM_ROLE_ARN]
    )
except Exception as e:
    print(e)

### Check cluster provisioning status

Run this step times and wait until status changes to `available`. Take note of the cluster URL and add it to `dwh.cfg` as value for **HOST**.

In [8]:
import time

cluster_description = redshift.describe_clusters(ClusterIdentifier=DB_CLUSTER_IDENTIFIER)["Clusters"][0]

while cluster_description["ClusterStatus"] != 'available':
    cluster_description = redshift.describe_clusters(ClusterIdentifier=DB_CLUSTER_IDENTIFIER)["Clusters"][0]
    print("\rCluster Status:",cluster_description["ClusterStatus"], end="")
    time.sleep(1)
    
print("\rCluster Status:",cluster_description["ClusterStatus"])
print("Cluster URL:", cluster_description["Endpoint"]["Address"])

Cluster Status: available
Cluster URL: myredshiftcluster.crqg5tfvx6hp.us-west-2.redshift.amazonaws.com


## Step 4: Modify security group

Allow incoming access to the Redshift cluster (port 5439) from anywhere.

In [9]:
try:
    vpc = ec2.Vpc(id=cluster_description['VpcId'])
    default_sg = list(vpc.security_groups.all())[0]
    
    default_sg.authorize_ingress(
        GroupName= default_sg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DB_PORT),
        ToPort=int(DB_PORT)
    )
    print("Security group modified!")
except Exception as e:
    print(e)

An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


## Step 5: Verify connection to Redshift

Make sure that we can connect to the cluster.

In [10]:
%load_ext sql

Create tables and load data

In [None]:
!python create_tables.py
!python etl.py

In [13]:
config.read_file(open("dwh.cfg"))
HOST = config.get("CLUSTER","HOST")

connection_string="postgresql://{}:{}@{}:{}/{}".format(DB_USER, DB_PASSWORD, HOST, DB_PORT, DB_NAME)
print("Connection String:", connection_string)
%sql $connection_string

Connection String: postgresql://awsuser:Passw0rd@myredshiftcluster.crqg5tfvx6hp.us-west-2.redshift.amazonaws.com:5439/dev


'Connected: awsuser@dev'

## Step 6: Sandbox

Sandbox to run tests and verify that data have been loaded successfully.

In [47]:
%%sql SELECT sp.user_id
            ,u.first_name
            ,u.last_name
            ,u.level
            ,COUNT(*) AS songplays
            ,COUNT(DISTINCT session_id) AS sessions
      FROM songplays sp
      JOIN users u ON sp.user_id = u.user_id
      GROUP BY 1, 2, 3, 4
      ORDER BY songplays DESC
      LIMIT 5;

 * postgresql://awsuser:***@myredshiftcluster.crqg5tfvx6hp.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


user_id,first_name,last_name,level,songplays,sessions
49,Chloe,Cuevas,paid,42,21
97,Kate,Harrell,paid,32,11
80,Tegan,Levine,paid,31,17
44,Aleena,Kirby,paid,21,7
73,Jacob,Klein,paid,18,6


In [48]:
%%sql SELECT s.title AS song_title
            ,a.name AS artist_name
            ,COUNT(*) AS songplays
      FROM songplays sp
      JOIN songs s ON sp.song_id = s.song_id
      JOIN artists a ON sp.artist_id = a.artist_id
      GROUP BY 1, 2
      ORDER BY songplays DESC
      LIMIT 5;

 * postgresql://awsuser:***@myredshiftcluster.crqg5tfvx6hp.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


song_title,artist_name,songplays
You're The One,Dwight Yoakam,37
Catch You Baby (Steve Pitron & Max Sanna Radio Edit),Lonnie Gordon,9
I CAN'T GET STARTED,Ron Carter,9
Nothin' On You [feat. Bruno Mars] (Album Version),B.o.B,8
Hey Daddy (Daddy's Home),Usher featuring Jermaine Dupri,6


## Step 7: Destroy provisioned AWS resources

Run this step if you no longer require the created resources to save costs.

- Delete Redshift cluster
- Detach S3 read policy from Redshift role
- Delete Redshift role

In [None]:
redshift.delete_cluster(ClusterIdentifier=DB_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)

Run this block until you receive (ClusterNotFound).

In [50]:
try:
    print(redshift.describe_clusters(ClusterIdentifier=DB_CLUSTER_IDENTIFIER)['Clusters'][0]["ClusterStatus"])
except Exception as e:
    print(e)

deleting


Detach policy and delete role

In [None]:
iam.detach_role_policy(RoleName=IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=IAM_ROLE_NAME)