In [3]:
import json
import pandas as pd
import numpy as np

import re

from sqlalchemy import create_engine
import psycopg2

from config import db_password

import time

In [4]:
#  Add the clean movie function that takes in the argument, "movie".
def clean_movie(movie):
    
    movie = dict(movie) #create a non-destructive copy
    alt_titles = {}
    
    for key in ['Also known as','Arabic','Cantonese','Chinese','French',
                'Hangul','Hebrew','Hepburn','Japanese','Literally',
                'Mandarin','McCune–Reischauer','Original title','Polish',
                'Revised Romanization','Romanized','Russian',
                'Simplified','Traditional','Yiddish']:
        if key in movie:
            alt_titles[key] = movie[key]
            movie.pop(key)
    if len(alt_titles) > 0:
        movie['alt_titles'] = alt_titles
    
    # Merge column names
    def change_column_name(old_name, new_name):
        if old_name in movie:
            movie[new_name] = movie.pop(old_name)
    change_column_name('Adaptation by', 'Writer(s)')
    change_column_name('Country of origin', 'Country')
    change_column_name('Directed by', 'Director')
    change_column_name('Distributed by', 'Distributor')
    change_column_name('Edited by', 'Editor(s)')
    change_column_name('Length', 'Running time')
    change_column_name('Original release', 'Release date')
    change_column_name('Music by', 'Composer(s)')
    change_column_name('Produced by', 'Producer(s)')
    change_column_name('Producer', 'Producer(s)')
    change_column_name('Productioncompanies ', 'Production company(s)')
    change_column_name('Productioncompany ', 'Production company(s)')
    change_column_name('Released', 'Release Date')
    change_column_name('Release Date', 'Release date')
    change_column_name('Screen story by', 'Writer(s)')
    change_column_name('Screenplay by', 'Writer(s)')
    change_column_name('Story by', 'Writer(s)')
    change_column_name('Theme music composer', 'Composer(s)')
    change_column_name('Written by', 'Writer(s)')

    return movie

In [5]:
# 1 Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)

def extract_transform_load():
    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    kaggle_metadata = pd.read_csv(f'{file_dir}/movies_metadata.csv', low_memory = False)

    ratings = pd.read_csv(f'{file_dir}/ratings.csv')

    # Open and read the Wikipedia data JSON file.
    with open(f'{file_dir}/wikipedia-movies.json', mode='r') as file:
        wiki_movies_raw = json.load(file)    
    
    wiki_movies_df = pd.DataFrame(wiki_movies_raw)
    
    # Write a list comprehension to filter out TV shows.
    wiki_movies = [movie for movie in wiki_movies_raw
               if ('Director' in movie or 'Directed by' in movie)
                   and 'imdb_link' in movie
                   and 'No. of episodes' not in movie]

    # Write a list comprehension to iterate through the cleaned wiki movies list
    # and call the clean_movie function on each movie.
    clean_movies = [clean_movie(movie) for movie in wiki_movies]

    # Read in the cleaned movies list from Step 4 as a DataFrame.
    wiki_movies_df = pd.DataFrame(clean_movies)
    sorted(wiki_movies_df.columns.tolist())

    # Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    #  dropping any imdb_id duplicates. If there is an error, capture and print the exception.
    try:
        wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
        wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)
        
    except Exception as e:
        print(e)

    #  Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.
    wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
    wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]

    # Create a variable that will hold the non-null values from the “Box office” column.
    box_office = wiki_movies_df['Box office'].dropna()
    
    # Convert the box office data created in Step 8 to string values using the lambda and join functions.
    box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)

    # Write a regular expression to match the six elements of "form_one" of the box office data.
    form_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on'
    
    # Write a regular expression to match the three elements of "form_two" of the box office data.
    form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'
    box_office = box_office.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    box_office.str.extract(f'({form_one}|{form_two})')

    # Add the parse_dollars function.
    def parse_dollars(s):
        # if s is not a string, return NaN
        if type(s) != str:
            return np.nan

        # if input is of the form $###.# million
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " million"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a million
            value = float(s) * 10**6

            # return value
            return value

        # if input is of the form $###.# billion
        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " billion"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a billion
            value = float(s) * 10**9

            # return value
            return value

        # if input is of the form $###,###,###
        elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):

            # remove dollar sign and commas
            s = re.sub('\$|,','', s)

            # convert to float
            value = float(s)

            # return value
            return value

        # otherwise, return NaN
        else:
            return np.nan
        
    # Clean the box office column in the wiki_movies_df DataFrame.
    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    wiki_movies_df.drop('Box office', axis=1, inplace=True)
    
    # Clean the budget column in the wiki_movies_df DataFrame.
    budget = wiki_movies_df['Budget'].dropna()
    
    budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)
    
    budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    
    budget.str.extract(f'({form_one}|{form_two})')
    
    budget = budget.str.replace(r'\[\d+\]\s*', '')
    
    matches_form_one = budget.str.contains(form_one, flags=re.IGNORECASE)
    matches_form_two = budget.str.contains(form_two, flags=re.IGNORECASE)
    
    budget[~matches_form_one & ~matches_form_two]
    
    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    

    # Clean the release date column in the wiki_movies_df DataFrame.
    release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    
    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]\d,\s\d{4}'
    date_form_two = r'\d{4}.[01]\d.[123]\d'
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
    date_form_four = r'\d{4}'

    release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})', flags=re.IGNORECASE)

    wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)

    # Clean the running time column in the wiki_movies_df DataFrame.
    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)

    running_time[running_time.str.contains(r'^\d*\s*m', flags=re.IGNORECASE) != True]

    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
    
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)

    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)

    wiki_movies_df.drop('Running time', axis=1, inplace=True)
     
    # 2. Clean the Kaggle metadata.
    kaggle_metadata[~kaggle_metadata['adult'].isin(['True','False'])]
    
    # The following code will keep rows where the adult column is False, and then drop the adult column.
    kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult',axis='columns')

    # This code creates the Boolean column we want.
    #kaggle_metadata['video'] == 'True'

    # Assign the boolean column we created avoce back to 'video'
    kaggle_metadata['video'] = kaggle_metadata['video'] == 'True'
    
    # creating these to numeric values
    kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
    kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
    kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')
    
    # Converting it to date time format.
    kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])
    
    
    # 3. Merged the two DataFrames into the movies DataFrame.
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])

    # 4. Drop unnecessary columns from the merged DataFrame.
    movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)

    # 5. Add in the function to fill in the missing Kaggle data.
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        df[kaggle_column] = df.apply(
            lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column]
            , axis=1)
        df.drop(columns=wiki_column, inplace=True)

    # 6. Call the function in Step 5 with the DataFrame and columns as the arguments.
    fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')

    # 7. Filter the movies DataFrame for specific columns.
    movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                       'genres','original_language','overview','spoken_languages','Country',
                       'production_companies','production_countries','Distributor',
                       'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                      ]]

    # 8. Rename the columns in the movies DataFrame.
    movies_df.rename({'id':'kaggle_id',
                  'title_kaggle':'title',
                  'url':'wikipedia_url',
                  'budget_kaggle':'budget',
                  'release_date_kaggle':'release_date',
                  'Country':'country',
                  'Distributor':'distributor',
                  'Producer(s)':'producers',
                  'Director':'director',
                  'Starring':'starring',
                  'Cinematography':'cinematography',
                  'Editor(s)':'editors',
                  'Writer(s)':'writers',
                  'Composer(s)':'composers',
                  'Based on':'based_on'
                 }, axis='columns', inplace=True)

    # 9. Transform and merge the ratings DataFrame.
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count()
        
    
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                .rename({'userId':'count'}, axis=1)
        
    
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                .rename({'userId':'count'}, axis=1) \
                .pivot(index='movieId',columns='rating', values='count')
        
    
    rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]

    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
    
    db_string = f"postgres://postgres:{db_password}@127.0.0.1:5432/movie_data"

    engine = create_engine(db_string)
    
    movies_df.to_sql(name='movies', con=engine, if_exists='replace')
    
    rows_imported = 0                           # Creates variable for the number of rows imported
    start_time = time.time()                    # get the start_time from time.time()
    

    for data in pd.read_csv('ratings.csv', chunksize=50000):

        # print out the range of rows that are being imported
        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
        
        data.to_sql(name='ratings', con=engine, if_exists='append')
        
        # increment the number of rows imported by the size of 'data'
        rows_imported += len(data)

        # print that the rows have finished importing
        # add elapsed time to final print out
        print(f'Done. {time.time() - start_time} total seconds elapsed')
    


In [6]:
# 10. Create the path to your file directory and variables for the three files.
file_dir = '/Users/Tommy/Professional/Data Analytics and Visualization/Week 8 - Extract, Transform, Lord (ETL)'
# The Wikipedia data
wiki_file = f'{file_dir}/wikipedia.movies.json'
# The Kaggle metadata
kaggle_file = f'{file_dir}/movies_metadata.csv'
# The MovieLens rating data.
ratings_file = f'{file_dir}/ratings.csv'

In [7]:
# 11. Set the three variables equal to the function created in D1.
extract_transform_load()

importing rows 0 to 50000...Done. 5.37873911857605 total seconds elapsed
importing rows 50000 to 100000...Done. 10.703013896942139 total seconds elapsed
importing rows 100000 to 150000...Done. 15.878643989562988 total seconds elapsed
importing rows 150000 to 200000...Done. 21.020561933517456 total seconds elapsed
importing rows 200000 to 250000...Done. 26.19023609161377 total seconds elapsed
importing rows 250000 to 300000...Done. 31.322097301483154 total seconds elapsed
importing rows 300000 to 350000...Done. 36.51977896690369 total seconds elapsed
importing rows 350000 to 400000...Done. 41.73559784889221 total seconds elapsed
importing rows 400000 to 450000...Done. 46.85221195220947 total seconds elapsed
importing rows 450000 to 500000...Done. 52.057424545288086 total seconds elapsed
importing rows 500000 to 550000...Done. 57.217998027801514 total seconds elapsed
importing rows 550000 to 600000...Done. 62.47741460800171 total seconds elapsed
importing rows 600000 to 650000...Done. 67

importing rows 5000000 to 5050000...Done. 522.2047593593597 total seconds elapsed
importing rows 5050000 to 5100000...Done. 527.3108110427856 total seconds elapsed
importing rows 5100000 to 5150000...Done. 532.4919245243073 total seconds elapsed
importing rows 5150000 to 5200000...Done. 537.668261051178 total seconds elapsed
importing rows 5200000 to 5250000...Done. 542.7837216854095 total seconds elapsed
importing rows 5250000 to 5300000...Done. 547.9068276882172 total seconds elapsed
importing rows 5300000 to 5350000...Done. 553.0883996486664 total seconds elapsed
importing rows 5350000 to 5400000...Done. 558.256217956543 total seconds elapsed
importing rows 5400000 to 5450000...Done. 563.3975126743317 total seconds elapsed
importing rows 5450000 to 5500000...Done. 568.5196132659912 total seconds elapsed
importing rows 5500000 to 5550000...Done. 573.6940619945526 total seconds elapsed
importing rows 5550000 to 5600000...Done. 578.9017715454102 total seconds elapsed
importing rows 560

importing rows 10000000 to 10050000...Done. 1040.3206372261047 total seconds elapsed
importing rows 10050000 to 10100000...Done. 1045.4716050624847 total seconds elapsed
importing rows 10100000 to 10150000...Done. 1050.6869525909424 total seconds elapsed
importing rows 10150000 to 10200000...Done. 1055.895096063614 total seconds elapsed
importing rows 10200000 to 10250000...Done. 1061.062451839447 total seconds elapsed
importing rows 10250000 to 10300000...Done. 1066.2159190177917 total seconds elapsed
importing rows 10300000 to 10350000...Done. 1071.4297723770142 total seconds elapsed
importing rows 10350000 to 10400000...Done. 1076.6630651950836 total seconds elapsed
importing rows 10400000 to 10450000...Done. 1081.8162016868591 total seconds elapsed
importing rows 10450000 to 10500000...Done. 1086.9620411396027 total seconds elapsed
importing rows 10500000 to 10550000...Done. 1092.2641229629517 total seconds elapsed
importing rows 10550000 to 10600000...Done. 1097.6011610031128 tota

importing rows 14850000 to 14900000...Done. 1550.3703229427338 total seconds elapsed
importing rows 14900000 to 14950000...Done. 1555.5534904003143 total seconds elapsed
importing rows 14950000 to 15000000...Done. 1560.7826118469238 total seconds elapsed
importing rows 15000000 to 15050000...Done. 1565.9398849010468 total seconds elapsed
importing rows 15050000 to 15100000...Done. 1571.1839611530304 total seconds elapsed
importing rows 15100000 to 15150000...Done. 1576.339898109436 total seconds elapsed
importing rows 15150000 to 15200000...Done. 1581.5629749298096 total seconds elapsed
importing rows 15200000 to 15250000...Done. 1586.7982845306396 total seconds elapsed
importing rows 15250000 to 15300000...Done. 1592.074989080429 total seconds elapsed
importing rows 15300000 to 15350000...Done. 1597.3603079319 total seconds elapsed
importing rows 15350000 to 15400000...Done. 1602.7173352241516 total seconds elapsed
importing rows 15400000 to 15450000...Done. 1608.0450439453125 total s

importing rows 19700000 to 19750000...Done. 2061.941079854965 total seconds elapsed
importing rows 19750000 to 19800000...Done. 2067.131870985031 total seconds elapsed
importing rows 19800000 to 19850000...Done. 2072.359872341156 total seconds elapsed
importing rows 19850000 to 19900000...Done. 2077.591572999954 total seconds elapsed
importing rows 19900000 to 19950000...Done. 2082.809893131256 total seconds elapsed
importing rows 19950000 to 20000000...Done. 2087.985760450363 total seconds elapsed
importing rows 20000000 to 20050000...Done. 2093.2192809581757 total seconds elapsed
importing rows 20050000 to 20100000...Done. 2098.4806356430054 total seconds elapsed
importing rows 20100000 to 20150000...Done. 2103.6461606025696 total seconds elapsed
importing rows 20150000 to 20200000...Done. 2108.829466819763 total seconds elapsed
importing rows 20200000 to 20250000...Done. 2114.0829784870148 total seconds elapsed
importing rows 20250000 to 20300000...Done. 2119.3398485183716 total sec

importing rows 24550000 to 24600000...Done. 2571.405806541443 total seconds elapsed
importing rows 24600000 to 24650000...Done. 2576.6808049678802 total seconds elapsed
importing rows 24650000 to 24700000...Done. 2581.9622082710266 total seconds elapsed
importing rows 24700000 to 24750000...Done. 2587.119214296341 total seconds elapsed
importing rows 24750000 to 24800000...Done. 2592.3274714946747 total seconds elapsed
importing rows 24800000 to 24850000...Done. 2597.5264780521393 total seconds elapsed
importing rows 24850000 to 24900000...Done. 2602.750995874405 total seconds elapsed
importing rows 24900000 to 24950000...Done. 2607.923166513443 total seconds elapsed
importing rows 24950000 to 25000000...Done. 2613.1943593025208 total seconds elapsed
importing rows 25000000 to 25050000...Done. 2618.443568468094 total seconds elapsed
importing rows 25050000 to 25100000...Done. 2623.683258533478 total seconds elapsed
importing rows 25100000 to 25150000...Done. 2628.8799390792847 total se