In [1]:
import json
import pandas as pd
import numpy as np

import re

from sqlalchemy import create_engine
import psycopg2

from config import db_password

import time

In [2]:
#  Add the clean movie function that takes in the argument, "movie".
def clean_movie(movie):
    movie = dict(movie)  # local copy of the input parameter dictionary
    
    # List of languages
    language_columns = [
        'Also known as',
        'Arabic',
        'Bopomofo',
        'Cantonese',
        'Chinese',
        'French',
        'Gwoyeu Romatzyh',
        'Hangul',
        'Hanyu Pinyin',
        'Hebrew',
        'Hepburn',
        'Hokkien POJ',
        'IPA',
        'Japanese',
        'Jyutping',
        'Literally',
        'Mandarin',
        'McCune–Reischauer',
        'Original title',
        'Polish',
        'Revised Romanization',
        'Romanized',
        'Russian',
        'Simplified',
        'Simplified Chinese',
        'Traditional',
        'Traditional Chinese',
        'Wade–Giles',
        'Yale Romanization',
        'Yiddish'
    ]
    
    alt_titles = {}
    for key in language_columns:
        if key in movie:
            alt_titles[key] = movie.pop(key)
    if 0 < len(alt_titles):
        movie['alt_titles'] = alt_titles
    
    # list of columns to be merged
    merge_columns = [
        ['Adaptation by', 'Writer(s)'],
        ['Country of origin', 'Country'],
        ['Directed by', 'Director'],
        ['Distributed by', 'Distributor'],
        ['Edited by', 'Editor(s)'],
        ['Length', 'Running time'],
        ['Music by', 'Composer(s)'],
        ['Original language(s)', 'Language'],
        ['Original release', 'Release date'],
        ['Produced by', 'Producer(s)'],
        ['Producer', 'Producer(s)'],
        ['Productioncompanies ', 'Production company(s)'],
        ['Productioncompany ', 'Production company(s)'],
        ['Release Date', 'Release date'],
        ['Released', 'Release Date'],
        ['Screen story by', 'Writer(s)'],
        ['Screenplay by', 'Writer(s)'],
        ['Story by', 'Writer(s)'],
        ['Theme music composer', 'Composer(s)'],
        ['Written by', 'Writer(s)']
    ]
    
    for col in merge_columns:
        if col[0] in movie:                     # If the first column name exists…
            movie[col[1]] = movie.pop(col[0])   # …rename that column to the second name
                                                # (overwriting the second column, if it exists)
    
    return movie

In [3]:
# 1 Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)

def extract_transform_load(wiki_file, kaggle_file, ratings_file):
    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    df_kaggle = pd.read_csv(kaggle_file, low_memory=False)
    df_ratings = pd.read_csv(ratings_file)
    
    # Open and read the Wikipedia data JSON file.
    with open(wiki_file, mode='r') as file:
        wiki_movies_raw = json.load(file)
    
    # Write a list comprehension to filter out TV shows.
    wiki_movies = [movie for movie in wiki_movies_raw
        if True
        and ('Directed by' in movie or 'Director' in movie)
        and 'imdb_link' in movie
        and 'No. of episodes' not in movie
    ]
    
    # Write a list comprehension to iterate through the cleaned wiki movies list
    # and call the clean_movie function on each movie.
    clean_movies = [clean_movie(movie) for movie in wiki_movies]
    
    # Read in the cleaned movies list from Step 4 as a DataFrame.
    df_wiki = pd.DataFrame(clean_movies)

    # Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    #  dropping any imdb_id duplicates. If there is an error, capture and print the exception.
    try:
        df_wiki['imdb_id'] = (
            df_wiki['imdb_link']
#             .str.strip()  # Make sure there is no extraneous whitespace on either end
#             .str.extract(r'^https?:\/\/(?:www\.)?imdb\.com\/title\/(tt\d{7})\/?$')
            .str.extract(r'(tt\d{7})')
        )
        
        df_wiki.drop_duplicates(subset='imdb_id', inplace=True)
#         raise ExpectedError()  # for testing
    except BaseException as err:
        print(f"Unexpected {err}, {type(err)}")  # From https://docs.python.org/3/tutorial/errors.html
    
#     ####
#     print('df_wiki rows:',len(df_wiki),'(7033 in Module; different regex)')##############################################
#     ####
    
    #  Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.
    df_wiki = df_wiki[[col for col in df_wiki.columns if df_wiki[col].isnull().sum() < 0.9*len(df_wiki)]]
    
    # Create a variable that will hold the non-null values from the “Box office” column.
    box_office = df_wiki['Box office'].dropna()
    
    # Convert the box office data created in Step 8 to string values using the lambda and join functions.
    box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)
    
    # Replace box office dollar ranges with just the upper value
    box_office = box_office.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    
    # Write a regular expression to match the six elements of "form_one" of the box office data.
    dollars_pattern_mil = r'\$\s*\d+\.?\d*\s*mill?i?on'
    dollars_pattern_bil = r'\$\s*\d+\.?\d*\s*bill?i?on'
    
    # Write a regular expression to match the three elements of "form_two" of the box office data.
    dollars_pattern_num = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'
    
    dollars_pattern_combined = f'({dollars_pattern_mil}|{dollars_pattern_bil}|{dollars_pattern_num})'
    
    # Add the parse_dollars function.
    def parse_dollars(s):
        # if s is not a string, return NaN
        if type(s) != str:
            return np.nan

        # if input is of the form $###.# million
        if re.match(dollars_pattern_mil, s, flags=re.IGNORECASE):

            # remove dollar sign and " million"
            s = re.sub('\$|\s|[a-zA-Z]','', s)  # Why not keep only \d and \. characters instead of
                                                # removing those specific ones?

            # convert to float and multiply by a million
            value = float(s) * 10**6

            # return value
            return value

        # if input is of the form $###.# billion
        elif re.match(dollars_pattern_bil, s, flags=re.IGNORECASE):

            # remove dollar sign and " billion"
            s = re.sub('\$|\s|[a-zA-Z]','', s)  # See question in the 'million' section, above

            # convert to float and multiply by a billion
            value = float(s) * 10**9

            # return value
            return value

        # if input is of the form $###,###,###
        elif re.match(dollars_pattern_num, s, flags=re.IGNORECASE):

            # remove dollar sign and commas
            s = re.sub('\$|,','', s)  # Same question

            # convert to float
            value = float(s)

            # return value
            return value

        # otherwise, return NaN
        else:
            return np.nan
    
    # Clean the box office column in the wiki_movies_df DataFrame.
    df_wiki['box_office'] = box_office.str.extract(
        dollars_pattern_combined,
        flags=re.IGNORECASE
    )[0].apply(parse_dollars)
    
    df_wiki.drop('Box office', axis=1, inplace=True)
    
    # Clean the budget column in the wiki_movies_df DataFrame.
    df_wiki['budget'] = (
        df_wiki['Budget']
        .dropna()
        .apply(lambda x: ' '.join(x) if type(x) == list else x) # Join list elements into space-separated strings
        .str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)    # Replace dollar ranges with just the upper value
        .str.replace(r'\s*\[\d+\]', '', regex=True)             # Remove citations (numbers within square brackets)
        .str.extract(dollars_pattern_combined, flags=re.IGNORECASE)[0]
        .apply(parse_dollars)
    )
    
#     df_wiki.drop('Budget', axis=1, inplace=True)
    
    # Clean the release date column in the wiki_movies_df DataFrame.
    date_pattern_ymd = r'\d{4}[/-][01]?\d[/-]][0123]?\d'
    date_pattern_mdy = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},\s+\d{4}'
    date_pattern_dmy = r'\d{1,2}\s+(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4}'
    date_pattern_my = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4}'
    date_pattern_yo = r'^\d{4}$|(?<=\( )\d{4}(?= \))'

    date_pattern_combined = f'({date_pattern_ymd}|{date_pattern_mdy}|{date_pattern_dmy}|{date_pattern_my}|{date_pattern_yo})'

    df_wiki['release_date'] = pd.to_datetime(
        df_wiki['Release date']
        .dropna()
        .apply(lambda x: ' '.join(x) if type(x) == list else x) # Join list elements into space-separated strings
        .str.extract(date_pattern_combined, flags=re.IGNORECASE )[0],
        infer_datetime_format=True
    )
    
#     df_wiki.drop('Release date', axis=1, inplace=True)
    
    # Clean the running time column in the wiki_movies_df DataFrame.
    run_time_pattern_hrs = r'(\d+)\s+h(?:(?:ou)?rs?)?(?:\s(\d+)\s+m(?:in(?:utes)?)?)?'
    run_time_pattern_min = r'(\d+)\s+m(?:in(?:utes)?)?'

    run_time_pattern_combined = f'{run_time_pattern_hrs}|{run_time_pattern_min}'

    df_wiki['running_time'] = (
        df_wiki['Running time']
        .dropna()
        .apply(lambda x: ' '.join(x) if type(x) == list else x) # Join list elements into space-separated strings
        .str.extract(run_time_pattern_combined)                 # Extract times: column 0 is hours; columns 1 & 2 are minutes
        .apply(lambda col: pd.to_numeric(col, errors='coerce')) # Coerce empty strings (errors) into NaNs
        .fillna(0)
        .astype(int)
        .apply(lambda row: 60*row[0]+row[1]+row[2], axis=1)     # Convert hours/minutes/minutes to minutes
    )
    
    df_wiki.drop('Running time', axis=1, inplace=True)

    # 2. Clean the Kaggle metadata.
    df_kaggle = df_kaggle[df_kaggle['adult'] == 'False'].drop('adult', axis=1)  # Drop adult movies
    df_kaggle['video'] = df_kaggle['video'].fillna(False)                       # Make the video column boolean
                                                                                # (treat NaNs as False)
    df_kaggle['budget'] = df_kaggle['budget'].astype(int)                       # Convert budget column to integer type
    df_kaggle['id'] = pd.to_numeric(                                            # Convert id column to numeric type
        df_kaggle['id'],
        errors='raise'  # see https://pandas.pydata.org/docs/reference/api/pandas.to_numeric.html
    )
    df_kaggle['popularity'] = pd.to_numeric(df_kaggle['popularity'], errors='raise')
    df_kaggle['release_date'] = pd.to_datetime(df_kaggle['release_date'])
    
    # 3. Merge the two DataFrames into the movies DataFrame.
    df_movies = pd.merge(df_wiki, df_kaggle, on='imdb_id', suffixes=['_wiki','_kaggle'])
    
#     df_movies = df_movies.drop(
#         df_movies[
#             (df_movies['release_date_kaggle'] < '1965-01-01')
#             & ('1996-01-01' < df_movies['release_date_wiki'])
#         ].index
#     ) # Bad merger: The Holiday (2006) merged with From Here to Eternity(1953)
#     
#     ####
#     print(
#         'df_movies rows:',
#         len(df_movies),
#         '(6051 in Module; removed bad merger: The Holiday (2006)/From Here to Eternity(1953))')##########################
#     ####
    
    # 4. Drop unnecessary columns from the merged DataFrame.
    df_movies.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)

    # 5. Add in the function to fill in the missing Kaggle data.
    def fill_missing_data(df, column_to_keep, column_to_drop):
        df[column_to_keep] = df.apply(
            lambda row: row[column_to_drop] if row[column_to_keep] == 0 else row[column_to_keep]
            , axis=1)
        df.drop(columns=column_to_drop, inplace=True)
    
    # 6. Call the function in Step 5 with the DataFrame and columns as the arguments.
    fill_missing_data(df_movies, 'runtime', 'running_time')
    fill_missing_data(df_movies, 'budget_kaggle', 'budget_wiki')
    fill_missing_data(df_movies, 'box_office', 'revenue')
    
    # 7. Filter the movies DataFrame for specific columns.
    df_movies = df_movies.loc[:, [
        'imdb_id',
        'id',
        'title_kaggle',
        'original_title',
        'belongs_to_collection',
        'tagline',
        'overview',
        'url',
        'imdb_link',
        'runtime',
        'budget_kaggle',
        'box_office',  # Differs from Module
        'year',
        'release_date_kaggle',
        'popularity',
        'vote_average',
        'vote_count',
        'genres',
        'original_language',
        'spoken_languages',
        'Country',
        'production_companies',
        'production_countries',
        'Distributor',
        'Producer(s)',
        'Director',
        'Writer(s)',
        'Based on',
        'Composer(s)',
        'Starring',
        'Cinematography',
        'Editor(s)'
#         'Release date', # Skip
#         'homepage',     # Skip
#         'poster_path',  # Skip
#         'status',       # Skip
#         'video',        # Skip
    ]]
    
    # 8. Rename the columns in the movies DataFrame.
    df_movies.rename({
        'id':'kaggle_id',
        'title_kaggle':'title',
        'url':'wikipedia_url',
        'budget_kaggle':'budget',
        'box_office':'revenue',
        'release_date_kaggle':'release_date',
        'Country':'country',
        'Distributor':'distributor',
        'Producer(s)':'producers',
        'Director':'director',
        'Starring':'starring',
        'Cinematography':'cinematography',
        'Editor(s)':'editors',
        'Writer(s)':'writers',
        'Composer(s)':'composers',
        'Based on':'based_on'
    }, axis='columns', inplace=True)
    
    # 9. Transform and merge the ratings DataFrame.
    df_ratings['timestamp'] = pd.to_datetime(df_ratings['timestamp'], unit='s') # unit='s' is for seconds
    
    rating_counts = (
        df_ratings.groupby(['movieId','rating'], as_index=False).count()
        .rename({'userId':'count'}, axis=1)  # the userId and timestamp columns each now contain counts for the above
        .pivot(index='movieId',columns='rating', values='count')
    )
    
    rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]
    
    df_movies_w_ratings = pd.merge(
        df_movies,              # The left table
        rating_counts,          # The right table
        how='left',             # Join type
        left_on='kaggle_id',    # Column from the left table to use as the join key
        right_index=True        # Use the right table's index column for the join key instead of using 'right_on='
    )
    
    df_movies_w_ratings[rating_counts.columns] = df_movies_w_ratings[rating_counts.columns].fillna(0).astype(int)
    
#     # Row counts to check the results of export steps to follow
#     # The :>10 parts of the f-strings mean to have the output right justified ten characters (digits) over.
#     print(
#         f"Movies (w/ ratings): {str(len(df_movies)) + '(' + str(len(df_movies_w_ratings)) + ')':>10} rows\n"
#         f"Ratings:            {len(df_ratings):>10}  rows\n"
#     )
    
    # create connection to PostgreSQL database
    engine = create_engine(f"postgresql://postgres:{db_password}@127.0.0.1:5432/movie_data")
    
    # Upload df_movies as SQL table
    df_movies.to_sql(name='movies', con=engine, if_exists='replace')
    
    # Export Ratings data
    rows_imported = 0
    start_time = time.time()
    
    for data in pd.read_csv(ratings_file, chunksize=1000000):
        
        # print out the range of rows that are being imported
        print(f'Uploading rows {rows_imported} to {rows_imported + len(data)}... ', end='')
        
        data.to_sql(name='ratings', con=engine, if_exists='append')
        
        # increment the number of rows imported by the size of 'data'
        rows_imported += len(data)
        
        # print that the rows have finished importing
        print(f'done. {time.time() - start_time} total seconds elapsed.')
    
    print('Upload complete.')
    
#     return df_wiki, df_movies_w_ratings, df_movies

In [4]:
# Create filepath directory and variables for the three files.
filepath = './Resources'
# The Wikipedia data
wiki_file = f'{filepath}/wikipedia-movies.json'
# The Kaggle metadata
kaggle_file = f'{filepath}/movies_metadata.csv'
# The MovieLens rating data.
ratings_file = f'{filepath}/ratings.csv'

In [5]:
extract_transform_load(wiki_file, kaggle_file, ratings_file)

Uploading rows 0 to 1000000... done. 38.77549386024475 total seconds elapsed.
Uploading rows 1000000 to 2000000... done. 77.16441988945007 total seconds elapsed.
Uploading rows 2000000 to 3000000... done. 114.51331233978271 total seconds elapsed.
Uploading rows 3000000 to 4000000... done. 151.37267780303955 total seconds elapsed.
Uploading rows 4000000 to 5000000... done. 188.2217025756836 total seconds elapsed.
Uploading rows 5000000 to 6000000... done. 226.1458158493042 total seconds elapsed.
Uploading rows 6000000 to 7000000... done. 265.24549198150635 total seconds elapsed.
Uploading rows 7000000 to 8000000... done. 302.25529193878174 total seconds elapsed.
Uploading rows 8000000 to 9000000... done. 340.3410987854004 total seconds elapsed.
Uploading rows 9000000 to 10000000... done. 377.23037457466125 total seconds elapsed.
Uploading rows 10000000 to 11000000... done. 414.3162200450897 total seconds elapsed.
Uploading rows 11000000 to 12000000... done. 453.0658755302429 total secon