# ETL Processes

In [69]:
import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *

In [70]:
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=postgres password=test123")
cur = conn.cursor()

In [3]:
def get_files(file_path):
    all_files = []
    for root, dirs, files in os.walk(file_path):
        files = glob.glob(os.path.join(root, '*.json'))
        for f in files:
            all_files.append(os.path.abspath(f))
    return all_files

# Process song data

In [4]:
song_files = get_files('data/song_data')
print(song_files)

['/home/winner/Documents/Data Engineer/project/DataModeling_with_Postgres/data/song_data/A/A/A/TRAAAMQ128F1460CD3.json', '/home/winner/Documents/Data Engineer/project/DataModeling_with_Postgres/data/song_data/A/A/A/TRAAADZ128F9348C2E.json', '/home/winner/Documents/Data Engineer/project/DataModeling_with_Postgres/data/song_data/A/A/A/TRAAAAW128F429D538.json', '/home/winner/Documents/Data Engineer/project/DataModeling_with_Postgres/data/song_data/A/A/A/TRAAAVG12903CFA543.json', '/home/winner/Documents/Data Engineer/project/DataModeling_with_Postgres/data/song_data/A/A/A/TRAAAPK128E0786D96.json', '/home/winner/Documents/Data Engineer/project/DataModeling_with_Postgres/data/song_data/A/A/A/TRAAABD128F429CF47.json', '/home/winner/Documents/Data Engineer/project/DataModeling_with_Postgres/data/song_data/A/A/A/TRAAAMO128F1481E7F.json', '/home/winner/Documents/Data Engineer/project/DataModeling_with_Postgres/data/song_data/A/A/A/TRAAAEF128F4273421.json', '/home/winner/Documents/Data Engineer/p

In [5]:
file_path = song_files[0]
print(file_path)

/home/winner/Documents/Data Engineer/project/DataModeling_with_Postgres/data/song_data/A/A/A/TRAAAMQ128F1460CD3.json


In [6]:
df = pd.DataFrame([pd.read_json(file_path, typ='series', convert_dates=False)])
df.head()

Unnamed: 0,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,1,ARD0S291187B9B7BF5,,,Ohio,Rated R,SOMJBYD12A6D4F8557,Keepin It Real (Skit),114.78159,0


## 1: artists table

In [7]:
artist_data = df[['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']]
artist_data = artist_data.values.tolist()[0]
artist_data

['ARD0S291187B9B7BF5', 'Rated R', 'Ohio', None, None]

In [8]:
cur.execute(artist_table_insert, artist_data)
conn.commit()

## 2: songs table

In [9]:
song_data = df[['num_songs','title', 'artist_id', 'year', 'duration']]
song_data = song_data.values.tolist()[0]
song_data

[1, 'Keepin It Real (Skit)', 'ARD0S291187B9B7BF5', 0, 114.78159]

In [10]:
cur.execute(song_table_insert, song_data)
conn.commit()

# Process log data

In [11]:
logs_file = get_files('data/log_data')
print(logs_file)

['/home/winner/Documents/Data Engineer/project/DataModeling_with_Postgres/data/log_data/2018/11/2018-11-02-events.json', '/home/winner/Documents/Data Engineer/project/DataModeling_with_Postgres/data/log_data/2018/11/2018-11-20-events.json', '/home/winner/Documents/Data Engineer/project/DataModeling_with_Postgres/data/log_data/2018/11/2018-11-08-events.json', '/home/winner/Documents/Data Engineer/project/DataModeling_with_Postgres/data/log_data/2018/11/2018-11-01-events.json', '/home/winner/Documents/Data Engineer/project/DataModeling_with_Postgres/data/log_data/2018/11/2018-11-24-events.json', '/home/winner/Documents/Data Engineer/project/DataModeling_with_Postgres/data/log_data/2018/11/2018-11-21-events.json', '/home/winner/Documents/Data Engineer/project/DataModeling_with_Postgres/data/log_data/2018/11/2018-11-26-events.json', '/home/winner/Documents/Data Engineer/project/DataModeling_with_Postgres/data/log_data/2018/11/2018-11-14-events.json', '/home/winner/Documents/Data Engineer/p

In [12]:
file_path = logs_file[0]
print(file_path)

/home/winner/Documents/Data Engineer/project/DataModeling_with_Postgres/data/log_data/2018/11/2018-11-02-events.json


In [33]:
df = pd.read_json(file_path, lines=True)
df = df[df['page']=='NextSong']

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,N.E.R.D. FEATURING MALICE,Logged In,Jayden,M,0,Fox,288.9922,free,"New Orleans-Metairie, LA",PUT,NextSong,1541033612796,184,Am I High (Feat. Malice),200,1541121934796,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",101
2,Death Cab for Cutie,Logged In,Stefany,F,1,White,216.42404,free,"Lubbock, TX",PUT,NextSong,1540708070796,82,A Lack Of Color (Album Version),200,1541122241796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",83
3,Tracy Gang Pussy,Logged In,Stefany,F,2,White,221.33506,free,"Lubbock, TX",PUT,NextSong,1540708070796,82,I Have A Wish,200,1541122457796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",83
4,Skillet,Logged In,Kevin,M,0,Arellano,178.02404,free,"Harrisburg-Carlisle, PA",PUT,NextSong,1540006905796,153,Monster (Album Version),200,1541126568796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66
5,Dance Gavin Dance,Logged In,Marina,F,0,Sutton,218.46159,free,"Salinas, CA",PUT,NextSong,1541064343796,47,Uneasy Hearts Weigh The Most,200,1541127957796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",48


In [36]:
import datetime
#time = datetime.datetime.fromtimestamp(1541033612796/1000.0)
t = pd.to_datetime(df['ts'], unit='ms')
t.head()

0   2018-11-02 01:25:34.796
1   2018-11-02 01:29:36.796
2   2018-11-02 01:30:41.796
3   2018-11-02 01:34:17.796
4   2018-11-02 02:42:48.796
Name: ts, dtype: datetime64[ns]

In [50]:
time_data = []
for data in t: 
    time_data.append([data, data.hour, data.day, data.weekofyear, data.month, data.year, data.day_name()])
column_labels = ["timestamp", "hour", "day", "week_of_year", "month", "year", "weekday"]

In [52]:
time_df = pd.DataFrame.from_records(data=time_data, columns=column_labels)
time_df.head()

Unnamed: 0,timestamp,hour,day,week_of_year,month,year,weekday
0,2018-11-02 01:25:34.796,1,2,44,11,2018,Friday
1,2018-11-02 01:29:36.796,1,2,44,11,2018,Friday
2,2018-11-02 01:30:41.796,1,2,44,11,2018,Friday
3,2018-11-02 01:34:17.796,1,2,44,11,2018,Friday
4,2018-11-02 02:42:48.796,2,2,44,11,2018,Friday


In [53]:
for i, row in time_df.iterrows():
    cur.execute(time_table_insert, list(row))
    conn.commit()

## 3: users table

In [54]:
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,N.E.R.D. FEATURING MALICE,Logged In,Jayden,M,0,Fox,288.9922,free,"New Orleans-Metairie, LA",PUT,NextSong,1541033612796,184,Am I High (Feat. Malice),200,1541121934796,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",101
1,,Logged In,Stefany,F,0,White,,free,"Lubbock, TX",GET,Home,1540708070796,82,,200,1541122176796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",83
2,Death Cab for Cutie,Logged In,Stefany,F,1,White,216.42404,free,"Lubbock, TX",PUT,NextSong,1540708070796,82,A Lack Of Color (Album Version),200,1541122241796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",83
3,Tracy Gang Pussy,Logged In,Stefany,F,2,White,221.33506,free,"Lubbock, TX",PUT,NextSong,1540708070796,82,I Have A Wish,200,1541122457796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",83
4,Skillet,Logged In,Kevin,M,0,Arellano,178.02404,free,"Harrisburg-Carlisle, PA",PUT,NextSong,1540006905796,153,Monster (Album Version),200,1541126568796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66


In [59]:
user_df = df[['userId', 'firstName', 'lastName', 'gender', 'level']]
user_df.head()


Unnamed: 0,userId,firstName,lastName,gender,level
0,101,Jayden,Fox,M,free
1,83,Stefany,White,F,free
2,83,Stefany,White,F,free
3,83,Stefany,White,F,free
4,66,Kevin,Arellano,M,free


In [62]:
for i, row in user_df.iterrows():
    cur.execute(user_table_insert, list(row))
    conn.commit()

## songplay

In [None]:
df = df[df['page'] == "NextSong"].astype({'ts': 'datetime64[ms]'})

# convert timestamp column to datetime
t = pd.Series(df['ts'], index=df.index)
for i, row in df.iterrows():
    cur.execute(song_select, (row.song, row.artist, row.length))
    results = cur.fetchone()

    if results:
        song_id, artist_id = results
    else:
        song_id, artist_id = None, None
    
    songplay_data = (row.ts, row.userId, row.level, song_id, artist_id, row.sessionId, row.location, row.userAgent)
    cur.execute(songplay_table_insert, songplay_data)

In [68]:
conn.close()