# ETL Processes
this notebook simulate the ETL process for every table
it consists of the following steps
- create database and tables
- load data from JSON files into DataFrame
- prepare data
- insert data into PostgreSQL Database

### you must create tables first by running creat_tables.py first

In [1]:
%run create_tables.py

##### import needed libiraries

In [2]:
import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *
from app_config_reader import get_database_configuration

#### function to return all files to be loaded in a given directory

In [3]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

### get Configuration from configuration file that contains database credential and file paths

In [4]:
configurations = get_database_configuration()

In [5]:
configurations

{'host': 'localhost',
 'defaultdbname': 'postgres',
 'dbname': 'sparkifydb',
 'username': 'postgres',
 'password': 'sayed',
 'songs_path': 'data/song_data',
 'log_path': 'data/log_data'}

# Process `song_data`
In this first part, we will perform ETL on the first dataset, `song_data`, to create the `songs` and `artists` dimensional tables.

Let's perform ETL on a all song files and insert data into database.
- Use the `get_files` function provided above to get a list of all song JSON files in `data/song_data`
- load data from files into DataFrame 
- extract Song Data and Artist Data from DataFrame and insert them into Database

In [6]:
song_files = get_files(configurations['songs_path'])

In [7]:
song_files[0:3]

['C:\\Users\\SayedAbdallah\\Downloads\\Nano\\Data-Modeling-with-Postgres\\data\\song_data\\A\\A\\A\\TRAAAAW128F429D538.json',
 'C:\\Users\\SayedAbdallah\\Downloads\\Nano\\Data-Modeling-with-Postgres\\data\\song_data\\A\\A\\A\\TRAAABD128F429CF47.json',
 'C:\\Users\\SayedAbdallah\\Downloads\\Nano\\Data-Modeling-with-Postgres\\data\\song_data\\A\\A\\A\\TRAAADZ128F9348C2E.json']

In [8]:
# return Series not DataFrame so we use list comprehension to create list of Series then pass this list to pd.DataFrame
pd.read_json(path_or_buf=song_files[0], typ='series', dtype=False)

num_songs                            1
artist_id           ARD7TVE1187B99BFB1
artist_latitude                   None
artist_longitude                  None
artist_location        California - LA
artist_name                     Casual
song_id             SOMZWCG12A8C13C480
title                 I Didn't Mean To
duration                       218.932
year                                 0
dtype: object

In [9]:
df = pd.DataFrame([pd.read_json(path_or_buf=f, typ='series', dtype=False) for f in song_files])

In [10]:
df.head(5)

Unnamed: 0,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,1,ARD7TVE1187B99BFB1,,,California - LA,Casual,SOMZWCG12A8C13C480,I Didn't Mean To,218.93179,0
1,1,ARMJAGH1187FB546F3,35.14968,-90.04892,"Memphis, TN",The Box Tops,SOCIWDW12A8C13D406,Soul Deep,148.03546,1969
2,1,ARKRRTF1187B9984DA,,,,Sonora Santanera,SOXVLOJ12AB0189215,Amor De Cabaret,177.47546,0
3,1,AR7G5I41187FB4CE6C,,,"London, England",Adam Ant,SONHOTT12A8C13493C,Something Girls,233.40363,1982
4,1,ARXR32B1187FB57099,,,,Gob,SOFSOCN12A8C143F5D,Face the Ashes,209.60608,2007


## #1: `songs` Table
#### Extract Data for Songs Table
- Select columns for song ID, title, artist ID, year, and duration
- convert DataFrame to list and insert data in the list using `executemany()` method of cursor object

In [11]:
songs_df = df[['song_id', 'title', 'artist_id', 'year', 'duration']]
songs_df.head()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOMZWCG12A8C13C480,I Didn't Mean To,ARD7TVE1187B99BFB1,0,218.93179
1,SOCIWDW12A8C13D406,Soul Deep,ARMJAGH1187FB546F3,1969,148.03546
2,SOXVLOJ12AB0189215,Amor De Cabaret,ARKRRTF1187B9984DA,0,177.47546
3,SONHOTT12A8C13493C,Something Girls,AR7G5I41187FB4CE6C,1982,233.40363
4,SOFSOCN12A8C143F5D,Face the Ashes,ARXR32B1187FB57099,2007,209.60608


# connect to Database

In [56]:
conn = psycopg2.connect(f"host={configurations['host']} dbname={configurations['dbname']} user={configurations['username']} password={configurations['password']}")
cur = conn.cursor()

In [13]:
songs_data = songs_df.values.tolist()

In [14]:
songs_data[0:3]

[['SOMZWCG12A8C13C480',
  "I Didn't Mean To",
  'ARD7TVE1187B99BFB1',
  0,
  218.93179],
 ['SOCIWDW12A8C13D406', 'Soul Deep', 'ARMJAGH1187FB546F3', 1969, 148.03546],
 ['SOXVLOJ12AB0189215', 'Amor De Cabaret', 'ARKRRTF1187B9984DA', 0, 177.47546]]

In [15]:
print(SONG_TABLE_INSERT)


INSERT INTO songs (song_id, title, artist_id, year, duration) VALUES(%s, %s, %s, %s, %s) ON CONFLICT (song_id) DO NOTHING;



In [16]:
cur.executemany(SONG_TABLE_INSERT, songs_data)

In [17]:
conn.commit()

## #2: `artists` Table
#### Extract Data for Artists Table
- Select columns for artist ID, name, location, latitude, and longitude
- convert DataFrame to list and insert data in the list using executemany method of cursor object

In [18]:
artists_df = df[['artist_id', 'artist_name', 'artist_location', 'artist_longitude', 'artist_latitude']]
artists_df.head(2)

Unnamed: 0,artist_id,artist_name,artist_location,artist_longitude,artist_latitude
0,ARD7TVE1187B99BFB1,Casual,California - LA,,
1,ARMJAGH1187FB546F3,The Box Tops,"Memphis, TN",-90.04892,35.14968


In [19]:
artists_data = artists_df.values.tolist()
artists_data[0:3]

[['ARD7TVE1187B99BFB1', 'Casual', 'California - LA', nan, nan],
 ['ARMJAGH1187FB546F3', 'The Box Tops', 'Memphis, TN', -90.04892, 35.14968],
 ['ARKRRTF1187B9984DA', 'Sonora Santanera', '', nan, nan]]

In [20]:
print(ARTIST_TABLE_INSERT)


INSERT INTO artists (artist_id, name, location, latitude, longitude) VALUES(%s, %s, %s, %s, %s) ON CONFLICT (artist_id) DO NOTHING;



In [21]:
cur.executemany(ARTIST_TABLE_INSERT, artists_data)

In [22]:
conn.commit()

# Process `log_data`
In this part, we'll perform ETL on the second dataset, `log_data`, to create the `time` and `users` dimensional tables, as well as the `songplays` fact table.

In [23]:
log_files = get_files(configurations['log_path'])

In [24]:
log_files[0:3]

['C:\\Users\\SayedAbdallah\\Downloads\\Nano\\Data-Modeling-with-Postgres\\data\\log_data\\2018\\11\\2018-11-01-events.json',
 'C:\\Users\\SayedAbdallah\\Downloads\\Nano\\Data-Modeling-with-Postgres\\data\\log_data\\2018\\11\\2018-11-02-events.json',
 'C:\\Users\\SayedAbdallah\\Downloads\\Nano\\Data-Modeling-with-Postgres\\data\\log_data\\2018\\11\\2018-11-03-events.json']

#### each file represents DataFrame so we use pd.concat method to concate all DataFrames using list comprehension

In [25]:
logs_df = pd.concat([pd.read_json(f, lines=True) for f in log_files])

In [26]:
logs_df.head(3)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,,Logged In,Walter,M,0,Frye,,free,"San Francisco-Oakland-Hayward, CA",GET,Home,1540919000000.0,38,,200,1541105830796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",39
1,,Logged In,Kaylee,F,0,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Home,1540345000000.0,139,,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540345000000.0,139,You Gotta Be,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


## #3: `time` Table
#### Extract Data for Time Table
- Filter records by `NextSong` action
- Convert the `ts` timestamp column to datetime
  - Hint: the current timestamp is in milliseconds
- Extract the timestamp, hour, day, week of year, month, year, and weekday from the `ts` column and set `time_data` to a list containing these values in order
  - Hint: use pandas' [`dt` attribute](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.dt.html) to access easily datetimelike properties.

In [27]:
logs_df = logs_df[logs_df['page'] == 'NextSong']

In [28]:
time_df = pd.DataFrame(logs_df['ts'], copy=True)
time_df['timestamp'] = pd.to_datetime(time_df['ts'], unit='ms')

time_df = time_df.assign(hour       = time_df['timestamp'].dt.hour, 
                         day        = time_df['timestamp'].dt.day,
                         weekofyear = time_df['timestamp'].dt.weekofyear,
                         month      = time_df['timestamp'].dt.month,
                         year       = time_df['timestamp'].dt.year,
                         weekday    = time_df['timestamp'].dt.weekday
                        )

time_df = time_df.drop(columns='timestamp')
time_df.head()

Unnamed: 0,ts,hour,day,weekofyear,month,year,weekday
2,1541106106796,21,1,44,11,2018,3
4,1541106352796,21,1,44,11,2018,3
5,1541106496796,21,1,44,11,2018,3
6,1541106673796,21,1,44,11,2018,3
7,1541107053796,21,1,44,11,2018,3


In [29]:
print(TIME_TABLE_INSERT)


INSERT INTO time(start_time, hour, day, week, month, year, weekday) VALUES(%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (start_time) DO NOTHING;



### convert time_df to list and insert it into database

In [30]:
time_data = time_df.values.tolist()

In [31]:
time_data[0:3]

[[1541106106796, 21, 1, 44, 11, 2018, 3],
 [1541106352796, 21, 1, 44, 11, 2018, 3],
 [1541106496796, 21, 1, 44, 11, 2018, 3]]

In [32]:
cur.executemany(TIME_TABLE_INSERT, time_data)

In [33]:
conn.commit()

## #4: `users` Table
#### Extract Data for Users Table
- Select columns for user ID, first name, last name, gender and level and set to `user_df`
- convert DataFrame to list and insert list into database

In [34]:
users_df = logs_df[['userId', 'firstName', 'lastName', 'gender', 'level']]
users_df.head()

Unnamed: 0,userId,firstName,lastName,gender,level
2,8,Kaylee,Summers,F,free
4,8,Kaylee,Summers,F,free
5,8,Kaylee,Summers,F,free
6,8,Kaylee,Summers,F,free
7,8,Kaylee,Summers,F,free


### drop full row duplicate of users DataFrame

In [35]:
len(users_df)

6820

In [36]:
len(users_df.drop_duplicates())

137

In [37]:
users_df = users_df.drop_duplicates()

In [38]:
len(users_df)

137

In [39]:
print(USER_TABLE_INSERT)


INSERT INTO users(user_id, first_name, last_name, gender, level) VALUES(%s, %s, %s, %s, %s) ON CONFLICT (user_id) DO UPDATE SET level = excluded.level;



In [40]:
users_data = users_df.values.tolist()
users_data[0:2]

[[8, 'Kaylee', 'Summers', 'F', 'free'], [10, 'Sylvie', 'Cruz', 'F', 'free']]

In [41]:
cur.executemany(USER_TABLE_INSERT, users_data)

In [42]:
conn.commit()

## #5: `songplays` Table
#### Extract Data and Songplays Table
This one is a little more complicated since information from the songs table, artists table, and original log file are all needed for the `songplays` table. Since the log file does not specify an ID for either the song or the artist, you'll need to get the song ID and artist ID by inserting first into temp table then join 3 tables together to get final data

In [43]:
logs_df.head(2)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540345000000.0,139,You Gotta Be,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
4,Mr Oizo,Logged In,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540345000000.0,139,Flat 55,200,1541106352796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


In [44]:
print(SONGPLAY_TEMP_TABLE_INSERT)


INSERT INTO songplays_temp(ts, user_id, level, song_title, song_length, artist_name, session_id, location, user_agent) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s);



In [45]:
temp_df = logs_df[['ts', 'userId','level', 'song', 'length', 'artist', 'sessionId', 'location', 'userAgent']]

In [46]:
temp_df.head(3)

Unnamed: 0,ts,userId,level,song,length,artist,sessionId,location,userAgent
2,1541106106796,8,free,You Gotta Be,246.30812,Des'ree,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
4,1541106352796,8,free,Flat 55,144.03873,Mr Oizo,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
5,1541106496796,8,free,Quem Quiser Encontrar O Amor,177.18812,Tamba Trio,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."


### convert temp DataFrame into list and insert the list into temp table

In [47]:
temp_data = temp_df.values.tolist()
temp_data[0]

[1541106106796,
 8,
 'free',
 'You Gotta Be',
 246.30812,
 "Des'ree",
 139,
 'Phoenix-Mesa-Scottsdale, AZ',
 '"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36"']

In [48]:
print(SONGPLAY_TEMP_TABLE_INSERT)


INSERT INTO songplays_temp(ts, user_id, level, song_title, song_length, artist_name, session_id, location, user_agent) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s);



In [49]:
cur.executemany(SONGPLAY_TEMP_TABLE_INSERT, temp_data)

In [50]:
conn.commit()

### run SONGPLAY_TABLE_INSERT to insert into SONGPLAY TABLE

In [51]:
cur.execute(SONGPLAY_TABLE_INSERT)

In [58]:
conn.commit()

#### drop Temp Table

In [57]:
cur.execute(SONGPLAY_TEMP_TABLE_DROP)

# Close Connection to Sparkify Database

In [59]:
conn.close()