In [None]:
%load_ext sql

In [None]:
import boto3
import json
import time
import getpass
import configparser
import pandas as pd

## AWS CONFIGURATION

In [None]:
# Enter AWS KEY and Secret
KEY = getpass.getpass(prompt='Enter AWS Access key ID:')
SECRET = getpass.getpass(prompt='Enter AWS Secret access key')

In [None]:
# Load DWH Params from config file
CONFIG_FILE = 'dwh.cfg'
config = configparser.ConfigParser()
config.read(CONFIG_FILE)

REGION                 = config.get("AWS","REGION")

DWH_IAM_ROLE_NAME      = config.get("IAM","ROLE_NAME")

DWH_ENDPOINT           = config.get("CLUSTER","HOST")
DWH_DB                 = config.get("CLUSTER","DB_NAME")
DWH_DB_USER            = config.get("CLUSTER","DB_USER")
DWH_DB_PASSWORD        = config.get("CLUSTER","DB_PASSWORD")
DWH_PORT               = config.get("CLUSTER","DB_PORT")

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")
DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")

### Clients to access AWS resources

In [None]:
# Created clients for S3, AM and Redshift
s3 = boto3.resource('s3',
                    region_name=REGION,
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',
                   region_name=REGION,
                   aws_access_key_id=KEY,
                   aws_secret_access_key=SECRET
                  )

redshift = boto3.client('redshift',
                        region_name=REGION,
                        aws_access_key_id=KEY,
                        aws_secret_access_key=SECRET
                       )

In [None]:
# Check out the sample data sources on S3 
sampleDbBucket =  s3.Bucket("udacity-dend")

for obj in sampleDbBucket.objects.filter(Prefix="log_data"):
    print(obj)

### IAM ROLE

In [None]:
#Create an IAM Role that makes Redshift able to access S3 bucket (ReadOnly)
try:
    print('Creating a new IAM Role')
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {
                'Statement': [
                    {
                        'Action': 'sts:AssumeRole',
                        'Effect': 'Allow',
                        'Principal': {'Service': 'redshift.amazonaws.com'}
                    }
                ],
                'Version': '2012-10-17'}
        )
    )
except Exception as e:
    print(e)

In [None]:
print('Attaching Policy')
iam.attach_role_policy(
    RoleName=DWH_IAM_ROLE_NAME,
    PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
)['ResponseMetadata']['HTTPStatusCode']

In [None]:
print('Get the IAM role ARN')
DWH_ROLE_ARN = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(DWH_ROLE_ARN)

In [None]:
# Update config file with DWH_ROLE_ARN
config.set("IAM","ROLE_ARN", DWH_ROLE_ARN)
with open(CONFIG_FILE, "w+") as configfile:
    config.write(configfile)

#### REDSHIFT CLUSTER

In [None]:
# Create a RedShift Cluster
try:
    response = redshift.create_cluster(        
        # Add parameters for hardware
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        # Add parameters for identifiers & credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        # Add parameter for role (to allow s3 access)
        IamRoles=[DWH_ROLE_ARN]
    )
except Exception as e:
    print(e)

In [None]:
# Wait for cluster getting created
print('Redshift Cluster is getting created...')
cluster_status = 'creating'
while cluster_status != "available":
    myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
    cluster_status = myClusterProps['ClusterStatus']
    time.sleep(5)
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
print(f'Cluster status: {cluster_status} \nEndpoint: {DWH_ENDPOINT}')

In [None]:
# Update config file with DWH_ENDPOINT
config.set("CLUSTER","HOST", DWH_ENDPOINT)
with open(CONFIG_FILE, "w+") as configfile:
    config.write(configfile)

In [None]:
# Establish conection to the cluster
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

### ETL


In [None]:
# Create staging, fact and dim tables
!python3 create_tables.py

In [None]:
# ETL
!python3 etl.py

### TESTING

#### Check count of staging, fact and dim tables entries

In [None]:
%sql SELECT count(*) FROM staging_events;

In [None]:
%sql SELECT count(*) FROM staging_songs;

In [None]:
%sql SELECT count(*) FROM songplays;

#### Cleaning up resoruces

In [None]:
# Delete cluster
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)

In [None]:
# Wait for cluster deletion
print('Redshift Cluster is getting getting deleted...')
cluster_status = 'deleting'
while cluster_status == "deleting":
    try:
        myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
        cluster_status = myClusterProps['ClusterStatus']
        time.sleep(5)
    except Exception as e:
        break
print(f'{DWH_CLUSTER_IDENTIFIER} has been deleted!')

In [None]:
# Delete IAM Role
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)