In [2]:
pip install psycopg2-binary

Note: you may need to restart the kernel to use updated packages.


In [3]:
import json
import pandas as pd
import numpy as np

import re

from sqlalchemy import create_engine
import psycopg2

from config import db_password

import time

In [4]:
#  Add the clean movie function that takes in the argument, "movie".
def clean_movie(movie):

    movie = dict(movie)
    
    

    return movie

In [11]:
# 1 Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)

def extract_transform_load():
    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    kaggle_metadata_df = pd.DataFrame(pd.read_csv(kaggle_file, low_memory=False))
    ratings_df = pd.DataFrame(pd.read_csv(ratings_file))

    # Open and read the Wikipedia data JSON file.
    with open(wiki_file, mode='r') as file:
            wiki_file_raw = json.load(file)
        
    # Write a list comprehension to filter out TV shows.
    wiki_movies = [movie for movie in wiki_file_raw
                   if 'No. of episodes' not in movie]

    # Write a list comprehension to iterate through the cleaned wiki movies list
    #  and call the clean_movie function on each movie.
    wiki_movies_clean = [clean_movie(movie) for movie in wiki_movies]

    # Read in the cleaned movies list from Step 4 as a DataFrame.
    wiki_movies_df = pd.DataFrame(wiki_movies_clean)
    
    # Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    #  dropping any imdb_id duplicates. If there is an error, capture and print the exception.
    try:
        wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
                
    except:
        print('IMDb_id error captured.')
        
    wiki_movies_df.dropna(subset=['imdb_id'], inplace=True)
    wiki_movies_df.drop_duplicates(subset=['imdb_id'], inplace=True)
    
    # Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.
    wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
    wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]
    
    # Create a variable that will hold the non-null values from the “Box office” column.
    box_office = wiki_movies_df['Box office'].dropna()
        
    # Convert the box office data created in Step 8 to string values using the lambda and join functions.
    box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)

    # Write a regular expression to match the six elements of "form_one" of the box office data.
    form_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on'
   
    # Write a regular expression to match the three elements of "form_two" of the box office data.
    form_two = r'\$\s*\d{1,3}(?:[,|.]\d{3})+(?!\s[mb]illion)'

    # Add the parse_dollars function.
    def parse_dollars(s):
    
        # If s is not a string, return NaN
        if type(s) != str:
            return np.nan
    
        # Values $###.# million.
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.I):
            s = re.sub('\$|\s|[a-zA-Z]','', s)
            value = float(s) *10**6
            return value
                
        # Values $###.# billion.
        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.I):
            s = re.sub('\$|\s|[a-zA-Z]','', s)
            value = float(s) *10**9
            return value
                
        # Values $###,###,###
        elif re.match(r'\$\s*\d{1,3}(?:[,|.]\d{3})+(?!\s[mb]illion)', s, flags=re.I):
            s = re.sub('\$|\s|,','', s)
            value = float(s)
            return value
                
        else:
            return np.nan
            
    # Clean the box office column in the wiki_movies_df DataFrame.
    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.I)[0].apply(parse_dollars)
    wiki_movies_df.drop('Box office', axis=1, inplace=True)
        
    # Clean the budget column in the wiki_movies_df DataFrame.
    budget = wiki_movies_df['Budget'].dropna()
    budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)
    
    budget = budget.str.replace(r'\[\d+\]\s*', '')
    bedget = budget.str.replace(r'\$.*[---](?![a-z])', '$', regex=True)

    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.I)[0].apply(parse_dollars)
    wiki_movies_df.drop('Budget', axis=1, inplace=True)
    
    # Clean the release date column in the wiki_movies_df DataFrame.
    release_date = wiki_movies_df['Release date'].dropna()
    release_date = release_date.apply(lambda x: ' '.join(x) if type(x) == list else x)
    
    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]?\d,\s\d{4}'
    date_form_two = r'\d{4}.[01]\d.[0123]\d'
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
    date_form_four = r'\d{4}'
    
    release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})', flags=re.I)
        
    # Clean the running time column in the wiki_movies_df DataFrame.
    running_time = wiki_movies_df['Running time'].dropna()
    running_time = running_time.apply(lambda x: ' '.join(x) if type(x) == list else x)
    
    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
    wiki_movies_df.drop('Running time', axis=1, inplace=True)
    
    
    # 2. Clean the Kaggle metadata.
    # Keep rows where 'adult' column is False and drop 'adult' column.
    kaggle_metadata = kaggle_metadata_df[kaggle_metadata_df['adult'] == 'False'].drop('adult',axis='columns')
    
    # Create Boolean column and assign it back to 'video'
    kaggle_metadata['video'] = kaggle_metadata['video'] == 'True'
    
    # Convert numeric columns
    kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
    kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
    kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')
    
    # Convert 'release_date'
    kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])
    
    # 3. Merged the two DataFrames into the movies DataFrame.
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])

    # 4. Drop unnecessary columns from the merged DataFrame.
    movies_df.drop(columns=['title_wiki','Release date','Productioncompany ','Language'], inplace=True)
        
    # 5. Add in the function to fill in the missing Kaggle data.
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        df[kaggle_column] = df.apply(
            lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column], axis=1)
        df.drop(columns=wiki_column, inplace=True)
        
    # 6. Call the function in Step 5 with the DataFrame and columns as the arguments.
    fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')
    
    # 7. Filter the movies DataFrame for specific columns.
    for col in movies_df.columns:
        lists_to_tuples = lambda x: tuple(x) if type(x) == list else x
        if len(movies_df[col].apply(lists_to_tuples).value_counts(dropna=False)) < 5:
            movies_df.drop(col, axis=1, inplace=True)
    
    # 8. Rename the columns in the movies DataFrame.
    movies_df.rename({'id':'kaggle_id',
                  'title_kaggle':'title',
                  'url':'wikipedia_url',
                  'budget_kaggle':'budget',
                  'release_date_kaggle':'release_date',
                  'Country':'country',
                  'Distributor':'distributor',
                  'Producer(s)':'producers',
                  'Director':'director',
                  'Starring':'starring',
                  'Cinematography':'cinematography',
                  'Editor(s)':'editors',
                  'Writer(s)':'writers',
                  'Composer(s)':'composers',
                  'Based on':'based_on'
                 }, axis='columns', inplace=True)

    # 9. Transform and merge the ratings DataFrame.
    # Take the count for each movieId-rating group and rename 'userId' column to 'count'
    rating_counts = ratings_df.groupby(['movieId','rating'], as_index=False).count() \
                    .rename({'userId':'count'}, axis=1)
    
    # Pivot the data so that 'movieId' is index, columns are rating values, rows are the counts.
    rating_counts =rating_counts.pivot(index='movieId',columns='rating', values='count')
    
    # Rename columns.
    rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]
    
    # Merge rating counts, keeping everything in 'movies_df'
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
    
    # Fill missing values.
    movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)
    
    # Create connection to PostgreSQL database
    db_string = f"postgresql://postgres:{db_password}@127.0.0.1:5432/movie_data"
    engine = create_engine(db_string)
    
    # Add 'movies_df' to SQL database.
    movies_df.to_sql(name='movies', con=engine)
    
    # Add 'ratings.csv' to SQL database.
    rows_imported = 0
    # Get start_time
    start_time = time.time()
    
    for data in pd.read_csv(ratings_file, chunksize=1000000):
        
        # Print out the range of rows that are being imported
        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
        data.to_sql(name='ratings', con=engine, if_exists='append')
        
        # Increment the number of rows imported
        rows_imported += len(data)
        
        # Print rows have finished importing.
        print(f'Complete. {time.time() - start_time} total seconds elapsed.')

In [12]:
# 10. Create the path to your file directory and variables for the three files.
file_dir = 'C:\\Users\\happi\\Movies-ETL\\'

# The Wikipedia data
wiki_file = f'{file_dir}wikipedia-movies.json'
# The Kaggle metadata
kaggle_file = f'{file_dir}movies_metadata.csv'
# The MovieLens rating data.
ratings_file = f'{file_dir}ratings.csv'

In [14]:
# 11. Set the three variables equal to the function created in D1.
wiki_file, kaggle_file, ratings_file = extract_transform_load()



importing rows 0 to 1000000...Complete. 49.089471101760864 total seconds elapsed.
importing rows 1000000 to 2000000...Complete. 96.73899364471436 total seconds elapsed.
importing rows 2000000 to 3000000...Complete. 139.85245537757874 total seconds elapsed.
importing rows 3000000 to 4000000...Complete. 182.64520645141602 total seconds elapsed.
importing rows 4000000 to 5000000...Complete. 225.2983250617981 total seconds elapsed.
importing rows 5000000 to 6000000...Complete. 267.1156163215637 total seconds elapsed.
importing rows 6000000 to 7000000...Complete. 306.8283383846283 total seconds elapsed.
importing rows 7000000 to 8000000...Complete. 344.2215647697449 total seconds elapsed.
importing rows 8000000 to 9000000...Complete. 384.2225000858307 total seconds elapsed.
importing rows 9000000 to 10000000...Complete. 421.14152121543884 total seconds elapsed.
importing rows 10000000 to 11000000...Complete. 458.3143036365509 total seconds elapsed.
importing rows 11000000 to 12000000...Comp

TypeError: cannot unpack non-iterable NoneType object

In [20]:
# 12. Set the DataFrames from the return statement equal to the file names in Step 11. 
wiki_movies_df = wiki_file
movies_with_ratings_df = kaggle_file
movies_df = ratings_file

In [22]:
# 13. Check the wiki_movies_df DataFrame. 
wiki_movies_df

'C:\\Users\\happi\\Movies-ETL\\wikipedia-movies.json'

In [23]:
# 14. Check the movies_with_ratings_df DataFrame.
movies_with_ratings_df

'C:\\Users\\happi\\Movies-ETL\\movies_metadata.csv'

In [24]:
# 15. Check the movies_df DataFrame. 
movies_df

'C:\\Users\\happi\\Movies-ETL\\ratings.csv'