In [37]:
import json
import pandas as pd
import numpy as np

import re

from sqlalchemy import create_engine
import psycopg2

from config import db_password

import time

In [38]:
#  Add the clean movie function that takes in the argument, "movie".
def clean_movie(movie):
    movie=dict(movie) # Create a non-destructive copy
    
    # Combining multiple fields for alternate language titles into one
    alt_titles={}
    alt_lang=['Also known as','Arabic','Cantonese','Chinese','French',
                'Hangul','Hebrew','Hepburn','Japanese','Literally',
                'Mandarin','McCune–Reischauer','Original title','Polish',
                'Revised Romanization','Romanized','Russian',
                'Simplified','Traditional','Yiddish']
    # Check for presence of language, remove it as element and add it to dic as key-value pair
    for lang in alt_lang:
        if lang in movie:
            alt_titles[lang]=movie[lang]
            movie.pop(lang) 
    if len(alt_titles)>0:
        movie["Alternate Titles"] = alt_titles
        
     # Combining duplicate fields into one
    def change_column_name (old_name,new_name):
        if old_name in movie:
            movie[new_name]=movie.pop(old_name)
     # List of column names for modification       
    change_column_name('Directed by','Director')
    change_column_name('Distributed by','Distributor')
    change_column_name('Edited by','Editor(s)')
    change_column_name('Produced by','Producer')
    change_column_name('Producer','Producer(s)')
    change_column_name('Productioncompanies ','Productioncompany ')
    change_column_name('Productioncompany ','Production company(s)')
    change_column_name('Country of origin','Country')
    change_column_name('Adaptation by','Writer(s)')
    change_column_name('Length','Running time')
    change_column_name('Music by','Composer(s)')
    change_column_name('Adaptation by','Writer (s)')
    change_column_name('Original release','Release date')
    change_column_name('Original language(s)','Language')
    change_column_name('Released', 'Release date')
    change_column_name('Screen story by', 'Writer(s)')
    change_column_name('Screenplay by', 'Writer(s)')
    change_column_name('Story by', 'Writer(s)')
    change_column_name('Theme music composer', 'Composer(s)')
    change_column_name('Written by', 'Writer(s)')
    change_column_name('Voices of','Narrated by')     

    return movie

In [39]:
# 1 Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)

def extract_transform_load():
    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    kaggle_metadata=pd.read_csv(kaggle_file,low_memory=False)
    ratings=pd.read_csv(ratings_file)

    # Open the read the Wikipedia data JSON file.
    with open (wiki_file,"r") as wiki_data:
        wiki_movies_raw=json.load(wiki_data)     
    
    # 3. Write a list comprehension to filter out TV shows.
    wiki_movies=[movie for movie in wiki_movies_raw if 'No. of episodes' not in movie 
                                                        and ('Director' in movie or 'Directed by' in movie)                                                        
                                                        and 'imdb_link' in movie]
    
    # 4. Write a list comprehension to iterate through the cleaned wiki movies list
    # and call the clean_movie function on each movie.
    wiki_movies=[clean_movie(movie) for movie in wiki_movies]

    # 5. Read in the cleaned movies list from Step 4 as a DataFrame.
    wiki_movies_df=pd.DataFrame(wiki_movies)

    # 6. Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    #  dropping any imdb_id duplicates. If there is an error, capture and print the exception.
    try:
        exp=r"(tt\d{7})"
        wiki_movies_df['imdb_id']=wiki_movies_df['imdb_link'].str.extract(exp)
        wiki_movies_df.drop_duplicates("imdb_id",inplace=True)
    except Exception as e:
        print(e)
    
    #  7. Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.
    drop_col_list=[col for col in wiki_movies_df.columns if (wiki_movies_df[col].isna().sum())/len(wiki_movies) > 0.9]
    wiki_movies_df.drop(drop_col_list, axis="columns",inplace=True)

    # 8. Create a variable that will hold the non-null values from the “Box office” column.
    box_office=wiki_movies_df["Box office"].dropna()
    
    # 9. Convert the box office data created in Step 8 to string values using the lambda and join functions.
    box_office=box_office.apply(lambda x: " ".join(x) if type(x)==list else x)
    box_office=box_office.str.replace(r'\$.*[-–—](?![a-z])','$',regex=True)# Cleaning ranges
    # 10. Write a regular expression to match the six elements of "form_one" of the box office data.
    form_one=r"\$\s*\d+\.?\d*\s*[mb]illi?on"
    # 11. Write a regular expression to match the three elements of "form_two" of the box office data.
    form_two=r"\$\s*\d{1,3}(?:[\.,]\d{3})+(?!\s[mb]illion)"
    # 12. Add the parse_dollars function.
    def parse_dollars(s):
        # if s is not a string, return NaN
        if type(s)!=str:
            return np.nan

        # if input is of the form $###.# million
        if re.match("\$\s*\d+\.?\d*\s*milli?on",s,flags=re.IGNORECASE):

            # remove dollar sign and " million"
            s = re.sub("\$|\s|[a-zA-Z]",'',s)
            # convert to float and multiply by a million
            value=float(s)*10**6
            # return value
            return value

        # if input is of the form $###.# billion
        elif re.match("\$\s*\d+\.?\d*\s*billi?on",s,flags=re.IGNORECASE):
            # remove dollar sign and " billion"
            s=re.sub("\$|\s|[a-zA-Z]",'',s)
            # convert to float and multiply by a billion
            value=float(s)*10**9
            # return value
            return value

        # if input is of the form $###,###,###
        elif re.match("\$\s*\d{1,3}(?:[\.,]\d{3})+(?!\s[mb]illion)",s,flags=re.IGNORECASE):
            # remove dollar sign and commas
            s = re.sub("\$|\s|,",'',s)
            # convert to float
            value=float(s)
            # return value
            return value
        # otherwise, return NaN
        else:
            return np.nan
    
        
    # 13. Clean the box office column in the wiki_movies_df DataFrame.
    wiki_movies_df["box_office"]=box_office.str.extract(f"({form_one}|{form_two})",flags=re.I)[0].apply(parse_dollars)
    
    # 14. Clean the budget column in the wiki_movies_df DataFrame.
    budget=wiki_movies_df["Budget"].dropna().apply(lambda x:" ".join(x) if type(x)==list else x)
    budget=budget.str.replace(r'\$.*[-–—](?![a-z])','$',regex=True) # Cleaning ranges
    budget=budget.str.replace(r'\[\d+\]\s*', '',regex=True) # Removing citations
    wiki_movies_df["budget"]=budget.str.extract(f"({form_one}|{form_two})",flags=re.IGNORECASE)[0].apply(parse_dollars)
    # 15. Clean the release date column in the wiki_movies_df DataFrame.
    release_date=wiki_movies_df["Release date"].dropna().apply(lambda x: " ".join(x) if type(x)==list else x)    
    date_form_one=r"(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[0123]?\d,\s*\d{4}"
    date_form_two=r"\d{4}.[01]\d.[0123]\d"
    date_form_three=r"(?:January|February|March|April|May|June|July|August|September|October|November|December)\s*\d{4}"
    date_form_four=r"\d{4}"
    wiki_movies_df['release_date']=pd.to_datetime(release_date.str\
                                .extract(f"({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})"\
                                ,flags=re.IGNORECASE)[0],infer_datetime_format=True)
    # 16. Clean the running time column in the wiki_movies_df DataFrame.
    running_time=wiki_movies_df["Running time"].dropna().apply(lambda x: " ".join(x) if type(x)==list else x)
    time_form_one=r'(\d*)\s*m'
    time_form_two=r'(\d+)\s*ho?u?r?s?\s*(\d*)\s*'
    running_time_extract=running_time.str.extract(f"{time_form_one}|{time_form_two}",flags=re.I).\
                        apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[1]*60 + row[2] if row[0] == 0 else row[0], axis=1)
    
    # Dropping unnecessary fields
    wiki_movies_df.drop(["Box office","Budget","Release date","Running time"],axis="columns",inplace=True)
    
     
    # 2. Clean the Kaggle metadata.
    # Collecting only non-adult movies
    kaggle_metadata=kaggle_metadata[kaggle_metadata["adult"]=="False"].drop("adult",axis='columns')
    # Converting to numeric
    kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
    kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
    kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')
    # Converting release date to date
    kaggle_metadata["release_date"]=pd.to_datetime(kaggle_metadata["release_date"],infer_datetime_format=True)      
     
    
    # 3. Merged the two DataFrames into the movies DataFrame.
    movies_df=pd.merge(wiki_movies_df,kaggle_metadata,on="imdb_id",suffixes=["_wiki","_kaggle"])

    # 4. Drop unnecessary columns from the merged DataFrame.
    movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)

    # 5. Add in the function to fill in the missing Kaggle data.
    def fill_missing_column_data (df,kaggle_column,wiki_column):
        df[kaggle_column].fillna(0)
        df[kaggle_column]=df.apply(lambda row: row[wiki_column] if row[kaggle_column]==0 else row[kaggle_column],axis=1)
        df.drop(wiki_column,axis='columns',inplace=True)

    # 6. Call the function in Step 5 with the DataFrame and columns as the arguments.
    fill_missing_column_data (movies_df,"budget_kaggle","budget_wiki")
    fill_missing_column_data(movies_df, 'runtime', 'running_time')
    fill_missing_column_data(movies_df, 'revenue', 'box_office')
    
    # 7. Filter the movies DataFrame for specific columns.
    movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                       'genres','original_language','overview','spoken_languages','Country',
                       'production_companies','production_countries','Distributor',
                       'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                      ]]

    # 8. Rename the columns in the movies DataFrame.
    movies_df.rename({'id':'kaggle_id','title_kaggle':'title','url':'wikipedia_url','budget_kaggle':'budget',
                      'release_date_kaggle':'release_date','Country':'country','Distributor':'distributor',
                      'Producer(s)':'producers','Director':'director','Starring':'starring','Cinematography':'cinematography',
                      'Editor(s)':'editors','Writer(s)':'writers','Composer(s)':'composers','Based on':'based_on'}
                      ,axis='columns', inplace=True)

    # 9. Transform and merge the ratings DataFrame.
    # Grouping rating data by rating for each movie
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count()\
                .rename({'userId':'count'}, axis=1)\
                .pivot(index="movieId",columns="rating",values="count")
    # Renaming columns
    rating_counts.columns=["rating_"+str(col) for col in rating_counts.columns]
    # Merging the movies df with ratings df
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
    # Populating rating count columns with 0. rating_counts.columns is a list of columns
    movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)
        
    # DB Connection
    db_string = f"postgresql://postgres:{db_password}@127.0.0.1:5432/movie_data"
    engine=create_engine(db_string)
    movies_df.to_sql(name='movies', con=engine, if_exists='replace')
    # Saving ratings data. Dividing into chunks since file is large
    start_time=time.time()
    row_count=0
    for data in pd.read_csv(f'{file_dir}\\ratings.csv', chunksize=1000000):

        print(f"Importing {row_count} to {row_count+len(data)}...",end="")              
        data.to_sql(name='ratings', con=engine, if_exists='append')    
        row_count=row_count+len(data)    
        print(f'Done. {time.time()-start_time} seconds have elapsed')

In [40]:
# 10. Create the path to your file directory and variables for the three files.
file_dir = 'Resources'
# The Wikipedia data
wiki_file = f'{file_dir}/wikipedia-movies.json'
# The Kaggle metadata
kaggle_file = f'{file_dir}/movies_metadata.csv'
# The MovieLens rating data.
ratings_file = f'{file_dir}/ratings.csv'

In [41]:
# Run the function
extract_transform_load()

Importing 0 to 1000000...Done. 39.261881828308105 seconds have elapsed
Importing 1000000 to 2000000...Done. 76.6726176738739 seconds have elapsed
Importing 2000000 to 3000000...Done. 113.17659449577332 seconds have elapsed
Importing 3000000 to 4000000...Done. 151.04798579216003 seconds have elapsed
Importing 4000000 to 5000000...Done. 187.89104175567627 seconds have elapsed
Importing 5000000 to 6000000...Done. 225.4394519329071 seconds have elapsed
Importing 6000000 to 7000000...Done. 262.72111916542053 seconds have elapsed
Importing 7000000 to 8000000...Done. 300.7182400226593 seconds have elapsed
Importing 8000000 to 9000000...Done. 337.9566259384155 seconds have elapsed
Importing 9000000 to 10000000...Done. 374.90412163734436 seconds have elapsed
Importing 10000000 to 11000000...Done. 412.27722120285034 seconds have elapsed
Importing 11000000 to 12000000...Done. 450.06471824645996 seconds have elapsed
Importing 12000000 to 13000000...Done. 487.43779373168945 seconds have elapsed
Imp