In [None]:
import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *

In [None]:
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=student password=student")
cur = conn.cursor()

In [None]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

In [None]:
#process_data(cur, conn, filepath='data/song_data', func=process_song_file)
#process_data(cur, conn, filepath='data/log_data', func=process_log_file)

In [None]:
song_files = get_files('data/song_data')
#song_files

In [None]:
filepath = song_files[0]

In [None]:
import io
import numpy as np
def process_song_file(song_files):
    all_song_files = []
    for song_f in song_files:
        df = pd.read_json(song_f, lines=True)
        all_song_files.append(df)
    song_files_concat = pd.concat(all_song_files).drop_duplicates('song_id').drop_duplicates('artist_id').replace(np.nan, 0)
#    song_files_concat['artist_latitude'] = song_files_concat['artist_latitude'].apply(lambda x: None if np.isnan(x) else x)
#    song_files_concat['artist_longitude'] = song_files_concat['artist_longitude'].apply(lambda x: None if np.isnan(x) else x)
    return song_files_concat

def songs_table(song_files_list, cur):
    song_files_fields = song_files_list[['song_id', 'title', 'artist_id', 'year', 'duration']]
#    print(song_files_fields)
    song_files_csv = song_files_fields.to_csv(header=False, sep='|', index=False)
#    print(song_files_csv)
    csv_file_like_object = io.StringIO()
    csv_file_like_object.write(song_files_csv)
    csv_file_like_object.seek(0)
    cur.copy_from(csv_file_like_object, 'songs', sep='|')
    conn.commit()

def artists_table(song_files_list, cur):
    song_files_fields = song_files_list[['artist_id','artist_name','artist_location','artist_latitude','artist_longitude']]
#    print(song_files_fields)
    song_files_csv = song_files_fields.to_csv(header=False, sep='|', index=False)
#    print(song_files_csv)
    csv_file_like_object = io.StringIO()
    csv_file_like_object.write(song_files_csv)
    csv_file_like_object.seek(0)
    cur.copy_from(csv_file_like_object, 'artists', sep='|')
    conn.commit()

song_files_list = process_song_file(song_files)
song_files_list
songs_table(song_files_list,cur)
artists_table(song_files_list, cur)

In [None]:
log_files = get_files('data/log_data')

In [None]:
def process_log_file(log_files):
    all_log_files = []
    for log_f in log_files:
        df = pd.read_json(log_f, lines=True)
        t = pd.to_datetime(df['ts'], unit='ms').dt.round('S')
        df['start_time'] = t
        df['hour'] = t.dt.hour
        df['day'] = t.dt.day
        df['week'] = t.dt.week
        df['month'] = t.dt.month
        df['year'] = t.dt.year
        df['weekday'] = t.dt.weekday
        all_log_files.append(df)
    log_files_concat = pd.concat(all_log_files)
    log_files_concat = log_files_concat.loc[log_files_concat['page']=='NextSong']  
    return log_files_concat

def log_files_staging_table(log_files_list, cur):
    log_files_fields = log_files_list[['start_time','hour','day','week','month','year','weekday',
                                       'userId','firstName','lastName','gender','level','artist',
                                       'song','length','sessionId','location','userAgent']]
    log_files_csv = log_files_fields.to_csv(header=False, sep='|', index=False)
    csv_file_like_object = io.StringIO()
    csv_file_like_object.write(log_files_csv)
    csv_file_like_object.seek(0)
    cur.copy_from(csv_file_like_object, 'log_files_staging', sep='|')
    conn.commit()
    
"""
def time_table(log_files_list, cur):
    t = pd.to_datetime(log_files_list['ts'], unit='ms').dt.round('S')
    time_data = [t, t.dt.hour, t.dt.day, t.dt.weekofyear, t.dt.month, t.dt.year, t.dt.weekday]
    column_labels = ['start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday']
    time_df = pd.DataFrame(dict(zip(column_labels, time_data))).drop_duplicates('start_time')
#    print(time_df)
    log_files_csv = time_df.to_csv(header=False, sep='|', index=False)
    csv_file_like_object = io.StringIO()
    csv_file_like_object.write(log_files_csv)
    csv_file_like_object.seek(0)
    cur.copy_from(csv_file_like_object, 'time', sep='|')
    conn.commit()

def user_table(log_files_list, cur):
    user_df = log_files_list[['userId','firstName','lastName','gender','level']]
#    print(user_df)
    log_files_csv = user_df.to_csv(header=False, sep='|', index=False)
#    print(log_files_csv)
    csv_file_like_object = io.StringIO()
    csv_file_like_object.write(log_files_csv)
    csv_file_like_object.seek(0)
    cur.copy_from(csv_file_like_object, 'users', sep='|')
    conn.commit()
"""

log_files_list = process_log_file(log_files)
#print(log_files_list)
log_files_staging_table(log_files_list, cur)
#time_table(log_files_list, cur)
#user_table(log_files_list, cur)
cur.execute(time_table_staging_insert)
conn.commit()
cur.execute(merge_user_table)
conn.commit()
cur.execute(songplay_table_staging_insert)
conn.commit()