# ETL Processes

In [1]:
import os
import glob
import mysql.connector as mysql
from mysql.connector import errorcode
from datetime import datetime
import pandas as pd
from sql_queries import *

In [11]:

"""
    PLEASE INSERT YOUR MYSQL USERNAME, PASSWORD AND DATABASE BELOW
"""

'\n    PLEASE INSERT YOUR MYSQL USERNAME, PASSWORD AND DATABASE BELOW\n'

In [2]:
try:
    db = mysql.connect(user='root', password='***',
                        host="localhost", database="***")
except mysql.Error as err:
    if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
        print("There is error with the username or password")
    elif err.errno == errorcode.ER_BAD_DB_ERROR:
        print("Database does not exist")
    else:
        print(err)

In [3]:
cursor = db.cursor()

In [4]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

# Process `song_data`

In [5]:
song_files = 'data/song_data'

In [6]:
filepath = get_files(song_files)

In [7]:
df = pd.read_json(filepath[0], lines=True)
df.head()

Unnamed: 0,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,1,ARGCY1Y1187B9A4FA5,36.16778,-86.77836,"Nashville, TN.",Gloriana,SOQOTLQ12AB01868D0,Clementina Santafè,153.33832,0


## #1: `songs` Table
#### Extract Data for Songs Table

In [None]:
song_data = list(df[["song_id", "title","artist_id", "year", "duration"]].values[0])
song_data 

#### Insert Record into Song Table

In [None]:
cursor.execute(song_table_insert, song_data)
db.commit()

## #2: `artists` Table
#### Extract Data for Artists Table

In [None]:
artist_data = list(df[["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]].values[0])
artist_data

#### Insert Record into Artist Table

In [None]:
cursor.execute(artist_table_insert, artist_data)
db.commit()

# Process `log_data`

In [None]:
log_files = 'data/log_data'

In [None]:
filepath = get_files(log_files)

In [None]:
df = pd.read_json(filepath[1], lines=True)
df.head()

## #3: `time` Table
#### Extract Data for Time Table

In [None]:
df = df[df["page"] == "NextSong"]
df.head()

In [None]:
t = df['ts'].apply(lambda x: datetime.utcfromtimestamp(x//1000.0))
t.head()

In [None]:
df['start_time'] = t
df['hour'] = t.apply(lambda x: x.hour)
df['day'] = t.apply(lambda x: x.day)
df['week'] = t.apply(lambda x: x.week)
df['month'] = t.apply(lambda x: x.month)
df['year'] = t.apply(lambda x: x.year)
df['weekday'] = t.apply(lambda x: x.day_name())

In [None]:
column_labels = ["start_time", "hour",
                 "day", "week", "month", "year", "weekday"]
time_df = df[column_labels]

time_df.head()

#### Insert Records into Time Table

In [None]:
for i, row in time_df.iterrows():
    cursor.execute(time_table_insert, list(row))
    db.commit()

## #4: `users` Table
#### Extract Data for Users Table

In [None]:
user_cols = ["userId", "firstName",
             "lastName", "gender", "level"]
user_df = df[user_cols].rename(columns=({
    "userId": "user_id",
    "firstName": "first_name",
    "lastName": "last_name"
    }))
user_df["user_id"] = user_df["user_id"].astype(int)
df.head()

In [None]:
user_df.reset_index(drop=True, inplace=True)
user_df.tail()

#### Insert Records into Users Table

In [None]:
# The table is denomalized and contains lot of duplicates, Postgres can accept duplicates but Mysql can not. 
# Therefore, we introduced auto-increement which we take the index.

In [None]:
for i in user_df.index:
    row = user_df.iloc[i]
    Selected_list = [int(row['user_id']), row['first_name'], \
                         row['last_name'], row['gender'], row['level'], i]
    # print(Selected_list)
    cursor.execute(user_table_insert, Selected_list)
    db.commit()

## #5: `songplays` Table
#### Extract Data and Songplays Table
#### Insert Records into Songplays Table

In [None]:
for index, row in df.iterrows():

    # get songid and artistid from song and artist tables
    
    insert_statement = song_select.format(*(row.song.replace("'", ""), row.artist.replace("'", ""), row.length))
    cursor.execute(insert_statement)
    results = cursor.fetchone()

    if results:
        songid, artistid = results
    else:
        songid, artistid = None, None
    # insert songplay record
    songplay_data = (row.ts, row.start_time, int(row.userId), row.level, songid, artistid,
                     row.sessionId, row.location, row.userAgent)
    print(songplay_data)
    cursor.execute(songplay_table_insert, songplay_data)
    db.commit()

# Close Connection to Sparkify Database

In [None]:
db.close()