# ETL processes
Develop the ETL process for each of the tables (user, artist, song, time, songplay)

In [1]:
import os
import glob
#!pip install --user psycopg2
import psycopg2
import pandas as pd
from sql_queries import *

C:\Users\camib\anaconda3\lib\site-packages\numpy\.libs\libopenblas.FB5AE2TYXYH2IJRDKGDGQ3XBKLKTF43H.gfortran-win_amd64.dll
C:\Users\camib\anaconda3\lib\site-packages\numpy\.libs\libopenblas.XWYDX2IKJW2NMTWSFYNGFUWKQU3LYTCZ.gfortran-win_amd64.dll


Note : create a local postgresql database and connect to it before going further

In [2]:
# get all the files contained in the filepath to the 'song' or 'log' folder. Each file has one or multiple records
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

# Process `song_data`
Perform ETL on the first dataset, `song_data`, to create the `songs` and `artists` tables.
Do it on a single song file and load a single record into each table to start.
We first get a list of all JSON files in 'song_data', select the first song in this list and read it to view the data

In [3]:
path = 'C:/Users/camib/Desktop/DATASCIENCE_PERSOPROJECTS/Music_ETL/'
song_files = get_files(path + 'data/song_data')

In [4]:
filepath = song_files[0]
filepath

'C:\\Users\\camib\\Desktop\\DATASCIENCE_PERSOPROJECTS\\Music_ETL\\data\\song_data\\A\\A\\A\\TRAAAAW128F429D538.json'

In [5]:
df = pd.read_json(filepath, typ='series', convert_dates=False)
df

num_songs                            1
artist_id           ARD7TVE1187B99BFB1
artist_latitude                   None
artist_longitude                  None
artist_location        California - LA
artist_name                     Casual
song_id             SOMZWCG12A8C13C480
title                 I Didn't Mean To
duration                     218.93179
year                                 0
dtype: object

## 1. Extract data for 'songs' table

In [6]:
num_songs, artist_id, artist_latitude, artist_longitude, artist_location, \
artist_name, song_id, title, duration, year = df.to_numpy()[:]     # or older version df.values[:]
print(type(df.to_numpy()[:]))
print(df.to_numpy()[:])

<class 'numpy.ndarray'>
[1 'ARD7TVE1187B99BFB1' None None 'California - LA' 'Casual'
 'SOMZWCG12A8C13C480' "I Didn't Mean To" 218.93179 0]


In [7]:
song_data = (song_id, title, artist_id, year, duration)
song_data

('SOMZWCG12A8C13C480', "I Didn't Mean To", 'ARD7TVE1187B99BFB1', 0, 218.93179)

### Insert record into 'songs' table

In [8]:
# Connect to the 'musicdb' database
try:
    conn = psycopg2.connect("host=127.0.0.1 dbname=musicdb user=postgres password=118218")
    #conn.autocommit = True
except(Exception, psycopg2.Error) as error:
    print("Error while connecting to PostgreSQL", error)

cur = conn.cursor()  #allows us to execute the SQL query once we’ve written it

"""Note :
Once create tables and connect to musicdb on Spyder , it doesn't mean you are also connected here in Jupyter and 
that you can execute queries from here !
Need to connect from here as well
"""

"Note :\nOnce create tables and connect to musicdb on Spyder , it doesn't mean you are also connected here in Jupyter and \nthat you can execute queries from here !\nNeed to connect from here as well\n"

In [9]:
# Disable Foreign key constraints, to add records into 'songs' table that has Fk (artist_ID is Pk in 'artists' table)

cur.execute("SET session_replication_role = 'replica';")

In [10]:
cur.execute(song_table_insert, song_data)
conn.commit()

# then test on PgAdmin the query [SELECT * from songs] and see if the record has correctly been added - OK

# also run `test.ipynb` to see if it has been added as well

In [11]:
# Re-enable Foreign key constraints

cur.execute("SET session_replication_role = 'origin';")

# (Explanations given on the text file)

In [12]:
# same thing, but as a function

def disableFKconstraints_addRecords(query_table_insert, record_to_add):
    cur.execute("SET session_replication_role = 'replica';")
    cur.execute(query_table_insert, record_to_add)
    conn.commit()
    cur.execute("SET session_replication_role = 'origin';")
    
disableFKconstraints_addRecords(song_table_insert, song_data)

Another test (from here) to see if the record has been successfully added :

In [13]:
%load_ext sql
#!pip install ipython-sql

In [14]:
%sql postgresql://postgres:118218@127.0.0.1/musicdb

In [15]:
%sql SELECT * FROM songs LIMIT 5;

 * postgresql://postgres:***@127.0.0.1/musicdb
5 rows affected.


song_id,title,artist_id,year,duration
SOMZWCG12A8C13C480,I Didn't Mean To,ARD7TVE1187B99BFB1,0,218.93179
SOCIWDW12A8C13D406,Soul Deep,ARMJAGH1187FB546F3,1969,148.03546
SOXVLOJ12AB0189215,Amor De Cabaret,ARKRRTF1187B9984DA,0,177.47546
SONHOTT12A8C13493C,Something Girls,AR7G5I41187FB4CE6C,1982,233.40363
SOFSOCN12A8C143F5D,Face the Ashes,ARXR32B1187FB57099,2007,209.60608


## 2. Extract data for 'artists' table

In [16]:
num_songs, artist_id, artist_latitude, artist_longitude, artist_location, artist_name, \
song_id, title, duration, year = df.to_numpy()[:]
print(type(df.to_numpy()[:]))
print(df.to_numpy()[:])

# insert artist record
artist_data = (artist_id, artist_name, artist_location, artist_latitude, artist_longitude)
artist_data

<class 'numpy.ndarray'>
[1 'ARD7TVE1187B99BFB1' None None 'California - LA' 'Casual'
 'SOMZWCG12A8C13C480' "I Didn't Mean To" 218.93179 0]


('ARD7TVE1187B99BFB1', 'Casual', 'California - LA', None, None)

### Insert record into 'artists' table

In [17]:
cur.execute(artist_table_insert, artist_data)
conn.commit()

And test again if record has been correctly added :

In [18]:
%sql SELECT * FROM artists LIMIT 5;

 * postgresql://postgres:***@127.0.0.1/musicdb
5 rows affected.


artist_id,name,location,latitude,longitude
ARMJAGH1187FB546F3,The Box Tops,"Memphis, TN",35.14968,-90.04892
ARKRRTF1187B9984DA,Sonora Santanera,,,
AR7G5I41187FB4CE6C,Adam Ant,"London, England",,
ARXR32B1187FB57099,Gob,,,
ARKFYS91187B98E58F,Jeff And Sheri Easter,,,


# Process 'log_data'
Perform ETL on the second dataset, 'log_data', to create the 'time', 'users' and 'songplays' tables.
Again, we do it on a single song file and load a single record into each table to start.
We first get a list of all JSON files in 'song_data', select the first song in this list and read it to view the data

In [19]:
log_files = get_files(path + 'data/log_data')

In [20]:
filepath = log_files[0]
filepath

'C:\\Users\\camib\\Desktop\\DATASCIENCE_PERSOPROJECTS\\Music_ETL\\data\\log_data\\2018\\11\\2018-11-01-events.json'

In [21]:
pd.get_option("display.max_columns")   # display all the columns
df = pd.read_json(filepath, convert_dates=False, lines=True)#, typ='series')
df.head(3)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,,Logged In,Walter,M,0,Frye,,free,"San Francisco-Oakland-Hayward, CA",GET,Home,1540919166796,38,,200,1541105830796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",39
1,,Logged In,Kaylee,F,0,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Home,1540344794796,139,,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,You Gotta Be,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


## 3. Extract data for 'time' table

In [22]:
df.info()   # ts -> integer
# AttributeError: 'Series' object has no attribute 'info', if keep typ='series' above
df['ts']

# Need to convert timestamp in datetime. Note that current timestamp is in nanosecond (ns)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 15 entries, 0 to 14
Data columns (total 18 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   artist         11 non-null     object 
 1   auth           15 non-null     object 
 2   firstName      15 non-null     object 
 3   gender         15 non-null     object 
 4   itemInSession  15 non-null     int64  
 5   lastName       15 non-null     object 
 6   length         11 non-null     float64
 7   level          15 non-null     object 
 8   location       15 non-null     object 
 9   method         15 non-null     object 
 10  page           15 non-null     object 
 11  registration   15 non-null     int64  
 12  sessionId      15 non-null     int64  
 13  song           11 non-null     object 
 14  status         15 non-null     int64  
 15  ts             15 non-null     int64  
 16  userAgent      15 non-null     object 
 17  userId         15 non-null     int64  
dtypes: float64(1

0     1541105830796
1     1541106106796
2     1541106106796
3     1541106132796
4     1541106352796
5     1541106496796
6     1541106673796
7     1541107053796
8     1541107493796
9     1541107734796
10    1541108520796
11    1541109015796
12    1541109125796
13    1541109325796
14    1541110994796
Name: ts, dtype: int64

In [23]:
# Convert timestamp (ns) into datetime format

df['ts'] = pd.to_datetime(df['ts'], unit='ns') 
df['ts'].head(15)

0    1970-01-01 00:25:41.105830796
1    1970-01-01 00:25:41.106106796
2    1970-01-01 00:25:41.106106796
3    1970-01-01 00:25:41.106132796
4    1970-01-01 00:25:41.106352796
5    1970-01-01 00:25:41.106496796
6    1970-01-01 00:25:41.106673796
7    1970-01-01 00:25:41.107053796
8    1970-01-01 00:25:41.107493796
9    1970-01-01 00:25:41.107734796
10   1970-01-01 00:25:41.108520796
11   1970-01-01 00:25:41.109015796
12   1970-01-01 00:25:41.109125796
13   1970-01-01 00:25:41.109325796
14   1970-01-01 00:25:41.110994796
Name: ts, dtype: datetime64[ns]

In [24]:
# hour, day, week of year, month, year, weekday

timestamp = df['ts']
time_hour = df['ts'].dt.hour
time_day = df['ts'].dt.day
time_week = df['ts'].dt.week
time_month = df['ts'].dt.month
time_year = df['ts'].dt.year
time_weekday = df['ts'].dt.weekday

  time_week = df['ts'].dt.week


In [25]:
time_data = (timestamp, time_hour, time_day, time_week, time_month, time_year, time_weekday)
column_labels = ('timestamp', 'hour', 'day', 'week of year', 'month', 'year', 'weekday')

time_data

(0    1970-01-01 00:25:41.105830796
 1    1970-01-01 00:25:41.106106796
 2    1970-01-01 00:25:41.106106796
 3    1970-01-01 00:25:41.106132796
 4    1970-01-01 00:25:41.106352796
 5    1970-01-01 00:25:41.106496796
 6    1970-01-01 00:25:41.106673796
 7    1970-01-01 00:25:41.107053796
 8    1970-01-01 00:25:41.107493796
 9    1970-01-01 00:25:41.107734796
 10   1970-01-01 00:25:41.108520796
 11   1970-01-01 00:25:41.109015796
 12   1970-01-01 00:25:41.109125796
 13   1970-01-01 00:25:41.109325796
 14   1970-01-01 00:25:41.110994796
 Name: ts, dtype: datetime64[ns],
 0     0
 1     0
 2     0
 3     0
 4     0
 5     0
 6     0
 7     0
 8     0
 9     0
 10    0
 11    0
 12    0
 13    0
 14    0
 Name: ts, dtype: int64,
 0     1
 1     1
 2     1
 3     1
 4     1
 5     1
 6     1
 7     1
 8     1
 9     1
 10    1
 11    1
 12    1
 13    1
 14    1
 Name: ts, dtype: int64,
 0     1
 1     1
 2     1
 3     1
 4     1
 5     1
 6     1
 7     1
 8     1
 9     1
 10    1
 11    

In [26]:
len(time_data), len(time_data[0]), type(time_data)

(7, 15, tuple)

In [27]:
data_dict = dict(zip(column_labels, time_data))
time_df = pd.DataFrame(data_dict) 
time_df.head()

Unnamed: 0,timestamp,hour,day,week of year,month,year,weekday
0,1970-01-01 00:25:41.105830796,0,1,1,1,1970,3
1,1970-01-01 00:25:41.106106796,0,1,1,1,1970,3
2,1970-01-01 00:25:41.106106796,0,1,1,1,1970,3
3,1970-01-01 00:25:41.106132796,0,1,1,1,1970,3
4,1970-01-01 00:25:41.106352796,0,1,1,1,1970,3


### Insert record into 'time' table

In [28]:
try:
    for i, row in time_df.iterrows():    
        cur.execute(time_table_insert, list(row))
        conn.commit()
        
except(Exception, psycopg2.Error):   # retry if the database has been disconnected (reconnection + query) - because I got disconnected at this step so I just tried to write it
    conn = psycopg2.connect("host=127.0.0.1 dbname=musicdb user=postgres password=118218")
    conn.autocommit = True
    cur = conn.cursor() 
    for i, row in time_df.iterrows():    
        cur.execute(time_table_insert, list(row))
        conn.commit()

then test if it has correctly been added :

In [29]:
%sql SELECT * FROM time LIMIT 200;

 * postgresql://postgres:***@127.0.0.1/musicdb
200 rows affected.


start_time,hour,day,week,month,year,weekday
1970-01-01 00:25:41.106107,0,1,1,1,1970,3
1970-01-01 00:25:41.106353,0,1,1,1,1970,3
1970-01-01 00:25:41.106497,0,1,1,1,1970,3
1970-01-01 00:25:41.106674,0,1,1,1,1970,3
1970-01-01 00:25:41.107054,0,1,1,1,1970,3
1970-01-01 00:25:41.107494,0,1,1,1,1970,3
1970-01-01 00:25:41.107735,0,1,1,1,1970,3
1970-01-01 00:25:41.108521,0,1,1,1,1970,3
1970-01-01 00:25:41.109126,0,1,1,1,1970,3
1970-01-01 00:25:41.109326,0,1,1,1,1970,3


## 4. Extract data for 'users' table

In [30]:
user_df = df[['userId', 'firstName', 'lastName', 'gender', 'level']]
user_df.head(10)

Unnamed: 0,userId,firstName,lastName,gender,level
0,39,Walter,Frye,M,free
1,8,Kaylee,Summers,F,free
2,8,Kaylee,Summers,F,free
3,8,Kaylee,Summers,F,free
4,8,Kaylee,Summers,F,free
5,8,Kaylee,Summers,F,free
6,8,Kaylee,Summers,F,free
7,8,Kaylee,Summers,F,free
8,8,Kaylee,Summers,F,free
9,8,Kaylee,Summers,F,free


In [31]:
# drop all the rows where userId is void

user_df.loc[:,'userId'] = user_df.loc[:,'userId'].replace("", float("NaN"))  
#user_df.replace("", float("NaN"), inplace=True)       # Replace "" (empty values) by "NaN" (which is a numeric data type in python)
user_df.dropna(subset = ["userId"], inplace=True)      # Drop all the rows that contain NaN under the column (the Primary key column 'userId')
user_df.head(10)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self._setitem_single_column(ilocs[0], value, pi)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  user_df.dropna(subset = ["userId"], inplace=True)      # Drop all the rows that contain NaN under the column (the Primary key column 'userId')


Unnamed: 0,userId,firstName,lastName,gender,level
0,39,Walter,Frye,M,free
1,8,Kaylee,Summers,F,free
2,8,Kaylee,Summers,F,free
3,8,Kaylee,Summers,F,free
4,8,Kaylee,Summers,F,free
5,8,Kaylee,Summers,F,free
6,8,Kaylee,Summers,F,free
7,8,Kaylee,Summers,F,free
8,8,Kaylee,Summers,F,free
9,8,Kaylee,Summers,F,free


### Insert record into 'users' table

In [32]:
try:
    for i, row in user_df.iterrows():
        cur.execute(user_table_insert, row)
        conn.commit()

except:
    cur.execute("rollback")
    for i, row in user_df.iterrows():
        cur.execute(user_table_insert, row)
        conn.commit()

test if it has correctly been added :

In [33]:
%sql SELECT * FROM users LIMIT 5;

 * postgresql://postgres:***@127.0.0.1/musicdb
5 rows affected.


user_id,first_name,last_name,gender,level
61,Samuel,Gonzalez,M,free
50,Ava,Robinson,F,free
88,Mohammad,Rodriguez,M,paid
8,Kaylee,Summers,F,free
10,Sylvie,Cruz,F,free


## 5. Extract data for 'songplays' table

Information from the songs table, artists table, and original log file are all needed for the `songplays` table. 
Since the log file does not specify an ID for either the song or the artist,we need to get the song ID and artist ID by querying the songs and artists tables to find matches based on song title, artist name, and song duration time.

In [34]:
for index, row in df.iterrows():

    # get songid and artistid from song and artist tables
    cur.execute(song_select, (row.song, row.artist, row.length))
    results = cur.fetchone()
    #row = cursor.fetchone() -> This method retrieves the next row of a query result set and returns a single sequence, or None if no more rows are available.
    
    if results:
        songid, artistid = results
    else:
        songid, artistid = None, None
    
    # insert songplay record
    #songplay_data = (df['ts'], df['userId'], df['level'], songid, artistid, df['sessionId'], df['location'], df['userAgent'])   ### WRONG!
    songplay_data = (row.ts, row.userId, row.level, songid, artistid, row.sessionId, row.location, row.userAgent)

    cur.execute(songplay_table_insert, songplay_data)
    
    conn.commit()

In [35]:
songplay_data

(Timestamp('1970-01-01 00:25:41.110994796'),
 101,
 'free',
 None,
 None,
 100,
 'New Orleans-Metairie, LA',
 '"Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"')

In [36]:
type(songplay_data)

tuple

test it :

In [37]:
%sql SELECT * FROM songplays LIMIT 5;

 * postgresql://postgres:***@127.0.0.1/musicdb
5 rows affected.


songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
1,1970-01-01 00:25:41.106107,8,free,,,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36"""
2,1970-01-01 00:25:41.106353,8,free,,,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36"""
3,1970-01-01 00:25:41.106497,8,free,,,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36"""
4,1970-01-01 00:25:41.106674,8,free,,,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36"""
5,1970-01-01 00:25:41.107054,8,free,,,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36"""


# Close connection to musicdb database

In [38]:
conn.close()

Note :
Restart this notebook to close connection to musicdb.
Each time you run the cells above, remember to restart this notebook to close the connection to the database. Otherwise, won't be able to run the code in `create_tables.py`, `etl.py`, or `etl.ipynb` files since can't make multiple connections to the same database (in this case, musicdb).