In [94]:
# creating empty db schema and tables
%run create_tables.py

Table dropped successfully!!
Table created successfully!!


In [95]:
import pandas as pd
import psycopg2
import os
import glob
from sql_queries import *

In [96]:
def process_song_file(cur, filepath):
    """
    Process songs files and insert records into the Postgres database.
    :param cur: cursor reference
    :param filepath: complete file path for the file to load
    """
    
    df = pd.DataFrame([pd.read_json(filepath, typ='series')])

    for value in df.values:
        num_songs, artist_id, artist_latitude, artist_longitude, artist_location, artist_name, song_id, title, duration, year = value

    # insert artist record
    artist_data = (artist_id, artist_name, artist_location, artist_latitude, artist_longitude)
    cur.execute(artist_table_insert, list(artist_data))

    #insert song record
    song_data = (song_id, title, artist_id, year, duration)
    cur.execute(song_table_insert, list(song_data))

In [97]:
def process_log_file(cur, filepath):

    # open log file
    #filepath = '/Users/ggranda/Dropbox/data_eng/Postgres/udacity_Data_Modeling_with_Postgres/data/log_data/2018/11/2018-11-01-events.json'
    df = pd.read_json(filepath, lines=True)

    # filter by NextSong action
    df = df[df['page']== 'NextSong'].astype({'ts':'datetime64[ms]'})

    # convert ts column to a Series
    t = pd.Series(df['ts'], index=df.index)    

    # insert time data records
    column_labels = ['start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday']

    time_data = []
    for data in t:
        time_data.append([data, data.hour, data.day, data.weekofyear, data.month, data.year, data.day_name()])

    time_df = pd.DataFrame.from_records(data = time_data, columns=column_labels)

    for index, row in time_df.iterrows():
        cur.execute(time_table_insert, list(row))

    # load user table
    user_df = df[['userId','firstName','lastName','gender','level']]

    # insert user records
    for i, row in user_df.iterrows():
            cur.execute(user_table_insert, row)


    # insert songplay records
    for index, row in df.iterrows():
        # get songid and artistid from song and artist tables
        cur.execute(song_select, (row.song, row.artist, row.length))
        results = cur.fetchone()


        if results:
            songid, artistid = results
        else:
            songid, artistid = None, None

        # insert songplay record
        songplay_data = ( row.ts, row.userId, row.level, songid, artistid, row.sessionId, row.location, row.userAgent)
        cur.execute(songplay_table_insert, songplay_data)

In [98]:
def process_data(cur, conn, filepath, func):

    # get all files matching extension from directory
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
            
    
    # get total number of files found
    num_files = len(all_files)
    
    
    # iterate over files and process
    for i, file in enumerate(all_files, 1):
        func(cur, file)
        conn.commit()
        print('{}/{} files processed.'.format(i, num_files))

In [99]:
def main():
    """
    Driver function for loading songs and log data into Postgres database
    """
    conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=postgres password=postgres")
    cur = conn.cursor()

    process_data(cur, conn, filepath='data/song_data', func=process_song_file)
    process_data(cur, conn, filepath='data/log_data', func=process_log_file)

    conn.close()

In [100]:
main()
print("\n\nFinished processing!!!\n\n")

1/60 files processed.
2/60 files processed.
3/60 files processed.
4/60 files processed.
5/60 files processed.
6/60 files processed.
7/60 files processed.
8/60 files processed.
9/60 files processed.
10/60 files processed.
11/60 files processed.
12/60 files processed.
13/60 files processed.
14/60 files processed.
15/60 files processed.
16/60 files processed.
17/60 files processed.
18/60 files processed.
19/60 files processed.
20/60 files processed.
21/60 files processed.
22/60 files processed.
23/60 files processed.
24/60 files processed.
25/60 files processed.
26/60 files processed.
27/60 files processed.
28/60 files processed.
29/60 files processed.
30/60 files processed.
31/60 files processed.
32/60 files processed.
33/60 files processed.
34/60 files processed.
35/60 files processed.
36/60 files processed.
37/60 files processed.
38/60 files processed.
39/60 files processed.
40/60 files processed.
41/60 files processed.
42/60 files processed.
43/60 files processed.
44/60 files processe