In [1]:
import json
import pandas as pd
import numpy as np

import re

from sqlalchemy import create_engine
import psycopg2

from config import db_pass

import time

In [2]:
#  Add the clean movie function that takes in the argument, "movie".
def clean_movie(movie):
    
    # 1. make a copy of the dict to save it in memory to avoid destructable edits 
    # (using a local variable movie that can only be referenced inside the function)
    movie = dict(movie)
    alt_titles = {}
    
    # 2. use a for loop to loop through columns with these names and remove them with pop()
    for key in ['Also known as','Arabic','Cantonese','Chinese',
                'French','Hangul','Hebrew','Hepburn','Japanese',
                'Literally','Mandarin','McCune–Reischauer','Original title',
                'Polish','Revised Romanization','Romanized','Russian',
                'Simplified','Traditional','Yiddish']:
        
        # 2a. if the key exists in the movie object remove it and append it to the created dict above
        if key in movie:
            alt_titles[key] = movie[key]
            movie.pop(key)
            
        # 3. After loop add alt_titles dict to movies
    if len(alt_titles) > 0:
        movie['alt_titles'] = alt_titles
        
    # 4. Merge column names by 
    def change_column_name(old_name, new_name):
        if old_name in movie:
            movie[new_name] = movie.pop(old_name)
    change_column_name("Adaptation by", "Writer(s)")
    change_column_name("Country of origin", "Country")
    change_column_name("Directed by", "Director")
    change_column_name("Distributed by", "Distributor")
    change_column_name('Edited by', 'Editor(s)')
    change_column_name('Length', 'Running time')
    change_column_name('Original release', 'Release date')
    change_column_name('Music by', 'Composer(s)')
    change_column_name('Produced by', 'Producer(s)')
    change_column_name('Producer', 'Producer(s)')
    change_column_name('Productioncompanies ', 'Production company(s)')
    change_column_name('Productioncompany ', 'Production company(s)')
    change_column_name('Released', 'Release Date')
    change_column_name('Release Date', 'Release date')
    change_column_name('Screen story by', 'Writer(s)')
    change_column_name('Screenplay by', 'Writer(s)')
    change_column_name('Story by', 'Writer(s)')
    change_column_name('Theme music composer', 'Composer(s)')
    change_column_name('Written by', 'Writer(s)')
    
   
    return movie

In [3]:
# 1 Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)

def extract_transform_load(wiki_file, kaggle_file, ratings_file):
    
    # 1. Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    kaggle_metadata = pd.read_csv('resources/movies_metadata.csv', low_memory = False)
    ratings = pd.read_csv("resources/ratings.csv", low_memory=False)
    
    # 2. Open and read the Wikipedia data JSON file.
    with open ("resources/wikipedia_movies.json", mode="r") as file:
        wiki_movies_raw = json.load(file)
        
    # 3. Write a list comprehension to filter out TV shows.
    wiki_movies1 = [movie for movie in wiki_movies_raw
               if ("Director" in movie or "Directed by" in movie)
               and "imdb_link" in movie
               and "No. of episodes" not in movie]
    
    # 4 write a list comprehension to iterate through the cleaned movies list from 3 
    clean_movies = [clean_movie(movie) for movie in wiki_movies1]

    # 5. Return the clean_movies list as dataframe
    wiki_movies_df = pd.DataFrame(clean_movies)
    
    # 6. Write a try-except block to catch errors while extracting the IMDb ID using a regular expression 
    #  string and dropping any imdb_id duplicates. If there is an error, capture and print the exception.
    try:
        wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
        wiki_movies_df.drop_duplicates(subset="imdb_id", inplace=True)

    except error as e:
        print(e)
        
    # 7. Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df 
    wiki_columns_to_keep = [column for column in wiki_movies_df.columns if 
                            wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
    
    wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]
    
    # 8. Create a variable that will hold the non-null values from the “Box office” column.
    box_office = wiki_movies_df["Box office"].dropna()
    
    # 9. Convert the box office data created in Step 8 to string values using the lambda and join functions.
    box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)

    # 10. Write a regular expression to match the six elements of "form_one" of the box office data.
    form_1 = r'\$\s*\d+\.?\d*\s*[mb]illi?on'

    # 11. Write a regular expression to match the three elements of "form_two" of the box office data.
    form_2 = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'

    # 12. Add the parse_dollars function.
    def parse_dollars(v):
        
    # if the value is not a string then return it as NaN
        if type(v) != str:
            return np.nan
    
    # if the value is in the form of $NNN.N million, remove the '$' and ' million'
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', v, flags=re.IGNORECASE):
            v = re.sub('\$|\s|[a-zA-Z]','', v)
        
        # convert to float and multiply by 1,000,000
            value = float(v) * 10**6
    
        # return the new value
            return value
    
    # if the value is in the form of $NNN.N billion, remove the '$' and ' billion'
        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', v, flags=re.IGNORECASE):
            v = re.sub('\$|\s|[a-zA-Z]','', v)
        
        # convert to float and multiply by 1,000,000,000
            value = float(v) * 10**9
        
        # return new value
            return value
        
    # if value is in the form of $NNN,NNN,NNN
        elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', v, flags=re.IGNORECASE):
    
        # remove '$' and commas
            v = re.sub('\$|,','', v)
        
        # convert to float
            value = float(v)
        
        # return new value
            return value
        
    # otherwise return NaN
        else:
            return np.nan
    
        
    # 13. Clean the box office column in the wiki_movies_df DataFrame.
    wiki_movies_df["box_office"] = box_office.str.extract(f'({form_1}|{form_2})', 
                                                          flags=re.IGNORECASE)[0].apply(parse_dollars)
    box_office = box_office.str.replace(r'\$.*[---](?![a-z])', '$', regex=True)
    wiki_movies_df.drop("Box office", axis=1, inplace=True)
        
    # 14. Clean the budget column in the wiki_movies_df DataFrame.
    budget = wiki_movies_df['Budget'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    budget = budget.str.replace(r'\[\d+\]\s*', '', regex=True)
    budget.str.extract(f'({form_1}|{form_2})', flags=re.IGNORECASE)[0].apply(parse_dollars)

    # 15. Clean the release date column in the wiki_movies_df DataFrame.
    release_date = wiki_movies_df["Release date"].dropna().apply(lambda x: ' ' .join(x) if type(x) == list else x)
    
    date_1 = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]?\d,\s\d{4}'
    date_2 = r'\d{4}-[01]\d-[0123]\d'
    date_3 = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
    date_4 = r'\d{4}'
    
    wiki_movies_df['Release date'] = pd.to_datetime(release_date.str.extract(f'({date_1}|{date_2}|{date_3}|{date_4})', flags=re.IGNORECASE)[0], infer_datetime_format=True)
    wiki_movies_df['release_date'] = wiki_movies_df['Release date']
    
    # 16. Clean the running time column in the wiki_movies_df DataFrame.
    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    running_time_extract = running_time.str.extract(r'(\d+)\s*h?o?u?r?s\s*(\d*)|(\d+)|\s*m')
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
            
# 2. Clean the Kaggle metadata.
    kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult', axis=1)
    kaggle_metadata['video'] = kaggle_metadata['video'] == 'True'
    kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
    kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
    kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['budget'], errors='raise')
    kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])
    
    # 3. Merged the two DataFrames into the movies DataFrame.
    final_movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki', '_kaggle'])

    # 4. Drop unnecessary columns from the merged DataFrame.
    final_movies_df.drop("title_wiki", axis=1, inplace=True)
    final_movies_df.drop("Release date", axis=1, inplace=True)
    final_movies_df.drop("Running time", axis=1, inplace=True)
    
    # 5. Add in the function to fill in the missing Kaggle data.
    def fill_missing_kaggle_data(df, k_column, w_column):
        df[k_column] = df.apply(
        lambda row: row[w_column] if row[k_column] == 0 else row[k_column], axis=1)
        df.drop(columns=w_column, inplace=True)
    
    # 6. Call the function in Step 5 with the DataFrame and columns as the arguments.
    final_movies_df['budget_wiki'] = final_movies_df['Budget']
    final_movies_df['budget_kaggle'] = final_movies_df['budget']
    fill_missing_kaggle_data(final_movies_df, 'runtime', 'running_time')
    fill_missing_kaggle_data(final_movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(final_movies_df, 'revenue', 'box_office')

    # 7. Filter the movies DataFrame for specific columns.
    final_movies_df.drop('video', axis=1)

    # 8. Rename the columns in the movies DataFrame.
    final_movies_df = final_movies_df[['imdb_id','id','title_kaggle','original_title','tagline',
                                   'belongs_to_collection','url','imdb_link','runtime','budget_kaggle',
                                   'revenue','release_date_kaggle','popularity','vote_average','vote_count',
                                   'genres','original_language','overview','spoken_languages','Country',
                                   'production_companies','production_countries','Distributor','Producer(s)',
                                   'Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)',
                                   'Based on']]
    
    final_movies_df.rename({'id':'kaggle_id',
                  'title_kaggle':'title',
                  'url':'wikipedia_url',
                  'budget_kaggle':'budget',
                  'release_date_kaggle':'release_date',
                  'Country':'country',
                  'Distributor':'distributor',
                  'Producer(s)':'producers',
                  'Director':'director',
                  'Starring':'starring',
                  'Cinematography':'cinematography',
                  'Editor(s)':'editors',
                  'Writer(s)':'writers',
                  'Composer(s)':'composers',
                  'Based on':'based_on'
                 }, axis='columns', inplace=True)

    # 9. Transform and merge the ratings DataFrame.
    ratings_count = ratings.groupby(['movieId', 'rating'], as_index=False).count() \
    .rename({'userId':'count'}, axis=1) \
    .pivot(index='movieId', columns='rating', values='count')
    
    ratings_count.columns = ['rating_' + str(col) for col in ratings_count.columns]
    
    movies_and_ratings = pd.merge(final_movies_df, ratings_count, left_on='kaggle_id', right_index=True, how='left')
    movies_and_ratings = movies_and_ratings.fillna(0)
    
    # connect to the database and load data
    
    path = f'postgresql+psycopg2://postgres:{db_pass}@127.0.0.1:5432/movie_data'
    engine = create_engine(path)
    
    movies_and_ratings.to_sql(name='movies', con=engine)
    
    rows_imported = 0
    start_time = time.time()
    
    for data in pd.read_csv(f'resources/ratings.csv', chunksize=1000000):
    
    # print the range of rows being imported
        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
    
        data.to_sql(name='ratings', con=engine, if_exists='append')
    
    # increment the number of rows that have been imported by chunksize
        rows_imported += len(data)
    # print rows done importing
        print(f'Done. {time.time() - start_time} total seconds elapsed')
    pass

In [4]:
# 10. Create the path to your file directory and variables for the three files.
file_dir = 'resources'
# The Wikipedia data
wiki_file = f'{file_dir}/wikipedia_movies.json'
# The Kaggle metadata
kaggle_file = f'{file_dir}/movies_metadata.csv'
# The MovieLens rating data.
ratings_file = f'{file_dir}/ratings.csv'

In [5]:
# 11. Set the three variables equal to the function created in D1.
wiki_file, kaggle_file, ratings_file = extract_transform_load(wiki_file, kaggle_file, ratings_file)

  pat = re.compile(pat, flags=flags)


importing rows 0 to 1000000...Done. 18.962802171707153 total seconds elapsed
importing rows 1000000 to 2000000...Done. 37.714519023895264 total seconds elapsed
importing rows 2000000 to 3000000...Done. 56.313849210739136 total seconds elapsed
importing rows 3000000 to 4000000...Done. 75.11194491386414 total seconds elapsed
importing rows 4000000 to 5000000...Done. 94.96938610076904 total seconds elapsed
importing rows 5000000 to 6000000...Done. 114.80791902542114 total seconds elapsed
importing rows 6000000 to 7000000...Done. 134.38051915168762 total seconds elapsed
importing rows 7000000 to 8000000...Done. 153.80329608917236 total seconds elapsed
importing rows 8000000 to 9000000...Done. 172.59975719451904 total seconds elapsed
importing rows 9000000 to 10000000...Done. 193.21335005760193 total seconds elapsed
importing rows 10000000 to 11000000...Done. 213.24917006492615 total seconds elapsed
importing rows 11000000 to 12000000...Done. 232.58713817596436 total seconds elapsed
importi

TypeError: cannot unpack non-iterable NoneType object

In [6]:
# 12. Set the DataFrames from the return statement equal to the file names in Step 11. 
wiki_movies_df = wiki_file.pd.Dataframe
movies_and_ratings = kaggle_file
final_movies_df = ratings_file

In [7]:
# 13. Check the wiki_movies_df DataFrame. 
wiki_movies_df.head()

AttributeError: 'str' object has no attribute 'head'

In [None]:
# 14. Check the movies_with_ratings_df DataFrame.
movies_and_ratings.head()

In [None]:
# 15. Check the movies_df DataFrame. 
final_movies_df.head()

In [None]:
path = f'postgresql+psycopg2://postgres:{db_pass}@127.0.0.1:5432/movie_data'
engine = create_engine(path)

query1 = """
        select
            count(*)
        from 
            ratings
        """
conn = engine.connect()
ratings = pd.read_sql(query1, conn)
ratings

In [None]:
query2 = """
        select
            count(*)
        from 
            movies
        """
conn = engine.connect()
movies = pd.read_sql(query2, conn)
movies