In [1]:
import json
import pandas as pd
import numpy as np

import re

from sqlalchemy import create_engine
import psycopg2

from config import db_password

import time

In [2]:
# 1. Add the clean movie function that takes in the argument, "movie".

def clean_movie(movie):
    movie = dict(movie) #create a non-destructive copy
    alt_titles = {}
    # combine alternate titles into one column
    for key in ['Also known as','Arabic','Cantonese','Chinese','French',
                'Hangul','Hebrew','Hepburn','Japanese','Literally',
                'Mandarin','McCune-Reischauer','Original title','Polish',
                'Revised Romanization','Romanized','Russian',
                'Simplified','Traditional','Yiddish']:
        # If the key/column exists for the movie 
        if key in movie:
            # add it to the alt_titles value 
            alt_titles[key] = movie[key]
            # and remove the key/column from the movie
            movie.pop(key)
    # If the length of alt_title is > 0 (i.e has values) then it will become a column in our dataframe 
    if len(alt_titles) > 0:
        movie['alt_titles'] = alt_titles

    # We want to change the redundant column names into the one column so it is easier to read
    def change_column_name(old_name, new_name):
        if old_name in movie:
            movie[new_name] = movie.pop(old_name)
    change_column_name('Adaptation by', 'Writer(s)')
    change_column_name('Country of origin', 'Country')
    change_column_name('Directed by', 'Director')
    change_column_name('Distributed by', 'Distributor')
    change_column_name('Edited by', 'Editor(s)')
    change_column_name('Length', 'Running time')
    change_column_name('Original release', 'Release date')
    change_column_name('Music by', 'Composer(s)')
    change_column_name('Produced by', 'Producer(s)')
    change_column_name('Producer', 'Producer(s)')
    change_column_name('Productioncompanies ', 'Production company(s)')
    change_column_name('Productioncompany ', 'Production company(s)')
    change_column_name('Released', 'Release Date')
    change_column_name('Release Date', 'Release date')
    change_column_name('Screen story by', 'Writer(s)')
    change_column_name('Screenplay by', 'Writer(s)')
    change_column_name('Story by', 'Writer(s)')
    change_column_name('Theme music composer', 'Composer(s)')
    change_column_name('Written by', 'Writer(s)')

    return movie

In [3]:
# 2 Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)

def extract_transform_load():
    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    kaggle_metadata = pd.read_csv(kaggle_file, low_memory=False)
    ratings = pd.read_csv(ratings_file)

    # Open and read the Wikipedia data JSON file.
    with open(wiki_file, mode='r') as file:
        wiki_movies_raw_json = json.load(file)
        
    # 3. Write a list comprehension to filter out TV shows.
    # Assumption is that movies have a director and no episodes 
    wiki_filtered_df = [movie for movie in wiki_movies_raw_json
               if ('Director' in movie or 'Directed by' in movie)
                   and 'imdb_link' in movie
                   and 'No. of episodes' not in movie]
    
    # 4. Write a list comprehension to iterate through the cleaned wiki movies list
    # and call the clean_movie function on each movie.
    cleaned_movies = [clean_movie(cleaning_movies) for cleaning_movies in wiki_filtered_df]

    # 5. Read in the cleaned movies list from Step 4 as a DataFrame.
    wiki_movies_df = pd.DataFrame(cleaned_movies)

    # 6. Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    #  dropping any imdb_id duplicates. If there is an error, capture and print the exception.
    try:
        # The imdb links look like this https://www.imdb.com/title/tt1234567/," with "tt1234567" as the imdb_id
        # Using the .str.extract function, we can code to extract tt followed by the 7 digits after 
        wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')  
        # Use the drop_duplicates function to remove duplicates in the 'imdb_id' subset
        wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)
    except Exception as error:
        print(error)

    #  7. Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.
    # We can modify the list comprehension to where if the count of null values is < 90% of the total values we will keep those columns
    # In otherwords, if the column has 90% null data we won't keep it 
    wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
    wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]

    # 8. Create a variable that will hold the non-null values from the “Box office” column.
    box_office = wiki_movies_df['Box office'].dropna()
    
    # 9. Convert the box office data created in Step 8 to string values using the lambda and join functions.
    box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)

    # 10. Write a regular expression to match the six elements of "form_one" of the box office data.
    form_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on'
   
    # 11. Write a regular expression to match the three elements of "form_two" of the box office data.
    form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'
    
    # Remember, there are two main forms the box office data is written in: 
    # "$123.4 million" (or billion), and "$123,456,789."
    # Our above forms will allow us to match these 

    # 12. Add the parse_dollars function.
    def parse_dollars(s):
        # if s is not a string, return NaN
        if type(s) != str:
            return np.nan

        # We'll use re.match(pattern, string) to see if our string matches a pattern
        # we can slighthly modify form_one to check if input is of the form $###.# million
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):

            # Next, we'll use re.sub(pattern, replacement_string, string) to remove dollar signs, spaces, commas, and letters
                # this checks after the dollar sign for any non-white space characters or any characters and replaces it with blank
            # remove dollar sign and " million"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a million
            value = float(s) * 10**6

            # return value
            return value

        # The formula to covert the value to billion is different but we can use a similar code to above 
        # if input is of the form $###.# billion
        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " billion"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a billion
            value = float(s) * 10**9

            # return value
            return value

        # if input is of the form $###,###,###
        elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):

            # remove dollar sign and commas
            s = re.sub('\$|,','', s)

            # convert to float
            value = float(s)

            # return value
            return value

        # otherwise, return NaN
        else:
            return np.nan

        
    # 13. Clean the box office column in the wiki_movies_df DataFrame.
    #box_office = box_office.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    
    # We no longer need the Box Office column, so we'll just drop it
    wiki_movies_df.drop('Box office', axis=1, inplace=True)
    
    # 14. Clean the budget column in the wiki_movies_df DataFrame.
    # First create a variable to store budget once we drop the NA values 
    budget = wiki_movies_df['Budget'].dropna()
    
    # Convert any lists to strings using the .join method 
    budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)
    
    # Remove values between the dollar sign and hypen (for budgets displayed in range) using the same code as for box office 
    budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    
    # We notice that there are values that show a # within a bracket such as $40 [4] million which are just citation references 
    # We can use this code to replace it with a blank
    budget = budget.str.replace(r'\[\d+\]\s*', '')
    
    # Extract values that match our form and place it in the dataframe 
    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    
    # 15. Clean the release date column in the wiki_movies_df DataFrame.
    # Drop NA values and convert lists to string 
    release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    
    # The forms we'll be parsing are:
    # A. Full month name, one- to two-digit day, four-digit year (i.e., January 1, 2000)
    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{1,2},\s\d{4}'
    # B. Four-digit year, two-digit month, two-digit day, with any separator (i.e., 2000-01-01)
    date_form_two = r'\d{4}.[01]\d.[0123]\d'
    # C. Full month name, four-digit year (i.e., January 2000)
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
    # D. Four-digit year
    date_form_four = r'\d{4}'
    
    # Once we have the 4 forms, we can extract data that fit under those 4 forms 
    release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})', flags=re.IGNORECASE)
    
    # We can then use the to_datetime method to convert our data into dates
    # set the infer_datetime_format option to True so that it infers the date when it isn't fully specified (i.e. form 4)
    wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)
                                                    
    # 16. Clean the running time column in the wiki_movies_df DataFrame.
    # First drop any NA values and place any lists into a string 
    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
   
    # We will extract the data that meets our form
    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
    
    # Using to_numeric we can convert strings to numeric values 
    # Coercing the errors will turn the empty strings into Not a Number (NaN)
    # then we can use fillna() to change all the NaNs to zeros.
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
    
    # Now we can apply a function that will convert the hour capture groups and minute capture groups to minutes 
    # if the pure minutes capture group is zero, and save the output to wiki_movies_df
    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)                                                
    
    # Drop the running time column because we don't need it 
    wiki_movies_df.drop('Running time', axis=1, inplace=True)

    
# DELIVERABLE 3 

    # 2. Clean the Kaggle metadata.
    
    # Reviewing again, we probably don't want adult movies in our dataset so we keep values where adult is false, and drop the rest
    kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult',axis='columns')
    
    # This converts the column to a boolean
    kaggle_metadata['video'] = kaggle_metadata['video'] == True

    # This converts the other columns into numeric 
    # using errors = 'raise' will let us know if any data could not be converted 
    kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
    kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
    kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')
    # Using Panda's built in .to_datetime function will convert the data into date
    kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])


    # 3. Merged the two DataFrames into the movies DataFrame.
    # Using the suffixes will allow us to show which dataframe the data comes from
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])

    # 4. Drop unnecessary columns from the merged DataFrame.
    # First we will drop the wiki columns we have decided to drop 
    movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)

    # 5. Add in the function to fill in the missing Kaggle data.
    # In order to keep kaggle and fill 0s with wiki data we will create a fuction
    # This will take a df and 2 "columns" as paramaters
    # It will apply to the kaggle_column where if the value is 0 it will take the wiki_column else keep the kaggle_column
    # It will then drop the wiki_column
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        df[kaggle_column] = df.apply(
            lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column]
            , axis=1)
        df.drop(columns=wiki_column, inplace=True)

    # 6. Call the function in Step 5 with the DataFrame and columns as the arguments.
    # We can then call our function to keep kaggle data and fill in 0s with wikipedia data 
    fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')

    # 7. Filter the movies DataFrame for specific columns.
    # Use the .loc method to re-arrange the columns
    movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                           'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                           'genres','original_language','overview','spoken_languages','Country',
                           'production_companies','production_countries','Distributor',
                           'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                          ]]

    # We want to check if there are any columns that hold one value since it won't provide us much information 
    # the result of this code tells us that the video column only holds one value "false" where as the other columns have various info
    [col for col in movies_df.columns if len(movies_df[col].apply(lambda x: tuple(x) if type(x) == list else x).value_counts(dropna=False)) == 1]
    
    # 8. Rename the columns in the movies DataFrame.
    # Then use the .rename function to rename the columns to be more consistent 
    movies_df.rename({'id':'kaggle_id',
                      'title_kaggle':'title',
                      'url':'wikipedia_url',
                      'budget_kaggle':'budget',
                      'release_date_kaggle':'release_date',
                      'Country':'country',
                      'Distributor':'distributor',
                      'Producer(s)':'producers',
                      'Director':'director',
                      'Starring':'starring',
                      'Cinematography':'cinematography',
                      'Editor(s)':'editors',
                      'Writer(s)':'writers',
                      'Composer(s)':'composers',
                      'Based on':'based_on'
                     }, axis='columns', inplace=True)


    # 9. Transform and merge the ratings DataFrame.
    # Convert Unix dates to regular date format
    ratings['timestamp'] = pd.to_datetime(ratings['timestamp'], unit='s')
    # Finally we can use the pivot function to make the index the movieID then the columns as rating and the value as count 
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                    .rename({'userId':'count'}, axis=1) \
                    .pivot(index='movieId',columns='rating', values='count')
    
    # We can then rename the column headers to show rating_ followed by the rating scale value 
    rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]
    
    # We can then use a left merge to keep everything in the movies_df but merge in these rating counts
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')

    # Since there may be some movies that don't have ratings on that scale/#, we'd want to fill those in with 0 
    movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)

    #return wiki_movies_df, movies_with_ratings_df, movies_df

# DELIVERABLE 4: 
    # We can create the link to our sql database using this syntax
    # Remember, we shouldn't type passwords as code so we will store it in our config.py file which will not be uploaded into github
    "postgres://[user]:[password]@[location]:[port]/[database]"

    # We will use that syntax and call our password to import into our port and database
    db_string = f"postgres://postgres:{db_password}@127.0.0.1:5432/Challenge: movie_data"

    # This is all the information that SQLAlchemy needs to create a database engine.
    # SQLAlchemy handles connections to different SQL databases and manages the conversion between data types. 
    # The way it handles all the communication and conversion is by creating a database engine
    engine = create_engine(db_string)

    # Some users may need an additional package installed for this section. 
    # In your terminal, run the following code: pip install psycopg2-binary to add it to your coding environment.

    # Import the movie df to sql
    # we need to specify the name of the table and the engine in the to_sql() method.
    movies_df.to_sql(name='movies', con=engine)

    # create a variable for the number of rows imported
    rows_imported = 0
    # get the start_time from time.time()
    # .time() stores the time when it is called 
    start_time = time.time()

    for data in pd.read_csv(f'{file_dir}/ratings.csv', chunksize=1000000):

        # print out the range of rows that are being imported
        # This tells us when import has started and ended 
        # use the end= parameter in the print function. 
            # Setting the end to an empty string will prevent the output from going to the next line.
        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')

        data.to_sql(name='ratings', con=engine, if_exists='replace')

        # increment the number of rows imported by the size of 'data'
        rows_imported += len(data)

        # add elapsed time to final print out
        # print that the rows have finished importing
        print(f'Done. {time.time() - start_time} total seconds elapsed')

In [4]:
# 10. Create the path to your file directory and variables for the three files.
file_dir = 'C://Users/100556036/Downloads/UofT_Class_Folder/Module_8_ETL/Movies-ETL/Challenge/Resources'
# The Wikipedia data
wiki_file = f'{file_dir}/wikipedia-movies.json'
# The Kaggle metadata
kaggle_file = f'{file_dir}/movies_metadata.csv'
# The MovieLens rating data.
ratings_file = f'{file_dir}/ratings.csv'

In [5]:
# 11. Set the three variables equal to the function created in D1.
wiki_file, kaggle_file, ratings_file = extract_transform_load()

importing rows 0 to 1000000...Done. 154.41036343574524 total seconds elapsed
importing rows 1000000 to 2000000...Done. 256.36280727386475 total seconds elapsed
importing rows 2000000 to 3000000...Done. 348.9034192562103 total seconds elapsed
importing rows 3000000 to 4000000...Done. 441.7857971191406 total seconds elapsed
importing rows 4000000 to 5000000...Done. 534.1046726703644 total seconds elapsed
importing rows 5000000 to 6000000...Done. 627.4117023944855 total seconds elapsed
importing rows 6000000 to 7000000...Done. 720.0915493965149 total seconds elapsed
importing rows 7000000 to 8000000...Done. 813.2584228515625 total seconds elapsed
importing rows 8000000 to 9000000...Done. 906.0936703681946 total seconds elapsed
importing rows 9000000 to 10000000...Done. 999.7310209274292 total seconds elapsed
importing rows 10000000 to 11000000...Done. 1092.8619666099548 total seconds elapsed
importing rows 11000000 to 12000000...Done. 1186.1735708713531 total seconds elapsed
importing row

TypeError: cannot unpack non-iterable NoneType object