# ETL processes

In [None]:
import os
import glob
import psycopg2
import pandas as pd

In [None]:
from sql import create_table_queries

In [None]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

## Setup DB connection and tables

In [None]:
db_config = "host=postgres-db dbname=udacity user=udacity password=udacity"

In [None]:
conn = psycopg2.connect(db_config)
cur = conn.cursor()

In [None]:
for query in create_table_queries:
    cur.execute(query)
conn.commit()

In [None]:
cur.execute("""SELECT table_name FROM information_schema.tables
       WHERE table_schema = 'public'""")
for table in cur.fetchall():
    print(table)

## Process `song_data`

First we set a list of all song data filepaths, and inspect one of the json file.

In [None]:
song_files = get_files('./data/song_data')

In [None]:
sample_file_path = song_files[0]
df = pd.DataFrame(pd.read_json(sample_file_path, typ='series', convert_dates=False))
df.values

Then we create a combined dataframe with all the log data

In [None]:
dfs = []
for file in song_files:
    dfs.append(pd.DataFrame([pd.read_json(file, typ='series', convert_dates=False)]))

Then we set artist_id as index and deduplicate, create a list of tuples to be inserted

In [None]:
artist_data_list = []
result = pd.concat(dfs)
result = result.reset_index(drop=True)
result = result.set_index('artist_id',  drop=False)
result = result[~result.index.duplicated(keep='first')]
for value in result.values:
        num_songs, artist_id, artist_latitude, artist_longitude, artist_location, artist_name, song_id, title, duration, year = value
        artist_data = (artist_id, artist_name, artist_location, artist_latitude, artist_longitude)
        artist_data_list.append(artist_data)

In [None]:
conn = psycopg2.connect(db_config)
cur = conn.cursor()
args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s)", x).decode("utf-8") for x in artist_data_list)
cur.execute("INSERT INTO artists VALUES " + args_str)
conn.commit()

In [None]:
conn = psycopg2.connect(db_config)
cur = conn.cursor()
cur.execute("SELECT * FROM artists")
artist_records = cur.fetchall()
artist_records

Similarly, we get all the song data. But this time we don't need to deduplicate.

In [None]:
song_data_list = []
result = pd.concat(dfs)
for value in result.values:
        num_songs, artist_id, artist_latitude, artist_longitude, artist_location, artist_name, song_id, title, duration, year = value
        song_data = (song_id, title, artist_id, year, duration)
        song_data_list.append(song_data)

In [None]:
conn = psycopg2.connect(db_config)
cur = conn.cursor()
args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s)", x).decode("utf-8") for x in song_data_list)
cur.execute("INSERT INTO songs VALUES " + args_str)
conn.commit()

In [None]:
conn = psycopg2.connect(db_config)
cur = conn.cursor()
cur.execute("SELECT * FROM songs")
song_records = cur.fetchall()
song_records

## Process log data

In this part, you'll perform ETL on the second dataset, log_data, to create the time and users dimensional tables, as well as the songplays fact table.

Let's perform ETL on a single log file and load a single record into each table.

Use the get_files function provided above to get a list of all log JSON files in data/log_data
Select the first log file in this list
Read the log file and view the data

In [None]:
log_files = get_files('./data/log_data')
sample_file_path = log_files[0]

df = pd.DataFrame(pd.read_json(sample_file_path, typ='frame', convert_dates=False, lines=True))
df.head()

### Time Table

Extract Data for Time Table

Filter records by NextSong action

Convert the ts timestamp column to datetime

Hint: the current timestamp is in milliseconds

Extract the timestamp, hour, day, week of year, month, year, and weekday from the ts column and set time_data to a list containing these values in order

Hint: use pandas' dt attribute to access easily datetimelike properties.

Specify labels for these columns and set to column_labels

Create a dataframe, time_df, containing the time data for this file by combining column_labels and time_data into a dictionary and converting this into a dataframe

In [None]:
log_dfs = []
for file in log_files:
    log_dfs.append(pd.DataFrame(pd.read_json(file, typ='frame', convert_dates=False, lines=True)))
log_result = pd.concat(log_dfs)
log_result.head()

In [None]:
required_page = "NextSong"
next_song_df = log_result.loc[log_result['page'] == required_page]
next_song_df.head()

In [None]:
next_song_df_w_datetime = next_song_df.copy()
next_song_df_w_datetime.loc[:, 'datetime'] = pd.to_datetime(next_song_df['ts'], unit='ms')
next_song_df_w_datetime.head()

In [None]:
ts_df = next_song_df_w_datetime[['datetime']].copy()
ts_df.loc[:,'hour'] = ts_df['datetime'].dt.hour
ts_df.loc[:,'day'] = ts_df['datetime'].dt.day
ts_df.loc[:,'week'] = ts_df['datetime'].dt.isocalendar().week
ts_df.loc[:,'month'] = ts_df['datetime'].dt.month
ts_df.loc[:,'year'] = ts_df['datetime'].dt.year
ts_df.loc[:,'weekday'] = ts_df['datetime'].dt.weekday
ts_df.head()

In [None]:
time_table_insert = 'INSERT INTO time VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (start_time) DO NOTHING'
conn = psycopg2.connect(db_config)
cur = conn.cursor()
for i, row in ts_df.iterrows():
    cur.execute(time_table_insert, list(row))
conn.commit()

### users Table
Extract Data for Users Table

1.Select columns for user ID, first name, last name, gender and level and set to user_df

In [None]:
user_df = log_result[['userId','firstName','lastName','gender','level']].copy()
user_df = user_df.loc[user_df['userId'] != '']
user_df.head()

In [None]:
user_table_insert = '''INSERT INTO users (user_id, first_name, last_name, gender, level) VALUES (%s, %s, %s, %s, %s) 
                        ON CONFLICT (user_id) DO UPDATE SET 
                        level = EXCLUDED.level'''
conn = psycopg2.connect(db_config)
cur = conn.cursor()
for i, row in user_df.iterrows():
    cur.execute(user_table_insert, list(row))
conn.commit()

### songplays table

Extract Data and Songplays Table

This one is a little more complicated since information from the songs table, artists table, and original log file are all needed for the songplays table. Since the log file does not specify an ID for either the song or the artist, you'll need to get the song ID and artist ID by querying the songs and artists tables to find matches based on song title, artist name, and song duration time.

Implement the song_select query in sql_queries.py to find the song ID and artist ID based on the title, artist name, and duration of a song.

Select the timestamp, user ID, level, song ID, artist ID, session ID, location, and user agent and set to songplay_data

Insert Records into Songplays Table

Implement the songplay_table_insert query and run the cell below to insert records for the songplay actions in this log file into the songplays table. Remember to run create_tables.py before running the cell below to ensure you've created/resetted the songplays table in the sparkify database.

In [None]:
song_select = '''SELECT song_id, artists.artist_id
    FROM songs JOIN artists ON songs.artist_id = artists.artist_id
    WHERE songs.title = %s
    AND artists.name = %s
    AND songs.duration = %s
'''
songplay_table_insert = '''
INSERT INTO songplays VALUES (DEFAULT, %s, %s, %s, %s, %s, %s, %s, %s )
'''
conn = psycopg2.connect(db_config)
cur = conn.cursor()
for index, row in next_song_df_w_datetime.iterrows():
    cur.execute(song_select, (row.song, row.artist, row.length))
    results = cur.fetchone()
    
    if results:
        songid, artistid = results
    else:
        songid, artistid = None, None

    songplay_data = (row.datetime, row.userId, row.level, songid, artistid, row.sessionId, row.location, row.userAgent)
    cur.execute(songplay_table_insert, songplay_data)
    conn.commit()