In [1]:
import json
import pandas as pd
import numpy as np

import re

from sqlalchemy import create_engine
import psycopg2

from config import db_password

import time

import warnings
warnings.filterwarnings("ignore")

In [2]:
pip install psycopg2-binary

Note: you may need to restart the kernel to use updated packages.


In [3]:
# 1. The Kaggle metadata is cleaned (4 pt)
def clean_movie(movie):
    movie = dict(movie) #create a non-destructive copy
    return movie

In [4]:
# 1 Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)
# Before Step 2, add all the code written for Deliverable 2

# 2 Add the function that takes in three arguments; Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)
# The Wikipedia and Kaggle DataFrames are merged (3 pt)

def extract_transform_load(wiki_file, kaggle_file, ratings_file):    
    
    file_dir = 'C://Users/KenAk/ETL/Movies-ETL/'
    
    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.

    kaggle_metadata = pd.read_csv(f'{file_dir}movies_metadata.csv', low_memory=False)
    ratings = pd.read_csv(f'{file_dir}ratings.csv')
    
    # Open and read the Wikipedia data JSON file.
 
    with open(f'{file_dir}/wikipedia-movies.json', mode='r') as file:
        wiki_movies_raw = json.load(file)
    
    # 3. Write a list comprehension to filter out TV shows. Code from near bottom of 8.3.3
    # The TV shows are filtered out, and the wiki_movies_df DataFrame is created (3 pt)
    
    wiki_movies = [movie for movie in wiki_movies_raw
                   if ('Director' in movie or 'Directed by' in movie)
                       and 'imdb_link' in movie
                       and 'No. of episodes' not in movie]
    
    # 4. Write a list comprehension to iterate through the cleaned wiki movies list and call the clean_movie function on each movie.
    # We can make a list of cleaned movies with a list comprehension
    
    clean_movies = [clean_movie(movie) for movie in wiki_movies]

    # 5. Read in the cleaned movies list from Step 4 as a DataFrame.

    wiki_movies_df = pd.DataFrame(clean_movies)
    
    # 6. Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    #  dropping any imdb_id duplicates. If there is an error, capture and print the exception.
    # A try-except block is used to catch errors while extracting the IMDb IDs with a regular expression and dropping duplicate IDs. (5 pt)
    try:
        wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
        print("Number of movies before dropping duplicates:", len(wiki_movies_df))
        wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)
        print("Number of movies after dropping duplicates: ",len(wiki_movies_df))
    except:
        print("An exception occurred")

    #  7. Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.
    # That will give us the columns that we want to keep, which we can select from our Pandas DataFrame
    # A list comprehension is used to keep columns with non-null values (3 pt)
    wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
    wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]
     
    # 8. Create a variable that will hold the non-null values from the “Box office” column.
    # The non-null box office data is converted to string values using the lambda and join functions (3 pt)

    Box_Office = wiki_movies_df['Box office'].dropna() #drop missing values
    
    # 9. Convert the box office data created in Step 8 to string values using the lambda and join functions.  
    # Lambda functions don't have a name and automatically return a variable
    Box_Office[Box_Office.map(lambda x: type(x) != str)]
    # Instead of creating a new function with a block of code and the def keyword, we can create an anonymous lambda function right inside the map() call

    # 10. Write a regular expression to match the six elements of "form_one" of the box office data.  
    # 8.3.10 Parse the Box Office Data; A regular expression is used to match the six elements of "form_one" of the box office data (2 pt)
    form_one = r'\$\d+\.?\d*\s*[mb]illi?on'
    Box_Office.str.contains(form_one, flags=re.IGNORECASE, na=False).sum()
    
    # 11. Write a regular expression to match the three elements of "form_two" of the box office data.
    # A regular expression is used to match the three elements of "form_two" of the box office data (2 pt)
    form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illi?on)'
    Box_Office.str.contains(form_two, flags=re.IGNORECASE, na=False).sum()
    
    # 12. Add the parse_dollars function.
    def parse_dollars(s):
        # if s is not a string, return NaN
        if type(s) != str:
            return np.nan

        # if input is of the form $###.# million
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " million"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a million
            value = float(s) * 10**6

            # return value
            return value

        # if input is of the form $###.# billion
        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " billion"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a billion
            value = float(s) * 10**9

            # return value
            return value

        # if input is of the form $###,###,###
        elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illi?on)', s, flags=re.IGNORECASE):

            # remove dollar sign and commas
            s = re.sub('\$|,','', s)

            # convert to float
            value = float(s)

            # return value
            return value

        # otherwise, return NaN
        else:
            return np.nan
    
    # The following columns are cleaned in the Wikipedia DataFrame: (8 pt)
    
    # The box office column
    # The budget column
    # The release date column
    # The running time column
    
    # 13. Clean the box office column in the wiki_movies_df DataFrame.

    wiki_movies_df['Box_Office'] = Box_Office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    wiki_movies_df.drop('Box office', axis=1, inplace=True)
    
    # 14. Clean the budget column in the wiki_movies_df DataFrame.
    
    budget = wiki_movies_df['Budget'].dropna()
    budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    matches_form_one = budget.str.contains(form_one, flags=re.IGNORECASE, na=False)
    matches_form_two = budget.str.contains(form_two, flags=re.IGNORECASE, na=False)
    budget[~matches_form_one & ~matches_form_two]
    # Remove the citation references with the following:
    budget = budget.str.replace(r'\[\d+\]\s*', '')
    budget[~matches_form_one & ~matches_form_two]
    # make a variable that holds the non-null values of Release date in the DataFrame, converting lists to strings
    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    # We can also drop the original Budget column
    wiki_movies_df.drop('Budget', axis=1, inplace=True)
    
    # 15. Clean the release date column in the wiki_movies_df DataFrame.
    
    # make a variable that holds the non-null values of Release date in the DataFrame, converting lists to strings

    Release_Date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    
    # The forms we'll be parsing are:

    # 1. Full month name, one- to two-digit day, four-digit year (i.e., January 1, 2000)
    # 2. Four-digit year, two-digit month, two-digit day, with any separator (i.e., 2000-01-01)
    # 3. Full month name, four-digit year (i.e., January 2000)
    # 4. Four-digit year

    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]?\d,\s\d{4}'
    date_form_two = r'\d{4}.[01]\d.[0123]\d'
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
    date_form_four = r'\d{4}'
    
    # Extract the dates

    Release_Date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})', flags=re.IGNORECASE)
    wiki_movies_df['Release_Date'] = pd.to_datetime(Release_Date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)
    # We can also drop the original Release date column
    wiki_movies_df.drop('Release date', axis=1, inplace=True)
    
    # 16. Clean the running time column in the wiki_movies_df DataFrame.
    # Parse running time
    Running_Time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    running_time_extract = Running_Time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
    wiki_movies_df['Running_Time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
    wiki_movies_df.drop('Running time', axis=1, inplace=True)
    
    # Return three variables. The first is the wiki_movies_df DataFrame  
    #return wiki_movies_df, kaggle_metadata, ratings 

    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    
    # Open and read the Wikipedia data JSON file.
      
    # Write a list comprehension to filter out TV shows.  

    # Write a list comprehension to iterate through the cleaned wiki movies list and call the clean_movie function on each movie.
 
    # Read in the cleaned movies list from Step 4 as a DataFrame.

    # Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    # dropping any imdb_id duplicates. If there is an error, capture and print the exception.

    #  Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.   

    # Create a variable that will hold the non-null values from the “Box office” column.
    
    # Convert the box office data created in Step 8 to string values using the lambda and join functions.    

    # Write a regular expression to match the six elements of "form_one" of the box office data.
   
    # Write a regular expression to match the three elements of "form_two" of the box office data.   

    # Add the parse_dollars function.    
        
    # Clean the box office column in the wiki_movies_df DataFrame.
    
    # Clean the budget column in the wiki_movies_df DataFrame.
    
    # Clean the release date column in the wiki_movies_df DataFrame.
    
    # Clean the running time column in the wiki_movies_df DataFrame.    
     
    # 2. Clean the Kaggle metadata
    
    kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult',axis='columns')
    kaggle_metadata['video'] = kaggle_metadata['video'] == 'True'
    kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
    kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
    kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')
    kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])

    # 3. Merged the two DataFrames into the movies DataFrame.
    # 8.4.1 Merge Wikipedia and Kaggle Metadata
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])

    #Test code
    movies_df.columns.to_list()
    
    # 4. Drop unnecessary columns from the merged DataFrame.
    movies_df.drop(columns=['title_wiki','Language'], inplace=True)
    
    # 5. Add in the function to fill in the missing Kaggle data.
    # Next, to save a little time, we'll make a function that fills in missing data for a column pair and then drops the redundant column
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
             df[kaggle_column] = df.apply(
                lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column], axis=1)
             df.drop(columns=wiki_column, inplace=True)

    # 6. Call the function in Step 5 with the DataFrame and columns as the arguments.
    # Now we can run the function for the column pairs that we decided to fill in zeros
    fill_missing_kaggle_data(movies_df, 'runtime', 'Running_Time')
    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(movies_df, 'revenue', 'Box_Office')
    fill_missing_kaggle_data(movies_df, 'production_companies', 'Productioncompanies ')
    fill_missing_kaggle_data(movies_df, 'release_date', 'Release_Date')
    
    # The above procedures, along with filtering and renaming movies_df DataFrame columns (see below), are worth 8 points.
        
    # 7. Filter the movies DataFrame for specific columns.
    # Since we've merged our data and filled in values, it's good to check that there aren't any columns with only one value, since
    # that doesn't really provide any information. Don't forget, we need to convert lists to tuples for value_counts() to work.

    for col in movies_df.columns:
        lists_to_tuples = lambda x: tuple(x) if type(x) == list else x
        value_counts = movies_df[col].apply(lists_to_tuples).value_counts(dropna=False)
        num_values = len(value_counts)
        if num_values == 1:
            movies_df.drop(columns=col, inplace=True)
            print("We have dropped the", col, "column because it only has one value.")
                
    # 8. Rename the columns in the movies DataFrame # Reorder, then rename the columns

    movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date','popularity','vote_average','vote_count','genres',
                       'original_language','overview','spoken_languages','Country','production_companies','production_countries',
                       'Distributed by','Produced by','Directed by','Starring','Cinematography','Edited by','Written by','Screenplay by',
                       'Music by','Based on','Productioncompany ','homepage'
                      ]]

    movies_df.rename({'id':'kaggle_id',
                   'title_kaggle':'title',
                   'url':'wikipedia_url',
                   'budget_kaggle':'budget',
                   'Productioncompany ':'production_company',
                   'Country':'country',
                   'Distributed by':'distributor',
                   'Produced by':'producer',
                   'Directed by':'director',
                   'Starring':'starring',
                   'Cinematography':'cinematography',
                   'Edited by':'editors',
                   'Written by':'writers',
                   'Screenplay by':'screenplay',
                   'Music by':'composers',
                   'Based on':'based_on'
                  }, axis='columns', inplace=True)
    
    # 9. Transform and merge the ratings DataFrame # 8.4.2 Transform and Merge Rating Data (10 points overall)
    
    # Add the code to create the connection to the PostgreSQL database, then add the movies_df DataFrame to a SQL database.
    
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count().rename({'userId':'count'}, axis=1) \
                .pivot(index='movieId',columns='rating', values='count') # (Clean 3 pt)
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left') # (Merge 4 pt)
    print("Number of movies with ratings:",len(movies_with_ratings_df))
    
    #  We'll prepend rating_ to each column with a list comprehension:
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                    .rename({'userId':'count'}, axis=1) \
                    .pivot(index='movieId',columns='rating', values='count')
    rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]
    
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')

    # Fill in missing values with zeros (3 pt)
    movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)

    #"postgresql://[user]:[db_password]@[location]:[port]/[database]"
    db_string = f"postgresql://postgres:{db_password}@127.0.0.1:5432/movie_data"
    
    engine = create_engine(db_string)
    
    movies_df.to_sql(name='movies', con=engine)
    
    rows_imported = 0
    # get the start_time from time.time()
    start_time = time.time()
    for data in pd.read_csv(f'{file_dir}ratings.csv', chunksize=1000000):
        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
        data.to_sql(name='ratings', con=engine, if_exists='append')
        rows_imported += len(data)

        # add elapsed time to final print out
        print(f'Done. {time.time() - start_time} total seconds elapsed')
    
    # Remove the return statement (Step 2)
    # return wiki_movies_df, movies_with_ratings_df, movies_df

In [5]:
# 10. Create the path to your file directory and variables for the three files.
file_dir = 'C://Users/KenAk/ETL/Movies-ETL'
# The Wikipedia data
wiki_file = f'{file_dir}/wikipedia_movies.json'
# The Kaggle metadata
kaggle_file = f'{file_dir}/movies_metadata.csv'
# The MovieLens rating data.
ratings_file = f'{file_dir}/ratings.csv'

In [6]:
# 11. Set the three variables equal to the function created in D1.
#wiki_file_returned, kaggle_file_returned, ratings_file_returned = extract_transform_load(wiki_file, kaggle_file, ratings_file)
extract_transform_load(wiki_file, kaggle_file, ratings_file)

Number of movies before dropping duplicates: 7076
Number of movies after dropping duplicates:  7033
We have dropped the video column because it only has one value.
Number of movies with ratings: 6052
importing rows 0 to 1000000...Done. 78.13694977760315 total seconds elapsed
importing rows 1000000 to 2000000...Done. 148.35907196998596 total seconds elapsed
importing rows 2000000 to 3000000...Done. 218.6779317855835 total seconds elapsed
importing rows 3000000 to 4000000...Done. 292.3657851219177 total seconds elapsed
importing rows 4000000 to 5000000...Done. 373.9654657840729 total seconds elapsed
importing rows 5000000 to 6000000...Done. 454.2576470375061 total seconds elapsed
importing rows 6000000 to 7000000...Done. 532.7935273647308 total seconds elapsed
importing rows 7000000 to 8000000...Done. 609.7865397930145 total seconds elapsed
importing rows 8000000 to 9000000...Done. 686.6409142017365 total seconds elapsed
importing rows 9000000 to 10000000...Done. 761.3819470405579 total 

In [None]:
# 12. Set the DataFrames from the return statement equal to the file names in Step 11. 
wiki_movies_df = wiki_file_returned
movies_with_ratings_df = kaggle_file_returned
movies_df = ratings_file_returned

In [None]:
#Test code
movies_with_ratings_df

In [None]:
#Test code
movies_with_ratings_df.columns.to_list()

In [None]:
#Test code
movies_df.columns.to_list()

In [None]:
# 13. Check the wiki_movies_df DataFrame. 
wiki_movies_df.head()

In [None]:
# 14. Check the movies_with_ratings_df DataFrame.
movies_with_ratings_df.loc[3513:3521]

In [None]:
# 15. Check the movies_df DataFrame
movies_df.loc[3012:3040]