# Step 01: Import necessary packages

In [1]:
%load_ext sql

In [2]:
# Import necessary packages
import boto3
import configparser
import glob
import json
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import psycopg2

from sql_queries import *
from time import time

# Step 02: Load DB Params from dwh.cfg file

__Create a IAM user in your AWS account:__  
- Give it `AdministratorAccess`, From `Attach existing policies directly` Tab  
- Take note of the access key and secret  
- Edit the file `dwh.cfg` in the same folder as this notebook and fill

In [3]:
# Edit the file dwh.cfg
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

# Define the parameters in AWS
KEY = config.get('AWS','KEY')
SECRET = config.get('AWS','SECRET')

# Define the parameters in CLUSTER
DB_NAME = config.get("CLUSTER","DB_NAME")
DB_USER = config.get("CLUSTER","DB_USER")
DB_PASSWORD = config.get("CLUSTER","DB_PASSWORD")
DB_PORT = config.get("CLUSTER","DB_PORT")
REGION = config.get("CLUSTER","REGION")
DB_CLUSTER_TYPE = config.get("CLUSTER","DB_CLUSTER_TYPE")
DB_NUM_NODES = config.get("CLUSTER","DB_NUM_NODES")
DB_NODE_TYPE = config.get("CLUSTER","DB_NODE_TYPE")
DB_CLUSTER_IDENTIFIER = config.get("CLUSTER","DB_CLUSTER_IDENTIFIER")
DB_IAM_ROLE_NAME = config.get("CLUSTER","DB_IAM_ROLE_NAME")

# Define the parameters in IAM_ROLE
ARN = config.get("IAM_ROLE", "ARN")

(DB_USER, DB_PASSWORD, DB_NAME)

pd.DataFrame({"Param":
                  ["DB_NAME", "DB_USER", "DB_PASSWORD", "DB_PORT", "ARN", "REGION", "DB_CLUSTER_TYPE", "DB_NUM_NODES", "DB_NODE_TYPE", "DB_CLUSTER_IDENTIFIER", "DB_IAM_ROLE_NAME"],
              "Value":
                  [DB_NAME, DB_USER, DB_PASSWORD, DB_PORT, ARN, REGION, DB_CLUSTER_TYPE, DB_NUM_NODES, DB_NODE_TYPE, DB_CLUSTER_IDENTIFIER, DB_IAM_ROLE_NAME]
             })

Unnamed: 0,Param,Value
0,DB_NAME,dwh
1,DB_USER,dwhuser
2,DB_PASSWORD,Passw0rd
3,DB_PORT,5439
4,ARN,'arn:aws:iam::837754688468:role/dwhRole'
5,REGION,us-west-2
6,DB_CLUSTER_TYPE,multi-node
7,DB_NUM_NODES,4
8,DB_NODE_TYPE,dc2.large
9,DB_CLUSTER_IDENTIFIER,dwhCluster


## Step 03: Create a IAM 

Create an IAM Role that makes Redshift able to access S3 bucket (ReadOnly)

In [4]:
iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name=REGION
                  )

In [5]:
from botocore.exceptions import ClientError

#1.1 Create the role, 
try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DB_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    
    
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DB_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DB_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

1.1 Creating a new IAM Role
An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name dwhRole already exists.
1.2 Attaching Policy
1.3 Get the IAM role ARN
arn:aws:iam::837754688468:role/dwhRole


## Step 04: Create a Redshift Cluster

In [6]:
redshift = boto3.client('redshift',
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET,
                       region_name=REGION                        
                       )

In [7]:
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DB_CLUSTER_TYPE,
        NodeType=DB_NODE_TYPE,
        NumberOfNodes=int(DB_NUM_NODES),

        #Identifiers & Credentials
        DBName=DB_NAME,
        ClusterIdentifier=DB_CLUSTER_IDENTIFIER,
        MasterUsername=DB_USER,
        MasterUserPassword=DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

### Verify the status cluster
Run this block several times until the cluster status becomes Available

In [10]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DB_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,"{'Address': 'dwhcluster.cbazftvy98uk.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-f0188288
7,NumberOfNodes,4


### Take note of the cluster endpoint and role ARN
<font color='red'>DO NOT RUN THIS unless the cluster status becomes "Available" </font>

In [11]:
DB_ENDPOINT = myClusterProps['Endpoint']['Address']
DB_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DB_ENDPOINT :: ", DB_ENDPOINT)
print("DB_ROLE_ARN :: ", roleArn)

DB_ENDPOINT ::  dwhcluster.cbazftvy98uk.us-west-2.redshift.amazonaws.com
DB_ROLE_ARN ::  arn:aws:iam::837754688468:role/dwhRole


# Step 05: Open an incoming TCP port to access the cluster endpoint

In [12]:
ec2 = boto3.resource('ec2',
                       region_name=REGION,
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

In [13]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DB_PORT),
        ToPort=int(DB_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-be6841e4')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


# ETL Processes

Get the params of the created redshift cluster

In [14]:
config = configparser.ConfigParser()
config.read("dwh.cfg")
DB_CLUSTER_IDENTIFIER = config.get("CLUSTER", "DB_CLUSTER_IDENTIFIER")

In [15]:
# Cluster status - make sure it's 'available'
redshift.describe_clusters(ClusterIdentifier=DB_CLUSTER_IDENTIFIER)['Clusters'][0]["ClusterStatus"]

'available'

In [16]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DB_USER, DB_PASSWORD, DB_ENDPOINT, DB_PORT, DB_NAME)
print(conn_string)
%sql $conn_string

postgresql://dwhuser:Passw0rd@dwhcluster.cbazftvy98uk.us-west-2.redshift.amazonaws.com:5439/dwh


'Connected: dwhuser@dwh'

In [19]:
# Create database sparkifydb and tables for this project
%run create_tables.py

In [18]:
# Execute ETL on cluster
%run etl.py

Start load_staging_tables
End load_staging_tables
Start insert_tables
INSERT INTO users SELECT userid, firstname, lastname, gender, level FROM ( SELECT MAX(ts) as ts, userid, firstname, lastname, gender, level FROM staging_events WHERE page = 'NextSong'  GROUP BY  userid, firstname, lastname, gender, level ORDER BY userid ASC )
End query
INSERT INTO songs SELECT song_id, title, artist_id, year, duration FROM (SELECT MAX(year) AS year, song_id, title, artist_id, duration FROM staging_songs GROUP BY song_id, title, artist_id, duration )
End query
INSERT INTO artists SELECT DISTINCT artist_id, artist_name, artist_location, artist_latitude, artist_longitude FROM staging_songs
End query
INSERT INTO times SELECT start_time, extract(hour from start_time) as hour, extract(day from start_time) as day, extract(week from start_time) as week, extract(month from start_time) as month, extract(year from start_time) as year, extract(weekday from start_time) as weekday FROM (     SELECT DISTINCT     ti

Use this notebook to develop the ETL process for each of your tables before completing the etl.py file to load the whole datasets.

# Optimizing Redshift Table Design

# Clean up your resources

In [None]:
redshift.delete_cluster( ClusterIdentifier=DB_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)

In [None]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DB_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

In [None]:
iam.detach_role_policy(RoleName=DB_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DB_IAM_ROLE_NAME)