In [1]:
import json
import pandas as pd
import numpy as np

import re

from sqlalchemy import create_engine
import psycopg2

from config import db_password

import time

In [2]:
#  Add the clean movie function that takes in the argument, "movie".
def clean_movie(movie):
    # Make a copy of the movie so we don't practice desctructive editing.
    # This is done using a local variable of the same name.
    movie = dict(movie)

    # Go through all the columns in the movie and pull out all alternate movie titles,
    # combine into one alternative titles dictionary.
    alt_titles = {}
    alt_columns = ['Also known as', 'Arabic', 'Cantonese', 'Chinese', 'French', 'Hangul', 
                   'Hebrew', 'Hepburn', 'Japanese', 'Literally', 'Mandarin', 'McCune–Reischauer', 
                   'Original title', 'Polish', 'Revised Romanization', 'Romanized', 'Russian', 
                   'Simplified', 'Traditional', 'Yiddish']
    
    # Loop through the alternate column titles.
    for alternate in alt_columns:
        # If the alternate column title is in the movie dictionary.
        if alternate in movie:
            # Add the column name and the alternate title to the empty dict.
            alt_titles[alternate] = movie.pop(alternate)
            
    # Add the alternate titles column to the movie. If there are alternative titles.
    if len(alt_titles) > 0:
        movie['alt_titles'] = alt_titles       
        
    # Create a function within the function to clean up some of the same-but-different column names.
    def change_column_name(old_name, new_name):
        if old_name in movie:
            movie[new_name] = movie.pop(old_name)
            
    # Start calling the function to change the same-but-different column names.
    change_column_name('Adaptation by', 'Writer(s)')
    change_column_name('Country of origin', 'Country')
    change_column_name('Directed by', 'Director')
    change_column_name('Distributed by', 'Distributor')
    change_column_name('Edited by', 'Editor(s)')
    change_column_name('Length', 'Running time')
    change_column_name('Original release', 'Release date')
    change_column_name('Music by', 'Composer(s)')
    change_column_name('Produced by', 'Producer(s)')
    change_column_name('Producer', 'Producer(s)')
    change_column_name('Productioncompanies ', 'Production company(s)')
    change_column_name('Productioncompany ', 'Production company(s)')
    change_column_name('Released', 'Release Date')
    change_column_name('Release Date', 'Release date')
    change_column_name('Screen story by', 'Writer(s)')
    change_column_name('Screenplay by', 'Writer(s)')
    change_column_name('Story by', 'Writer(s)')
    change_column_name('Theme music composer', 'Composer(s)')
    change_column_name('Written by', 'Writer(s)') 

    # Return the updated movie dictionary.
    return movie

In [6]:
# 1 Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)

def extract_transform_load(wiki_data, kaggle_data, rating_data):
    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    kaggle_metadata = pd.read_csv(kaggle_data, low_memory=False)
    ratings = pd.read_csv(rating_data)
    # Open and read the Wikipedia data JSON file.
    with open(wiki_data, mode='r') as f:
        wiki_movies_raw = json.load(f)
    
    # Write a list comprehension to filter out TV shows.
    wiki_movies = [movie for movie in wiki_movies_raw
                   if ('Director' in movie or 'Directed by' in movie)
                        and 'imdb_link' in movie
                        and 'No. of episodes' not in movie]

    # Write a list comprehension to iterate through the cleaned wiki movies list
    # and call the clean_movie function on each movie.
    cleaned_wiki_movies = [clean_movie(movie) for movie in wiki_movies]

    # Read in the cleaned movies list from Step 4 as a DataFrame.
    wiki_movies_df = pd.DataFrame(cleaned_wiki_movies)

    # Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    #  dropping any imdb_id duplicates. If there is an error, capture and print the exception.
    try:
        wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
        wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)        
    except Exception as e:
        print(f'There was an error of type: {e}')

    # Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.
    columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
    wiki_movies_df = wiki_movies_df[columns_to_keep]    
    
    # Create a variable that will hold the non-null values from the “Box office” column.
    box_office = wiki_movies_df['Box office'].dropna()
    
    # Convert the box office data created in Step 8 to string values using the lambda and join functions.
    box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)

    # Write a regular expression to match the six elements of "form_one" of the box office data.
    form_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on'

    # Write a regular expression to match the three elements of "form_two" of the box office data.
    form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illi?on)'

    # Add the parse_dollars function.
    def parse_dollars(s):
        # If s is not a string, return NaN.
        if type(s) != str:
            return np.nan

        # If input is of the form $###.# million.
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):
            # Remove dollar sign and " million".
            # Replace the dollar sign, whitespace, and text with nothing.
            s = re.sub('\$|\s|[a-zA-Z]', '', s)
            # Convert to float and multiply by a million.
            value = float(s) * 10**6
            # Return value.
            return value

        # If input is of the form $###.# billion.
        if re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):
            # Remove dollar sign and " billion".
            s = re.sub('\$|\s|[a-zA-Z]', '', s)
            # Convert to float and multiply by a billion.
            value = float(s) * 10**9
            # Return value.
            return value

        # If input is of the form $###,###,###.
        if re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illi?on)', s, flags=re.IGNORECASE):
            # Remove dollar sign and commas.
            s = re.sub('\$|,', '', s)
            # Convert to float.
            value = float(s)
            # Return value.
            return value

        # Otherwise, return NaN.
        else:
            return np.nan
        
    # Clean the box office column in the wiki_movies_df DataFrame.
    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    wiki_movies_df.drop('Box office',axis=1, inplace=True)
    
    # Clean the budget column in the wiki_movies_df DataFrame.
    budget = wiki_movies_df['Budget'].dropna()
    budget = budget.apply(lambda x: ' '.join(x) if type(x) == list else x)
    budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    budget = budget.str.replace(r'\[\d+\]\s*', '')
    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    
    # Clean the release date column in the wiki_movies_df DataFrame.
    release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[0123]\d,\s\d{4}'
    date_form_two = r'\d{4}.[01]\d.[0123]\d'
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4}'
    date_form_four = r'\d{4}'
    wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})', flags=re.IGNORECASE)[0], infer_datetime_format=True)
    
    # Clean the running time column in the wiki_movies_df DataFrame.
    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
    wiki_movies_df.drop('Running time', axis=1, inplace=True)
    
     
    # 2. Clean the Kaggle metadata.
    # We'll drop any rows where 'adult' is not False.
    # Get all the rows where 'adult' is False, and drop the entire adult column - save over the DF variable.
    kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False']
    kaggle_metadata.drop('adult', axis=1, inplace=True)
    kaggle_metadata['video'] = kaggle_metadata['video'] == 'True'
    # We'll convert the numeric columns too, using errors='raise'.
    kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
    kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
    kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')
    # Convert 'release_date' to a datetime.
    kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])

    # 3. Merged the two DataFrames into the movies DataFrame.
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki', '_kaggle'])

    # 4. Drop unnecessary columns from the merged DataFrame.
    movies_df.drop(columns=['title_wiki', 'release_date_wiki', 'Language', 'Production company(s)'], inplace=True)

    # 5. Add in the function to fill in the missing Kaggle data.
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        df[kaggle_column] = df.apply(
            lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column]
            , axis=1)
        df.drop(columns=wiki_column, inplace=True)

    # 6. Call the function in Step 5 with the DataFrame and columns as the arguments.
    fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')

    # 7. Filter the movies DataFrame for specific columns.
    movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                       'genres','original_language','overview','spoken_languages','Country',
                       'production_companies','production_countries','Distributor',
                       'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                      ]]

    # 8. Rename the columns in the movies DataFrame.
    movies_df.rename({'id':'kaggle_id',
                  'title_kaggle':'title',
                  'url':'wikipedia_url',
                  'budget_kaggle':'budget',
                  'release_date_kaggle':'release_date',
                  'Country':'country',
                  'Distributor':'distributor',
                  'Producer(s)':'producers',
                  'Director':'director',
                  'Starring':'starring',
                  'Cinematography':'cinematography',
                  'Editor(s)':'editors',
                  'Writer(s)':'writers',
                  'Composer(s)':'composers',
                  'Based on':'based_on'
                 }, axis='columns', inplace=True)

    # 9. Transform and merge the ratings DataFrame.
    # Group all movies together in ratings DF - using movieId then rating.
    # Then relabel the 'userId' column to 'count'.
    # Then pivot the table so the movieId's are the indexes and the ratings are the columns,
    # with the counts being the values in the table.
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                    .rename({'userId':'count'}, axis=1) \
                    .pivot(index='movieId', columns='rating', values='count')
    # Rename the columns to be clearer that they are the ratings.
    rating_counts.columns = [f'rating_{rating}' for rating in rating_counts.columns]
    # Now we can merge this rating data with the movies_df. (LEFT MERGE/JOIN)
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
    # Fill all NaN values with 0 instead.
    movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)

    # Create a connection string for out local server.
    db_string = f"postgres://postgres:{db_password}@127.0.0.1:5434/movie_data"
    # Create the database connection.
    engine = create_engine(db_string)
    # Import the movie data into the database.
    movies_df.to_sql(name='movies', con=engine, if_exists='replace')
    # Import ratings.csv to SQL server.
    # Create a variable for number of rows imported to keep track of import.
    rows_imported = 0
    # Track the time it takes to run this process.
    start_time = time.time()
    for data in pd.read_csv(f'Resources/ratings.csv', chunksize=1000000):
        # Print out the range of rows that are being imported.
        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
        # Import the data chunk.
        data.to_sql(name='ratings', con=engine, if_exists='append')
        # Increment the row import tracker.
        rows_imported += len(data)
        # Print that the rows that have finished importing.
        print(f'Done. {time.time() - start_time} total seconds elapsed.')


In [4]:
# Create the path to your file directory and variables for the three files.
file_dir = 'Resources'
# The Wikipedia data
wiki_file = f'{file_dir}/wikipedia.movies.json'
# The Kaggle metadata
kaggle_file = f'{file_dir}/movies_metadata.csv'
# The MovieLens rating data.
ratings_file = f'{file_dir}/ratings.csv'

In [5]:
# Pass the variables into the function.
extract_transform_load(wiki_file, kaggle_file, ratings_file)

importing rows 0 to 1000000...Done. 60.480669021606445 total seconds elapsed.
importing rows 1000000 to 2000000...Done. 121.23763871192932 total seconds elapsed.
importing rows 2000000 to 3000000...Done. 181.4698691368103 total seconds elapsed.
importing rows 3000000 to 4000000...Done. 241.07330989837646 total seconds elapsed.
importing rows 4000000 to 5000000...Done. 301.14439034461975 total seconds elapsed.
importing rows 5000000 to 6000000...Done. 361.5916237831116 total seconds elapsed.
importing rows 6000000 to 7000000...Done. 422.348685503006 total seconds elapsed.
importing rows 7000000 to 8000000...Done. 482.3037893772125 total seconds elapsed.
importing rows 8000000 to 9000000...Done. 543.1535172462463 total seconds elapsed.
importing rows 9000000 to 10000000...Done. 603.9518105983734 total seconds elapsed.
importing rows 10000000 to 11000000...Done. 664.66903424263 total seconds elapsed.
importing rows 11000000 to 12000000...Done. 725.5275750160217 total seconds elapsed.
impo