In [48]:
import pandas as pd

In [49]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [52]:
%load_ext sql

### Look at files

In [4]:
import boto3

s3 = boto3.resource('s3',
                    region_name="us-west-2",
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET
                   )

sampleDbBucket =  s3.Bucket("udacity-dend")
obj_list = list(sampleDbBucket.objects.filter(Prefix='song_data/A/B/C/'))
obj = obj_list[0]
obj

In [50]:
# !pip install s3fs

In [31]:

df_song_data = pd.read_json('s3://udacity-dend/song_data/A/B/C/TRABCAS128F14A25E2.json',  lines=True)

In [32]:
for col in df_song_data.columns:
    print(col)

artist_id
artist_latitude
artist_location
artist_longitude
artist_name
duration
num_songs
song_id
title
year


In [19]:
obj_list = list(sampleDbBucket.objects.filter(Prefix='log_data'))
obj = obj_list[1]
obj

s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-01-events.json')

In [21]:
df_log_data = pd.read_json('s3://udacity-dend/log_data/2018/11/2018-11-01-events.json',  lines=True)

In [24]:
for col in df_log_data.columns:
    print(col)

artist
auth
firstName
gender
itemInSession
lastName
length
level
location
method
page
registration
sessionId
song
status
ts
userAgent
userId


In [37]:
df_log_meta = pd.read_csv('s3://udacity-dend/log_json_path.json', sep=';')
df_log_meta

Unnamed: 0,{
0,"""jsonpaths"": ["
1,"""$['artist']"","
2,"""$['auth']"","
3,"""$['firstName']"","
4,"""$['gender']"","
5,"""$['itemInSession']"","
6,"""$['lastName']"","
7,"""$['length']"","
8,"""$['level']"","
9,"""$['location']"","


In [25]:
import configparser
import psycopg2
from sql_queries import create_table_queries, drop_table_queries

In [26]:
config = configparser.ConfigParser()
config.read('dwh.cfg')

['dwh.cfg']

In [27]:
list(config['CLUSTER'].keys())
# config['CLUSTER']

['host', 'db_name', 'db_user', 'db_password', 'db_port']

In [38]:
conn = psycopg2.connect(f"host={config['CLUSTER']['host']} \
    dbname={config['CLUSTER']['db_name']} \
    user={config['CLUSTER']['db_user']} \
    password={config['CLUSTER']['db_password']} \
    port={config['CLUSTER']['db_port']}")
cur = conn.cursor()

In [56]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(config['CLUSTER']['db_user'],
                                                 config['CLUSTER']['db_password'],
                                                 config['CLUSTER']['host'],
                                                 config['CLUSTER']['db_port'],
                                                 config['CLUSTER']['db_name'])
print(conn_string)
%sql $conn_string

postgresql://dwhuser:Passw0rd@dwhcluster.cmv39mltiyk7.us-west-2.redshift.amazonaws.com:5439/dwh


'Connected: dwhuser@dwh'

In [39]:
# !pip install s3fs

In [200]:
# CONFIG
config = configparser.ConfigParser()
config.read('dwh.cfg')

# DROP TABLES

staging_events_table_drop = "drop table IF EXISTS staging_events"
staging_songs_table_drop = "drop table IF EXISTS staging_songs"

songplay_table_drop = "drop table IF EXISTS songplays"
user_table_drop = "drop table IF EXISTS users"
song_table_drop = "drop table IF EXISTS songs"
artist_table_drop = "drop table IF EXISTS artist"
time_table_drop = "drop table IF EXISTS time"


# CREATE TABLES

staging_events_table_create= ("""
CREATE TABLE IF NOT EXISTS staging_events
(
artist text,
auth text,
firstName text,
gender text,
itemInSession text,
lastName text,
length text,
level text,
location text,
method text,
page text,
registration text,
sessionId text,
song text,
status text,
ts text,
userAgent text,
userId text
);
""")

staging_songs_table_create =  ("""
CREATE TABLE IF NOT EXISTS staging_songs
(
artist_id text,
artist_latitude text,
artist_location text,
artist_longitude text,
artist_name text,
duration text,
num_songs text,
song_id text,
title text,
year text
);
""")

songplay_table_create = ("""
CREATE TABLE IF NOT EXISTS songplays 
(
songplay_id BIGINT IDENTITY(0,1),
start_time timestamp NOT NULL,
user_id int NOT NULL,
level varchar,
song_id varchar,
artist_id varchar,
session_id int NOT NULL,
location varchar,
user_agent varchar
);
""")

user_table_create = ("""
CREATE TABLE IF NOT EXISTS users
(
user_id int PRIMARY KEY,
first_name varchar,
last_name varchar,
gender varchar,
level varchar
);
""")

song_table_create = ("""
CREATE TABLE IF NOT EXISTS songs 
(
song_id varchar PRIMARY KEY,
title varchar,
artist_id varchar,
year int,
duration real
);
""")

artist_table_create = ("""
CREATE TABLE IF NOT EXISTS artists 
(
artist_id varchar PRIMARY KEY,
name varchar,
location varchar,
latitude real,
longitude real
);
""")

time_table_create = ("""
CREATE TABLE IF NOT EXISTS times
(
start_time timestamp NOT NULL PRIMARY KEY,
hour int,
day int,
week int,
month int,
year int,
weekday int
);
""")

IAM_ROLE = config['IAM_ROLE']['ARN']
LOG_DATA = config['S3']['LOG_DATA']
SONG_DATA = config['S3']['SONG_DATA']
LOG_JSONPATH = config['S3']['LOG_JSONPATH']

# STAGING TABLES
staging_events_copy = (f"""
copy staging_events 
from {LOG_DATA}
iam_role '{IAM_ROLE}'
json {LOG_JSONPATH}
; """)

staging_songs_copy = (f"""
copy staging_songs 
from {SONG_DATA} 
iam_role '{IAM_ROLE}'
json 'auto'
;""")

# FINAL TABLES

# 'userId', 'firstName', 'lastName', 'gender', 'level'
user_table_insert = ("""
INSERT INTO users(user_id, first_name, last_name, gender, level)
SELECT distinct
    cast(userId as int) user_id,
    firstName first_name,
    lastName last_name,
    gender,
    level 
FROM staging_events
WHERE page = 'NextSong'
-- ON CONFLICT (user_id) DO NOTHING
""")

# 'song_id', 'title', 'artist_id', 'year', 'duration'
song_table_insert = ("""
INSERT INTO songs(song_id, title, artist_id, year, duration)
SELECT distinct
    song_id,
    title,
    artist_id,
    cast(year as INT) as year,
    cast(duration as REAL) as duration
FROM staging_songs
""")

artist_table_insert = ("""
INSERT INTO artists(artist_id, name, location, latitude, longitude)
SELECT distinct
    artist_id,
    artist_name,
    artist_location,
    cast(artist_latitude as real),
    cast(artist_longitude as real)
FROM staging_songs
""")

time_table_insert = ("""
INSERT INTO times(start_time, hour, day, week, month, year, weekDay)
SELECT distinct
    timestamp 'epoch' + cast(ts as BIGINT)/1000 * interval '1 second' as start_time,
    extract(hour from start_time) as hour,
    extract(day from start_time) as day,
    extract(week from start_time) as week,
    extract(month from start_time) as month,
    extract(year from start_time) as year, 
    extract(dayofweek from start_time) as weekDay
FROM staging_events
WHERE page = 'NextSong'
""")

songplay_table_insert = ("""
INSERT INTO songplays(start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
SELECT 
    timestamp 'epoch' + cast(e.ts as BIGINT)/1000 * interval '1 second' as start_time, 
    cast(e.userId as int) as user_id,
    e.level,
    s.song_id,
    s.artist_id,
    cast(e.sessionId as int) as session_id,
    e.location,
    e.userAgent as user_agent
FROM staging_songs s
JOIN staging_events e on e.song = s.title 
                    AND e.artist = s.artist_name
                    AND e.length = s.duration
WHERE e.page = 'NextSong'
""")

# staging_events_table_create= ("""
# CREATE TABLE IF NOT EXISTS staging_events
# (
# artist text,
# auth text,
# firstName text,
# gender text,
# itemInSession text,
# lastName text,
# length text,
# level text,
# location text,
# method text,
# page text,
# registration text,
# sessionId text,
# song text,
# status text,
# ts text,
# userAgent text,
# userId text
# );
# """)

# staging_songs_table_create =  ("""
# CREATE TABLE IF NOT EXISTS staging_songs
# (
# artist_id text,
# artist_latitude text,
# artist_location text,
# artist_longitude text,
# artist_name text,
# duration text,
# num_songs text,
# song_id text,
# title text,
# year text
# );
# """)

# QUERY LISTS

create_table_queries = [staging_events_table_create, staging_songs_table_create, songplay_table_create, user_table_create, song_table_create, artist_table_create, time_table_create]
drop_table_queries = [staging_events_table_drop, staging_songs_table_drop, songplay_table_drop, user_table_drop, song_table_drop, artist_table_drop, time_table_drop]
copy_table_queries = [staging_events_copy, staging_songs_copy]
insert_table_queries = [songplay_table_insert, user_table_insert, song_table_insert, artist_table_insert, time_table_insert]


['dwh.cfg']

In [201]:
# %%sql
# SELECT 
#     song_id,
#     title,
#     artist_id,
#     cast(year as INT) as year,
#     cast(duration as REAL) as duration
# FROM staging_songs
# limit 10

In [202]:
conn.rollback()

In [197]:
%sql drop table songplays
conn.commit()

In [203]:
cur.execute(songplay_table_create)
conn.commit()

In [204]:
%%time
cur.execute(songplay_table_insert)
conn.commit()

CPU times: user 1 ms, sys: 121 µs, total: 1.12 ms
Wall time: 10.6 s


In [188]:
# %sql truncate users
%sql select count(*) from songplays

 * postgresql://dwhuser:***@dwhcluster.cmv39mltiyk7.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


count
0


In [101]:
# %%sql
# SELECT distinct
#     cast(userId as int) user_id,
#     firstName first_name,
#     lastName last_name,
#     gender,
#     level 
# FROM staging_events
# WHERE page = 'NextSong'
# limit 10

### drop_table_queries

In [78]:
conn.rollback()

In [79]:
for query in drop_table_queries:
    cur.execute(query)
    conn.commit()

In [80]:
for query in create_table_queries:
    cur.execute(query)
    conn.commit()

In [86]:
staging_events_copy = (f"""
copy staging_events
from {LOG_DATA}
credentials 'aws_iam_role={IAM_ROLE}'
json {LOG_JSONPATH}
; """)

staging_songs_copy = (f"""
copy staging_songs 
from {SONG_DATA} 
credentials 'aws_iam_role={IAM_ROLE}'
json 'auto'
; 
""")

In [92]:
# %sql drop table staging_events
# cur.execute(create_table_queries[0])
# conn.commit()

In [90]:
%%time
cur.execute(staging_events_copy)
conn.commit()

CPU times: user 1.01 ms, sys: 157 µs, total: 1.17 ms
Wall time: 1.27 s


In [83]:
%%time
cur.execute(staging_songs_copy)
conn.commit()

CPU times: user 0 ns, sys: 1.74 ms, total: 1.74 ms
Wall time: 3min 13s


In [91]:
%sql select count(*) from staging_events;
%sql select count(*) from staging_songs;

 * postgresql://dwhuser:***@dwhcluster.cmv39mltiyk7.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


count
8056


 * postgresql://dwhuser:***@dwhcluster.cmv39mltiyk7.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


count
14896


In [205]:
conn.close()