# ETL Processes
Use this notebook to develop the ETL process for each of your tables before completing the `etl.py` file to load the whole datasets.

In [None]:
import os
os.chdir('..')
import glob
import psycopg2
from psycopg2 import sql
import pandas as pd
from sql_queries import *

#### Define function to create list of all our raw JSON files

In [None]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

# Process `song_data`
In this first part, you'll perform ETL on the first dataset, `song_data`, to create the `songs` and `artists` dimensional tables.

Let's perform ETL on a single song file and load a single record into each table to start.
- Use the `get_files` function provided above to get a list of all song JSON files in `data/song_data`
- Select the first song in this list
- Read the song file and view the data

In [None]:
filepath = "data/song_data"

In [None]:
song_files = get_files(filepath)
print(song_files[1])
print(len(song_files))
## There are 71 song files

There are 71 song files. Loop through all files and insert into the same dataframe

In [None]:
# df = pd.read_json(song_files[1], lines=True) 

# Update: Read ALL files into the same dataframe
df = pd.DataFrame()
for i in range(0, len(song_files)):
    temp = pd.read_json(song_files[i], lines=True)
    df = df.append(temp)
    
df

## #1: `songs` Table
#### Extract Data for Songs Table
- Select columns for song ID, title, artist ID, year, and duration
- Use `df.values` to select just the values from the dataframe
- Index to select the first (only) record in the dataframe
- Convert the array to a list and set it to `song_data`

In [None]:
# song_data = df[['song_id', 'title', 'artist_id', 'year', 'duration']]  # pandas dataframe
# song_data = song_data.values   # values only (is now a numpy array)
# song_data = song_data[0, :]  # select first record of song_data (not sure why this is necessary)
# song_data = song_data.tolist() # Convert array to list

# Create song DF from ALL song files
song_df = df[['song_id', 'title', 'artist_id', 'year', 'duration']]

# Sort by song_id
song_df = song_df.sort_values('title')

# Save as CSV so we can use COPY for better data loading performance
song_df.to_csv("data/song_df.csv", index=False)

song_df

#### Reset tables and connection before inserting records

In [None]:
!python create_tables.py
conn = psycopg2.connect(dbstring)
cur = conn.cursor()

### Insert Record into Song Table
Implement the `song_table_insert` query in `sql_queries.py` and run the cell below to insert a record for this song into the `songs` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `songs` table in the sparkify database.

In [None]:
# cur.execute(song_table_insert, song_data)
# conn.commit()

# Updated query using COPY
song_path = datapath+'/song_df.csv'
cur.execute(song_table_insert, (song_path,))
conn.commit()

Run `test.ipynb` to see if you've successfully added a record to this table.  

Finally, close the db connection, or else `create_tables.py` cannot run

In [None]:
cur.close()
conn.close()

## #2: `artists` Table
#### Extract Data for Artists Table
- Select columns for artist ID, name, location, latitude, and longitude
- Use `df.values` to select just the values from the dataframe
- Index to select the first (only) record in the dataframe
- Convert the array to a list and set it to `artist_data`

In [None]:
# artist_data = df[['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']]  # pandas df
# artist_data = artist_data.values   # values only (is now a numpy array)
# artist_data = artist_data[0, :]  # select first record of song_data (not sure why this is necessary)
# artist_data = artist_data.tolist() # Convert array to list

# Create artist DF from ALL song files
artist_df = df[['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']]

# Sort by artist_id
artist_df = artist_df.sort_values('artist_name')

# Save as CSV so we can use COPY for better data loading performance
artist_df.to_csv("data/artist_df.csv", index=False)

print("Total unique artists in song files:", len(artist_df['artist_name'].unique()))
artist_df

# There are 69 unique artists in the 71 song files.
# The artists Casual and Clp have 2 songs each

#### Reset tables and connection before inserting records

In [None]:
!python create_tables.py
conn = psycopg2.connect(dbstring)
cur = conn.cursor()

#### Insert Record into Artist Table
Implement the `artist_table_insert` query in `sql_queries.py` and run the cell below to insert a record for this song's artist into the `artists` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `artists` table in the sparkify database.

In [None]:
#cur.execute(artist_table_insert, artist_data)
#conn.commit()

# Updated query using COPY
artist_path = datapath+'/artist_df.csv'
cur.execute(artist_table_insert, (artist_path,))
conn.commit()

Run `test.ipynb` to see if you've successfully added a record to this table.

Finally, close the db connection, or else `create_tables.py` cannot run

In [None]:
cur.close()
conn.close()

# Process `log_data`
In this part, you'll perform ETL on the second dataset, `log_data`, to create the `time` and `users` dimensional tables, as well as the `songplays` fact table.

Let's perform ETL on a single log file and load a single record into each table.
- Use the `get_files` function provided above to get a list of all log JSON files in `data/log_data`
- Select the first log file in this list
- Read the log file and view the data

In [None]:
filepath = "data/log_data"

In [None]:
log_files = get_files(filepath)
print(log_files[0])
print(len(log_files))
## There are 30 log files

There are 30 log files. Loop through all files and insert into the same dataframe

In [None]:
#df = pd.read_json(log_files[1], lines=True) 

# Update: Read ALL files into the same dataframe
df = pd.DataFrame()
for i in range(0, len(log_files)):
    temp = pd.read_json(log_files[i], lines=True)
    df = df.append(temp)
    
print('Total number of events:', len(df))
df.head(5)
# 8056 total log events

## #3: `time` Table
#### Extract Data for Time Table
- Filter records by `NextSong` action
- Convert the `ts` timestamp column to datetime
  - Hint: the current timestamp is in milliseconds
- Extract the timestamp, hour, day, week of year, month, year, and weekday from the `ts` column and set `time_data` to a list containing these values in order
  - Hint: use pandas' [`dt` attribute](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.dt.html) to access easily datetimelike properties.
- Specify labels for these columns and set to `column_labels`
- Create a dataframe, `time_df,` containing the time data for this file by combining `column_labels` and `time_data` into a dictionary and converting this into a dataframe

In [None]:
df = df[df['page'].str.contains("NextSong")] # Filter to only NextSong actions
t = pd.to_datetime(df.ts, unit='ms') # convert ts column to datetime
df['ts'] = t

# Sort log data df by start_time
df = df.sort_values('ts')

# Save as CSV for temporary QC purposes
df.to_csv("data/log_df.csv", index=True)

print('Total number of song events:', len(df))
df.head(5)
# 6820 log events with NextSong (actual song events)

In [None]:
t = pd.to_datetime(df.ts, unit='ms') # convert ts column to datetime
t

In [None]:
time_data = list(zip(t, t.dt.hour, t.dt.day, t.dt.week, t.dt.month, t.dt.year, t.dt.weekday))
column_labels = ("start_time", "hour", "day", "week", "month", "year", "weekday")
#print(time_data)

In [None]:
time_df = pd.DataFrame(time_data, columns = column_labels, dtype = int)

# Sort by start_time
time_df = time_df.sort_values('start_time')

# Save as CSV so we can use COPY for better data loading performance
time_df.to_csv("data/time_df.csv", index=False)

print(time_df.dtypes)
print('Total unique start times:',len(time_df['start_time'].unique()))
time_df

# NOTE: There are 6820 total song events but 6813 with unique start times.
# Therefore, there are 7 duplicate start times.
# The final time table in PostgreSQL will have the 6813 unique times

#### Reset tables and connection before inserting records

In [None]:
!python create_tables.py
conn = psycopg2.connect(dbstring)
cur = conn.cursor()

#### Insert Records into Time Table
Implement the `time_table_insert` query in `sql_queries.py` and run the cell below to insert records for the timestamps in this log file into the `time` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `time` table in the sparkify database.

In [None]:
# for i, row in time_df.iterrows():
#     cur.execute(time_table_insert, list(row))
#     conn.commit()
    
# Updated query using COPY
time_path = datapath+'/time_df.csv'
cur.execute(time_table_insert, (time_path,))
conn.commit()

Run `test.ipynb` to see if you've successfully added records to this table.

Finally, close the db connection, or else `create_tables.py` cannot run

In [None]:
cur.close()
conn.close()

## #4: `users` Table
#### Extract Data for Users Table
- Select columns for user ID, first name, last name, gender and level and set to `user_df`

In [None]:
user_df = df[['userId', 'firstName', 'lastName', 'gender', 'level']]  # pandas df
user_df = user_df.astype({"userId": int})

# Sort by user_id
user_df = user_df.sort_values("userId")

# Save as CSV so we can use COPY for better data loading performance
user_df.to_csv("data/user_df.csv", index=False)

print(type(user_df))
print(user_df.dtypes)
print('Total unique users:',len(user_df['userId'].unique()))
user_df

# There are 96 unique users that comprise all 6813 song events

#### Reset tables and connection before inserting records

In [None]:
!python create_tables.py
conn = psycopg2.connect(dbstring)
cur = conn.cursor()

#### Insert Records into Users Table
Implement the `user_table_insert` query in `sql_queries.py` and run the cell below to insert records for the users in this log file into the `users` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `users` table in the sparkify database.

In [None]:
# for i, row in user_df.iterrows():
#     cur.execute(user_table_insert, row)
#     conn.commit()

# Updated query using COPY
user_path = datapath+'/user_df.csv'
cur.execute(user_table_insert, (user_path,))
conn.commit()

Run `test.ipynb` to see if you've successfully added records to this table.

Finally, close the db connection, or else `create_tables.py` cannot run

In [None]:
cur.close()
conn.close()

## #5: `songplays` Table
#### Reset tables and connection before inserting records

In [None]:
!python create_tables.py
conn = psycopg2.connect(dbstring)
cur = conn.cursor()

In [None]:
# Repopulate the other 4 tables before creating the songplays table
cur.execute(song_table_insert, (song_path,))
conn.commit()
cur.execute(artist_table_insert, (artist_path,))
conn.commit()
cur.execute(time_table_insert, (time_path,))
conn.commit()
cur.execute(user_table_insert, (user_path,))
conn.commit()

#### Extract Data and Songplays Table
This one is a little more complicated since information from the songs table, artists table, and original log file are all needed for the `songplays` table. Since the log file does not specify an ID for either the song or the artist, you'll need to get the song ID and artist ID by querying the songs and artists tables to find matches based on song title, artist name, and song duration time.
- Implement the `song_select` query in `sql_queries.py` to find the song ID and artist ID based on the title, artist name, and duration of a song.
- Select the timestamp, user ID, level, song ID, artist ID, session ID, location, and user agent and set to `songplay_data`

#### Insert Records into Songplays Table
- Implement the `songplay_table_insert` query and run the cell below to insert records for the songplay actions in this log file into the `songplays` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `songplays` table in the sparkify database.

In [None]:
# Loop through each row of the log files
# NOTE: use enumerate as a loop counter for songplay_id
# Index is turned off so we get the row number, not the df index
for idx, row in enumerate(df.itertuples(index=False), start=1):

    # get songid and artistid from song and artist tables
    cur.execute(song_select, (row.song, row.artist, row.length))
    results = cur.fetchone()
    
    if results:
        songid, artistid = results
    else:
        songid, artistid = None, None

    # insert songplay record
    songplay_data = (idx, row.ts, row.userId, row.level, songid, artistid, \
                                 row.sessionId, row.location, row.userAgent)   
    cur.execute(songplay_table_insert, songplay_data)    
    conn.commit()

In [None]:
# Export songplays table as CSV for QC (not included for final Python file)
csv_str = datapath+'/songplay_df.csv'
cur.execute("COPY songplays TO %s DELIMITER ',' CSV HEADER;", (csv_str,))

In [None]:
print(idx)
print(type(songplay_data))
print(songplay_data)

Run `test.ipynb` to see if you've successfully added records to this table.

# Close Connection to Sparkify Database

In [None]:
cur.close()
conn.close()

# Implement `etl.py`
### Test section to figure out how to implement `etl.py`.

In [None]:
import os
import glob
import psycopg2
from psycopg2 import sql
import pandas as pd
# Disable pandas SettingWithCopyWarning 
pd.options.mode.chained_assignment = None  # default='warn'

from sql_queries import *

In [None]:
def process_json_file(cur, filepath):
    # read json file
    df_temp = pd.read_json(filepath, lines=True)
    return df_temp
    
def insert_song_data(cur, conn, df, csvpath):
    # Create song DF from ALL song files
    song_df = df[['song_id', 'title', 'artist_id', 'year', 'duration']]
    # Sort by song title
    song_df = song_df.sort_values('title')

    # Create artist DF from ALL song files
    artist_df = df[['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']]
    # Sort by artist_name
    artist_df = artist_df.sort_values('artist_name')
    
    # Create CSVs so we can use COPY for better data loading performance    
    song_df.to_csv((csvpath+'/song_df.csv'), index=False)
    artist_df.to_csv((csvpath+'/artist_df.csv'), index=False)
    
    # Insert song data using COPY                            
    copy_path = datapath+'/csv_files/song_df.csv'
    cur.execute(song_table_insert, (copy_path,))
    conn.commit()

    # Insert artist data using COPY
    copy_path = datapath+'/csv_files/artist_df.csv'
    cur.execute(artist_table_insert, (copy_path,))
    conn.commit()
        
def insert_log_data(cur, conn, df, csvpath):
    # filter by NextSong action
    df = df[df['page'].str.contains("NextSong")]
    # convert timestamp column to datetime
    t = pd.to_datetime(df.ts, unit='ms') 
    df['ts'] = t
    
    # Create time DF from ALL log files
    time_data = list(zip(t, t.dt.hour, t.dt.day, t.dt.week, t.dt.month, t.dt.year, t.dt.weekday))
    column_labels = ("start_time", "hour", "day", "week", "month", "year", "weekday")
    time_df = pd.DataFrame(time_data, columns = column_labels, dtype = int)
    # Sort by start_time
    time_df = time_df.sort_values('start_time')

    # Create user DF from ALL log files
    user_df = df[['userId', 'firstName', 'lastName', 'gender', 'level']]
    user_df = user_df.astype({"userId": int})
    # Sort by user_id
    user_df = user_df.sort_values("userId")
    
    # Create CSVs so we can use COPY for better data loading performance
    time_df.to_csv((csvpath+'/time_df.csv'), index=False)
    user_df.to_csv((csvpath+'/user_df.csv'), index=False)

    # Insert time data using COPY                            
    copy_path = datapath+'/csv_files/time_df.csv'
    cur.execute(time_table_insert, (copy_path,))
    conn.commit() 

    # Insert user data using COPY                            
    copy_path = datapath+'/csv_files/user_df.csv'
    cur.execute(user_table_insert, (copy_path,))
    conn.commit() 
        
    # INSERT SONGPLAY DATA: Loop through each row of the log files
    # NOTE: use enumerate as a loop counter for songplay_id
    # Index is False so we get the row number, not the df index
    for idx, row in enumerate(df.itertuples(index=False), start=1):

        # get songid and artistid from song and artist tables
        cur.execute(song_select, (row.song, row.artist, row.length))
        results = cur.fetchone()

        if results:
            songid, artistid = results
        else:
            songid, artistid = None, None

        # insert songplay record
        songplay_data = (idx, row.ts, row.userId, row.level, songid, artistid, \
                                     row.sessionId, row.location, row.userAgent)   
        cur.execute(songplay_table_insert, songplay_data)    
        conn.commit()
        
def fill_songplay_data(cur, conn, df, csvpath):
    # Create a filled songplay table in Postgres with complete artist and song information
    # Use artist_name and song_name from the log data instead of the ids from the song data
    # This is because there is only ONE row in the songplays table with NON NULL song_id and artist ids   
    df = df[df['page'].str.contains("NextSong")] # Filter to only NextSong actions
    t = pd.to_datetime(df.ts, unit='ms') # convert ts column to datetime
    df['ts'] = t
    # Sort by start_time
    df = df.sort_values('ts')   
    # Select columns (using artist NAME and song NAME instead of ids)
    songplay_df = df[['ts', 'userId', 'level', 'song', 'artist', 'sessionId', 'location', 'userAgent']]
    # Insert songplay_id column (each row is a different songplay_id)
    songplay_df['songplay_id'] = songplay_df.reset_index().index
    songplay_df = songplay_df[['songplay_id', 'ts', 'userId', 'level', 'song', 'artist', 'sessionId', 'location', \
                               'userAgent']]
    # Create CSV so we can use COPY for better data loading performance
    songplay_df.to_csv(csvpath+'/songplay_df.csv', index=False)
    # Insert data using COPY                            
    copy_path = datapath+'/csv_files/songplay_df.csv'
    cur.execute(songplay_table_insert_2, (copy_path,))
    conn.commit() 
    return songplay_df
        
def quality_check_data(cur, conn, df, idcol, tablename, reset_query):
    # Ensure the number of table rows in Postgres equals the number of unique ids 
    # from the json data. Reset the tables in Postgres if the check fails
    table_rows = cur.execute(sql.SQL('SELECT COUNT(*) FROM {}').format(sql.Identifier(tablename)))
    table_rows = cur.fetchone()
    print('{} total rows in {} table in PostGres.'.format(table_rows[0], tablename))
    if tablename == "songplays":
        num_ids = len(df) # each row is a different songplay_id in the songplays table
    else:
        num_ids = len(df[idcol].unique())
    print('{} total unique {}s from the json files'.format(num_ids, idcol))
    if table_rows[0] != num_ids:
        cur.execute(sql.SQL('DROP TABLE IF EXISTS {}').format(sql.Identifier(tablename)))
        cur.execute(reset_query)
        conn.commit()
        raise ValueError("{} table: Table length and unique ids do not match. Resetting table in PostGres" \
        .format(tablename))
    else:
        print("Check passed!")
        
def process_data(cur, conn, filepath, func):
    # get all files matching extension from directory
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))

    # get total number of files found
    num_files = len(all_files)
    print('{} files found in {}'.format(num_files, filepath))

    # iterate over files and process
    df = pd.DataFrame()
    for i, datafile in enumerate(all_files, 1):
        df_temp = func(cur, datafile)
        df = df.append(df_temp)
        #conn.commit()
        
    print('{}/{} total files processed.'.format(i, num_files))
    # Return complete df with data from all files    
    return df

def main():
    conn = psycopg2.connect(dbstring) # dbstring defined at top of sql_queries
    cur = conn.cursor()

    process_data(cur, conn, filepath='data/song_data', func=process_json_file)
    process_data(cur, conn, filepath='data/log_data', func=process_json_file)

    conn.close()

In [None]:
!python create_tables.py
conn = psycopg2.connect(dbstring)
cur = conn.cursor()

In [None]:
# Create df of all song data
df = process_data(cur, conn, filepath='data/song_data', func=process_json_file)
# Create song and artist dfs and csv files, then copy the data into the Postgres tables
insert_song_data(cur, conn, df, csvpath='data/csv_files')

# Quality check song and artist tables: Ensure the number of table rows in Postgres
# equals the number of unique song and artist ids from the song data
quality_check_data(cur, conn, df, idcol="song_id", tablename="songs", reset_query=song_table_create)
quality_check_data(cur, conn, df, idcol="artist_id", tablename="artists", reset_query=artist_table_create)

In [None]:
# Create df of all the log data
df = process_data(cur, conn, filepath='data/log_data', func=process_json_file)
# Create time and user dfs and csv files, then copy into Postgres tables
# Then loop through all NextSong events and insert records into the songplay table
insert_log_data(cur, conn, df, csvpath='data/csv_files')

In [None]:
# Quality check time, user, and songplay tables: Ensure the number of table rows in Postgres
# equals the number of unique ids from the log data
df = df[df['page'].str.contains("NextSong")] # Filter to only NextSong actions
df = df.astype({"userId": int}) # Set userId to all integers
quality_check_data(cur, conn, df, idcol="ts", tablename = "time", reset_query=time_table_create)
quality_check_data(cur, conn, df, idcol="userId", tablename = "users", reset_query=user_table_create)
quality_check_data(cur, conn, df, idcol="songplay_id", tablename = "songplays", reset_query=songplay_table_create)

In [None]:
# Create a filled songplay table in Postgres with complete artist and song information
# Use artist_name and song_name from the log data instead of the ids from the song data
# This is because there is only ONE row in the songplays table with NON NULL song_id and artist ids   
songplay_df = fill_songplay_data(cur, conn, df, csvpath='data/csv_files')
# Quality check filled songplays table: Ensure the number of table rows in Postgres
# equals the number of unique songplay ids from the log data
quality_check_data(cur, conn, songplay_df, idcol="songplay_id", tablename = "songplays_fill", \
                   reset_query=songplay_table_create_2)

In [None]:
cur.close()
conn.close()