# ETL Processes
Use this notebook to develop the ETL process for each of your tables before completing the `etl.py` file to load the whole datasets.

In [1]:
#create sql tables to place processed data into
%run create_tables.py

In [2]:
#required imported for functional code
import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *

In [3]:
#open cursor to POSTGRES server
#make sure to replace host address as needed!!
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=student password=student")
cur = conn.cursor()

In [4]:
#create list of file addresses for processing
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

# Process Million Song Data

## #1: `songs` Table


In [5]:
#indentify the song file list
song_files = get_files('data/song_data')
len(song_files)

71

In [6]:
#ensure that filepath is correctly identifying the file location
filepath = song_files[0]
filepath

'/workspace/home/data/song_data/A/B/B/TRABBNP128F932546F.json'

In [7]:
#establish the song_df from JSON
song_df = pd.read_json(filepath, lines = True)
song_df.head()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,AR62SOJ1187FB47BB5,,,,Chase & Status,337.68444,1,SOGVQGJ12AB017F169,Ten Tonne,2005


In [8]:
#create the song data list to pass to the sql statement

song_data = (song_df[['song_id', 'title', 'artist_id', 'year', 'duration']].values[0]).tolist()

#ensuring that the proper data is getting passed to the sql statement
song_data

['SOGVQGJ12AB017F169', 'Ten Tonne', 'AR62SOJ1187FB47BB5', 2005, 337.68444]

In [9]:
#use cursor command to run the sql statement using the above data
cur.execute(song_table_insert, song_data)
conn.commit()

Run `test.ipynb` if you want to check successfully record insert to this table.

## #2: `artists` Table



In [10]:
artist_data = (song_df[['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']].values[0]).tolist()
artist_data

['AR62SOJ1187FB47BB5', 'Chase & Status', '', nan, nan]

In [11]:
#use cursor to insert data using sql statement
cur.execute(artist_table_insert, artist_data)
conn.commit()

Run `test.ipynb` to see if you've successfully added a record to this table.

# Process Sparkify `log_data`


In [12]:
#create list of file addresses
log_files = get_files('data/log_data')

In [13]:
#check that address is pointing to correct location
filepath = log_files[0]
filepath

'/workspace/home/data/log_data/2018/11/2018-11-04-events.json'

In [14]:
#create dataframe from JSON data
log_df = pd.read_json(filepath, lines = True)
log_df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,,Logged In,Theodore,M,0,Smith,,free,"Houston-The Woodlands-Sugar Land, TX",GET,Home,1540306000000.0,154,,200,1541290555796,Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) G...,52
1,Professor Longhair,Logged In,Ann,F,0,Banks,214.20363,free,"Salt Lake City, UT",PUT,NextSong,1540896000000.0,124,Mean Ol'World,200,1541292603796,Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; r...,99
2,,Logged In,Jahiem,M,0,Miles,,free,"San Antonio-New Braunfels, TX",GET,Home,1540817000000.0,42,,200,1541299033796,"""Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537....",43
3,Gary Hobbs,Logged In,Jahiem,M,1,Miles,245.52444,free,"San Antonio-New Braunfels, TX",PUT,NextSong,1540817000000.0,42,En Mi Mundo,200,1541300092796,"""Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537....",43
4,Lifehouse,Logged In,Jahiem,M,2,Miles,203.59791,free,"San Antonio-New Braunfels, TX",PUT,NextSong,1540817000000.0,42,We'll Never Know,200,1541300337796,"""Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537....",43


## #3: `time` Table
Convert time data from a MS integer to a timestamp

In [15]:
#first, filter down the dataframe to only desired entries
log_df = log_df[log_df['page'] == 'NextSong']
log_df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
1,Professor Longhair,Logged In,Ann,F,0,Banks,214.20363,free,"Salt Lake City, UT",PUT,NextSong,1540896000000.0,124,Mean Ol'World,200,1541292603796,Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; r...,99
3,Gary Hobbs,Logged In,Jahiem,M,1,Miles,245.52444,free,"San Antonio-New Braunfels, TX",PUT,NextSong,1540817000000.0,42,En Mi Mundo,200,1541300092796,"""Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537....",43
4,Lifehouse,Logged In,Jahiem,M,2,Miles,203.59791,free,"San Antonio-New Braunfels, TX",PUT,NextSong,1540817000000.0,42,We'll Never Know,200,1541300337796,"""Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537....",43
5,Olivia Ruiz,Logged In,Jahiem,M,3,Miles,254.74567,free,"San Antonio-New Braunfels, TX",PUT,NextSong,1540817000000.0,42,Cabaret Blanco,200,1541300540796,"""Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537....",43
7,Jordan Rudess,Logged In,Cecilia,F,1,Owens,1367.84934,free,"Atlanta-Sandy Springs-Roswell, GA",PUT,NextSong,1541032000000.0,225,Tarkus,200,1541306152796,Mozilla/5.0 (Windows NT 6.1; WOW64; rv:32.0) G...,6


In [16]:
#create a series of timestamps using the 'ts' column
t = pd.to_datetime(log_df['ts'], unit = 'ms')
t.head()

1   2018-11-04 00:50:03.796
3   2018-11-04 02:54:52.796
4   2018-11-04 02:58:57.796
5   2018-11-04 03:02:20.796
7   2018-11-04 04:35:52.796
Name: ts, dtype: datetime64[ns]

In [17]:
#establish a list of individual timestamp components
time_data = [t, t.dt.hour, t.dt.day, t.dt.week, t.dt.month, t.dt.year, t.dt.weekday]

#create labels for a future dataframe
column_labels = ('timestamp', 'hour', 'day', 'week', 'month', 'year', 'weekday')

In [18]:
#creating dictionary to pair the data with identifiers
time_dict = {}

#loop pairs the label to data as a dictionary, converts to a dataframe after loop is complete
for c in range(len(column_labels)):
    time_dict[column_labels[c]] = time_data[c]
time_df = pd.DataFrame(time_dict)


time_df.tail()

Unnamed: 0,timestamp,hour,day,week,month,year,weekday
180,2018-11-04 21:01:44.796,21,4,44,11,2018,6
181,2018-11-04 22:56:24.796,22,4,44,11,2018,6
182,2018-11-04 22:58:36.796,22,4,44,11,2018,6
183,2018-11-04 23:02:14.796,23,4,44,11,2018,6
188,2018-11-04 23:45:13.796,23,4,44,11,2018,6


In [19]:
#insert time dataframe into POSTGRES database
for i, row in time_df.iterrows():
    cur.execute(time_table_insert, list(row))
    conn.commit()

Run `test.ipynb` to see if you've successfully added records to this table if needed

## #4: `users` Table


In [20]:
#filer the log data to user relevant information
user_df = log_df[['userId', 'firstName', 'lastName', 'gender', 'level']]

In [21]:
#checking for proper assignments
user_df.head()

Unnamed: 0,userId,firstName,lastName,gender,level
1,99,Ann,Banks,F,free
3,43,Jahiem,Miles,M,free
4,43,Jahiem,Miles,M,free
5,43,Jahiem,Miles,M,free
7,6,Cecilia,Owens,F,free


In [22]:
#insert user data frame into POSTGRES
for i, row in user_df.iterrows():
    cur.execute(user_table_insert, row)
    conn.commit()

Run `test.ipynb` to see if you've successfully added records to this table if needed

## #5: `songplays` Table


In [23]:
for index, row in log_df.iterrows():

    # get songid and artistid from song and artist tables
    cur.execute(song_select, (row.song, row.artist, row.length))
    results = cur.fetchone()   

    if results:
        songid, artistid= results
    else:
        songid, artistid = None, None
    
    
   
    #create list of data to pass to sql statement
    songplay_data = (time_df.timestamp[index], row.userId, row.level, songid, artistid, row.sessionId, row.location, row.userAgent)
   
     #insert songplay record into POSTGRES
    cur.execute(songplay_table_insert, songplay_data)
    conn.commit()

Run `test.ipynb` to see if you've successfully added records to this table if needed

# Close Connection to Sparkify Database

In [24]:
conn.close()

# Implement `etl.py`

If all addresses, tables and imports run smoothly above, run this block for full import.

In [25]:
#run create tables to ensure empty tables are present for import and no duplicates are added.
%run create_tables.py
#run etl.py to import and process all data from log and song into the sql database
%run etl.py

71 files found in data/song_data
1/71 files processed.
2/71 files processed.
3/71 files processed.
4/71 files processed.
5/71 files processed.
6/71 files processed.
7/71 files processed.
8/71 files processed.
9/71 files processed.
10/71 files processed.
11/71 files processed.
12/71 files processed.
13/71 files processed.
14/71 files processed.
15/71 files processed.
16/71 files processed.
17/71 files processed.
18/71 files processed.
19/71 files processed.
20/71 files processed.
21/71 files processed.
22/71 files processed.
23/71 files processed.
24/71 files processed.
25/71 files processed.
26/71 files processed.
27/71 files processed.
28/71 files processed.
29/71 files processed.
30/71 files processed.
31/71 files processed.
32/71 files processed.
33/71 files processed.
34/71 files processed.
35/71 files processed.
36/71 files processed.
37/71 files processed.
38/71 files processed.
39/71 files processed.
40/71 files processed.
41/71 files processed.
42/71 files processed.
43/71 file

ProgrammingError: column "start_time" is of type timestamp without time zone but expression is of type time without time zone
LINE 10:     VALUES ('00:50:03.796000'::time,0,4,44,11,2018,6)
                     ^
HINT:  You will need to rewrite or cast the expression.
