# <font color = "blue"> Building ETL Pipeline by PostgreSQL </font>

### By Otto Kwon

* The aim of this project is to extract data from two datasets, 
whcih are "song_data" and "log_data", 
to transform the extracted data in its type and contents, 
and to load the transformed data into our PostgreSQL server.
<br></br>
* There are toal 5 tables in our PostgreSQL
so the song_data will be extracted for 2 tables (songs, artists),
the log_data will be extracted for 2 tables(users, time),
and these two datasets will be extracted for rest 1 table (songplays).

##### Run the create_tables.py

In [1]:
%run create_tables.py

##### Run the test.ipynb

In [2]:
%run test.ipynb

 * postgresql://student:***@127.0.0.1/sparkifydb
0 rows affected.
 * postgresql://student:***@127.0.0.1/sparkifydb
0 rows affected.
 * postgresql://student:***@127.0.0.1/sparkifydb
0 rows affected.
 * postgresql://student:***@127.0.0.1/sparkifydb
0 rows affected.
 * postgresql://student:***@127.0.0.1/sparkifydb
0 rows affected.


##### Importing packages

In [3]:
import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *

##### Creating connection and cursor for PostgreSQL

In [4]:
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=student password=student")
cur = conn.cursor()

In [5]:
# creating get_files function to get files from directories.
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

## Procedure :
#### 1. ETL with 'song_data' for 'song' and 'artists' tables.
#### 2. ETL with 'log_data' for 'time' and 'users' tables.
#### 3. ETL with both 'song_data' and 'log_data' for 'songplay' table.

## `song_Data`
##### * Extract data from song_data by accessing through get_files function.
##### * Should implement ETL on 'song' and 'artists' tables.

In [34]:
# get song_data
song_files = get_files('data/song_data')

In [7]:
# get sample row from song_data
filepath = song_files[0]

In [8]:
# converting the sample to pd.dataframe
df = pd.read_json(filepath, lines = True)

# visualizing
df.head()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,AR8IEZO1187B99055E,,,,Marc Shaiman,149.86404,1,SOINLJW12A8C13314C,City Slickers,2008


### 1.`songs` Table
- Extract data from `song_data` to `songs` table.
- `songs` columns : song ID, title, artist ID, year, and duration

In [9]:
# Creating get_val function that inputs column name then outputs value of that column name from df 
def get_val(col_name):
    index = list(df.columns).index(col_name)
    return df.values[0][index]

# creating song_data list
song_data = [get_val('song_id'), get_val('title'), get_val('artist_id'), 
             get_val('year'), get_val('duration')]

In [10]:
song_data

['SOINLJW12A8C13314C', 'City Slickers', 'AR8IEZO1187B99055E', 2008, 149.86404]

#### Insert Record into Song Table
##### Use `song_table_insert` query in `sql_queries.py` and execute the sql query like below.

In [11]:
cur.execute(song_table_insert, song_data)
conn.commit()

##### Running the test.ipynb, the song_data is now added to song_table

In [12]:
%sql SELECT * FROM songs LIMIT 5;

 * postgresql://student:***@127.0.0.1/sparkifydb
1 rows affected.


song_id,title,artist_id,year,duration
SOINLJW12A8C13314C,City Slickers,AR8IEZO1187B99055E,2008,149.86404


### 2.`artists` Table
- Extract data from `song_data` to `artists` table
- `artists` columns : artist ID, name, location, latitude, and longitude

##### Set artist_data by using get_val() function

In [13]:
artist_data = [get_val('artist_id'), get_val('artist_name'), get_val('artist_location'), 
               get_val('artist_latitude'), get_val('artist_longitude')]

In [14]:
artist_data

['AR8IEZO1187B99055E', 'Marc Shaiman', '', nan, nan]

#### Insert Record into Artist Table
##### Use `artist_table_insert` query in `sql_queries.py` and execute the sql query like below.

In [15]:
cur.execute(artist_table_insert, artist_data)
conn.commit()

##### Running the `test.ipynb`, the `artist_data` is now added to artist_table

In [16]:
%sql SELECT * FROM artists LIMIT 5;

 * postgresql://student:***@127.0.0.1/sparkifydb
1 rows affected.


artist_id,name,location,latitude,longitude
AR8IEZO1187B99055E,Marc Shaiman,,,


## `log_data`
##### * Extract data from log_data by accessing through get_files function.
##### * Should implement ETL on 'time' and 'users' tables.

In [17]:
# get log_data from its directory
log_files = get_files('data/log_data')

In [18]:
# assing sample row from log_data
filepath = log_files[0]

In [19]:
# convert the log_data sample to pd.dataframe
df_log = pd.read_json(filepath, lines=True)

# visualizing
df_log.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Sydney Youngblood,Logged In,Jacob,M,53,Klein,238.07955,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,954,Ain't No Sunshine,200,1543449657796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
1,Gang Starr,Logged In,Layla,F,88,Griffin,151.92771,paid,"Lake Havasu City-Kingman, AZ",PUT,NextSong,1541057000000.0,984,My Advice 2 You (Explicit),200,1543449690796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",24
2,3OH!3,Logged In,Layla,F,89,Griffin,192.522,paid,"Lake Havasu City-Kingman, AZ",PUT,NextSong,1541057000000.0,984,My First Kiss (Feat. Ke$ha) [Album Version],200,1543449841796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",24
3,RÃÂ¶yksopp,Logged In,Jacob,M,54,Klein,369.81506,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,954,The Girl and The Robot,200,1543449895796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
4,Kajagoogoo,Logged In,Layla,F,90,Griffin,223.55546,paid,"Lake Havasu City-Kingman, AZ",PUT,NextSong,1541057000000.0,984,Too Shy,200,1543450033796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",24


In [20]:
# seeing df_log info
df_log.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 366 entries, 0 to 365
Data columns (total 18 columns):
artist           319 non-null object
auth             366 non-null object
firstName        352 non-null object
gender           352 non-null object
itemInSession    366 non-null int64
lastName         352 non-null object
length           319 non-null float64
level            366 non-null object
location         352 non-null object
method           366 non-null object
page             366 non-null object
registration     352 non-null float64
sessionId        366 non-null int64
song             319 non-null object
status           366 non-null int64
ts               366 non-null int64
userAgent        352 non-null object
userId           366 non-null object
dtypes: float64(2), int64(4), object(12)
memory usage: 51.5+ KB


### 3.`time` Table
##### 1. Filter the df_log by `page` = `NextSong` 
##### 2. Extract time from `ts` or timestamp column from df_log. In here, ts is in 'ms' format 
##### 3. columns : timestamp, hour, day, week of year, month, year, and weekday
<br>
</br>

In [21]:
# filtering by query function.
df_log = df_log.query('page == "NextSong"')

# checking for the filtering
if len(df_log.query('page != "NextSong"')) == 0:
    print('Filter Confirmed')

Filter Confirmed


In [22]:
# converting ts column to datetime format
t = pd.to_datetime(df_log.ts, unit='ms')
t.head()

0   2018-11-29 00:00:57.796
1   2018-11-29 00:01:30.796
2   2018-11-29 00:04:01.796
3   2018-11-29 00:04:55.796
4   2018-11-29 00:07:13.796
Name: ts, dtype: datetime64[ns]

In [23]:
# assigning columns with appropriate datetime function
time_data = (t, t.dt.hour, t.dt.day, t.dt.week, t.dt.month, t.dt.year, t.dt.day_name())

# the names of columns
column_labels = ('start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday')

In [24]:
# creating dictionary to zip time_data and column_labels
dict1 = dict(zip(column_labels, time_data))

In [25]:
# converting the dictionary to pd.dataframe
time_df = pd.DataFrame.from_dict(dict1)
time_df.head()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-29 00:00:57.796,0,29,48,11,2018,Thursday
1,2018-11-29 00:01:30.796,0,29,48,11,2018,Thursday
2,2018-11-29 00:04:01.796,0,29,48,11,2018,Thursday
3,2018-11-29 00:04:55.796,0,29,48,11,2018,Thursday
4,2018-11-29 00:07:13.796,0,29,48,11,2018,Thursday


#### Insert Record into time Table
##### Use `time_table_insert` query in `sql_queries.py` and execute the sql query like below.

In [26]:
for i, row in time_df.iterrows():
    cur.execute(time_table_insert, list(row))
    conn.commit()

##### Running the `test.ipynb`, the `artist_data` is now added to artist_table

In [27]:
%sql SELECT * FROM time LIMIT 5;

 * postgresql://student:***@127.0.0.1/sparkifydb
5 rows affected.


start_time,hour,day,week,month,year,weekday
2018-11-29 00:00:57.796000,0,29,48,11,2018,Thursday
2018-11-29 00:01:30.796000,0,29,48,11,2018,Thursday
2018-11-29 00:04:01.796000,0,29,48,11,2018,Thursday
2018-11-29 00:04:55.796000,0,29,48,11,2018,Thursday
2018-11-29 00:07:13.796000,0,29,48,11,2018,Thursday


### 4.`users` Table
##### Columns : user ID, first name, last name, gender and level
##### Create dataframe and set to `user_df`
<br>
</br>

In [28]:
# creating user_df with the columns
user_df = df_log[['userId', 'firstName', 'lastName', 'gender', 'level']]
user_df.head()

Unnamed: 0,userId,firstName,lastName,gender,level
0,73,Jacob,Klein,M,paid
1,24,Layla,Griffin,F,paid
2,24,Layla,Griffin,F,paid
3,73,Jacob,Klein,M,paid
4,24,Layla,Griffin,F,paid


#### Insert Record into users Table
##### Use `user_table_insert` query in `sql_queries.py` and execute the sql query like below.

In [29]:
for i, row in user_df.iterrows():
    cur.execute(user_table_insert, row)
    conn.commit()

In [30]:
%sql SELECT * FROM users LIMIT 5;

 * postgresql://student:***@127.0.0.1/sparkifydb
5 rows affected.


user_id,first_name,last_name,gender,level
73,Jacob,Klein,M,paid
50,Ava,Robinson,F,free
4,Alivia,Terrell,F,free
24,Layla,Griffin,F,paid
54,Kaleb,Cook,M,free


### 5.`songplays` Table
##### * Columns : songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
##### * Will generate songplay_data directly with for loops
<br>
</br>

In [31]:
for index, row in df_log.iterrows():

    # extracting song_id and artist_id thorugh 'song_select' query
    cur.execute(song_select, (row.song, row.artist, row.length))
    
    # assgining None to out-of-index value through fetchone functions
    results = cur.fetchone()
    if results:
        songid, artistid = results
    else:
        songid, artistid = None, None

    # Creating songplay_data
    songplay_data = (index, t[index], row.userId, 
                     row.level, songid, artistid, 
                     row.sessionId, row.location, 
                     row.userAgent )
    
    # Inserting 'songplay_data' to 'songplays' table through 'songplay_table_insert' query
    cur.execute(songplay_table_insert, songplay_data)
    conn.commit()

##### Running the `test.ipynb`, the `songplays` table is now loaded

In [32]:
%sql SELECT * FROM songplays LIMIT 5;

 * postgresql://student:***@127.0.0.1/sparkifydb
5 rows affected.


songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
0,2018-11-29,73,paid,,,954,"Tampa-St. Petersburg-Clearwater, FL","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.78.2 (KHTML, like Gecko) Version/7.0.6 Safari/537.78.2"""
1,2018-11-29,24,paid,,,984,"Lake Havasu City-Kingman, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"""
2,2018-11-29,24,paid,,,984,"Lake Havasu City-Kingman, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"""
3,2018-11-29,73,paid,,,954,"Tampa-St. Petersburg-Clearwater, FL","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.78.2 (KHTML, like Gecko) Version/7.0.6 Safari/537.78.2"""
4,2018-11-29,24,paid,,,984,"Lake Havasu City-Kingman, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"""


# Close Connection to Sparkify Database

In [33]:
conn.close()