In [2]:
import json
import pandas as pd
import numpy as np
import re
from sqlalchemy import create_engine
import psycopg2
from config import db_password
import time


In [4]:
#  Add the clean movie function that takes in the argument, "movie".
def clean_movie(movie):
    movie = dict(movie) #create a non-destructive copy
    alt_titles = {}
    # combine alternate titles into one list
    for key in ['Also known as','Arabic','Cantonese','Chinese','French',
                'Hangul','Hebrew','Hepburn','Japanese','Literally',
                'Mandarin','McCune-Reischauer','Original title','Polish',
                'Revised Romanization','Romanized','Russian',
                'Simplified','Traditional','Yiddish']:
        if key in movie:
            alt_titles[key] = movie[key]
            movie.pop(key)
    if len(alt_titles) > 0:
        movie['alt_titles'] = alt_titles
        # merge column names
    def change_column_name(old_name, new_name):
        if old_name in movie:
            movie[new_name] = movie.pop(old_name)
    change_column_name('Adaptation by', 'Writer(s)')
    change_column_name('Country of origin', 'Country')
    change_column_name('Directed by', 'Director')
    change_column_name('Distributed by', 'Distributor')
    change_column_name('Edited by', 'Editor(s)')
    change_column_name('Length', 'Running time')
    change_column_name('Original release', 'Release date')
    change_column_name('Music by', 'Composer(s)')
    change_column_name('Produced by', 'Producer(s)')
    change_column_name('Producer', 'Producer(s)')
    change_column_name('Productioncompanies ', 'Production company(s)')
    change_column_name('Productioncompany ', 'Production company(s)')
    change_column_name('Released', 'Release Date')
    change_column_name('Release Date', 'Release date')
    change_column_name('Screen story by', 'Writer(s)')
    change_column_name('Screenplay by', 'Writer(s)')
    change_column_name('Story by', 'Writer(s)')
    change_column_name('Theme music composer', 'Composer(s)')
    change_column_name('Written by', 'Writer(s)')

    return movie

In [8]:
# 1 Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)

def extract_transform_load(x,y,z):
    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
        kaggle_file=pd.read_csv(x, low_memory=False)
        ratings_file=pd.read_csv(z)

    # Open and read the Wikipedia data JSON file.
        with open(y, mode='r') as file:
            wiki_file=json.load(file)
    
    # Write a list comprehension to filter out TV shows.
        wiki_movies = [movie for movie in wiki_file
                     if ('Director' in movie or 'Directed by' in movie)
                         and 'imdb_link' in movie
                         and 'No. of episodes' not in movie]

    #  Write a list comprehension to iterate through the cleaned wiki movies list
    # and call the clean_movie function on each movie.
        clean_movies = [clean_movie(movie) for movie in wiki_movies]

    #  Read in the cleaned movies list from Step 4 as a DataFrame.
        wiki_file=pd.DataFrame(clean_movies)

    #  Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    #  dropping any imdb_id duplicates. If there is an error, capture and print the exception.
        try:
            wiki_file['imdb_id'] = wiki_file['imdb_link'].str.extract(r'(tt\d{7})')
            wiki_file.drop_duplicates(subset='imdb_id', inplace=True)
        except (RuntimeError, TypeError, NameError):
            print("Oops! There is an error")
             #  Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.
        wiki_columns_to_keep = [column for column in wiki_file.columns if wiki_file[column].isnull().sum() < len(wiki_file) * 0.9]
        wiki_file = wiki_file[wiki_columns_to_keep]

    #  Create a variable that will hold the non-null values from the “Box office” column.
        box_office = wiki_file['Box office'].dropna()
    
    #  Convert the box office data created in Step 8 to string values using the lambda and join functions.
        box_office = box_office.apply(lambda w: ' '.join(w) if type(w) == list else w)

    #  Write a regular expression to match the six elements of "form_one" of the box office data.
        form_one = r'\$\d+\.?\d*\s*[mb]illion'
        matches_form_one = box_office.str.contains(form_one, flags=re.IGNORECASE, na=False)

    #  Write a regular expression to match the three elements of "form_two" of the box office data.
        form_two = r'\$\d{1,3}(?:,\d{3})+'
        matches_form_two = box_office.str.contains(form_two, flags=re.IGNORECASE, na=False)
    #  Add the parse_dollars function.
        def parse_dollars(s):
    # if s is not a string, return NaN
           if type(s) != str:
               return np.nan

    # if input is of the form $###.# million
           if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):

        # remove dollar sign and " million"
               s = re.sub('\$|\s|[a-zA-Z]','', s)

        # convert to float and multiply by a million
               value = float(s) * 10**6

        # return value
               return value
        # if input is of the form $###.# billion
           elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):

        # remove dollar sign and " billion"
                s = re.sub('\$|\s|[a-zA-Z]','', s)

        # convert to float and multiply by a billion
                value = float(s) * 10**9

        # return value
                return value

    # if input is of the form $###,###,###
           elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):

        # remove dollar sign and commas
                s = re.sub('\$|,','', s)

        # convert to float
                value = float(s)

        # return value
                return value

    # otherwise, return NaN
           else:
              return np.nan
            
    #  Clean the box office column in the wiki_movies_df DataFrame.
        wiki_file['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
        wiki_file.drop('Box office', axis=1, inplace=True)
        
    
    #  Clean the budget column in the wiki_movies_df DataFrame.
        budget = wiki_file['Budget'].dropna()
        budget = budget.map(lambda w: ' '.join(w) if type(w) == list else w)
        budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
        matches_form_one = budget.str.contains(form_one, flags=re.IGNORECASE, na=False)
        matches_form_two = budget.str.contains(form_two, flags=re.IGNORECASE, na=False)
        budget[~matches_form_one & ~matches_form_two]
        budget = budget.str.replace(r'\[\d+\]\s*', '')
        budget[~matches_form_one & ~matches_form_two]
        wiki_file['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
        wiki_file.drop('Budget', axis=1, inplace=True)
    
    
    #  Clean the release date column in the wiki_movies_df DataFrame.
        release_date = wiki_file['Release date'].dropna().apply(lambda w: ' '.join(w) if type(w) == list else w)
        date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]?\d,\s\d{4}'
        date_form_two = r'\d{4}.[01]\d.[0123]\d'
        date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
        date_form_four = r'\d{4}'
        wiki_file['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)
        wiki_file.drop('Release date', axis=1, inplace=True)
    
    #  Clean the running time column in the wiki_movies_df DataFrame.
        running_time = wiki_file['Running time'].dropna().apply(lambda w: ' '.join(w) if type(w) == list else w)
        running_time[running_time.str.contains(r'^\d*\s*minutes$', flags=re.IGNORECASE, na=False) != True]
        running_time[running_time.str.contains(r'^\d*\s*m', flags=re.IGNORECASE, na=False) != True]
        running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
        running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
        wiki_file['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
        wiki_file.drop('Running time', axis=1, inplace=True)
        
         # 2. Clean the Kaggle metadata.
        kaggle_file[~kaggle_file['adult'].isin(['True','False'])]
        kaggle_file = kaggle_file[kaggle_file['adult'] == 'False'].drop('adult',axis='columns')
        kaggle_file['video'] = kaggle_file['video'] == 'True'
        kaggle_file['budget'] = kaggle_file['budget'].astype(int)
        kaggle_file['id'] = pd.to_numeric(kaggle_file['id'], errors='raise')
        kaggle_file['popularity'] = pd.to_numeric(kaggle_file['popularity'], errors='raise')
        kaggle_file['release_date'] = pd.to_datetime(kaggle_file['release_date'])
        
        
    # 3. Merged the two DataFrames into the movies DataFrame.
        movies_df = pd.merge(wiki_file, kaggle_file, on='imdb_id', suffixes=['_wiki','_kaggle'])
        movies_df[movies_df['title_wiki'] != movies_df['title_kaggle']][['title_wiki','title_kaggle']]
        
        
    # 4. Drop unnecessary columns from the merged DataFrame.
        movies_df = movies_df.drop(movies_df[(movies_df['release_date_wiki'] > '1996-01-01') & (movies_df['release_date_kaggle'] < '1965-01-01')].index)
        movies_df['Language'].apply(lambda x: tuple(x) if type(x) == list else x).value_counts(dropna=False)
        movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)
        
    # 5. Add in the function to fill in the missing Kaggle data.
        def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
            df[kaggle_column] = df.apply(
              lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column]
                    ,axis=1)
            df.drop(columns=wiki_column, inplace=True)

    # 6. Call the function in Step 5 with the DataFrame and columns as the arguments.
        fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
        fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
        fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')
        movies_df
     # 7. Filter the movies DataFrame for specific columns.
        movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                       'genres','original_language','overview','spoken_languages','Country',
                       'production_companies','production_countries','Distributor',
                       'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                      ]]


    # 8. Rename the columns in the movies DataFrame.
        movies_df.rename({'id':'kaggle_id',
                  'title_kaggle':'title',
                  'url':'wikipedia_url',
                  'budget_kaggle':'budget',
                  'release_date_kaggle':'release_date',
                  'Country':'country',
                  'Distributor':'distributor',
                  'Producer(s)':'producers',
                  'Director':'director',
                  'Starring':'starring',
                  'Cinematography':'cinematography',
                  'Editor(s)':'editors',
                  'Writer(s)':'writers',
                  'Composer(s)':'composers',
                  'Based on':'based_on'
                 }, axis='columns', inplace=True)
       
    # 9. Transform and merge the ratings DataFrame.
        ratings_file['timestamp'] = pd.to_datetime(ratings_file['timestamp'], unit='s')
        rating_counts = ratings_file.groupby(['movieId','rating'], as_index=False).count() \
                .rename({'userId':'count'}, axis=1) \
                .pivot(index='movieId',columns='rating', values='count')
        rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]
        movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
        movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)
        
        # 10. Creating Data base
    # Import the movies_df data
    
        db_string = f"postgresql://postgres:{db_password}@127.0.0.1:5432/movie_data"
        engine = create_engine(db_string)
        movies_df.to_sql(name='movies', con=engine)
        
    # Import the Ratings data
        rows_imported = 0
        start_time = time.time()
        for data in pd.read_csv(f'{ratings_dir}', chunksize=1000000):
            print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
            data.to_sql(name='ratings', con=engine, if_exists='append')
            rows_imported += len(data)
            print(f'Done. {time.time() - start_time} total seconds elapsed')
        
    # Return three variables. The first is the wiki_movies_df DataFrame
    
        return  
        

In [9]:
# 11. Create the path to your file directory and variables for the three files.
file_dir = 'Resources'
# The Wikipedia data
wiki_dir = f'{file_dir}/wikipedia_movies.json'
# The Kaggle metadata
kaggle_dir = f'{file_dir}/movies_metadata.csv'
# The MovieLens rating data.
ratings_dir = f'{file_dir}/ratings.csv'

In [11]:
# 12. Run the function
extract_transform_load(kaggle_dir,wiki_dir,ratings_dir)

  budget = budget.str.replace(r'\[\d+\]\s*', '')


importing rows 0 to 1000000...Done. 29.089967012405396 total seconds elapsed
importing rows 1000000 to 2000000...Done. 57.8761990070343 total seconds elapsed
importing rows 2000000 to 3000000...Done. 87.1897840499878 total seconds elapsed
importing rows 3000000 to 4000000...Done. 117.28276681900024 total seconds elapsed
importing rows 4000000 to 5000000...Done. 146.92089295387268 total seconds elapsed
importing rows 5000000 to 6000000...Done. 177.1719410419464 total seconds elapsed
importing rows 6000000 to 7000000...Done. 206.39723300933838 total seconds elapsed
importing rows 7000000 to 8000000...Done. 236.53729605674744 total seconds elapsed
importing rows 8000000 to 9000000...Done. 266.2454149723053 total seconds elapsed
importing rows 9000000 to 10000000...Done. 296.6093180179596 total seconds elapsed
importing rows 10000000 to 11000000...Done. 325.27084493637085 total seconds elapsed
importing rows 11000000 to 12000000...Done. 355.34184885025024 total seconds elapsed
importing ro