# ETL Processes

This notebook contains the elements to construct ETL process for each of the tables in etl.py file to load the dataset.

In [1]:
import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *

In [15]:
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=student password=student")
cur = conn.cursor()

OperationalError: connection to server at "127.0.0.1", port 5432 failed: FATAL:  password authentication failed for user "student"


In [3]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root, '*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))

    return all_files

# Process song_data

Performing ETL on song_data to create song and artist dimension tables.

In [4]:
song_files = get_files('data/song_data/')

In [5]:
filepath = song_files[0]
print(filepath)

d:\Data-Engineering-Nanodegree\Data Modeling with Postgres\data\song_data\A\A\A\TRAAAAW128F429D538.json


In [6]:
df = pd.read_json(filepath, lines=True)
df.head()

Unnamed: 0,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,1,ARD7TVE1187B99BFB1,,,California - LA,Casual,SOMZWCG12A8C13C480,I Didn't Mean To,218.93179,0


# #1: songs table

## Extract Data for Songs Table
* Select columns for song_id, title, artist_id, year, and duration
* Use df.values to select just the values from the dataframe
* Index to select the first (only) record in the dataframe
* Convert the array to a list and set it to song_data   

In [7]:
num_songs, artist_id, artist_latitude, artist_longitude, artist_location, artist_name, song_id, title, duration, year = df.values[0]

In [8]:
song_data = [ song_id, title, artist_id, year, duration]

song_data

['SOMZWCG12A8C13C480', "I Didn't Mean To", 'ARD7TVE1187B99BFB1', 0, 218.93179]

<b>Insert record into Song Table</b>

Implement the song_table_insert query in sql_queries.py and run the cell below to insert a record for this song into the songs table. Remember to run create_table.py before running the cell below to ensure you have created/resetted the songs table in the sparkify database.

In [9]:
cur.execute(song_table_insert, song_data)
conn.commit()

NameError: name 'cur' is not defined

Run test.ipynb to see if you have successfully added a record to this table.

# #2: artists Table

<b>Extract Data for Artists Table</b>
* Select columns for artist ID, name, location, latitude, and longitude
* Use df.values to select just the values from the dataframe
* Index to select the first (only) record in the dataframe
* Convert the array to a list and set it to artist_data

In [None]:
artist_data = [artist_id, artist_latitude, artist_longitude, artist_location, artist_name]
artist_data

<b> Insert Record into Artist Table</b>

Implement the artist_table_insert query in sql_queries.py and run the cell below to insert a record for this song's artist into the artists table. Remember to run create_tables.py before running the cell below to ensure you have created resetted the artists table in sparkify database.

In [1]:
cur.execute(artist_table_insert, artist_data)
conn.commit()

NameError: name 'cur' is not defined

# Process log_data

In this part, you will perform ETL on the second dataset, log_data, to create the time and users dimensional tables, as well as the songplays fact table.


In [2]:
log_files = get_files('data/log_data/')

NameError: name 'get_files' is not defined

In [3]:
filepath = log_files[0]
print(filepath)

NameError: name 'log_files' is not defined

In [4]:
df = pd.read_json(filepath, lines=True)
print(df.size)
df.head()

NameError: name 'pd' is not defined

# #3: time Table

<b>Extract Data for Time Table</b>

* Filter records by NextSong action
* Convert the ts timestamp column to datetime
Hint: the current timestamp column to datetime
* Extract the timestamp, hour, day, week of year, month, year, and weekday from the ts column and set time_data to a list containing these values in order.
* Specify labels for these columns and set to column_labels
* Create a dataframe, time_df, containing the time data for this file by combining column_labels and time_data into a dictionary and converting this into a dataframe.

In [None]:
df = df[df['page']=='NextSong']
print(df.size)
df.head()

In [5]:
t = pd.to_datetime(df['ts'], unit='ms')
t.head()
                   

NameError: name 'pd' is not defined

In [None]:
#Extract the timestamp, hour, day, week of year, month, year, and weekday from the ts column and set time_data to a list containing these values in order
time_data = []
for line in t:
    time_data.append([line, line.hour, line.day, line.week, line.month, line.year, line.day_name()])
column_labels = ('start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday')

In [None]:
time_data[1]

In [None]:
# Create a dataframe, time_df, containing the time data for this file by combining column_labels and time_data into a dictionary and converting this into a dataframe
time_df = pd.DataFrame.from_records(time_data, columns=column_labels)
time_df.head()

<b> Insert Records into Time Table </b>
Implement the time_table_insert query in sql_queries.py and run the cell below to insert records for the timestamps in this log file into the time table. Remember to run create_table.py before running the cell below to ensure you have created/resetted the time table in the sparkify database.

In [6]:
for i, row in time_df.iterrows():
    cur.execute(time_table_insert, list(row))
    conn.commit()

NameError: name 'time_df' is not defined

# #4: users Table

<b> Extract Data for Users Table</b>
* Select columns for user ID, first name, last name, gender and level and set to user_df

In [None]:
user_df = df[['userId', 'firstName', 'lastName', 'gender', 'level']]
user_df.head()

<b>Insert Records into Users Table</b>

Implement the user_table_insert query in sql_queries.py and run the cell below to insert records for the users in this log file into the users table. Remember to run create_tables.py before running the cell below to ensure you have created / resetted the users table in the sparkify database.

In [None]:
for i, row in user_df.iterrows():
    cur.execute(user_table_insert, row)
    conn.commit()

# #5: songplays Table

<b>Extract Data and Songplays Table</b>
This one is a little more complicated since information from the songs table, artists table, and original log file are all needed for the songplays table. Since the log file does not specify an ID for either the song or the artist, you will need to get the song ID and artist ID by querying the songs and artists tables to find matches based on song title, artist name, and song duration time.

In [None]:
for index, row in df.iterrows():

    cur.execute(song_select, (row.song, row.artist, row.length))
    results = cur.fetchone()

    if results:
        songid, artistid = results
    else:
        songid, artistid = None, None

    # insert songplay records
    # (songplay_id int, start_time int, user_id int, level text, song_id text, artist_id text, session_id int, 
    # location text, user_agent text)

    songplay_data = (index, pd.to_datetime(row.ts, unit='ms'), int(row.userId), row.level, songid, artistid, row.sessionId, row.location, row.userAgent)
    cur.execute(songplay_table_insert, songplay_data)
    conn.commit()

# Close Connection to Sparkify Database

In [None]:
conn.close()

# Implement etl.py

Use what you have completed in this notebook to implement etl.py