# ETL Processes
Use this notebook to develop the ETL process for each of your tables before completing the `etl.py` file to load the whole datasets.

In [1]:
import os
import pandas as pd
import json
import psycopg2
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')

In [2]:
def get_paths(path:str)->list:
    '''
    Recursively scans a directory and returns absolute path of 
    json files
    
    Input(s)
    path(str): path to directory which has to be scanned
    Return
    paths(list): list of paths to json data files
    '''
    paths = []
    for roots,dirs,files in os.walk(path):
        for file in files:
            if file.endswith(".json"):
                paths.append(os.path.join(roots,file))
    return paths

# Process `song_data`
In this first part, you'll perform ETL on the first dataset, `song_data`, to create the `songs` and `artists` dimensional tables.

Let's perform ETL on a single song file and load a single record into each table to start.
- Use the `get_files` function provided above to get a list of all song JSON files in `data/song_data`
- Select the first song in this list
- Read the song file and view the data

In [3]:
path_songs = "./data/song_data"
paths_songs = get_paths(path_songs)
def read_json(path):
    with open(path,'r') as f:
        data = json.loads(f.read())
    return data

In [4]:
### SQL Connection and Database Creation Queries
#### Create Database and Tables ####

def create_conn(user_name,passwd,db_name='postgres'):
    conn = psycopg2.connect(f"dbname= {db_name} user={user_name} password={passwd}")
    conn.set_session(autocommit=True)
    return conn

def drop_if_db_exists(conn,db_name):
    cur = conn.cursor()
    try:
        logging.info(f"Checking if {db_name} exists already")
        cur.execute(f"DROP DATABASE IF EXISTS {db_name}")
    except Exception as  e:
        logging.error(f"Could not drop database {db_name}, error {e}")
    cur.close()
    
    
def createdb(conn,db_name):
    cur = conn.cursor()
    try:
        logging.info(f"Creating database {db_name}")
        cur.execute(f"CREATE DATABASE {db_name} WITH ENCODING 'utf8' TEMPLATE template0")
    except Exception as e:
        logging.error(f"Could not create database {db_name}, error {e}")
    cur.close

def execute_query(conn,query):
    logging.info(f"Executing query {query}")
    cur = conn.cursor()
    try:
        cur.execute(query)
    except:
        logging.error(f"Query: {query} failed")
        cur.rollback()
    cur.close()

In [5]:
### Queries:- Song Data
song_table_create = """
CREATE TABLE IF NOT EXISTS SONGS (
song_id VARCHAR(100) PRIMARY KEY,
title VARCHAR(100) NOT NULL,
artist_id VARCHAR(100) NOT NUll,
year INTEGER NOT NULL,
duration FLOAT NOT NULL
);
"""

artist_table_create = """
CREATE TABLE IF NOT EXISTS ARTISTS (
artist_id VARCHAR(100) PRIMARY KEY,
name VARCHAR(100) NOT NULL,
location VARCHAR(100),
latitude double precision,
longitude double precision
);
"""

song_table_insert = """
INSERT INTO songs (song_id,title,artist_id,year,duration) VALUES (%s,%s,%s,%s,%s)
ON CONFLICT DO NOTHING;
"""

artist_table_insert = """
INSERT INTO artists (artist_id,name,location,latitude,longitude) VALUES (%s,%s,%s,%s,%s)
ON CONFLICT DO NOTHING;
"""

def drop_table_query(tablename):
    q = f'''
    DROP TABLE IF EXISTS {tablename};
    '''
    return q


In [6]:
def process_song_files(path,conn):
    curr = conn.cursor()
    data = read_json(path)
    #logging.info("Inserting data into Artist table")
    data_artist = [data.get('artist_id'),data.get('artist_name'),
                                     data.get('artist_location'),
                                     data.get('artist_latitude'),
                                     data.get('artist_longitude')]
    curr.execute(artist_table_insert,data_artist)
    data_song = [data.get('song_id'),
                                    data.get('title'),
                                    data.get('artist_id'),
                                    data.get('year'),
                                    data.get('duration')]
    curr.execute(song_table_insert,data_song)
    curr.close()

In [7]:
### Create Database ###
conn = create_conn(user_name="postgres",passwd="Gun125")
drop_if_db_exists(conn,'sparkify')
createdb(conn,'sparkify')
conn.close()

2022-11-21 11:16:32,516 - INFO - Checking if sparkify exists already
2022-11-21 11:16:32,728 - INFO - Creating database sparkify


In [8]:
### Create Songs and Artists Tables ###
conn = create_conn(user_name="postgres",passwd="Gun125",db_name="sparkify")
execute_query(conn,song_table_create)
execute_query(conn,artist_table_create)
conn.close()

2022-11-21 11:16:32,840 - INFO - Executing query 
CREATE TABLE IF NOT EXISTS SONGS (
song_id VARCHAR(100) PRIMARY KEY,
title VARCHAR(100) NOT NULL,
artist_id VARCHAR(100) NOT NUll,
year INTEGER NOT NULL,
duration FLOAT NOT NULL
);

2022-11-21 11:16:32,845 - INFO - Executing query 
CREATE TABLE IF NOT EXISTS ARTISTS (
artist_id VARCHAR(100) PRIMARY KEY,
name VARCHAR(100) NOT NULL,
location VARCHAR(100),
latitude double precision,
longitude double precision
);



In [9]:
## Insert data into Songs and artists table ##
conn = create_conn(user_name="postgres",passwd="Gun125",db_name="sparkify")
for path in paths_songs:
    process_song_files(path,conn)
conn.close()

# Process `log_data`
In this part, you'll perform ETL on the second dataset, `log_data`, to create the `time` and `users` dimensional tables, as well as the `songplays` fact table.

Let's perform ETL on a single log file and load a single record into each table.
- Use the `get_paths` function provided above to get a list of all log JSON files in `data/log_data`
- Select the first log file in this list
- Read the log file and view the data

In [10]:
### SQL Queries log-data ###
songplay_table_create = ("""
CREATE TABLE IF NOT EXISTS songplays (
songplay_id SERIAL PRIMARY KEY,
start_time TIMESTAMP NOT NULL, 
user_id INTEGER NOT NULL, 
level VARCHAR(50), 
song_id VARCHAR(50), 
artist_id VARCHAR(50), 
session_id INTEGER, 
location VARCHAR(50), 
user_agent VARCHAR(150)
);
""")

user_table_create = ("""
CREATE TABLE IF NOT EXISTS USERS (
user_id INTEGER PRIMARY KEY,
first_name VARCHAR(100),
last_name VARCHAR(100),
gender VARCHAR(50),
level VARCHAR(50)
);
""")

time_table_create = ("""
CREATE TABLE IF NOT EXISTS time (
start_time TIMESTAMP PRIMARY KEY,
hour INTEGER,
day INTEGER,
week INTEGER,
month INTEGER,
year INTEGER,
weekday VARCHAR
);
""")

songplay_table_insert = ("""
INSERT INTO songplays (songplay_id,start_time,user_id,level,
                      song_id,artist_id,session_id,location,user_agent)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT(songplay_id) DO NOTHING;
""")
user_table_insert = """
INSERT INTO USERS (user_id,first_name,last_name,gender,level)
VALUES (%s,%s,%s,%s,%s)
ON CONFLICT DO NOTHING;
"""
time_table_insert = """
INSERT INTO TIME (start_time,hour,day,week,month,year,weekday)
VALUES (%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT DO NOTHING;
"""

song_select = """
select songs.song_id, songs.artist_id from songs
join artists on artists.artist_id = songs.artist_id
where artists.name = %s and
songs.title = %s and
songs.duration = %s;
"""


In [11]:
### Process Log data ###
path_logs = "./data/log_data"
paths_logs = get_paths(path_logs)
log_data = []
for path in paths_logs:
    with open(path,'r') as f:
        log_data.extend(json.loads(x) for x in f.readlines())

In [12]:
def validate_whitespace(val):
    if val=='':
        val = 'Null'
    return val
def get_user_info(data):
    info = (validate_whitespace(data.get('userId','Null')), 
            validate_whitespace(data.get('firstName','Null')), 
            validate_whitespace(data.get('lastName','Null')), 
            validate_whitespace(data.get('gender','Null')), 
            validate_whitespace(data.get('level','Null')))
    return info

def get_time_info(data):
    weekday_dict = {'0':'Sunday',
               '1': 'Monday',
               '2': 'Tuesday',
               '3':'Wednesday',
               '4':'Thursday',
               '5': 'Friday',
               '6':'Saturday'}
    ts = data.get('ts')
    ts = ts/1000
    ts = datetime.fromtimestamp(ts)
    hour = int(ts.strftime("%H"))
    day = int(ts.strftime("%d"))
    week = int(ts.strftime("%V"))
    month = int(ts.strftime("%m"))
    year = int(ts.strftime("%Y"))
    weekday = weekday_dict[ts.strftime("%w")]
    time_stamp = ts.strftime("%X")
    start_time = f"{year}-{month}-{day} {time_stamp} zulu"
    info = (start_time, 
            hour, 
            day, 
            week, 
            month, 
            year, 
            weekday)
    return info

In [13]:
def process_log_data(conn,log_data):
    curr = conn.cursor()
    user_data = []
    time_data = []
    execute_query(conn,songplay_table_create)
    execute_query(conn,user_table_create)
    execute_query(conn,time_table_create)
    for index,row in enumerate(log_data):
        if row.get('page')=='NextSong':
            info_user = get_user_info(row)
            info_time = get_time_info(row)
            user_data.append(info_user)
            time_data.append(info_time)
            name = row.get('artist')
            title = row.get('song')
            duration = row.get('length')
            curr.execute(song_select,(name,title,duration))
            result = curr.fetchone()
            if result:
                song_id,artist_id = result
            else:
                song_id,artist_id = None, None
            vals = [index, info_time[0],info_user[0],
                    info_user[-1],song_id,artist_id,row.get('sessionId'),
                    row.get('location'),row.get('userAgent')]
            curr.execute(songplay_table_insert,vals)
            curr.execute(user_table_insert,get_user_info(row))
            curr.execute(time_table_insert,get_time_info(row))
    curr.close()
    

In [14]:
conn =  create_conn(user_name="postgres",passwd="Gun125",db_name="sparkify")
process_log_data(conn,log_data)
conn.close()

2022-11-21 11:16:38,494 - INFO - Executing query 
CREATE TABLE IF NOT EXISTS songplays (
songplay_id SERIAL PRIMARY KEY,
start_time TIMESTAMP NOT NULL, 
user_id INTEGER NOT NULL, 
level VARCHAR(50), 
song_id VARCHAR(50), 
artist_id VARCHAR(50), 
session_id INTEGER, 
location VARCHAR(50), 
user_agent VARCHAR(150)
);

2022-11-21 11:16:38,501 - INFO - Executing query 
CREATE TABLE IF NOT EXISTS USERS (
user_id INTEGER PRIMARY KEY,
first_name VARCHAR(100),
last_name VARCHAR(100),
gender VARCHAR(50),
level VARCHAR(50)
);

2022-11-21 11:16:38,504 - INFO - Executing query 
CREATE TABLE IF NOT EXISTS time (
start_time TIMESTAMP PRIMARY KEY,
hour INTEGER,
day INTEGER,
week INTEGER,
month INTEGER,
year INTEGER,
weekday VARCHAR
);

