# ETL Processes
Use this notebook to develop the ETL process for each of your tables before completing the `etl.py` file to load the whole datasets.

In [None]:
import os
import glob
import psycopg2
import pandas as pd
import boto3
from sql_queries import *

### Load DWH Params from a file

In [None]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY = config.get('AWS','KEY')
SECRET = config.get('AWS','SECRET')

HOST = config.get("CLUSTER","HOST")
CLUSTER_IDENTIFIER=config.get("CLUSTER","CLUSTER_IDENTIFIER")
DB_NAME = config.get("CLUSTER","DB_NAME")
DB_USER = config.get("CLUSTER","DB_USER")
DB_PASSWORD = config.get("CLUSTER","DB_PASSWORD")
DB_PORT = config.get("CLUSTER","DB_PORT")
CLUSTER_TYPE = config.get("CLUSTER","CLUSTER_TYPE")
NUM_NODES = config.get("CLUSTER","NUM_NODES")
NODE_TYPE = config.get("CLUSTER","NODE_TYPE")

IAM_ROLE_NAME = config.get("IAM_ROLE", "NAME")
IAM_ARN = config.get("IAM_ROLE", "ARN")

# Connect to the clusterConnect to the cluster

Get endpoint and role_arn from the cluster

In [None]:
redshift = boto3.client('redshift',
                        region_name='us-west-2',
                        aws_access_key_id=KEY,
                        aws_secret_access_key=SECRET                      
                       )

def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)


In [None]:
ENDPOINT = myClusterProps['Endpoint']['Address']
ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("ENDPOINT :: ", ENDPOINT)
print("ROLE_ARN :: ", ROLE_ARN)

## Check out the sample data sources on S3

In [None]:
s3 = boto3.resource('s3',
                  region_name='us-east-1',
                  aws_access_key_id=KEY,
                  aws_secret_access_key=SECRET
                 )

## Connect to cluster

In [None]:
%load_ext sql

In [None]:
conn_string = "postgresql://{}:{}@{}:{}/{}" \
                        .format(DB_USER, DB_PASSWORD, HOST, DB_PORT, DB_NAME)


In [None]:
%sql $conn_string

# Process `song_data`
In this first part, you'll perform ETL on staging table. Let's perform ETL on a single song file and load a single record into each table to start.

In [None]:
songDataDbBucket =  s3.Bucket("udacity-dend")
s3_song_files = [file for _, file in map(lambda x: (x.bucket_name, x.key),songDataDbBucket.objects.filter(Prefix="song_dat"))]
print(s3_song_files[:4])

In [None]:
import io
s3_client = boto3.client('s3',
                         aws_access_key_id=KEY,
                         aws_secret_access_key=SECRET)
obj = s3_client.get_object(Bucket='udacity-dend', Key=s3_song_files[1])
df = pd.read_json(io.BytesIO(obj['Body'].read()),lines=True)
df.head(3)

In [None]:
df.info()

COPY command to load data from s3://udacity-dend/song_data using your iam role credentials

In [None]:
%%time
qry = """
    copy staging_artist_songs from 's3://udacity-dend/song_data'
    credentials 'aws_iam_role={}'
    format as json 'auto';
""".format(ROLE_ARN)

%sql $qry

In [None]:
%%sql 
SELECT * 
FROM stl_load_errors
LIMIT 2;

# Process `log_data`
In this part, you'll perform ETL on the second dataset, `log_data` on on staging table. Let's perform ETL on a single log file and load a single record into each table.


In [None]:
logDataDbBucket =  s3.Bucket("udacity-dend")
log_data_files = [file for _, file in map(lambda x: (x.bucket_name, x.key),songDataDbBucket.objects.filter(Prefix="log_data"))]
print(log_data_files[:4])

In [None]:
obj = s3_client.get_object(Bucket='udacity-dend', Key=log_data_files[1])
json_bytes = io.BytesIO(obj['Body'].read())
df = pd.read_json(json_bytes, lines=True)
df.head(1)

In [None]:
#s3.create_bucket(Bucket='spartkify-etl-config', CreateBucketConfiguration={'LocationConstraint': 'us-west-2'})
s3.Object('spartkify-etl-config', 'jpath.json').put(Body=open('jpath.json', 'rb'))

In [None]:
log_data_files[1]

In [None]:
%%time
qry = """
    copy staging_events 
    from 's3://udacity-dend/log_data'
    credentials 'aws_iam_role={}'
    FORMAT AS JSON 's3://spartkify-etl-config/jpath.json' ;
""".format(ROLE_ARN)

%sql $qry

In [None]:
%%time
%sql $qry

In [None]:
ROLE_ARN

In [None]:
%sql SELECT *  FROM stl_load_errors order by starttime desc LIMIT 1;

In [None]:
%sql SELECT count(*)  FROM stl_load_errors

# ELT from staging to Star schema

Here is a Entity Relationship Diagram for songs

### ER Start Diagram

[<img src="er-sparkify.png">]()

Convert Unix timestamp in hour

In [None]:
%%sql
SELECT  EXTRACT(hour from TIMESTAMP 'epoch' + ts/1000 *INTERVAL '1 second')
FROM staging_events
LIMIT 5

Populate the time table with data from the staging_events table

In [None]:
%%sql
INSERT INTO time (start_time, hour, day, week, month, year, weekday)
SELECT TIMESTAMP 'epoch' + ts/1000 *INTERVAL '1 second' AS start_time_key,
       EXTRACT(hour from TIMESTAMP 'epoch' + ts/1000 *INTERVAL '1 second')  AS hour,
       EXTRACT(day  from TIMESTAMP 'epoch' + ts/1000 *INTERVAL '1 second')  AS day,
       EXTRACT(week from TIMESTAMP 'epoch' + ts/1000 *INTERVAL '1 second')  AS week,
       EXTRACT(month from TIMESTAMP 'epoch' + ts/1000 *INTERVAL '1 second') AS month,
       EXTRACT(year from TIMESTAMP 'epoch' + ts/1000 *INTERVAL '1 second')  AS year,
       EXTRACT(dow from TIMESTAMP 'epoch' + ts/1000 *INTERVAL '1 second')   AS weekday
FROM staging_events;

Populate the songs table with data from the staging_artist_songs table

In [None]:
%%sql 
INSERT INTO songs(song_id, title, artist_id, year, duration) 
SELECT DISTINCT song_id, title, artist_id, year, duration
FROM staging_artist_songs

Populate the artists table with data from the staging_artist_songs table

In [None]:
%%sql 
INSERT INTO artists(artist_id, name, latitude, location, longitude)
SELECT DISTINCT artist_id, artist_name, artist_latitude, artist_location, artist_longitude
FROM staging_artist_songs

Populate the users table with data from the staging_events table

In [None]:
%%sql 
SELECT DISTINCT userid,
    firstName,
    lastName,
    gender,
    level
FROM staging_events
LIMIT 10

Populate the songplays table with data from the staging_events, sogns and artists table

In [None]:
%%sql
INSERT INTO songplays(start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
SELECT TIMESTAMP 'epoch' + ts/1000 *INTERVAL '1 second',
    userId,
    level,
    s.song_id,
    a.artist_id,
    sessionId,
    e.location,
    userAgent
FROM staging_events e
JOIN songs s ON (e.song = s.title AND e.) 
JOIN artists a ON (e.artist = a.name)

Verify table staging_events

In [None]:
%%sql 
SELECT *
FROM staging_events
WHERE page = 'NextSong'
LIMIT 5