In [2]:
import os
import glob
import json
import uuid
from sql_queries import *
from datetime import datetime
import pandas as pd
import mysql.connector

def process_data(folderpath, cur, func):
    all_files = []
    for root, dirs, files in os.walk(folderpath):
        for file in files:
            if file.endswith('.json'):
                file_path = os.path.join(root, file)
                all_files.append(file_path)
    num_files = len(all_files)
    
    for file in all_files:
        func(file, cur)

def process_songfile(file_path, cur):
    df = pd.read_json(file_path, lines=True)
    df.fillna(0, inplace=True)
    artist_data = list(df[['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']].values[0])

    cur.execute(insert_dimartist, artist_data)
    
    song_data = list(df[['song_id','title','artist_id', 'year', 'duration']].values[0])

    cur.execute(insert_dimsong, song_data)       
        
        
def process_logfile(file_path, cur):
    df_log = pd.DataFrame()
    with open(file_path, 'r') as file:
        for line in file:
            record = pd.read_json(line, lines = True)
            df_log = pd.concat([df_log, record], ignore_index = True)

    
    #filter Next songs
    df_nextsong = df_log[df_log['page']=='NextSong']
    
    
    # convert timestamp to datetime
    df_datetime = pd.DataFrame()
    df_datetime['ts'] = pd.to_datetime(df_nextsong['ts'], unit = 'ms')
    df_datetime['day'] = df_datetime['ts'].dt.day
    df_datetime['hour'] = df_datetime['ts'].dt.hour
    df_datetime['week'] = df_datetime['ts'].dt.isocalendar().week
    df_datetime['month'] = df_datetime['ts'].dt.month
    df_datetime['year'] = df_datetime['ts'].dt.year
    df_datetime['weekday'] = df_datetime['ts'].dt.weekday
    
    time_data = df_datetime[['ts','hour', 'day', 'week', 'month', 'year', 'weekday']]
    
    for i, row in time_data.iterrows():
        cur.execute(insert_dimtime, tuple(row))
        
    
    user_data = df_nextsong[['userId', 'firstName', 'lastName', 'gender', 'level']]
    for i, row in user_data.iterrows():
        cur.execute(insert_dimuser, tuple(row))
    
    
    for i, row in df_nextsong.iterrows():
        
        cur.execute(song_select, (row.song, row.artist, row.length))
        results = cur.fetchone()
        
        if results:
            song_id = results[0]
            artist_id = results[1]
        else:
            artist_id = uuid.uuid4().hex[:10].upper()
            
            song_id = uuid.uuid4().hex[:10].upper()
            
            cur.execute(insert_dimartist, (artist_id, row.artist, None, None, None))
            
            cur.execute(insert_dimsong, (song_id, row.song, artist_id, None, row.length))
            
    
            
        songplay_data = (datetime.fromtimestamp(row.ts/1000), artist_id, row.userId, row.sessionId, song_id, row.location,
                                      row.userAgent, row.level)
    
            
        cur.execute(insert_songplay, songplay_data)  
    
def main():
    try:
        with mysql.connector.connect(host='*****', user='****', password='********', database = 'sparkify',
        autocommit=True) as conn:
            with conn.cursor() as cur:
                process_data(folderpath = r'localdrive:\pathto\song_data', cur=cur, func = process_songfile)
                process_data(folderpath = r'localdrive:\path\to\log_data', cur=cur, func = process_logfile)
    except mysql.connector.Error as err:
        print(f"Error: {err}")
    
                

if __name__ == "__main__":
    main()
    