In [1]:
import json
import pandas as pd
import numpy as np

import re, os

from sqlalchemy import create_engine
import psycopg2

from config import db_password

import time

In [2]:
#  Add the clean movie function that takes in the argument, "movie".
def clean_movie(movie):
    movie = dict(movie)  # create a non-desctructive copy
    alt_titles = {}
    for key in ['Also known as','Arabic','Cantonese','Chinese','French',
                'Hangul','Hebrew','Hepburn','Japanese','Literally',
                'Mandarin','McCune–Reischauer','Original title','Polish',
                'Revised Romanization','Romanized','Russian',
                'Simplified','Traditional','Yiddish']:
        if key in movie:
            alt_titles[key] = movie[key]
            movie.pop(key)
    if len(alt_titles) > 0:
        movie['alt_titles'] = alt_titles

    # Add function to change/consolidate column names
    def change_column_name(old_name, new_name):
        if old_name in movie:
            movie[new_name] = movie.pop(old_name)
    
    # list the column pairs that need to be changed and call the function
    change_column_name('Adaptation by', 'Writer(s)')
    change_column_name('Country of origin', 'Country')
    change_column_name('Directed by', 'Director')
    change_column_name('Distributed by', 'Distributor')
    change_column_name('Edited by', 'Editor(s)')
    change_column_name('Length', 'Running time')
    change_column_name('Original release', 'Release date')
    change_column_name('Music by', 'Composer(s)')
    change_column_name('Produced by', 'Producer(s)')
    change_column_name('Producer', 'Producer(s)')
    change_column_name('Productioncompanies ', 'Production company(s)')
    change_column_name('Productioncompany ', 'Production company(s)')
    change_column_name('Released', 'Release Date')
    change_column_name('Release Date', 'Release date')
    change_column_name('Screen story by', 'Writer(s)')
    change_column_name('Screenplay by', 'Writer(s)')
    change_column_name('Story by', 'Writer(s)')
    change_column_name('Theme music composer', 'Composer(s)')
    change_column_name('Written by', 'Writer(s)')
    
    # return our data
    return movie

In [3]:
# 1 Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)
def extract_transform_load(wiki_file, kaggle_file, ratings_file):
# def extract_transform_load():
    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    kaggle_df = pd.read_csv(kaggle_file, low_memory=False)
    ratings_df = pd.read_csv(ratings_file)

    # Open and read the Wikipedia data JSON file.
    with open(wiki_file, mode='r') as file:
        wiki_movies_raw = json.load(file)    
    
    # Write a list comprehension to filter out TV shows.
    wiki_no_tv = [movie for movie in wiki_movies_raw if 'No. of episodes' not in movie]

    # Write a list comprehension to iterate through the cleaned wiki movies list
    # and call the clean_movie function on each movie.
    clean_wiki_movies = [clean_movie(this_movie) for this_movie in wiki_no_tv]

    # Read in the cleaned movies list from Step 4 as a DataFrame.
    wiki_movies_df = pd.DataFrame(clean_wiki_movies)

    # Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    #  dropping any imdb_id duplicates. If there is an error, capture and print the exception.
    try:
        wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
        wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)
    except:
        print(Exception)

    #  Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.
    keep_columns = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
    wiki_movies_df = wiki_movies_df[keep_columns]

    # Create a variable that will hold the non-null values from the “Box office” column.
    non_null_box_office = wiki_movies_df['Box office'].dropna()
    
    # Convert the box office data created in Step 8 to string values using the lambda and join functions.
    non_null_box_office = non_null_box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)

    # Write a regular expression to match the six elements of "form_one" of the box office data.
    form_one = r'\$\d+.?\d*\s*[mb]illion'

    # Write a regular expression to match the three elements of "form_two" of the box office data.
    form_two = r'\$\d{1,3}(?:,\d{3})+'

    # Add the parse_dollars function.
    def parse_dollars(s):
        # if s is not a string, return NaN
        if type(s) != str:
            return np.nan

        # if input is of the form $###.# million
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " million"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a million
            value = float(s) * 10**6

            # return value
            return value

        # if input is of the form $###.# billion
        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " billion"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a billion
            value = float(s) * 10**9

            # return value
            return value

        # if input is of the form $###,###,###
        elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):

            # remove dollar sign and commas
            s = re.sub('\$|,','', s)

            # convert to float
            value = float(s)

            # return value
            return value

        # otherwise, return NaN
        else:
            return np.nan
        
    # Clean the box office column in the wiki_movies_df DataFrame.
    wiki_movies_df['non_null_box_office'] = non_null_box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    wiki_movies_df.drop('Box office', axis=1, inplace=True)
    
    # Clean the budget column in the wiki_movies_df DataFrame.
    budget = wiki_movies_df['Budget'].dropna()
    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)

    # Clean the release date column in the wiki_movies_df DataFrame.
    release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]\d,\s\d{4}'
    date_form_two = r'\d{4}.[01]\d.[123]\d'
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
    date_form_four = r'\d{4}'

    wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)

    # Clean the running time column in the wiki_movies_df DataFrame.
    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')

    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
    wiki_movies_df.drop('Running time', axis=1, inplace=True)
     
    # 2. Clean the Kaggle metadata.
    # drop corrupted data from adult column
    kaggle_df = kaggle_df[kaggle_df['adult'] == 'False'].drop('adult',axis='columns')

    # create boolean and assign back to video column
    kaggle_df['video'] = kaggle_df['video'] == 'True'

    # convert to int and assign back to budget
    kaggle_df['budget'] = kaggle_df['budget'].astype(int)
    
    # convert to number, raise any errors
    kaggle_df['id'] = pd.to_numeric(kaggle_df['id'], errors='raise')
    
    # convert to number, raise any errors
    kaggle_df['popularity'] = pd.to_numeric(kaggle_df['popularity'], errors='raise')
    
    # convert to datetime and assign back to release_date
    kaggle_df['release_date'] = pd.to_datetime(kaggle_df['release_date'])

    # 3. Merged the two DataFrames into the movies DataFrame.
    movies_df = pd.merge(wiki_movies_df, kaggle_df, on='imdb_id', suffixes=['_wiki','_kaggle'])

    # 4. Drop unnecessary columns from the merged DataFrame.
    movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)

    # 5. Add in the function to fill in the missing Kaggle data.
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        df[kaggle_column] = df.apply(
            lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column]
            , axis=1)
        df.drop(columns=wiki_column, inplace=True)

    # 6. Call the function in Step 5 with the DataFrame and columns as the arguments.
    fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(movies_df, 'revenue', 'non_null_box_office')

    movies_df.drop(columns=['video'], inplace=True)

    # 7. Filter the movies DataFrame for specific columns.
    movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                        'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                        'genres','original_language','overview','spoken_languages','Country',
                        'production_companies','production_countries','Distributor',
                        'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                        ]]

    # 8. Rename the columns in the movies DataFrame.
    movies_df.rename({'id':'kaggle_id',
                  'title_kaggle':'title',
                  'url':'wikipedia_url',
                  'budget_kaggle':'budget',
                  'release_date_kaggle':'release_date',
                  'Country':'country',
                  'Distributor':'distributor',
                  'Producer(s)':'producers',
                  'Director':'director',
                  'Starring':'starring',
                  'Cinematography':'cinematography',
                  'Editor(s)':'editors',
                  'Writer(s)':'writers',
                  'Composer(s)':'composers',
                  'Based on':'based_on'
                 }, axis='columns', inplace=True)

    # 9. Transform and merge the ratings DataFrame.
    r_counts = ratings_df.groupby(['movieId','rating'], as_index=False).count() \
                    .rename({'userId':'count'}, axis=1) \
                    .pivot(index='movieId',columns='rating', values='count')
    
    r_counts.columns = ['rating_' + str(col) for col in r_counts.columns]

    movies_with_ratings_df = pd.merge(movies_df, r_counts, left_on='kaggle_id', right_index=True, how='left')

    # Create PostgreSQL database connection
    db_string = f"postgresql://postgres:{db_password}@127.0.0.1:5432/KU-challenge-eight"

    engine = create_engine(db_string)

    # Add movies_df DataFrame to SQL database
    movies_df.to_sql(name='movies', con=engine, if_exists='replace')

    rows_imported = 0
    start_time = time.time()

    for ratings in pd.read_csv(ratings_file, chunksize=1000000):
        print(f'Importing rows {rows_imported} to {rows_imported + len(ratings)}...', end='')

        ratings.to_sql(name='ratings', con=engine, if_exists='append')

        rows_imported += len(ratings)

        # Add the code that prints out the elapsed time to import each row.
        print(f"Done. {time.time() - start_time} total seconds elapsed.")
    
    # return wiki_movies_df, movies_with_ratings_df, movies_df


In [4]:
# Using OS since it functions better with my system and setup

# The Wikipedia data
wiki_file = os.path.join("_Resources", "wikipedia-movies.json")
# The Kaggle metadata
kaggle_file = os.path.join("_Resources", "movies_metadata.csv")
# The MovieLens rating data.
ratings_file = os.path.join("Resources", "ratings.csv")

In [5]:
# 11. Set the three variables equal to the function created in D1.
# Refactor Step 11 of Deliverable 3 so that you pass in the variables for the files created in Step 10 of Deliverable 3 in the function created in Deliverable 1.

# wiki_file, kaggle_file, ratings_file = extract_transform_load()
extract_transform_load(wiki_file, kaggle_file, ratings_file)

Importing rows 0 to 1000000...Done. 65.11760640144348 total seconds elapsed.
Importing rows 1000000 to 2000000...Done. 121.47563552856445 total seconds elapsed.
Importing rows 2000000 to 3000000...Done. 177.0687186717987 total seconds elapsed.
Importing rows 3000000 to 4000000...Done. 232.46597361564636 total seconds elapsed.
Importing rows 4000000 to 5000000...Done. 293.6130015850067 total seconds elapsed.
Importing rows 5000000 to 6000000...Done. 374.69804430007935 total seconds elapsed.
Importing rows 6000000 to 7000000...Done. 436.9491813182831 total seconds elapsed.
Importing rows 7000000 to 8000000...Done. 510.2530450820923 total seconds elapsed.
Importing rows 8000000 to 9000000...Done. 586.0755383968353 total seconds elapsed.
Importing rows 9000000 to 10000000...Done. 658.9752147197723 total seconds elapsed.
Importing rows 10000000 to 11000000...Done. 753.2659101486206 total seconds elapsed.
Importing rows 11000000 to 12000000...Done. 854.634697675705 total seconds elapsed.
Imp