# ETL PROCESSES

# Developing the ETL process for each of the tables.

In [29]:
# Imports
import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *

In [30]:
# Connectiong to the database
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkify user=franchise password='420GLOCKzone.'")
cur = conn.cursor()

In [31]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root, '*.json'))
        for f in files:
            all_files.append(os.path.abspath(f))

    return all_files

# Process song_data

In [32]:
# Creating the songs and artists dimensional tables.
song_files = get_files('./data/song_data/')
filepath = song_files[0]

In [33]:
# ETL on a single song and load the record into each table.
df = pd.read_json(filepath, lines=True)
df.head()

Unnamed: 0,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,1,ARDNS031187B9924F0,32.67828,-83.22295,Georgia,Tim Wilson,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,186.48771,2005


In [34]:
# Extracting data from the songs table
song_data_df = df[['song_id', 'title','artist_id','year','duration']]
song_data_values = song_data_df.values   # Returns a numpy representation of values in the dataframe
first_record_df = song_data_values[0]
song_data = first_record_df.tolist()
song_data


['SONYPOM12A8C13B2D7',
 'I Think My Wife Is Running Around On Me (Taco Hell)',
 'ARDNS031187B9924F0',
 2005,
 186.48771]

In [35]:
# Inserting the record into the song table
cur.execute(song_table_insert, song_data)
conn.commit()

## Artists table

In [36]:
# Extracting data for Artists table
artist_data_df = df[['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']]
artist_data_values = artist_data_df.values
first_record_df = artist_data_df.values[0]
artist_data = first_record_df.tolist()
artist_data

['ARDNS031187B9924F0', 'Tim Wilson', 'Georgia', 32.67828, -83.22295]

In [37]:
# Inserting the records into the artists table
cur.execute(artist_table_insert, artist_data)
conn.commit()

# Processing Log_data

In [39]:
log_files = get_files('./data/log_data/')
filepath = log_files[0]

In [40]:
df = pd.read_json(filepath, lines=True)
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,,Logged In,Lily,F,0,Burns,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1540621000000.0,689,,200,1542592468796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",32.0
1,Explosions In The Sky,Logged In,Adelyn,F,0,Jordan,497.47546,free,"Chicago-Naperville-Elgin, IL-IN-WI",PUT,NextSong,1540131000000.0,458,Your Hand In Mine,200,1542592496796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3...",7.0
2,,Logged In,Adelyn,F,1,Jordan,,free,"Chicago-Naperville-Elgin, IL-IN-WI",PUT,Logout,1540131000000.0,458,,307,1542592497796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3...",7.0
3,,Logged Out,,,2,,,free,,GET,Home,,458,,200,1542592500796,,
4,,Logged Out,,,3,,,free,,PUT,Login,,458,,307,1542592501796,,


## Time_table

In [42]:
# FIltering records by next song action
df = df[df.page == 'NextSong']
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
1,Explosions In The Sky,Logged In,Adelyn,F,0,Jordan,497.47546,free,"Chicago-Naperville-Elgin, IL-IN-WI",PUT,NextSong,1540131000000.0,458,Your Hand In Mine,200,1542592496796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3...",7
6,Paul Van Dyk Featuring Jessica Sutta,Logged In,Layla,F,0,Griffin,425.66485,paid,"Lake Havasu City-Kingman, AZ",PUT,NextSong,1541057000000.0,672,White Lies (Dave Spoon Remix),200,1542592893796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",24
7,Tim Hughes,Logged In,Layla,F,1,Griffin,323.47383,paid,"Lake Havasu City-Kingman, AZ",PUT,NextSong,1541057000000.0,672,God of Justice,200,1542593318796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",24
8,David Cassidy & The Partridge Family,Logged In,Layla,F,2,Griffin,227.73506,paid,"Lake Havasu City-Kingman, AZ",PUT,NextSong,1541057000000.0,672,I'll Meet You Halfway,200,1542593641796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",24
9,Snow Patrol,Logged In,Layla,F,3,Griffin,200.93342,paid,"Lake Havasu City-Kingman, AZ",PUT,NextSong,1541057000000.0,672,Crack The Shutters,200,1542593868796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",24


In [43]:
# Converting the ts timestamp to datetime
t = pd.to_datetime(df['ts'], unit='ms')
t.head()

1   2018-11-19 01:54:56.796
6   2018-11-19 02:01:33.796
7   2018-11-19 02:08:38.796
8   2018-11-19 02:14:01.796
9   2018-11-19 02:17:48.796
Name: ts, dtype: datetime64[ns]

In [44]:
time_data = [t, t.dt.hour, t.dt.day, t.dt.isocalendar().week, t.dt.month, t.dt.year, t.dt.weekday]
column_labels = ['timestamp', 'hour', 'day', 'weekofyear', 'month', 'year', 'weekday']


In [46]:
time_dict = dict(zip(column_labels, time_data))
time_df = pd.DataFrame(time_dict)
time_df.head()

Unnamed: 0,timestamp,hour,day,weekofyear,month,year,weekday
1,2018-11-19 01:54:56.796,1,19,47,11,2018,0
6,2018-11-19 02:01:33.796,2,19,47,11,2018,0
7,2018-11-19 02:08:38.796,2,19,47,11,2018,0
8,2018-11-19 02:14:01.796,2,19,47,11,2018,0
9,2018-11-19 02:17:48.796,2,19,47,11,2018,0


In [47]:
# Inserting records into the time table
for i, row in time_df.iterrows():
    cur.execute(time_table_insert, list(row))
    conn.commit()

## Users table

In [50]:
# Extracting data for users table
user_df = df[['userId', 'firstName', 'lastName', 'gender', 'level']]
user_df = user_df.drop_duplicates().dropna()
user_df.head()

Unnamed: 0,userId,firstName,lastName,gender,level
1,7,Adelyn,Jordan,F,free
6,24,Layla,Griffin,F,paid
41,66,Kevin,Arellano,M,free
47,37,Jordan,Hicks,F,free
51,15,Lily,Koch,F,paid


In [51]:
# Inserting records into the users table
for i, row in user_df.iterrows():
    cur.execute(user_table_insert, row)
    conn.commit()

In [52]:
# Inserting records into songplays table
for index, row in df.iterrows():

    # get songid and artistid from song and artist tables
    cur.execute(song_select, (row.song, row.artist, row.length))
    results = cur.fetchone()

    if results:
        songid, artistid = results
    else:
        songid, artistid = None, None

    # Insert songplay record
    songplay_data = (pd.to_datetime(row.ts, unit='ms'), row.userId, row.level, songid, artistid, row.sessionId, row.location, row.userAgent)
    cur.execute(songplay_table_insert, songplay_data)
    conn.commit()

In [None]:
# Closing connection to the Sparkify Database
#conn.close()