# Project: Data modeling with Postgres (By using Sparkify data)

----

- PROJECT OBJECT

This project aims for creating an ETL data pipeline to extract data from log dataset and song dataset(both datasets reside in a local computer), transform these data into a format which analytics team prefers, and put data into a Postgres database

------------

## **PART 1: PREPROCESSING**
### Import packages this project needs

In [1]:
import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *
import numpy as np

### Create connection to postgres database

In [3]:
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=student password=student")
cur = conn.cursor()

In [4]:
conn.set_session(autocommit=True)

### Define a function which helps us to get the path of log_file and song_file

In [5]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

## **PART 2: PROCESS SONG DATA**
### Read song files

In [6]:
# get song files path
song_files = get_files('data/song_data')

In [7]:
# read song json files
song_file_str = ""

for i in range(0, len(song_files)):
    if i == 0:
        with open(song_files[i]) as song_content:
            for line in song_content.readlines():
                song_file_str = song_file_str + line
    else:
        with open(song_files[i]) as song_content:
            for line in song_content.readlines():
                song_file_str = song_file_str + ',' + line

song_file_df = pd.read_json(song_file_str, lines=True)        
        


In [8]:
# clean the song_file_df by replaceing np.nan and '' with 'None' 
song_file_df = song_file_df.fillna(value='None')
song_file_df = song_file_df.replace('', 'None')

#drop duplicated rows
song_file_df = song_file_df.drop_duplicates(keep='first')

In [9]:
# check song file dataset
song_file_df.head(5)

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARD7TVE1187B99BFB1,,California - LA,,Casual,218.93179,1,SOMZWCG12A8C13C480,I Didn't Mean To,0
1,ARGSJW91187B9B1D6B,35.2196,North Carolina,-80.0195,JennyAnyKind,218.77506,1,SOQHXMF12AB0182363,Young Boy Blues,0
2,ARKRRTF1187B9984DA,,,,Sonora Santanera,177.47546,1,SOXVLOJ12AB0189215,Amor De Cabaret,0
3,ARKFYS91187B98E58F,,,,Jeff And Sheri Easter,267.7024,1,SOYMRWW12A6D4FAB14,The Moon And I (Ordinary Day Album Version),0
4,ARNTLGG11E2835DDB9,,,,Clp,266.39628,1,SOUDSGM12AC9618304,Insatiable (Instrumental Version),0


### Extract Data for Songs Table and put data into database in Postgres

In [10]:
# Extract Data for Songs Table
song_data = song_file_df.loc[:,['song_id', 'title', 'artist_id', 'year', 'duration']]
song_data = song_data.drop_duplicates(subset='song_id', keep='first')
song_data_list = song_data.values.tolist()

In [11]:
# insert data into song table in Postgres
for row in song_data_list:
    cur.execute(song_table_insert, row)
    conn.commit()

In [29]:
# check song table data in Postgres
cur.execute("select count(*) from song")
#row = cur.fetchone()

row = cur.fetchall()
#while row:
#    print(row)
#    row = cur.fetchone()
row[0][0]

71

### Extract Data for Artists Table and put data into database in Postgres

In [220]:
# Extract Data for Artists Table
artist_data = song_file_df.loc[:,['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude"']]
artist_data = artist_data.drop_duplicates(subset = 'artist_id', keep='first')
artist_data_list = artist_data.replace(np.nan, 'None').values.tolist()

Passing list-likes to .loc or [] with any missing label will raise
KeyError in the future, you can use .reindex() as an alternative.

See the documentation here:
https://pandas.pydata.org/pandas-docs/stable/indexing.html#deprecate-loc-reindex-listlike
  return self._getitem_tuple(key)


In [221]:
# put data into database in Postgres
for row in artist_data_list:
    cur.execute(artist_table_insert, row)
    conn.commit()

In [None]:
# check artist data resides in Postgres
cur.execute("select * from artist")
row = cur.fetchone()
while row:
   print(row)
   row = cur.fetchone()

## **PART 3: PROCESS LOG DATA**
### Read log files

In [222]:
# get log file path
log_files = get_files('data/log_data')

In [223]:
# read log files and put into data frame
log_file_str = ""

for i in range(0, len(log_files)):
    if i == 0:
        with open(log_files[i]) as log_content:
            for line in log_content.readlines():
                log_file_str = log_file_str + line
    else:
        with open(log_files[i]) as log_content:
            for line in log_content.readlines():
                log_file_str = log_file_str + '\n' + line

log_file_str_replaced = log_file_str.replace('}\n{', '},{')
log_file_df = pd.read_json(log_file_str_replaced, lines=True)        

In [224]:
# clean log_file_df by replacing np.nan and '' with 'None'
log_file_df = log_file_df.fillna(value='None')
log_file_df = log_file_df.replace('', 'None')

In [225]:
# drop duplicated rows
log_file_df = log_file_df.drop_duplicates(keep='first')

In [226]:
# check data in log_file_df
log_file_df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,The Grass Roots,Logged In,Sara,F,72,Johnson,166.713,paid,"Winston-Salem, NC",PUT,NextSong,1540810000000.0,411,Let's Live For Today,200,1542153802796,"""Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like...",95
1,Stars,Logged In,Sara,F,73,Johnson,298.945,paid,"Winston-Salem, NC",PUT,NextSong,1540810000000.0,411,Time Can Never Kill The True Heart,200,1542153968796,"""Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like...",95
2,Eddie Palmieri,Logged In,Sara,F,74,Johnson,391.836,paid,"Winston-Salem, NC",PUT,NextSong,1540810000000.0,411,Nada De Ti,200,1542154266796,"""Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like...",95
3,The Bravery,Logged In,Sara,F,75,Johnson,168.15,paid,"Winston-Salem, NC",PUT,NextSong,1540810000000.0,411,Give In,200,1542154657796,"""Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like...",95
4,K.U.K.L,Logged In,Sara,F,76,Johnson,181.289,paid,"Winston-Salem, NC",PUT,NextSong,1540810000000.0,411,Anna,200,1542154825796,"""Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like...",95


### Extract Data for time Table and put data into database in Postgres

In [227]:
# filter log_file
filtered_log_file_df = log_file_df[log_file_df['page'] == 'NextSong']
filtered_log_file_df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,The Grass Roots,Logged In,Sara,F,72,Johnson,166.713,paid,"Winston-Salem, NC",PUT,NextSong,1540810000000.0,411,Let's Live For Today,200,1542153802796,"""Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like...",95
1,Stars,Logged In,Sara,F,73,Johnson,298.945,paid,"Winston-Salem, NC",PUT,NextSong,1540810000000.0,411,Time Can Never Kill The True Heart,200,1542153968796,"""Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like...",95
2,Eddie Palmieri,Logged In,Sara,F,74,Johnson,391.836,paid,"Winston-Salem, NC",PUT,NextSong,1540810000000.0,411,Nada De Ti,200,1542154266796,"""Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like...",95
3,The Bravery,Logged In,Sara,F,75,Johnson,168.15,paid,"Winston-Salem, NC",PUT,NextSong,1540810000000.0,411,Give In,200,1542154657796,"""Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like...",95
4,K.U.K.L,Logged In,Sara,F,76,Johnson,181.289,paid,"Winston-Salem, NC",PUT,NextSong,1540810000000.0,411,Anna,200,1542154825796,"""Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like...",95


In [228]:
# convert value in ts column into datetime format
exact_time = pd.to_datetime(filtered_log_file_df['ts'], unit='ms')
exact_time.head()

0   2018-11-14 00:03:22.796
1   2018-11-14 00:06:08.796
2   2018-11-14 00:11:06.796
3   2018-11-14 00:17:37.796
4   2018-11-14 00:20:25.796
Name: ts, dtype: datetime64[ns]

In [229]:
# Extract the timestamp, hour, day, week of year, month, year, and weekday from the ts column and set time_data
timestamp = filtered_log_file_df['ts']
hour = exact_time.dt.hour
day = exact_time.dt.day
weekofYear = exact_time.dt.weekofyear
month = exact_time.dt.month
year = exact_time.dt.year
weekday = exact_time.dt.weekday

# create time_data_df
time_data_df = pd.concat([timestamp, hour, day, weekofYear, month, year, weekday], axis=1)

In [230]:
# change column label 
col_labels = ['timestamp', 'hour', 'day', 'weekofYear', 'month', 'year', 'weekday']
time_data_df.columns = col_labels

# drop duplicated data
time_data_df = time_data_df.drop_duplicates(subset='timestamp', keep='first')

# check data
time_data_df.head()

Unnamed: 0,timestamp,hour,day,weekofYear,month,year,weekday
0,1542153802796,0,14,46,11,2018,2
1,1542153968796,0,14,46,11,2018,2
2,1542154266796,0,14,46,11,2018,2
3,1542154657796,0,14,46,11,2018,2
4,1542154825796,0,14,46,11,2018,2


In [231]:
time_data_list = time_data_df.values.tolist()

In [232]:
# insert data into time table in Postgres
for row in time_data_list:
    cur.execute(time_table_insert, row)
    conn.commit()

In [None]:
# check data resides in Time table in Postgres
try: 
    cur.execute("SELECT * FROM time;")
except psycopg2.Error as e: 
    print("Error: select *")
    print (e)

row = cur.fetchone()
while row:
   print(row)
   row = cur.fetchone()

### Extract Data for User Table and put data into database in Postgres

In [233]:
# Extract Data for User Table
user_df = log_file_df.loc[:,['userId', 'firstName', 'lastName', 'gender', 'level']]
user_df = user_df.drop_duplicates(subset='userId', keep='first')

In [234]:
user_list = user_df.values.tolist()

In [235]:
# put data into database in Postgres
for row in user_list:
    cur.execute(user_table_insert, row)
    conn.commit()

In [None]:
try: 
    cur.execute("""SELECT * FROM "user" """)
except psycopg2.Error as e: 
    print("Error: select *")
    print (e)

row = cur.fetchone()
while row:
   print(row)
   row = cur.fetchone()

### Extract Data for SongPlay Table and put data into database in Postgres

In [236]:
# Join song and artist tables, and get song_join_artist_df
song_join_artist_df = pd.read_sql(song_select,conn)

# change the data (for following merge operation)
song_join_artist_df['duration'] = song_join_artist_df['duration'].astype('object')

# reset the song_join_artist_df index (for following merge operation)
song_join_artist_df.index = list(song_join_artist_df.index)

In [237]:
# Join "log_file_df" and "song_join_artist_df", get song_join_artist_join_logfile_df
song_join_artist_join_logfile_df = pd.merge(log_file_df, song_join_artist_df, left_on=['artist', 'song', 'length'], right_on=['name', 'title', 'duration'], how='left')
song_join_artist_join_logfile_df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,...,song,status,ts,userAgent,userId,song_id,artist_id,name,title,duration
0,The Grass Roots,Logged In,Sara,F,72,Johnson,166.713,paid,"Winston-Salem, NC",PUT,...,Let's Live For Today,200,1542153802796,"""Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like...",95,,,,,
1,Stars,Logged In,Sara,F,73,Johnson,298.945,paid,"Winston-Salem, NC",PUT,...,Time Can Never Kill The True Heart,200,1542153968796,"""Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like...",95,,,,,
2,Eddie Palmieri,Logged In,Sara,F,74,Johnson,391.836,paid,"Winston-Salem, NC",PUT,...,Nada De Ti,200,1542154266796,"""Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like...",95,,,,,
3,The Bravery,Logged In,Sara,F,75,Johnson,168.15,paid,"Winston-Salem, NC",PUT,...,Give In,200,1542154657796,"""Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like...",95,,,,,
4,K.U.K.L,Logged In,Sara,F,76,Johnson,181.289,paid,"Winston-Salem, NC",PUT,...,Anna,200,1542154825796,"""Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like...",95,,,,,


In [238]:
# extract the column for songplay table
songplay_df = song_join_artist_join_logfile_df.loc[:,['ts', 'userId', 'level', 'song_id', 'artist_id', 'sessionId', 'location', 'userAgent']]

# reset the index for songplay_df
songplay_df.index = range(1,len(songplay_df)+1)
songplay_df = songplay_df.reset_index()

# change the column name for songplay_df
songplay_df.columns = ['songplay_id', 'start_time', 'user_id', 'level', 'song_id', 'artist_id', 'session_id', 'location', 'user_agent']

# clean the na value
songplay_df = songplay_df.fillna(value='None')
songplay_df = songplay_df.replace('', 'None')

In [239]:
# check data in songplay table
songplay_df.head()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
0,1,1542153802796,95,paid,,,411,"Winston-Salem, NC","""Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like..."
1,2,1542153968796,95,paid,,,411,"Winston-Salem, NC","""Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like..."
2,3,1542154266796,95,paid,,,411,"Winston-Salem, NC","""Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like..."
3,4,1542154657796,95,paid,,,411,"Winston-Salem, NC","""Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like..."
4,5,1542154825796,95,paid,,,411,"Winston-Salem, NC","""Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like..."


In [240]:
# save data into songplay table in Postgres
songplay_list = songplay_df.values.tolist()

for row in songplay_list:
    cur.execute(songplay_table_insert, row)
    conn.commit()

In [None]:
# check data in Songplay table in Postgres
cur.execute("select * from songplay")
row = cur.fetchone()
while row:
   print(row)
   row = cur.fetchone()

# Close Connection to Sparkify Database

In [241]:
conn.close()