In [3]:
import psycopg2
import pandas as pd
from sql_queries import *
from connect import *
import glob as glob

In [4]:
def get_all_files_matching_from_directory(directorypath, match):
    """
    Get all the files that match into a directory recursively.
    :param directorypath: path/to/directory.
    :param match: match expression.
    :return: array with all the files that match.
    """
    # get all files matching extension from directory
    all_files = []
    for root, dirs, files in os.walk(directorypath):
        files = glob.glob(os.path.join(root, match))
        for f in files :
            all_files.append(os.path.abspath(f))

    return all_files


In [5]:
allFiles = get_all_files_matching_from_directory('./data/song_data' , '*.json')

In [6]:
print(allFiles[0])

c:\Users\say\Desktop\course-learning\Data-engineer-course\Data-engineer-project\project01-data-modeling-with-postgres\data\song_data\A\A\A\TRAAAAW128F429D538.json


In [7]:
## connect to database
conn , cur = connect_database('sparkifydb')

In [8]:
songData = get_all_files_matching_from_directory('./data/song_data' , '*.json')
logData = get_all_files_matching_from_directory('./data/log_data' , '*.json')


In [9]:
songData[0]

'c:\\Users\\say\\Desktop\\course-learning\\Data-engineer-course\\Data-engineer-project\\project01-data-modeling-with-postgres\\data\\song_data\\A\\A\\A\\TRAAAAW128F429D538.json'

In [10]:
print("length of song data is :", len(songData))
print("length of log data is :", len(logData))

length of song data is : 71
length of log data is : 30


In [47]:
def insertRecord(cur , insertQuery , df , fields):
    record = df[fields].values[0].tolist()
    cur.execute(insertQuery , record)

def insertDataframe(cur , df , insertQuery):
    for i, row in df.iterrows():
        cur.execute(insertQuery, list(row))


def get_songid_artistid(cur, song, artist, length):
    """
    Gets the song_id and the artist_id from song tittle, artist name and gon duration.
    :param cur: connection cursor to query the data in DB.
    :param song: song tittle
    :param artist: artist name
    :param length: song duration
    :return: returns song_id and artist_id
    """

    # get songid and artistid from song and artist tables
    cur.execute(song_select, (song, artist, length))
    results = cur.fetchone()

    if results:
        songid, artistid = results
    else:
        songid, artistid = None, None

    return songid, artistid


def insert_facts_songplays(cur, df):
    """
    Insert songplays fact table
    :param cur: connection cursor to insert the data in DB.
    :param df: dataframe with song plays data.
    """

    # insert songplay records
    for index, row in df.iterrows():
        song_id, artist_id = get_songid_artistid(cur, row.song, row.artist, row.length)

        # insert songplay record
        songplay_data = (row.ts, row.userId, row.level, song_id, artist_id,
                         row.itemInSession, row.location, row.userAgent)
        cur.execute(songplay_table_insert, songplay_data)




def process_song_file(cur , filePath):
    df = pd.read_json(filePath, lines=True)
        # insert song record
    insertRecord(cur, song_table_insert, df, ['song_id', 'title', 'artist_id', 'year', 'duration'])
    
    # insert artist record
    insertRecord(cur, artist_table_insert, df,
                  ['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude'])
    return df

def expand_time_data(df , ts):
    df['datetime'] = pd.to_datetime(df[ts], unit='ms')
    t = df
    t['year'] = t['datetime'].dt.year
    t['month'] = t['datetime'].dt.month
    t['day'] = t['datetime'].dt.day
    t['hour'] = t['datetime'].dt.hour
    t['weekday_name'] = t['datetime'].dt.day_name()
    t['week'] = t['datetime'].dt.week

    return t


def process_log_file(cur, filepath):
    df = pd.read_json(filepath , lines = True)
    columns = df.columns
    print(columns)
    
    df = df.loc[df['page'] == 'NextSong']

    t = expand_time_data(df , 'ts')
    print(t.columns)
    time_df = t[['ts', 'hour', 'day', 'week', 'month', 'year', 'weekday_name']]
    insertDataframe(cur, time_df, time_table_insert)

        # load user table
    user_df = df[['userId', 'firstName', 'lastName', 'gender', 'level']]
    insertDataframe(cur, user_df, user_table_insert)

    # insert songplay records
    insert_facts_songplays(cur, df)


    print(t)
    


def processData(cur, conn, filepath, func):
    all_files = get_all_files_matching_from_directory(filepath, '*.json')
    for i , dataFile in enumerate(allFiles ,1):
        func(cur , dataFile)
        conn.commit()
        print("path file :{} : {} ".format(filepath ,i))




In [48]:
def main():
    conn , cur = connect_database("sparkifydb")

    processData(cur, conn, filepath='./data/song_data', func=process_song_file)
    processData(cur, conn, filepath='data/log_data', func=process_log_file)

    close_connection(cur , conn)

In [49]:
main()

path file :./data/song_data : 1 
path file :./data/song_data : 2 
path file :./data/song_data : 3 
path file :./data/song_data : 4 
path file :./data/song_data : 5 
path file :./data/song_data : 6 
path file :./data/song_data : 7 
path file :./data/song_data : 8 
path file :./data/song_data : 9 
path file :./data/song_data : 10 
path file :./data/song_data : 11 
path file :./data/song_data : 12 
path file :./data/song_data : 13 
path file :./data/song_data : 14 
path file :./data/song_data : 15 
path file :./data/song_data : 16 
path file :./data/song_data : 17 
path file :./data/song_data : 18 
path file :./data/song_data : 19 
path file :./data/song_data : 20 
path file :./data/song_data : 21 
path file :./data/song_data : 22 
path file :./data/song_data : 23 
path file :./data/song_data : 24 
path file :./data/song_data : 25 
path file :./data/song_data : 26 
path file :./data/song_data : 27 
path file :./data/song_data : 28 
path file :./data/song_data : 29 
path file :./data/song_

KeyError: 'page'