## In this notebook we will see how to perform Extract Transform Load Operation

In [77]:
import os
import glob # for file pattern searching
import pandas as pd
import numpy as np
from Sql_Queries import * # contains DDL and DML queries
import mysql.connector # for database

In [78]:
# Connecting to the database
Sparkify_db = mysql.connector.connect(
host = "localhost",
user = "root",
password = "rootuser",
database = "SparkifyDB"
)

cur = Sparkify_db.cursor()

In [79]:
PATH = "data/song_data"

In [80]:
def get_files(filepath,pattern):
    all_files = []
    for root,dirs,files in os.walk(filepath):
        files = glob.glob(os.path.join(root,pattern))
        for f in files:
            all_files.append(os.path.abspath(f))
    return all_files 

## Process Song Data
### ETL on Song Data to add a single record into DB
    1. Using the get_files function get a lost of all the SOng JSON files in data/song_data
    2. Select the first song in the list
    3. Read the song file and view the data

In [81]:
song_files = get_files(filepath=PATH,pattern="*.json")

In [82]:
song_files[0]

'e:\\Udacity_Data_Eng_NanoDegree\\Relational_database_Project_1\\data\\song_data\\A\\A\\A\\TRAAAAW128F429D538.json'

In [83]:
filepath = song_files[1]
print(filepath)

e:\Udacity_Data_Eng_NanoDegree\Relational_database_Project_1\data\song_data\A\A\A\TRAAABD128F429CF47.json


In [84]:
df = pd.read_json(filepath,lines=True)

In [85]:
df

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARMJAGH1187FB546F3,35.14968,"Memphis, TN",-90.04892,The Box Tops,148.03546,1,SOCIWDW12A8C13D406,Soul Deep,1969


### Now we will extract data for song table.

In [86]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1 entries, 0 to 0
Data columns (total 10 columns):
artist_id           1 non-null object
artist_latitude     1 non-null float64
artist_location     1 non-null object
artist_longitude    1 non-null float64
artist_name         1 non-null object
duration            1 non-null float64
num_songs           1 non-null int64
song_id             1 non-null object
title               1 non-null object
year                1 non-null int64
dtypes: float64(3), int64(2), object(5)
memory usage: 160.0+ bytes


In [87]:
song_data = df[['song_id','title','artist_id','year','duration']].values[0].tolist()

In [88]:
song_data

['SOCIWDW12A8C13D406', 'Soul Deep', 'ARMJAGH1187FB546F3', 1969, 148.03546]

In [89]:
song_table_insert # insert query from the query file

'\nINSERT IGNORE INTO songs (song_id, title, artist_id, year, duration)\nVALUES (%s, %s, %s, %s, %s);\n'

In [90]:
try:
    cur.execute(song_table_insert,song_data)
    Sparkify_db.commit()
except Exception as e:
    print(e)

## ETL for Artist table

In [91]:
df

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARMJAGH1187FB546F3,35.14968,"Memphis, TN",-90.04892,The Box Tops,148.03546,1,SOCIWDW12A8C13D406,Soul Deep,1969


In [92]:
artist_data = df[["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]].values[0].tolist()
artist_data

['ARMJAGH1187FB546F3', 'The Box Tops', 'Memphis, TN', 35.14968, -90.04892]

In [93]:
try:
    cur.execute(artist_table_insert,artist_data)
    Sparkify_db.commit()
except Exception as e:
    print(e)

# Process Log Data

In [94]:
log_files = get_files(filepath="data/log_data",pattern="*.json")

In [95]:
log_filepath = log_files[0]

In [96]:
df_log = pd.read_json(log_filepath,lines=True)

In [97]:
df_log.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,,Logged In,Walter,M,0,Frye,,free,"San Francisco-Oakland-Hayward, CA",GET,Home,1540919166796,38,,200,1541105830796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",39
1,,Logged In,Kaylee,F,0,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Home,1540344794796,139,,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,You Gotta Be,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
3,,Logged In,Kaylee,F,2,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Upgrade,1540344794796,139,,200,1541106132796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
4,Mr Oizo,Logged In,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,Flat 55,200,1541106352796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


In [98]:
df_log.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 15 entries, 0 to 14
Data columns (total 18 columns):
artist           11 non-null object
auth             15 non-null object
firstName        15 non-null object
gender           15 non-null object
itemInSession    15 non-null int64
lastName         15 non-null object
length           11 non-null float64
level            15 non-null object
location         15 non-null object
method           15 non-null object
page             15 non-null object
registration     15 non-null int64
sessionId        15 non-null int64
song             11 non-null object
status           15 non-null int64
ts               15 non-null int64
userAgent        15 non-null object
userId           15 non-null int64
dtypes: float64(1), int64(6), object(11)
memory usage: 2.2+ KB


In [99]:
df_log['page']

0         Home
1         Home
2     NextSong
3      Upgrade
4     NextSong
5     NextSong
6     NextSong
7     NextSong
8     NextSong
9     NextSong
10    NextSong
11        Home
12    NextSong
13    NextSong
14    NextSong
Name: page, dtype: object

In [100]:
# filter the dataframe values in which page values is NextSong
df_log = df_log.query("page == 'NextSong'")
df_log.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,You Gotta Be,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
4,Mr Oizo,Logged In,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,Flat 55,200,1541106352796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
5,Tamba Trio,Logged In,Kaylee,F,4,Summers,177.18812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,Quem Quiser Encontrar O Amor,200,1541106496796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
6,The Mars Volta,Logged In,Kaylee,F,5,Summers,380.42077,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,Eriatarka,200,1541106673796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
7,Infected Mushroom,Logged In,Kaylee,F,6,Summers,440.2673,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,Becoming Insane,200,1541107053796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


In [101]:
# Let's Extract the data for our time table
t = pd.to_datetime(df_log['ts'],unit='ms') # miliseconds to date and time stamp
df_log['ts'] = pd.to_datetime(df_log['ts'],unit='ms')
df_log['ts'].head()

2   2018-11-01 21:01:46.796
4   2018-11-01 21:05:52.796
5   2018-11-01 21:08:16.796
6   2018-11-01 21:11:13.796
7   2018-11-01 21:17:33.796
Name: ts, dtype: datetime64[ns]

In [102]:
t

2    2018-11-01 21:01:46.796
4    2018-11-01 21:05:52.796
5    2018-11-01 21:08:16.796
6    2018-11-01 21:11:13.796
7    2018-11-01 21:17:33.796
8    2018-11-01 21:24:53.796
9    2018-11-01 21:28:54.796
10   2018-11-01 21:42:00.796
12   2018-11-01 21:52:05.796
13   2018-11-01 21:55:25.796
14   2018-11-01 22:23:14.796
Name: ts, dtype: datetime64[ns]

In [103]:
time_data = list((t,t.dt.hour,t.dt.day,t.dt.weekofyear,t.dt.month,t.dt.year,t.dt.weekday))
column_labels = list(('start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday'))

In [104]:
time_df = pd.DataFrame.from_dict(dict(zip(column_labels,time_data)))

In [105]:
time_df.head()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
2,2018-11-01 21:01:46.796,21,1,44,11,2018,3
4,2018-11-01 21:05:52.796,21,1,44,11,2018,3
5,2018-11-01 21:08:16.796,21,1,44,11,2018,3
6,2018-11-01 21:11:13.796,21,1,44,11,2018,3
7,2018-11-01 21:17:33.796,21,1,44,11,2018,3


In [106]:
# Now let's insert the record into Time table

for i,row in time_df.iterrows():
    cur.execute(time_table_insert,list(row))
    Sparkify_db.commit()

## Users table

In [107]:
df_log.head(2)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,You Gotta Be,200,2018-11-01 21:01:46.796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
4,Mr Oizo,Logged In,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,Flat 55,200,2018-11-01 21:05:52.796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


In [108]:
# Let's Extract data for User's table
users_df = df_log[['userId','firstName','lastName','gender','level']]

users_df.head(3)


Unnamed: 0,userId,firstName,lastName,gender,level
2,8,Kaylee,Summers,F,free
4,8,Kaylee,Summers,F,free
5,8,Kaylee,Summers,F,free


In [109]:
# Insert record in user's table

for i,row in users_df.iterrows():
    cur.execute(user_table_insert,list(row))
    Sparkify_db.commit()

## Fact table :  SongPlay table
### required data : songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

In [110]:
df.head(2)

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARMJAGH1187FB546F3,35.14968,"Memphis, TN",-90.04892,The Box Tops,148.03546,1,SOCIWDW12A8C13D406,Soul Deep,1969


In [111]:
df_log.head(2)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,You Gotta Be,200,2018-11-01 21:01:46.796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
4,Mr Oizo,Logged In,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,Flat 55,200,2018-11-01 21:05:52.796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


In [112]:
# First we have to run song_select query to fetch song_id and artist_id , then we will fetch rest of the fields required for songplays table  from df_log

for index,row in df_log.iterrows():
    print((row.song,row.artist,row.length))
    cur.execute(song_select,(row.song,row.artist,row.length))

    res  = cur.fetchone()
    if res:
        songid,artistid = res
    else:
        songid,artistid = None,None
    # insert into songplays table
    songplay_data = (index, row.ts, row.userId, row.level, songid, artistid, row.sessionId,\
                     row.location, row.userAgent)
    cur.execute(songplay_table_insert, songplay_data)
    Sparkify_db.commit()
    

('You Gotta Be', "Des'ree", 246.30812)
('Flat 55', 'Mr Oizo', 144.03873)
('Quem Quiser Encontrar O Amor', 'Tamba Trio', 177.18812)
('Eriatarka', 'The Mars Volta', 380.42077)
('Becoming Insane', 'Infected Mushroom', 440.2673)
('Congratulations', 'Blue October / Imogen Heap', 241.3971)
('Once again', 'Girl Talk', 160.15628)
('Pump It', 'Black Eyed Peas', 214.93506)
('Nobody Puts Baby In The Corner', 'Fall Out Boy', 200.72444)
('Mango Pickle Down River (With The Wilcannia Mob)', 'M.I.A.', 233.7171)
('Eye Of The Tiger', 'Survivor', 245.36771)


In [113]:
Sparkify_db.close()