In [1]:
import os
import glob
import psycopg2
import pandas as pd
import numpy as np
import json
from Create_Table_queries import *


def process_youtubedata_file(cur, filepath): 
    """
        This function reads one JSON file and read information of videos and youtuber data and saves into video_data and youtuber_data
        Arguments:
        cur: Database Cursor
        filepath: location of JSON files
        Return: None
    """
    # open JSON file
    #df = pd.read_json(filepath, lines=True)
    with open(filepath) as f:
        data=json.load(f)
    # ---------insert youtuber record----------
    # write your code here that reads youtuber data from JSON file and insert it into Youtubers_dim table 
    youtuber_record=[data["youtuber_id"]["0"],data["youtuber_name"]["0"],data["youtuber_location"]["0"],data["youtuber_latitude"]["0"],data["youtuber_longitude"]["0"]]
    #print(youtuber_record)
    try:
        cur.execute(Youtubers_table_insert, youtuber_record)
    except psycopg2.Error as e:
        print(e)
    # ---------insert video record--------------
    # write your code here that reads youtube videos data from JSON file and insert it into Videos_dim table 

    # write your code here
    video_record=[data["video_id"]["0"],data["title"]["0"],data["youtuber_id"]["0"],data["year"]["0"],data["duration"]["0"]]
    #print(video_record)
    cur.execute(Videos_table_insert, video_record)

def process_log_file(cur, filepath):
    """
        This function reads Log files and reads information of time, user and videoplay data and saves into time, user, videoplay
        Arguments:
        cur: Database Cursor
        filepath: location of Log files
        Return: None
    """

    # open log file
    df = pd.read_json(filepath, lines=True)

    # filter by NextVideo action
    df = df[(df['page'] == 'NextVideo')]

    # convert timestamp column to datetime   
    t = df['ts'] = pd.to_datetime(df['ts'], unit='ms')
    
    # insert time data records to Time_dim table
    time_data = ([df['ts'].dt.date, 
              df['ts'].dt.hour, 
              df['ts'].dt.day, 
              df['ts'].dt.weekofyear, 
              df['ts'].dt.month,
              df['ts'].dt.year, 
              df['ts'].dt.weekday])
    column_labels = (['start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday'])
    time_dict = dict((k,v) for (k,v) in zip(column_labels, time_data))
    time_df = pd.DataFrame.from_dict(time_dict)
    # write your code here
    for i, row in time_df.iterrows():
        cur.execute(Time_table_insert, list(row))
        
    # load user table
    # insert user records into Users_dim table
    user_df = df[['userId', 'firstName', 'lastName', 'gender', 'level']]

    # write your code here

    for i, row in user_df.iterrows():
        cur.execute(Users_table_insert, row)
        
    
    # insert Videoplay records in Videoplay_fact table
    for index, row in df.iterrows():
        video_select = "SELECT videos.video_id, youtubers.youtuber_id FROM videos JOIN youtubers ON videos.youtuber_id = youtubers.youtuber_id WHERE videos.title = %s AND youtubers.name = %s;"
        cur.execute(video_select, (row.video,
                                  row.youtuber))
    # retrieve the information by tuples
        results = cur.fetchone()
    
    # asing video_id and youtuber_id into variables
        videoid, youtuberid = results if results else (None, None)
        
    #insert videoplay record
        st =(row.ts).date()
        videoplay_record = (st,row.userId,row.level,videoid,youtuberid,row.sessionId,row.location,row.userAgent)
        cur.execute(Videoplay_table_insert, videoplay_record)
    


def process_data(cur, conn, filepath, func):
    """
        This function get all JSON files in given directory by exploring all sub directories, and process all files that were found using the given function.
        Example: if I give it the path to youtube_data directory which resides in data folder of this assignment,
        and func given is process_youtubedata_file it should get all JSON files in this directories and process each file using process_youtubedata_file function. 
        Arguments:
        cur: Database Cursor
        conn: Database
        filepath: location of JSON files
        func: function to process all files in the directory
        Return: None
    """
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root, '*.json'))
        for f in files:
            all_files.append(os.path.abspath(f))

    # get total number of files found
    num_files = len(all_files)
    print('{} files found in {}'.format(num_files, filepath))

    # iterate over files and process
    for i, datafile in enumerate(all_files, 1):
        func(cur, datafile)
        conn.commit()
        print('{}/{} files processed.'.format(i, num_files))



def main():
    conn = psycopg2.connect("host=127.0.0.1 dbname=youtubedb user=postgres password=12345678")
    cur = conn.cursor()

    process_data(cur, conn, filepath='data/youtube_data', func=process_youtubedata_file)
    process_data(cur, conn, filepath='data/log_data', func=process_log_file)

    conn.close()


if __name__ == "__main__":
    main()



71 files found in data/youtube_data
1/71 files processed.
2/71 files processed.
3/71 files processed.
4/71 files processed.
5/71 files processed.
6/71 files processed.
7/71 files processed.
8/71 files processed.
9/71 files processed.
10/71 files processed.
11/71 files processed.
12/71 files processed.
13/71 files processed.
14/71 files processed.
15/71 files processed.
16/71 files processed.
17/71 files processed.
18/71 files processed.
19/71 files processed.
20/71 files processed.
21/71 files processed.
22/71 files processed.
23/71 files processed.
24/71 files processed.
25/71 files processed.
26/71 files processed.
27/71 files processed.
28/71 files processed.
29/71 files processed.
30/71 files processed.
31/71 files processed.
32/71 files processed.
33/71 files processed.
34/71 files processed.
35/71 files processed.
36/71 files processed.
37/71 files processed.
38/71 files processed.
39/71 files processed.
40/71 files processed.
41/71 files processed.
42/71 files processed.
43/71 f