# ETL Data

In [1]:
import pandas as pd
import configparser
import psycopg2
import os

In [2]:
# move up one directory for config files
os.chdir('..')
os.getcwd()

In [5]:
# Load configuration file

config = configparser.ConfigParser()
config.read('dwh.cfg')
    
host = config['CLUSTER']['DB_ENDPOINT']
dbname = config['CLUSTER']['DB_NAME']
user = config['CLUSTER']['DB_USER']
password = config['CLUSTER']['DB_PASSWORD']
port = config['CLUSTER']['DB_PORT']

conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(host, dbname, user, password, port))
cur = conn.cursor()

OperationalError: could not translate host name "dwhcluster.cysyqtwog01l.us-west-2.redshift.amazonaws.com" to address: Name or service not known


In [None]:
# Load configuration variables

iam_role = config['IAM_ROLE']['ARN']

In [None]:
# Create Staging Table SQL Queries

staging_events_copy = ("""
copy event_stage
from 's3://udacity-dend/log_data/2018/11/2018'
credentials 'aws_iam_role={}'
format as json 's3://udacity-dend/log_json_path.json';
""").format(iam_role)



staging_songs_copy = ("""
copy event_stage
from 's3://udacity-dend/log_data/2018/11/2018'
credentials 'aws_iam_role={}'
format as json 's3://udacity-dend/log_json_path.json';
""").format(iam_role)


In [None]:
# Execute Staging Table SQL Queries 
copy_table_queries = [staging_events_copy, staging_songs_copy]
print("Loading Staging Tables...")
    for query in copy_table_queries:
        cur.execute(query)
        conn.commit()

In [None]:
# Create Star Schema Table SQL Queries

songplay_table_insert = ("""
INSERT INTO fact_songplay(songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
SELECT
    e.userId || st.song_id || e.itemInSession as songplay_id,
    CAST(e.ts as bigint) as start_time,
    CAST(e.userId as int) as user_id,
    e.level as level,
    st.song_id,
    st.artist_id,
    CAST(e.itemInSession as int) as session_id,
    e.location as location,
    e.userAgent as user_agent
FROM (select * from event_stage where page = 'NextSong') as e
LEFT JOIN song_stage st
    ON (e.artist = st.artist_name OR e.song = st.title)
WHERE song_id <> 'None'
ORDER BY start_time ASC
""")

user_table_insert = ("""
INSERT INTO dim_user (user_id, first_name, last_name, gender, level)
SELECT DISTINCT
    CAST(e.userID as integer) AS user_id,
    e.firstName AS first_name,
    e.lastName AS last_name,
    e.gender AS gender,
    e.level AS level
FROM event_stage e
WHERE e.page = 'NextSong'
""")

song_table_insert = ("""
INSERT INTO dim_song (song_id, title, artist_id, year, duration)
SELECT DISTINCT
    s.song_id AS song_id,
    s.title AS title,
    s.artist_id AS artist_id,
    CAST(s.year as integer) AS year,
    CAST(s.duration as decimal(8,2)) AS duration
FROM song_stage s
""")

artist_table_insert = ("""
INSERT INTO dim_artist (artist_id, name, location, latitude, longitude)
SELECT DISTINCT
    s.artist_id AS artist_id,
    s.artist_name AS name,
    s.artist_location AS location,
    CONVERT(float, s.artist_latitude) AS latitude,
    CONVERT(float, s.artist_longitude) AS longitude
FROM song_stage s
JOIN event_stage e
    ON (e.artist = s.artist_name AND e.song = s.title)
WHERE e.page = 'NextSong'
""")

time_table_insert = ("""
INSERT INTO dim_time(time_key, start_time, hour, day, week, month, year, weekday)
SELECT DISTINCT
    CAST(e.ts as bigint) AS time_key,
    TIMESTAMP 'epoch' + e.ts/1000 *INTERVAL '1 second' as start_time,
    EXTRACT(hour from TIMESTAMP 'epoch' + e.ts/1000 *INTERVAL '1 second') AS hour,
    CAST(DATE_PART(day, TIMESTAMP 'epoch' + e.ts/1000 *INTERVAL '1 second')  as Integer) AS day,
    CAST(DATE_PART(week, TIMESTAMP 'epoch' + e.ts/1000 *INTERVAL '1 second') as Integer) AS week,
    CAST(DATE_PART(month, TIMESTAMP 'epoch' + e.ts/1000 *INTERVAL '1 second') as Integer) AS month,
    CAST(DATE_PART(year, TIMESTAMP 'epoch' + e.ts/1000 *INTERVAL '1 second') as Integer) AS year,
    CASE
        WHEN(
                DATE_PART(dayofweek, TIMESTAMP 'epoch' + e.ts/1000 *INTERVAL '1 second') = 0.0
                OR
                DATE_PART(dayofweek, TIMESTAMP 'epoch' + e.ts/1000 *INTERVAL '1 second') = 6.0
            )
        THEN 'no'
        ELSE 'yes'
        END
        AS weekday
FROM event_stage e
WHERE e.page = 'NextSong'
ORDER BY time_key ASC;

""")

In [None]:
# Execute Star Schema Table SQL Queries

insert_table_queries = [songplay_table_insert, user_table_insert, song_table_insert, artist_table_insert, time_table_insert]
for query in insert_table_queries:
        cur.execute(query)
        conn.commit()