# ETL Process

In [None]:
%load_ext sql

In [None]:
import boto3
import configparser
import json
import matplotlib.pyplot as plt
import pandas as pd
from io import BytesIO
from time import time

## Load DWH Params from a configuration file

In [None]:
config = configparser.ConfigParser()
config.read("dwh.cfg")

# AWS ACCESS CONFIGS
KEY                             = config.get("AWS", "KEY")
SECRET                          = config.get("AWS", "SECRET")

# DATA WAREHOUSE CONFIGS
DWH_CLUSTER_TYPE                = config.get("DWH", "DWH_CLUSTER_TYPE")
DWH_NUM_NODES                   = config.get("DWH", "DWH_NUM_NODES")
DWH_NODE_TYPE                   = config.get("DWH", "DWH_NODE_TYPE")
DWH_CLUSTER_IDENTIFIER          = config.get("DWH", "DWH_CLUSTER_IDENTIFIER")
DWH_IAM_ROLE_NAME               = config.get("DWH", "DWH_IAM_ROLE_NAME")

# DATA WAREHOUSE DATABSE CONFIGS
DB_NAME                         = config.get("CLUSTER", "DB_NAME")
DB_USER                         = config.get("CLUSTER", "DB_USER")
DB_PASSWORD                     = config.get("CLUSTER", "DB_PASSWORD")
DB_PORT                         = config.get("CLUSTER", "DB_PORT")

## Creating clients for IAM, EC2, S3, and Redshift

In [None]:
ec2 = boto3.resource("ec2",
                     region_name="us-west-2",
                     aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET
                    )

s3 = boto3.resource("s3",
                     region_name="us-west-2",
                     aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET
                    )

iam = boto3.client("iam",
                     region_name="us-west-2",
                     aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET
                    )

redshift = boto3.client("redshift",
                     region_name="us-west-2",
                     aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET
                    )

## Check Sample Datasets on s3

In [None]:
datasetBucket = s3.Bucket("udacity-dend")

## Song Datasets


In [None]:
for bucket in datasetBucket.objects.filter(Prefix="song-data").limit(10):
    print(bucket)

In [None]:
song_obj = s3.Object("udacity-dend", "song_data/A/A/A/TRAAAAK128F9318786.json")
with BytesIO(song_obj.get()["Body"].read()) as song_data:
    song_data_df = pd.read_json(song_data, lines=True)
song_data_df.head()

## Log Datasets

In [None]:
for bucket in datasetBucket.objects.filter(Prefix="log-data").limit(10):
    print(bucket)

In [None]:
log_obj = s3.Object("udacity-dend", "log-data/2018/11/2018-11-01-events.json")
with BytesIO(log_obj.get()["Body"].read()) as log_data:
    log_data_df = pd.read_json(log_data, lines=True)
log_data_df.head()

In [None]:
log_data_df.shape

In [None]:
log_jsonpath = s3.Object("udacity-dend", "log_json_path.json")
with BytesIO(log_jsonpath.get()["Body"].read()) as log_jpath:
    print(json.load(log_jpath))

## IAM ROLE

In [None]:
# creating an IAM ROLE that makes Redshift able to access S3 bucket (ReadOnly)
from botocore.exceptions import ClientError

try:
    dwhRole = iam.create_role(
        Path="/",
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on my behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {
                "Statement": [{"Action": "sts:AssumeRole",
                               "Effect": "Allow",
                               "Principal": {"Service": "redshift.amazonaws.com"}
                              }],
                "Version": "2012-10-17"
            }
        )
    )
except ClientError as e:
    print(e)
    
# Attaching Policy
iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )["ResponseMetadata"]["HTTPStatusCode"]

# Get the IAM role ARN to be used later when creating cluster and database instance
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)["Role"]["Arn"]

In [None]:
print(roleArn)

## Creating Redshift Cluster

In [None]:
try:
    response = redshift.create_cluster(
        # Hardware
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),
        
        # Identifiers and Credentials
        DBName=DB_NAME,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DB_USER,
        MasterUserPassword=DB_PASSWORD,
        
        # Roles (for s3 access)
        IamRoles=[roleArn]
    )
except Exception as e:
    print(e)

## Cluster Status

In [None]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

## Getting Cluster endpoint/host and role ARN


In [None]:
DWH_ENDPOINT = myClusterProps["Endpoint"]["Address"]
DWH_ROLE_ARN = myClusterProps["IamRoles"][0]["IamRoleArn"]
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

## Opening an incoming TCP port to access the cluster endpoint


In [None]:
try:
    vpc = ec2.Vpc(id=myClusterProps["VpcId"])
    defaultSg = list(vpc.security_groups.all())[0]
    # print(defaultSg)
    
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DB_PORT),
        ToPort=int(DB_PORT)
    )
except Exception as e:
    print(e)

## Checking connection to the cluster

In [None]:

conn_string="postgresql://{}:{}@{}:{}/{}".format(DB_USER, DB_PASSWORD, DWH_ENDPOINT, DB_PORT, DB_NAME)
# print(conn_string)
%sql $conn_string

## Run this as test copy command if you have not yet tested with etl.py

In [None]:
staging_events_copy = ("""
    COPY log_events_staging FROM {}
    credentials 'aws_iam_role={}'
    json {}
    region 'us-west-2';
""").format(config["S3"]["LOG_DATA"], config["IAM_ROLE"]["ARN"], config["S3"]["LOG_JSONPATH"])
%sql $staging_events_copy

## Run these as test select queries if you have already run etl.py

In [None]:
%sql SELECT COUNT(*) FROM songplays;

In [None]:
%sql SELECT * FROM songplays LIMIT 2;

In [None]:
%sql SELECT COUNT(*) FROM users;

In [None]:
%sql SELECT * FROM users LIMIT 2;

In [None]:
%sql SELECT * FROM users WHERE user_id = 10;

In [None]:
%sql SELECT COUNT(*) FROM songs;

In [None]:
%sql SELECT * FROM songs LIMIT 2;

In [None]:
%sql SELECT * FROM songs WHERE song_id='SOUBASN12AC468DB23' LIMIT 2;

In [None]:
%sql SELECT COUNT(*) FROM artists;

In [None]:
%sql SELECT * FROM artists LIMIT 2;

In [None]:
%sql SELECT * FROM artists WHERE artist_id='ARGQJWL1187FB3CE9E';

In [None]:
%sql SELECT COUNT(*) FROM time;

In [None]:
%sql SELECT * FROM time LIMIT 2;

In [None]:
%sql SELECT * FROM log_events_staging LIMIT 2;

In [None]:
%sql SELECT * FROM songs_staging LIMIT 2;

In [None]:
%%sql 
SELECT DISTINCT st.song_id, st.artist_id, st.artist_name, ls.artist, ls.level
FROM songs_staging st
JOIN log_events_staging ls
ON st.artist_name = ls.artist
WHERE ls.userid IS NOT NULL
LIMIT 10;

In [None]:
%%sql
WITH free_level as (
    SELECT DISTINCT userId, firstName, lastName, gender, level
    FROM log_events_staging
    WHERE userID IS NOT NULL
    AND level='free'
),
paid_level as (
    SELECT DISTINCT userId, firstName, lastName, gender, level
    FROM log_events_staging
    WHERE userID IS NOT NULL
    AND level='paid'
),
unique_users as (
    SELECT *
    FROM free_level
    WHERE userId NOT IN (SELECT userId FROM paid_level)
    UNION ALL
    SELECT *
    FROM paid_level
)
SELECT COUNT(*)FROM unique_users;

## Clean up resources after testing out the whole ETL pipeline in **etl.py**

In [None]:
#### CAREFUL!!
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
#### CAREFUL!!

In [None]:
#### CAREFUL!!
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)
#### CAREFUL!!