# ETL Processes
Use this notebook to develop the ETL process for each of your tables before completing the `etl.py` file to load the whole datasets.

In [40]:
# Imports all the needed library
import os #library to interact with the operating system
import glob #library to interact with the data files
import psycopg2 #library to interact with the postgreSQL database system
import pandas as pd #library to inreact with structuring data
import json #library to interact with the json file
from sql_queries import * # the model contain related SQL functions

In [3]:
# Uses the psycopg2 library to connect to the postgreSQL database 
# Runs after generate the create_tables.py in terminal
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=student password=student")
cur = conn.cursor()

In [4]:
# This python function helps to retrieve all the .json files inside the direction folder.
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

# Process `song_data`
In this first part, you'll perform ETL on the first dataset, `song_data`, to create the `songs` and `artists` dimensional tables.

Let's perform ETL on a single song file and load a single record into each table to start.
- Use the `get_files` function provided above to get a list of all song JSON files in `data/song_data`
- Select the first song in this list
- Read the song file and view the data

In [5]:
# Implements the get_files() function to retrieve all .json files in the song_data folder.
# Assigns the song_files as a list contain those files.
song_files = get_files('data/song_data')
# Print out the list of retrieved files to check.
song_files

['/workspace/home/data/song_data/A/B/C/TRABCEC128F426456E.json',
 '/workspace/home/data/song_data/A/B/C/TRABCUQ128E0783E2B.json',
 '/workspace/home/data/song_data/A/B/C/TRABCFL128F149BB0D.json',
 '/workspace/home/data/song_data/A/B/C/TRABCTK128F934B224.json',
 '/workspace/home/data/song_data/A/B/C/TRABCPZ128F4275C32.json',
 '/workspace/home/data/song_data/A/B/C/TRABCKL128F423A778.json',
 '/workspace/home/data/song_data/A/B/C/TRABCIX128F4265903.json',
 '/workspace/home/data/song_data/A/B/C/TRABCAJ12903CDFCC2.json',
 '/workspace/home/data/song_data/A/B/C/TRABCRU128F423F449.json',
 '/workspace/home/data/song_data/A/B/C/TRABCYE128F934CE1D.json',
 '/workspace/home/data/song_data/A/B/C/TRABCXB128F4286BD3.json',
 '/workspace/home/data/song_data/A/B/C/TRABCEI128F424C983.json',
 '/workspace/home/data/song_data/A/B/B/TRABBJE12903CDB442.json',
 '/workspace/home/data/song_data/A/B/B/TRABBAM128F429D223.json',
 '/workspace/home/data/song_data/A/B/B/TRABBLU128F93349CF.json',
 '/workspace/home/data/so

In [6]:
# Print out number of the current .json files inside the song_data folder. (71 records)
# Each of this file contains the infomation of each song.
len(song_files)

71

In [7]:
# Assigns the filepath as the first .json file in the the song_files list.
filepath = song_files[0]
# Print out the file direction path.
filepath

'/workspace/home/data/song_data/A/B/C/TRABCEC128F426456E.json'

In [8]:
# Uses pandas function to read that first json file and assigns the data into df dataframe.
df = pd.read_json(filepath,lines=True)
# Print out the information inside that df.
df.head()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,AR0IAWL1187B9A96D0,8.4177,Panama,-80.11278,Danilo Perez,197.19791,1,SONSKXP12A8C13A2C9,Native Soul,2003


## #1: `songs` Table
#### Extract Data for Songs Table
- Select columns for song ID, title, artist ID, year, and duration
- Use `df.values` to select just the values from the dataframe
- Index to select the first (only) record in the dataframe
- Convert the array to a list and set it to `song_data`

In [9]:
# Uses the pandas fucntion to interact with the dataframe and retrieve only needed data.
# The needed data will insert into the songs table in the postgreSQL database later.
song_data = df[['song_id', 'title', 'artist_id', 'year', 'duration']].values[0].tolist()
# Print out the data to check
song_data


['SONSKXP12A8C13A2C9', 'Native Soul', 'AR0IAWL1187B9A96D0', 2003, 197.19791]

#### Insert Record into Song Table
Implement the `song_table_insert` query in `sql_queries.py` and run the cell below to insert a record for this song into the `songs` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `songs` table in the sparkify database.

In [10]:
# Print out the song_table_insert query in the sql_queries.py to ensure it correct syntax.
print("SQL Query:", song_table_insert)
# Print out the data inside the assigned song_data list to ensure before implement the insert.
print("Data:", song_data)

SQL Query: INSERT INTO songs(song_id, title, artist_id, year, duration) VALUES(%s,%s,%s,%s,%s) ON CONFLICT (song_id) DO NOTHING;
Data: ['SONSKXP12A8C13A2C9', 'Native Soul', 'AR0IAWL1187B9A96D0', 2003, 197.19791]


In [11]:
# Uses the curr.execute() function of the psycopg2 library to implement the inserting to tables inside the database.
cur.execute(song_table_insert, song_data)
# To commit all the change in the databases. 
conn.commit()

Run `test.ipynb` to see if you've successfully added a record to this table.

**The result from the test should show one record in the songs table.**

## #2: `artists` Table
#### Extract Data for Artists Table
- Select columns for artist ID, name, location, latitude, and longitude
- Use `df.values` to select just the values from the dataframe
- Index to select the first (only) record in the dataframe
- Convert the array to a list and set it to `artist_data`

In [12]:
# Retrieves needed data and adds it to the assigned artist_data from the same json file above.
artist_data = df[['artist_id','artist_name','artist_location','artist_latitude','artist_longitude']].values[0].tolist()

#### Insert Record into Artist Table
Implement the `artist_table_insert` query in `sql_queries.py` and run the cell below to insert a record for this song's artist into the `artists` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `artists` table in the sparkify database.

In [13]:
# Print out the art_table_insert query in the sql_queries.py to ensure it correct syntax.
print("SQL Query:", artist_table_insert)
# Print out the data inside the assigned artist_data list to ensure before implement the insert.
print("Data:", artist_data)

SQL Query: INSERT INTO artists(artist_id, name, location, latitude, longitude) VALUES(%s,%s,%s,%s,%s) ON CONFLICT (artist_id) DO NOTHING;
Data: ['AR0IAWL1187B9A96D0', 'Danilo Perez', 'Panama', 8.4177, -80.11278]


In [14]:
# Uses the curr.execute() function of the psycopg2 library to implement the inserting to tables inside the database.
cur.execute(artist_table_insert, artist_data)
# To commit all the change in the databases. 
conn.commit()

Run `test.ipynb` to see if you've successfully added a record to this table.

**(The table artists in the database should show one record for now).**

# Process `log_data`
In this part, you'll perform ETL on the second dataset, `log_data`, to create the `time` and `users` dimensional tables, as well as the `songplays` fact table.

Let's perform ETL on a single log file and load a single record into each table.
- Use the `get_files` function provided above to get a list of all log JSON files in `data/log_data`
- Select the first log file in this list
- Read the log file and view the data

In [15]:
# Implements the get_files() function to retrieve all .json files in the log_data folder.
# Assigns the log_files as a list contain those files.
log_files = get_files('data/log_data')
# Print oyt the list of retrieved json files to check
log_files

['/workspace/home/data/log_data/2018/11/2018-11-28-events.json',
 '/workspace/home/data/log_data/2018/11/2018-11-16-events.json',
 '/workspace/home/data/log_data/2018/11/2018-11-05-events.json',
 '/workspace/home/data/log_data/2018/11/2018-11-06-events.json',
 '/workspace/home/data/log_data/2018/11/2018-11-14-events.json',
 '/workspace/home/data/log_data/2018/11/2018-11-24-events.json',
 '/workspace/home/data/log_data/2018/11/2018-11-13-events.json',
 '/workspace/home/data/log_data/2018/11/2018-11-19-events.json',
 '/workspace/home/data/log_data/2018/11/2018-11-25-events.json',
 '/workspace/home/data/log_data/2018/11/2018-11-07-events.json',
 '/workspace/home/data/log_data/2018/11/2018-11-21-events.json',
 '/workspace/home/data/log_data/2018/11/2018-11-11-events.json',
 '/workspace/home/data/log_data/2018/11/2018-11-20-events.json',
 '/workspace/home/data/log_data/2018/11/2018-11-27-events.json',
 '/workspace/home/data/log_data/2018/11/2018-11-22-events.json',
 '/workspace/home/data/lo

In [16]:
# Print out number of the current .json files inside the log_data folder. (30 records)
# Each of this file contains the all the interaction as on the page on each day in November of 2018.
len(log_files)

30

In [17]:
# Assigns the filepath as the first .json file in the the log_files list.
filepath = log_files[0]
# Print out the first file direction path.
filepath

'/workspace/home/data/log_data/2018/11/2018-11-28-events.json'

In [18]:
# Uses pandas function to read that first json file and assigns the data into df dataframe.
df = pd.read_json(filepath,lines=True)
# Print out the information inside that df.
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Mitch Ryder & The Detroit Wheels,Logged In,Tegan,F,65,Levine,205.03465,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,992,Jenny Take A Ride (LP Version),200,1543363215796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
1,The Spill Canvas,Logged In,Tegan,F,66,Levine,358.03383,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,992,The TIde (LP Version),200,1543363420796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
2,Mogwai,Logged In,Tegan,F,67,Levine,571.19302,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,992,Two Rights Make One Wrong,200,1543363778796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
3,Spor,Logged In,Tegan,F,68,Levine,380.3424,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,992,Way Of The Samurai,200,1543364349796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
4,DJ Dizzy,Logged In,Tegan,F,69,Levine,221.1522,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,992,Sexy Bitch,200,1543364729796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80


## #3: `time` Table
#### Extract Data for Time Table
- Filter records by `NextSong` action
- Convert the `ts` timestamp column to datetime
  - Hint: the current timestamp is in milliseconds
- Extract the timestamp, hour, day, week of year, month, year, and weekday from the `ts` column and set `time_data` to a list containing these values in order
  - Hint: use pandas' [`dt` attribute](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.dt.html) to access easily datetimelike properties.
- Specify labels for these columns and set to `column_labels`
- Create a dataframe, `time_df,` containing the time data for this file by combining `column_labels` and `time_data` into a dictionary and converting this into a dataframe

In [19]:
# Filtering out to retrive all the values when the 'page' column value == 'NextSong' only
# Re-assigns those data to new dataframe as df
df = df[df['page'] == 'NextSong']
# Print to see how many records in the current json file and the datatype of each column.
df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 363 entries, 0 to 432
Data columns (total 18 columns):
artist           363 non-null object
auth             363 non-null object
firstName        363 non-null object
gender           363 non-null object
itemInSession    363 non-null int64
lastName         363 non-null object
length           363 non-null float64
level            363 non-null object
location         363 non-null object
method           363 non-null object
page             363 non-null object
registration     363 non-null float64
sessionId        363 non-null int64
song             363 non-null object
status           363 non-null int64
ts               363 non-null int64
userAgent        363 non-null object
userId           363 non-null object
dtypes: float64(2), int64(4), object(12)
memory usage: 53.9+ KB


In [20]:
# Converts the 'ts' column from integer to datetime datatype.
# t = pd.to_datetime(df['ts'],unit='ms')
df['ts'] = pd.to_datetime(df['ts'], unit='ms')

In [21]:
# Double check to see if successfully convert the datatype for the 'ts' column.
df['ts'].dtypes

dtype('<M8[ns]')

In [22]:
# Programmatically check datatype of the 'ts' column.
assert pd.api.types.is_datetime64_dtype(df['ts'])

In [23]:
# Assigns the time_data as an empty list which will hold the extracted data from the 'ts' column.
time_data = []
# Using python for loop to iterate each row in the current df. 
for index, row in df.iterrows():
    # From each row in the df extract the value data and time in the 'ts' column.
    time_data.append([row['ts'],
                      row['ts'].hour,
                      row['ts'].day,
                      row['ts'].weekofyear,
                      row['ts'].month,
                      row['ts'].year,
                      row['ts'].weekday()])

In [24]:
# Assigns needed data from the 'ts' column to the new list variable column_labels name.
column_labels = ['start_time','hour','day','week_of_year','month','year','weekday']

In [25]:
# Converts the colunn_labels list into the new DataFrame time_df
time_df = pd.DataFrame(time_data, columns=column_labels)
# Print out the first two line from the new dataframe.
time_df.head(2)

Unnamed: 0,start_time,hour,day,week_of_year,month,year,weekday
0,2018-11-28 00:00:15.796,0,28,48,11,2018,2
1,2018-11-28 00:03:40.796,0,28,48,11,2018,2


In [26]:
# Print out umber of records inside the time_df (including the duplicated values.)
time_df.count()

start_time      363
hour            363
day             363
week_of_year    363
month           363
year            363
weekday         363
dtype: int64

In [27]:
#Ensure their is not any issues with duplicated values later.
#duplicated_time = time_df.duplicated('start_time')
#duplicated_time.sum()

#### Insert Records into Time Table
Implement the `time_table_insert` query in `sql_queries.py` and run the cell below to insert records for the timestamps in this log file into the `time` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `time` table in the sparkify database.

In [28]:
# Print out the time_table_insert query in the sql_queries.py to ensure it correct syntax.
print("SQL Query:", time_table_insert)

SQL Query: INSERT INTO time(start_time, hour, day, week, month, year, weekday) VALUES(%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (start_time) DO NOTHING;


In [29]:
# Uses python loop fucntion to iterate through each row of the time_df to retrieve the needed data.
for i, row in time_df.iterrows():
    # Uses each row data to insert into the time table in the database.
    cur.execute(time_table_insert, list(row))
    # To commit all the change in the databases.
    conn.commit()

Run `test.ipynb` to see if you've successfully added records to this table.

**(The result should show around ~360 records in the time table for now).**

## #4: `users` Table
#### Extract Data for Users Table
- Select columns for user ID, first name, last name, gender and level and set to `user_df`

In [30]:
# Retrieves needed data and adds it to the assigned user_df DataFrame.
user_df = df[['userId','firstName','lastName','gender','level']]
# Print out the first three rows in the user_df dataframe
user_df.head(3)

Unnamed: 0,userId,firstName,lastName,gender,level
0,80,Tegan,Levine,F,paid
1,80,Tegan,Levine,F,paid
2,80,Tegan,Levine,F,paid


In [31]:
# Print out the number of row in the user_df (including duplicated values).
user_df.count()

userId       363
firstName    363
lastName     363
gender       363
level        363
dtype: int64

#### Insert Records into Users Table
Implement the `user_table_insert` query in `sql_queries.py` and run the cell below to insert records for the users in this log file into the `users` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `users` table in the sparkify database.

In [32]:
# Print out the user_table_insert query in the sql_queries.py to ensure it correct syntax.
print("SQL Query:", user_table_insert)

SQL Query: INSERT INTO users(user_id,first_name, last_name, gender, level) VALUES(%s,%s,%s,%s,%s) ON CONFLICT (user_id) DO NOTHING;


In [33]:
# Uses the python for loop to iterate each row in the user_df
for i, row in user_df.iterrows():
    # Generates the insert query for the users table with the data from each row.
    cur.execute(user_table_insert, row)
    # To commit all the change in the databases.
    conn.commit()

Run `test.ipynb` to see if you've successfully added records to this table.

**(The users table should shows the list of users information with an uniqued user_id).**

## #5: `songplays` Table
#### Extract Data and Songplays Table
This one is a little more complicated since information from the songs table, artists table, and original log file are all needed for the `songplays` table. Since the log file does not specify an ID for either the song or the artist, you'll need to get the song ID and artist ID by querying the songs and artists tables to find matches based on song title, artist name, and song duration time.
- Implement the `song_select` query in `sql_queries.py` to find the song ID and artist ID based on the title, artist name, and duration of a song.
- Select the timestamp, user ID, level, song ID, artist ID, session ID, location, and user agent and set to `songplay_data`

#### Insert Records into Songplays Table
- Implement the `songplay_table_insert` query and run the cell below to insert records for the songplay actions in this log file into the `songplays` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `songplays` table in the sparkify database.

In [35]:
# Print out the syntax of the song_select from the sql_queries to check
print("SQL Query:", song_select)
# Print out the syntax of the songplay_table_insert from the sql_queries to check
print("SQL INSERT QUERY:", songplay_table_insert)

SQL Query: SELECT songs.song_id, artists.artist_id FROM songs JOIN artists ON songs.artist_id = artists.artist_id WHERE songs.title = %s AND artists.name = %s AND songs.duration = %s
SQL INSERT QUERY: INSERT INTO songplays(start_time, user_id, level, song_id, artist_id, session_id, location, user_agent) VALUES(%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (songplay_id) DO NOTHING;


In [37]:
# The pre-defined data type for the songs.duration column is float,
# Print out the current data type for the length column to ensure the validity for the comparison inside the song_select query.
df.length.dtypes

dtype('float64')

In [38]:
# Uses the python for loop function to iterate each row in the current df dataframe. 
for index, row in df.iterrows():
    # From each row of the df take the value from song, artist, and lenght columns
    # then, pass those values into the song_select query 
    # to retrieve the songs.song_id and artists.artist_id from the songs and artists tables in current database.
    cur.execute(song_select, (row.song, row.artist, row.length))
    # Uses the cur.fetchone() function to assign the above results (song_id and artist_id) to the variable results
    results = cur.fetchone()    
    if results:
        songid, artistid = results
    else:
        songid, artistid = None, None
    # Assigns the songgplay_data as the list contains the extracted values needed for the insertion into the songplay table.
    songplay_data = [
        row['ts'],
        row['userId'],
        row['level'],
        songid,
        artistid,
        row['sessionId'],
        row['location'],
        row['userAgent']]
    # Generates the insert query for the songplay table with the data from each row.
    cur.execute(songplay_table_insert, songplay_data)
    conn.commit()
    

In [37]:
# Using this to re-run the code with issue earlier. 
#conn.rollback()

Run `test.ipynb` to see if you've successfully added records to this table.
**(The songplays tables will shows all events information related to each user account interacting with the company website. 

# Close Connection to Sparkify Database

In [39]:
conn.close()

# Implement `etl.py`
Use what you've completed in this notebook to implement `etl.py`.