# RICE-VIRT-DATA-PT-05-2022-U-B-MW Module 8 Challenge

# Deliverable 4: Create the Movie Database
## Code Summary
- **Purpose  :** Creates SQL DB from ETL of Kaggle and Wikipedia data
- **Created  :** 2022 Jul 06 18:34:11 UTC (Meghan E. Hull)
- **Modified :** 2022 Jul 06 23:59:11 UTC (Meghan E. Hull)


## Dependencies

In [1]:
import json
import pandas as pd
import numpy as np

import re

from sqlalchemy import create_engine
import psycopg2

import time

## Database Engine

In [2]:
# Import database password
from config import db_password

# Define connection string
db_string = f"postgresql://postgres:{db_password}@127.0.0.1:5432/movie_data"


# Define Functions

## Function ***clean_movie***,  with the input *movie*
**Note:** Taken from Deliverable 2

In [3]:
#  Add the clean movie function that takes in the argument, "movie".
def clean_movie(movie):
    # Create a non-destructive copy
    movie = dict(movie) 
    
    # Create dict to hold all of the alternative titles
    alt_titles = {}
    for key in ['Also known as','Arabic','Cantonese','Chinese','French',
                'Hangul','Hebrew','Hepburn','Japanese','Literally',
                'Mandarin','McCune–Reischauer','Original title','Polish',
                'Revised Romanization','Romanized','Russian',
                'Simplified','Traditional','Yiddish']:
        if key in movie:
            alt_titles[key] = movie[key]
            movie.pop(key)
    if len(alt_titles) > 0:
        movie['alt_titles'] = alt_titles

    # Define Subfunction change_column_name 
    def change_column_name(old_name, new_name):
        if old_name in movie:
            movie[new_name] = movie.pop(old_name)
    
    # Rename columns with the subfunction change_column_name
    change_column_name('Adaptation by', 'Writer(s)')
    change_column_name('Country of origin', 'Country')
    change_column_name('Directed by', 'Director')
    change_column_name('Distributed by', 'Distributor')
    change_column_name('Edited by', 'Editor(s)')
    change_column_name('Length', 'Running time')
    change_column_name('Original release', 'Release date')
    change_column_name('Music by', 'Composer(s)')
    change_column_name('Produced by', 'Producer(s)')
    change_column_name('Producer', 'Producer(s)')
    change_column_name('Productioncompanies ', 'Production company(s)')
    change_column_name('Productioncompany ', 'Production company(s)')
    change_column_name('Released', 'Release Date')
    change_column_name('Release Date', 'Release date')
    change_column_name('Screen story by', 'Writer(s)')
    change_column_name('Screenplay by', 'Writer(s)')
    change_column_name('Story by', 'Writer(s)')
    change_column_name('Theme music composer', 'Composer(s)')
    change_column_name('Written by', 'Writer(s)')
    
    # Output cleaned data
    return movie

## Function ***extract_transform_load***, with inputs *wiki*, *kaggle*, & *ratings*
**Note:** Refactored from Deleverable 3; added export to created SQL DB 

In [4]:
# Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)

def extract_transform_load(wiki,kaggle,ratings):
    #******************************************************************************************************************
    # Load Data
    #------------------------------------------------------------------------------------------------------------------
    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    kaggle_metadata=pd.read_csv(kaggle, low_memory=False)
    ratings_df=pd.read_csv(ratings, low_memory=False)
    
    # Open the read the Wikipedia data JSON file.
    with open(wiki, mode='r') as file:
        wiki_movies_raw = json.load(file)

    #******************************************************************************************************************
    # Clean Wikipedia Data
    #------------------------------------------------------------------------------------------------------------------
    # Cleaned data stored in wiki_movies_df Dataframe
    #------------------------------------------------------------------------------------------------------------------
    # Write a list comprehension to filter out TV shows.
    wiki_movies = [movie for movie in wiki_movies_raw
        if 'No. of episodes' not in movie]

    # Write a list comprehension to iterate through the cleaned wiki movies list
    # and call the clean_movie function on each movie.
    clean_movies = [clean_movie(movie) for movie in wiki_movies]

    # Read in the cleaned movies list from Step 4 as a DataFrame.
    wiki_movies_df = pd.DataFrame(clean_movies)
    
    # Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    #  dropping any imdb_id duplicates. If there is an error, capture and print the exception.
    try:
        wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
        wiki_movies_df.drop_duplicates(subset="imdb_id", inplace=True)
    except Exception:
        print("Error occured..." (Exception)) 

    # Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.
    wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
    wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]

    # Create a variable that will hold the non-null values from the “Box office” column.
    box_office = wiki_movies_df['Box office'].dropna()
    
    # Convert the box office data created in Step 8 to string values using the lambda and join functions.
    box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)

    # Write a regular expression to match the six elements of "form_one" of the box office data.
    form_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on'
   
    # Write a regular expression to match the three elements of "form_two" of the box office data.
    form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'

    # Add the parse_dollars function.
    def parse_dollars(mystr):
        # Assumes dollars are specified dollars, with maybe millions or billions after the numbers
        # Error Case 1 - mystr isn't a string to parse 
        if type(mystr) != str:
            return np.nan
    
        # Case 1 - mystr is of form $###.# million
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', mystr, flags=re.IGNORECASE):

            # remove dollar sign and " million"
            mystr = re.sub('\$|\s|[a-zA-Z]', '', mystr)
        
            # convert to float and multiply by a million
            value = float(mystr) * 10**6
        
            # return value
            return value
    
        # Case 2 - mystr is of form $###.# billion
        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', mystr, flags=re.IGNORECASE):
        
            # remove dollar sign and " billion"
            mystr = re.sub('\$|\s|[a-zA-Z]', '', mystr)
        
            # convert to float and multiply by a billion
            value = float(mystr) * 10**9
        
            # return value
            return value

        # Case 3 - mystr is of form $###,###,###
        elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', mystr, flags=re.IGNORECASE):

            # remove dollar sign and commas
            mystr = re.sub('\$|,','',mystr)
        
            # convert to float
            value = float(mystr)
        
            # return value
            return value
    
        # Error Case 2 - mystr in an unknown form
        else:
            return np.nan
    
    # Clean the box office column in the wiki_movies_df DataFrame.
    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})',flags=re.IGNORECASE)[0].apply(parse_dollars)
    wiki_movies_df.drop('Box office', axis=1, inplace=True)
    
    # Clean the budget column in the wiki_movies_df DataFrame.
    budget = wiki_movies_df["Budget"].dropna()
    budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)
    budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    budget = budget.str.replace(r'\[\d+\]\s*', '')
    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)

    # Clean the release date column in the wiki_movies_df DataFrame.
    release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]\d,\s\d{4}'
    date_form_two = r'\d{4}.[01]\d.[123]\d'
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
    date_form_four = r'\d{4}'
    wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)

    # Clean the running time column in the wiki_movies_df DataFrame.
    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
    wiki_movies_df.drop('Running time', axis=1, inplace=True)
                        
    #******************************************************************************************************************
    # Clean Kaggle Data
    #------------------------------------------------------------------------------------------------------------------
    # Cleaned data stored in kaggle_metadata Dataframe
    #------------------------------------------------------------------------------------------------------------------
    # 2. Clean the Kaggle metadata.
    kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult',axis='columns')
    
    # Set as boolean
    kaggle_metadata['video'] = kaggle_metadata['video'] == 'True'
    
    # Convert columns to dtypes
    kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
    kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
    kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')
    
    # Convert to datetime
    kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])
        
    #******************************************************************************************************************
    # Create Dataframe with Kaggle Data and Wikipedia Data
    #------------------------------------------------------------------------------------------------------------------
    # Combined data stored in movies_df Dataframe
    #------------------------------------------------------------------------------------------------------------------
     # 3. Merged the two DataFrames into the movies DataFrame.
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])

    # 4. Drop unnecessary columns from the merged DataFrame.
    movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)

    # 5. Add in the function to fill in the missing Kaggle data.
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        df[kaggle_column] = df.apply(
            lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column], axis=1)
        df.drop(columns=wiki_column, inplace=True)

    # 6. Call the function in Step 5 with the DataFrame and columns as the arguments.
    fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')

    # 7. Filter the movies DataFrame for specific columns.
    for col in movies_df.columns:
        lists_to_tuples = lambda x: tuple(x) if type(x) == list else x
        value_counts = movies_df[col].apply(lists_to_tuples).value_counts(dropna=False)
        num_values = len(value_counts)
        if num_values == 1:
            print(col)
        
    movies_df['video'].value_counts(dropna=False)

    # 8. Rename the columns in the movies DataFrame.
    movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                       'genres','original_language','overview','spoken_languages','Country',
                       'production_companies','production_countries','Distributor',
                       'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                      ]]
   
    movies_df.rename({'id':'kaggle_id',
                  'title_kaggle':'title',
                  'url':'wikipedia_url',
                  'budget_kaggle':'budget',
                  'release_date_kaggle':'release_date',
                  'Country':'country',
                  'Distributor':'distributor',
                  'Producer(s)':'producers',
                  'Director':'director',
                  'Starring':'starring',
                  'Cinematography':'cinematography',
                  'Editor(s)':'editors',
                  'Writer(s)':'writers',
                  'Composer(s)':'composers',
                  'Based on':'based_on'
                 }, axis='columns', inplace=True)

    #******************************************************************************************************************
    # Create Dataframe with Ratings, Kaggle, and Wikipedia Data
    #------------------------------------------------------------------------------------------------------------------
    # Combined data stored in movies_with_ratings_df Dataframe
    #------------------------------------------------------------------------------------------------------------------
    # 9. Transform and merge the ratings DataFrame.
    # Transform ratings
    rating_counts = ratings_df.groupby(['movieId','rating'], as_index=False).count() \
                .rename({'userId':'count'}, axis=1) \
                .pivot(index='movieId',columns='rating', values='count')
    
    rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]
    
    # Merging with movies_df
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
    movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)

    #******************************************************************************************************************
    # Output to PostgreSQL DB
    #------------------------------------------------------------------------------------------------------------------
    # Connect to DB engine
    engine = create_engine(db_string)
    
    # Import movies_df to DB
    movies_df.to_sql(name='movies', con=engine, if_exists='replace')
    
    #------------------------------------------------------------------------------------------------------------------
    # Add ratings chunksize to save memory
    #------------------------------------------------------------------------------------------------------------------
    # Initialize the number of rows imported
    rows_imported = 0
    
    # Get the start_time from time.time()
    start_time = time.time()
    
    for data in pd.read_csv(ratings, chunksize=1000000):
        # Print out the range of rows that are being imported
        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
        
        # Clear any existing tables for 1st row
        if rows_imported == 0:
            data.to_sql(name='ratings', con=engine, if_exists='replace')
        else:
            data.to_sql(name='ratings', con=engine, if_exists='append')
    
        # Increment the number of rows imported by the size of 'data'
        rows_imported += len(data)

        # Print that the rows have finished importing, with elapsed time
        print(f'Done. {time.time() - start_time} total seconds elapsed')
    
#     # Return three variables. The first is the wiki_movies_df DataFrame
#     return wiki_movies_df, movies_with_ratings_df, movies_df


# Extract Data

## Specify input file names

In [5]:
file_dir = 'C://Users/megha/OneDrive/Desktop/Rice Bootcamp 2022/0 Analysis/Movies-ETL/Resources'

# Wikipedia data
wiki_file = f'{file_dir}/wikipedia_movies.json'

# Kaggle metadata
kaggle_file = f'{file_dir}/movies_metadata.csv'

# MovieLens rating data.
ratings_file = f'{file_dir}/ratings.csv'

## Run the function ***extract_transform_load***.

In [6]:
extract_transform_load(wiki_file, kaggle_file, ratings_file)



video
importing rows 0 to 1000000...Done. 47.06868600845337 total seconds elapsed
importing rows 1000000 to 2000000...Done. 97.4979567527771 total seconds elapsed
importing rows 2000000 to 3000000...Done. 138.69687247276306 total seconds elapsed
importing rows 3000000 to 4000000...Done. 177.57474207878113 total seconds elapsed
importing rows 4000000 to 5000000...Done. 224.31711292266846 total seconds elapsed
importing rows 5000000 to 6000000...Done. 262.6731367111206 total seconds elapsed
importing rows 6000000 to 7000000...Done. 296.69806933403015 total seconds elapsed
importing rows 7000000 to 8000000...Done. 337.64159178733826 total seconds elapsed
importing rows 8000000 to 9000000...Done. 373.6870918273926 total seconds elapsed
importing rows 9000000 to 10000000...Done. 414.4004852771759 total seconds elapsed
importing rows 10000000 to 11000000...Done. 454.40690779685974 total seconds elapsed
importing rows 11000000 to 12000000...Done. 494.15832805633545 total seconds elapsed
impor