In [1]:
import json
import pandas as pd
import numpy as np
import re
from sqlalchemy import create_engine
import psycopg2
from config import db_password
import time

In [2]:
# Create a clean movie function that takes in the argument, "movie".
def clean_movie(movie):
    movie = dict(movie) # non-destructive copy of the movie-data
    
    alt_titles = {}
    alt_title_fields = ['Also known as','Arabic', 'Cantonese', 'Chinese', 'French', 'Hangul', 'Hebrew', 'Hepburn', 'Japanese', 'Literally',
    'Mandarin', 'McCune–Reischauer', 'Original title', 'Polish', 'Revised Romanization', 'Romanized', 'Russian',
    'Simplified', 'Traditional', 'Yiddish']
    
    # Consolidate any/all alternate titles into a single column
    for a_t in alt_title_fields:
        if a_t in movie:
            alt_titles[a_t] = movie.pop(a_t)
            
        if len(alt_titles) > 0:
            movie['alt_titles'] = alt_titles
            
    # Consolidate column names with a function
    def change_column_name(old_name, new_name):
        if old_name in movie:
            movie[new_name] = movie.pop(old_name)
    
    # Define list of column name transformations
    change_column_name('Adaptation by', 'Writer(s)')
    change_column_name('Original release', 'Release date')
    change_column_name('Music by', 'Composer(s)')
    change_column_name('Story by', 'Writer(s)')
    change_column_name('Screenplay by', 'Writer(s)')
    change_column_name('Written by', 'Writer(s)')
    change_column_name('Theme music composer', 'Composer(s)')
    change_column_name('Country of origin', 'Country')
    change_column_name('Directed by', 'Director')
    change_column_name('Edited by', 'Editor(s)')
    change_column_name('Distributed by', 'Distributor')
    change_column_name('Produced by', 'Producer(s)')
    change_column_name('Producer', 'Producer(s)')
    change_column_name('Original language(s)', 'Language')
    change_column_name('Productioncompanies ', 'Production company(s)')
    change_column_name('Productioncompany ', 'Production company(s)')
    change_column_name('Screen story by', 'Screenplay by')
    change_column_name('Length', 'Running time')
    change_column_name('Released', 'Release date')
    
    return(movie)

In [3]:
# Add a function that processes three input files;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)

def extract_transform_load():
    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    
    kaggle_metadata_df = pd.read_csv(kaggle_file,low_memory=False)
    
    ratings_df = pd.read_csv(ratings_file)    

    # Open and read the Wikipedia data JSON file.
    with open(wiki_file, mode='r') as file:
        wiki_movies_raw = json.load(file)    

        
    # Clean the Wikipedia Movie data
    #----------------------------------------------------------------------
    # Write a list comprehension to filter out TV shows.
    wiki_movies_1 = [movie for movie in wiki_movies_raw 
        if ('Director' in movie or 'Directed by' in movie and 'imdb_link' in movie and
            'No. of episodes' not in movie)]
        
    # Write a list comprehension to iterate through the cleaned wiki movies list
    # and call the clean_movie function on each movie.
    clean_movies = [clean_movie(movie) for movie in wiki_movies_1]

    # Read in the cleaned movies list as a DataFrame.
    wiki_movies_df = pd.DataFrame(clean_movies)    

    # Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    # dropping any imdb_id duplicates. If there is an error, capture and print the exception.
    try:
        wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')

        wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)    
        
    except Exception as e: 
        print(f'There was a problem - {e} --------')
        
    # Write a list comprehension filter to evaluate the column content and keep only those
    # that have a small amount of null or missing data within the wiki_movies_df DataFrame.
    wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]

    # Apply the filter (wiki_columns_to_keep) to remove empty or mostly-empty columns.
    wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]
    
    # Create a variable that will hold the non-null values from the “Box office” column.
    box_office = wiki_movies_df['Box office'].dropna()
     
    # Convert the box office data to string values using the lambda and join functions.
    box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)

    # Write a regular expression to match the six elements of "form_one" of the box office data.
    form_one = r'\$*\d+\.?\d*\s*[mb]illi?on'
    
    # Write a regular expression to match the three elements of "form_two" of the box office data.
    form_two = r'\$*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'

    # Add the parse_dollars function.
    def parse_dollars(s):
        # if s is not a string, return NaN
        if type(s) != str:
            return np.nan

        # if input is of the form $###.# million
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " million"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a million
            value = float(s) * 10**6

            return value

        # if input is of the form $###.# billion
        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " billion"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a billion
            value = float(s) * 10**9

            return value

        # if input is of the form $###,###,###
        elif re.match(r'\$*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):

            # remove dollar sign and commas
            s = re.sub('\$|,','', s)

            # convert to float
            value = float(s)

            return value

        else:
            return np.nan
    
    # Create and populate a new box office column in the wiki_movies_df DataFrame.
    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})',flags=re.IGNORECASE)[0].apply(parse_dollars)
   
    # Drop the original 'Box office' column
    wiki_movies_df.drop('Box office', axis=1, inplace=True)
    
    # Clean the budget column in the wiki_movies_df DataFrame.
    budget = wiki_movies_df['Budget'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    
    budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    
    budget = budget.str.replace(r'\[\d+\]\s*', '', regex=True)
    
    # Create and populate a new budget column in the wiki_movies_df DataFrame
    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)

    # Drop the original Budget column
    wiki_movies_df.drop('Budget', axis=1, inplace=True)
    
    # Clean the release date column in the wiki_movies_df DataFrame.
    release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    
    # This conversion is required to eliminate "AttributeError: Can only use .str accessor with string values!"
    release_date_str = release_date.astype(str)
    
    # Create date_form regexs for the release date values
    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]?\d,\s\d{4}'
    date_form_two = r'\d{4}.[01]\d.[0123]\d'
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
    date_form_four = r'\d{4}'
    
    # Create a populate a new release date colum in the wiki_movies_df DataFrame.
    wiki_movies_df['release_date'] = pd.to_datetime(release_date_str.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)

    # Drop the original release date column
    wiki_movies_df.drop('Release date', axis=1, inplace=True)  
    
    # Clean the running time column in the wiki_movies_df DataFrame.
    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
    
    # Create and populate a new running time column in the wiki_movies_df DataFrame
    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
    
    # Drop the original running time column
    wiki_movies_df.drop('Running time', axis=1, inplace=True)       


    # Clean the Kaggle metadata.
    #----------------------------------------------------------------------    
    kaggle_metadata_df = kaggle_metadata_df[kaggle_metadata_df['adult'] == 'False'].drop('adult',axis='columns')
    kaggle_metadata_df['video'] == 'True'
    kaggle_metadata_df['budget'] = kaggle_metadata_df['budget'].astype(int)
    kaggle_metadata_df['id'] = pd.to_numeric(kaggle_metadata_df['id'], errors='raise')
    kaggle_metadata_df['popularity'] = pd.to_numeric(kaggle_metadata_df['popularity'], errors='raise')
    kaggle_metadata_df['release_date'] = pd.to_datetime(kaggle_metadata_df['release_date'])
    

    # Merge the wiki_movie and kaggle DataFrames into a consolidated movies DataFrame.
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata_df, on='imdb_id', suffixes=['_wiki','_kaggle'])
    
    # Drop the outliers
    movies_df = movies_df.drop(movies_df[(movies_df['release_date_wiki'] > '1996-01-01') & (movies_df['release_date_kaggle'] < '1965-01-01')].index)
    
    # Drop unnecessary columns from the merged DataFrame.
    movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)

    # 5. Add a function to fill in the missing Kaggle data.
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        df[kaggle_column] = df.apply(lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column]
            , axis=1)
        df.drop(columns=wiki_column, inplace=True)

    # Use the function to leverage wiki data for Kaggle zero values.
    fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')

    # Filter the movies DataFrame for specific columns.
    movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url',
                                  'imdb_link','runtime','budget_kaggle','revenue','release_date_kaggle','popularity',
                                  'vote_average','vote_count','genres','original_language','overview','spoken_languages',
                                  'Country','production_companies','production_countries','Distributor','Producer(s)',
                                  'Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on']]

    # Rename the columns in the movies DataFrame.
    movies_df.rename({'id':'kaggle_id',
                      'title_kaggle':'title',
                      'url':'wikipedia_url',
                      'budget_kaggle':'budget',
                      'release_date_kaggle':'release_date',
                      'Country':'country',
                      'Distributor':'distributor',
                      'Producer(s)':'producers',
                      'Director':'director',
                      'Starring':'starring',
                      'Cinematography':'cinematography',
                      'Editor(s)':'editors',
                      'Writer(s)':'writers',
                      'Composer(s)':'composers',
                      'Based on':'based_on'}, axis='columns', inplace=True)
    
    # Transform and merge the ratings DataFrame.
    #----------------------------------------------------------------------   
    # Group and count the ratings so the data is easier to manage - reduces the size of the dataset.

    ratings_df['timestamp'] = pd.to_datetime(ratings_df['timestamp'], unit='s')
    
    rating_counts = ratings_df.groupby(['movieId','rating'], as_index=False).count() \
        .rename({'userId':'count'}, axis=1).pivot(index='movieId',columns='rating', values='count')
    
    # Rename the rating columns so we know the data source after creating a joined table.
    rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]
    
    # left-join the DataFrames to keep all the movies, and apply relevant ratings for those movies.
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
    
    # Convert Nulls to zero values
    movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)


    # Load the data into the Postgresql database
    #----------------------------------------------------------------------   
    # Create the connection string and start the connection to the db engine

    db_string = f"postgresql://postgres:{db_password}@127.0.0.1:5432/movie_data"
    engine = create_engine(db_string)
    
    # Import the movie_df DataFrame to the database SQL table
    movies_df.to_sql(name='movies', con=engine)
    
    # Create a function to import the ratings data in chunks
    rows = 0

    # Get the start time for the import operation
    start_time = time.time()

    for data in pd.read_csv(f'{file_dir}ratings.csv', chunksize=1000000):

        print (f'Importing rows {rows} to {rows + len(data)} --- ', end='')

        data.to_sql(name='ratings', con=engine, if_exists='append')
        rows += len(data)
        
        print(f'Done. {time.time() - start_time:.2f} total seconds elapsed')

    print('Import operation completed!')

    # Return three variables. The first is the wiki_movies_df DataFrame
    return wiki_movies_df, kaggle_metadata_df, movies_with_ratings_df 

# End of extract_transform_load function

In [4]:
# Create the path to the file directory and variables for the three files
file_dir = 'Resources/'
# Wikipedia data
wiki_file = f'{file_dir}wikipedia_movies.json'
# Kaggle metadata
kaggle_file = f'{file_dir}movies_metadata.csv'
# MovieLens rating data.
ratings_file = f'{file_dir}ratings.csv'

In [5]:
# Set the three variables equal to the function call.
wiki_file, kaggle_file, ratings_file = extract_transform_load()

Importing rows 0 to 1000000 --- Done. 23.59 total seconds elapsed
Importing rows 1000000 to 2000000 --- Done. 46.72 total seconds elapsed
Importing rows 2000000 to 3000000 --- Done. 69.86 total seconds elapsed
Importing rows 3000000 to 4000000 --- Done. 92.63 total seconds elapsed
Importing rows 4000000 to 5000000 --- Done. 115.46 total seconds elapsed
Importing rows 5000000 to 6000000 --- Done. 138.33 total seconds elapsed
Importing rows 6000000 to 7000000 --- Done. 162.08 total seconds elapsed
Importing rows 7000000 to 8000000 --- Done. 184.86 total seconds elapsed
Importing rows 8000000 to 9000000 --- Done. 208.06 total seconds elapsed
Importing rows 9000000 to 10000000 --- Done. 231.37 total seconds elapsed
Importing rows 10000000 to 11000000 --- Done. 254.10 total seconds elapsed
Importing rows 11000000 to 12000000 --- Done. 278.11 total seconds elapsed
Importing rows 12000000 to 13000000 --- Done. 302.42 total seconds elapsed
Importing rows 13000000 to 14000000 --- Done. 325.72 t

In [6]:
# Set the DataFrames from the return statement equal to the file names. Ensure the right DataFrame is matched to the correct
# output file data.
wiki_movies_df = wiki_file
movies_df = kaggle_file
movies_with_ratings_df = ratings_file

In [7]:
# Check the wiki_movies_df DataFrame. 
wiki_movies_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 7033 entries, 0 to 7075
Data columns (total 21 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   url                    7033 non-null   object        
 1   year                   7033 non-null   int64         
 2   imdb_link              7033 non-null   object        
 3   title                  7032 non-null   object        
 4   Based on               2181 non-null   object        
 5   Starring               6849 non-null   object        
 6   Cinematography         6342 non-null   object        
 7   Country                6797 non-null   object        
 8   Language               6947 non-null   object        
 9   Composer(s)            6515 non-null   object        
 10  Writer(s)              6834 non-null   object        
 11  Director               7033 non-null   object        
 12  Editor(s)              6485 non-null   object        
 13  Dis

In [8]:
# Check the movies_with_ratings_df DataFrame.
movies_with_ratings_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 6051 entries, 0 to 6051
Data columns (total 41 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   imdb_id                6051 non-null   object        
 1   kaggle_id              6051 non-null   int64         
 2   title                  6051 non-null   object        
 3   original_title         6051 non-null   object        
 4   tagline                4917 non-null   object        
 5   belongs_to_collection  1029 non-null   object        
 6   wikipedia_url          6051 non-null   object        
 7   imdb_link              6051 non-null   object        
 8   runtime                6050 non-null   float64       
 9   budget                 4609 non-null   float64       
 10  revenue                5172 non-null   float64       
 11  release_date           6051 non-null   datetime64[ns]
 12  popularity             6051 non-null   float64       
 13  vot

In [9]:
# Check the movies_df DataFrame.
movies_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 45454 entries, 0 to 45465
Data columns (total 23 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   belongs_to_collection  4491 non-null   object        
 1   budget                 45454 non-null  int32         
 2   genres                 45454 non-null  object        
 3   homepage               7777 non-null   object        
 4   id                     45454 non-null  int64         
 5   imdb_id                45437 non-null  object        
 6   original_language      45443 non-null  object        
 7   original_title         45454 non-null  object        
 8   overview               44500 non-null  object        
 9   popularity             45451 non-null  float64       
 10  poster_path            45068 non-null  object        
 11  production_companies   45451 non-null  object        
 12  production_countries   45451 non-null  object        
 13  r