ETL Process

In [None]:
import os
import glob
import psycopg2
import pandas as pd
from queries_sql import songs_table_insert, artists_table_insert, times_table_insert, users_table_insert, songplays_table_insert, songs_select

In [None]:
try: 
    conn = psycopg2.connect("dbname=sparkifydb user=postgres password=admin123")
    print("Connection to the database is successful")
    cur = conn.cursor()
except psycopg2.Error as e:
    print("Error: Could not make connection to the Postgres database")
    print(e)

Get file path:

In [None]:
def getFiles(filePath):
    all_files = []
    # root: đường dẫn thư mục hiện tại
    # dirs: danh sách thư mục con
    # files: danh sách file trong thư mục hiện tại
    for root, dirs, files in os.walk(filePath):
        # print(root, dirs, files)
        files = glob.glob(os.path.join(root, '*.json')) 
        for f in files:
            all_files.append(os.path.abspath(f))
        
    return all_files

Process song_data:

In [None]:
song_df = []

for song_path in getFiles('data/song_data'):
    song_data = pd.read_json(song_path, lines=True)
    song_df.append(song_data)

songs_df = pd.concat(song_df)
songs_df

1. Extract to songs table (convert into list and insert)

In [None]:
try:
    song_df = songs_df[['song_id', 'title', 'artist_id', 'year', 'duration']]
    for i, row in song_df.iterrows():
        print(row)
        cur.execute(songs_table_insert, list(row))
        conn.commit()
    print("Insert data into song table successfully")
except Exception as e:
    print(e)

2. Extract to artists table (convert into list and insert)

In [None]:
try:
    artist_df = songs_df[['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']]
    for i, row in artist_df.iterrows():
        cur.execute(artists_table_insert, list(row))
        conn.commit()
    print("Insert data into artist table successfully")
except Exception as e:
    print(e)

Process log_data:

In [None]:
log_df = []

for log_path in getFiles('data/log_data'):
    log_data = pd.read_json(log_path, lines=True)
    log_df.append(log_data)

logs_df = pd.concat(log_df)
logs_df

3. Extract to times table
    - Filter records by NextSong action
    - Convert the ts timestamp column to datetime
    - Extract the timestamp, hour, day, week of year, month, year, and weekday from the ts column and set time_data to a list containing these values in order
    - Specify labels for these columns and set to column_labels
    - Create a dataframe, time_df, containing the time data for this file by combining column_labels and time_data into a dictionary and converting this into a dataframe

In [None]:
time_df = logs_df[logs_df['page'] == 'NextSong']
t = pd.to_datetime(time_df['ts'])

In [None]:
time_data = [(tt.value, tt.hour, tt.day, tt.week, tt.month, tt.year, tt.weekday()) for tt in t]
column_labels = ('timestamp', 'hour', 'day', 'week', 'month', 'year', 'weekday')

Take time values from files and convert into dataframe

In [None]:
time_df = pd.DataFrame(time_data, columns=column_labels)
time_df

Insert Records into times table

In [None]:
try:
    for i, row in time_df.iterrows():
        cur.execute(times_table_insert, list(row))
        conn.commit()
    print("Insert data into time table successfully")
except Exception as e:
    print(e)

4. Extract to users table

In [None]:
try: 
    user_df = logs_df[['userId', 'firstName', 'lastName', 'gender', 'level']]
    for i, row in user_df.iterrows():
        print(list(row))
        cur.execute(users_table_insert, list(row))
        conn.commit()
    print("Insert data into user table successfully")
except Exception as e:
    print(e)

5. Extract songplays table

In [None]:
try:
    
    for index, row in logs_df.iterrows():
        # Tham so truyen vao cau lenh sql
        cur.execute(songs_select, (row.song, row.artist, row.length))
        results = cur.fetchone()    
        
        if results:
            songid, artistid = results
        else:
            songid, artistid = None, None

        # insert songplay record
        songplay_data = (index, row['ts'], row['userId'], row['level'], songid, artistid, row['sessionId'],
                     row['location'], row['userAgent'])
        print(song_data)
        cur.execute(songplays_table_insert, songplay_data)
        conn.commit()
except Exception as e:
    print(e)

In [None]:
conn.close()