In [None]:
%load_ext sql

import pandas as pd
import boto3
import json
from time import time

In [None]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DB_NAME                = config.get("CLUSTER","DB_NAME")
DB_USER                = config.get("CLUSTER","DB_USER")
DB_PASSWORD            = config.get("CLUSTER","DB_PASSWORD")
DB_PORT                = config.get("CLUSTER","DB_PORT")

(DB_USER, DB_PASSWORD, DB_NAME)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DB_NAME", "DB_USER", "DB_PASSWORD", "DB_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DB_NAME, DB_USER, DB_PASSWORD, DB_PORT, DWH_IAM_ROLE_NAME]
             })

# Create clients for IAM, S3 and Redshift

In [None]:
iam = boto3.client('iam',
                       region_name='us-west-2',
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                  )

redshift = boto3.client('redshift',
                           region_name="us-west-2",
                           aws_access_key_id=KEY,
                           aws_secret_access_key=SECRET
                       )

s3 = boto3.resource('s3',
                        region_name="us-west-2",
                        aws_access_key_id=KEY,
                        aws_secret_access_key=SECRET
                     )

# Print event and song datasource in S3 Bucket

In [None]:
dbBucket =  s3.Bucket("udacity-dend")

print("===== song_data bucket: =====")
for obj in dbBucket.objects.filter(Prefix="song_data").limit(10):
    print(obj)

print("\n===== log_data bucket: =====")
for obj in dbBucket.objects.filter(Prefix="log_data").limit(10):
    print(obj)

# Setup IAM Role

In [None]:
from botocore.exceptions import ClientError

#1.1 Create the role, 
try:
    print("Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    
    
print("Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

# Setup RedShift Cluster

In [None]:
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DB_NAME,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DB_USER,
        MasterUserPassword=DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

# Describe Redshift cluster for AVAILABLE status

In [None]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

# Update Endpoint and role ARN to Config

In [None]:
HOST = myClusterProps['Endpoint']['Address']
ARN = myClusterProps['IamRoles'][0]['IamRoleArn']

print("HOST :: ", HOST)
print("ARN  :: ", ARN)

# Write to dwh.cfg
config.set('CLUSTER', 'HOST', HOST)
config.set('IAM_ROLE', 'ARN', ARN)
with open('dwh.cfg', 'w') as configfile:
    config.write(configfile)

# Check connection to cluster

In [None]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DB_USER, DB_PASSWORD, HOST, DB_PORT, DB_NAME)
print(conn_string)
%sql $conn_string

# Run create_tables.py

In [None]:
%%time
%run create_tables.py

# Verify tables are created

In [None]:
tables = """ SELECT DISTINCT tablename FROM pg_table_def WHERE  schemaname ='public'; """
%sql $tables

# Run etl.py

In [None]:
%%time
%run etl.py

In [None]:
# View error logs if any
# query = """ SELECT * FROM stl_load_errors; """
# %sql $query

# Count Records in each table

In [None]:
# songplays table
songplaySql = """ SELECT count(*) FROM songplays; """
%sql $songplaySql

In [None]:
# users table
userSql = """ SELECT count(*) FROM users; """
%sql $userSql

In [None]:
# songs table
songSql = """ SELECT count(*) FROM songs; """
%sql $songSql

In [None]:
# artists table
artistSql = """ SELECT count(*) FROM artists; """
%sql $artistSql

In [None]:
# time table
timeSql = """ SELECT count(*) FROM time; """
%sql $timeSql

# Give me the artist, song title and song's length in the music app history that was heard during sessionId = 139

In [None]:
%%time
%%sql
SELECT artists.name, songs.title, songs.duration AS length
FROM songplays
JOIN artists ON (artists.artist_id = songplays.artist_id)
JOIN songs ON (songs.song_id = songplays.song_id)
WHERE songplays.session_id = 139
GROUP BY artists.name, songs.title, songs.duration

# Give me every user name (first and last) in my music app history who listened to the song 'Up Up & Away'

In [None]:
%%time
%%sql
SELECT users.user_id, users.first_name, users.last_name
FROM users
JOIN songplays ON (users.user_id = songplays.user_id)
JOIN songs ON (songplays.song_id = songs.song_id)
WHERE songs.title = 'Up Up & Away'
GROUP BY users.user_id, users.first_name, users.last_name
LIMIT 10;

# Clean up and delete cluster

In [None]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
#### CAREFUL!!

# Describe Redshift cluster for DELETE status

In [None]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

# Detach IAM role

In [None]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)
#### CAREFUL!!