In [1]:
import json
import pandas as pd
import numpy as np

import re

from sqlalchemy import create_engine
import psycopg2

from config import db_password

import time

In [2]:
#  Add the clean movie function that takes in the argument, "movie".
def clean_movie(movie):
    '''
    This function takes in a movie from our 'wikipedia-movies.json'. 
    If one or more alternate titles exist, those are added to a new 'alt_title' key, which will manifest as a column in our subsequently created wiki_movies_df DataFrame. 
    Additionally, it will take remove redundant columns, adding the value to the desired new column name. 
    It returns the cleaned-up movie dictionary. 
    '''
    movie = dict(movie)
    
    alt_titles = {}
    alt_title_keys =[    
        'Also known as','Arabic','Cantonese','Chinese','French',
        'Hangul','Hebrew','Hepburn','Japanese','Literally',
        'Mandarin','McCune–Reischauer','Original title','Polish',
        'Revised Romanization','Romanized','Russian',
        'Simplified','Traditional','Yiddish'
    ]
    for key in alt_title_keys:
        if key in movie:
            alt_titles[key] = movie[key]
            movie.pop(key)
            
    if len(alt_titles) > 0:
        movie['alt_titles'] = alt_titles
        
    def change_column_name(old_name, new_name):
        if old_name in movie:
            movie[new_name] = movie.pop(old_name)
    
    # a list of column names to use with change_column_name(); each tuple is the old name and the desired new name, respectively 
    column_names_to_change = [
        ('Adaptation by', 'Writer(s)'), ('Country of origin', 'Country'), ('Directed by', 'Director'), 
        ('Distributed by', 'Distributor'), ('Edited by', 'Editor(s)'), ('Length', 'Running time'), 
        ('Original release', 'Release date'), ('Music by', 'Composer(s)'), ('Produced by', 'Producer(s)'), 
        ('Producer', 'Producer(s)'), ('Productioncompanies ', 'Production company(s)'), ('Productioncompany ', 'Production company(s)'),
        ('Released', 'Release Date'), ('Release Date', 'Release date'), ('Screen story by', 'Writer(s)'), 
        ('Screenplay by', 'Writer(s)'), ('Story by', 'Writer(s)'), ('Theme music composer', 'Composer(s)'), 
        ('Written by', 'Writer(s)')
    ]
    
    for old_name, new_name in column_names_to_change:
        change_column_name(old_name, new_name)
        
    return movie

In [3]:
# Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)

def extract_transform_load(wikipedia_input, kaggle_input, movielens_input):
    
    kaggle_metadata = pd.read_csv(kaggle_input, low_memory=False)
    ratings = pd.read_csv(movielens_input, low_memory=False)

    with open(wikipedia_input, mode='r') as file:
        wiki_movies_raw = json.load(file)    
    wiki_movies_df = pd.DataFrame(wiki_movies_raw)
    
    
    # Write a list comprehension to filter out TV shows.
    wiki_movies = [
        movie for movie in wiki_movies_raw
        if ('Director' in movie or 'Directed by' in movie)
        and 'imdb_link' in movie
        and 'No. of episodes' not in movie
    ] 

    # Write a list comprehension to iterate through the cleaned wiki movies list
    # and call the clean_movie function on each movie.
    clean_movies = [clean_movie(movie) for movie in wiki_movies]

    # Read in the cleaned movies list from Step 4 as a DataFrame.
    wiki_movies_df = pd.DataFrame(clean_movies)

    # Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    #  dropping any imdb_id duplicates. If there is an error, capture and print the exception.
    try:
        wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
        wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)
    except Exception as e:
        print(e)

    #  Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.

    wiki_columns_to_keep = [
        column for column in wiki_movies_df.columns 
        if wiki_movies_df[column].isnull().sum() < (len(wiki_movies_df) * 0.9)
    ]
    wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]

    # Create a variable that will hold the non-null values from the “Box office” column.
    box_office = wiki_movies_df['Box office'].dropna()
    
    # Convert the box office data created in Step 8 to string values using the lambda and join functions.
    box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)
    box_office = box_office.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)

    # Write a regular expression to match the six elements of "form_one" of the box office data.
    # Write a regular expression to match the three elements of "form_two" of the box office data.
    
    form_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on'
    form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'
    
    box_office_matches_form_one = box_office.str.contains(form_one, flags=re.IGNORECASE, na=False)
    box_office_matches_form_two = box_office.str.contains(form_two, flags=re.IGNORECASE, na=False)

    # Add the parse_dollars function.
    def parse_dollars(s):
        if type(s) != str:
            return np.nan
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):
            s = re.sub('\$|\s|[a-zA-Z]','', s)
            value = float(s) * 10**6
            return value
        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):
            s = re.sub('\$|\s|[a-zA-Z]','', s)
            value = float(s) * 10**9
            return value
        elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):
            s = re.sub('\$|,', '', s)
            value = float(s)
            return value
        else:
            value = np.nan
        return value
        
    # Clean the box office column in the wiki_movies_df DataFrame.
    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    wiki_movies_df.drop('Box office', axis=1, inplace=True)
    
    # Clean the budget column in the wiki_movies_df DataFrame.
    budget = wiki_movies_df['Budget'].dropna()
    budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)
    budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    
    budget_matches_form_one = budget.str.contains(form_one, flags=re.IGNORECASE, na=False)
    budget_matches_form_two = budget.str.contains(form_two, flags=re.IGNORECASE, na=False)
    
    budget = budget.str.replace(r'\[\d+\]\s*', '')
    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    wiki_movies_df.drop('Budget', axis=1, inplace=True)

    # Clean the release date column in the wiki_movies_df DataFrame.
    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]?\d,\s\d{4}'
    date_form_two = r'\d{4}.[01]\d.[0123]\d'
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
    date_form_four = r'\d{4}'
    
    release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)
    wiki_movies_df.drop('Release date', axis=1, inplace=True)
    
    # Clean the running time column in the wiki_movies_df DataFrame.
    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
    wiki_movies_df.drop('Running time', axis=1, inplace=True)
    
    # Clean the Kaggle metadata.
    kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult', axis='columns')
    kaggle_metadata['video'] = kaggle_metadata['video'] == 'True'
    
    kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
    kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
    kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')
    kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])
    
    # Merge the two DataFrames into the movies DataFrame.
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])


    # Drop unnecessary columns from the merged DataFrame.
    movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)


    # Add in the function to fill in the missing Kaggle data.
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        df[kaggle_column] = df.apply(
            lambda row: row[wiki_column] 
            if row[kaggle_column] == 0 
            else row[kaggle_column], 
            axis=1
        )
        df.drop(columns=wiki_column, inplace=True)

    # Call the function in Step 5 with the DataFrame and columns as the arguments.
    column_pairs = [('runtime', 'running_time'), ('budget_kaggle', 'budget_wiki'), ('revenue', 'box_office')]
    for kaggle, wiki in column_pairs:
        fill_missing_kaggle_data(movies_df, kaggle, wiki)

    # Filter the movies DataFrame for specific columns.
    movies_df.drop('video', axis=1, inplace=True)

    # Rename the columns in the movies DataFrame.
    column_order = [
        'imdb_id', 'id', 'title_kaggle', 'original_title', 'tagline',
        'belongs_to_collection', 'url', 'imdb_link', 'runtime', 'budget_kaggle',
        'revenue', 'release_date_kaggle', 'popularity', 'vote_average', 'vote_count',
        'genres', 'original_language', 'overview', 'spoken_languages', 'Country',
        'production_companies', 'production_countries', 'Distributor', 'Producer(s)', 'Director',
        'Starring', 'Cinematography', 'Editor(s)', 'Writer(s)', 'Composer(s)',
        'Based on'
    ]
    movies_df = movies_df.loc[:, column_order]
    movies_df.rename(
        {
            'id':'kaggle_id',
            'title_kaggle':'title',
            'url':'wikipedia_url',
            'budget_kaggle':'budget',
            'release_date_kaggle':'release_date',
            'Country':'country',
            'Distributor':'distributor',
            'Producer(s)':'producers',
            'Director':'director',
            'Starring':'starring',
            'Cinematography':'cinematography',
            'Editor(s)':'editors',
            'Writer(s)':'writers',
            'Composer(s)':'composers',
            'Based on':'based_on'   
        },
        axis='columns',
        inplace=True
    )

    # Transform and merge the ratings DataFrame.
    pd.to_datetime(ratings['timestamp'], unit='s')
   
    rating_counts = ratings\
        .groupby(['movieId', 'rating'], as_index=False)\
        .count()\
        .rename({'userId': 'count'}, axis=1)\
        .pivot(index='movieId', columns='rating', values='count')
    rating_counts.columns = [f'rating_{col}' for col in rating_counts]
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
    movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)
    
    # create connection to PostgreSQL database, add the movies_df DataFrame to the database, add the ratings data and print progress
    db_string = f'postgresql://postgres:{db_password}@127.0.0.1:5432/movie_data'
    engine = create_engine(db_string)
    
    movies_df.to_sql(name='movies', con=engine, if_exists='replace')
    rows_imported = 0
    start_time = time.time()
    for data in pd.read_csv(f'archive/ratings.csv', chunksize=1000000):
        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
        data.to_sql(name='ratings', con=engine, if_exists='append')
        rows_imported += len(data)
        print(f'Done. {time.time() - start_time} total seconds elapsed')

In [8]:
# Create the path to your file directory and variables for the three files.
file_dir = 'Resources'
# The Wikipedia data
wiki_file = f'{file_dir}/wikipedia-movies.json'
# The Kaggle metadata
kaggle_file = f'{file_dir}/movies_metadata.csv'
# The MovieLens rating data.
ratings_file = f'archive/ratings.csv'

In [9]:
# run the extract_transform_load() function with the necessary arguments
extract_transform_load(wiki_file, kaggle_file, ratings_file)



importing rows 0 to 1000000...Done. 33.85981488227844 total seconds elapsed
importing rows 1000000 to 2000000...Done. 66.59438300132751 total seconds elapsed
importing rows 2000000 to 3000000...Done. 99.23062181472778 total seconds elapsed
importing rows 3000000 to 4000000...Done. 132.9410858154297 total seconds elapsed
importing rows 4000000 to 5000000...Done. 166.07375192642212 total seconds elapsed
importing rows 5000000 to 6000000...Done. 199.82706308364868 total seconds elapsed
importing rows 6000000 to 7000000...Done. 232.17686080932617 total seconds elapsed
importing rows 7000000 to 8000000...Done. 264.8919370174408 total seconds elapsed
importing rows 8000000 to 9000000...Done. 298.626886844635 total seconds elapsed
importing rows 9000000 to 10000000...Done. 331.42740082740784 total seconds elapsed
importing rows 10000000 to 11000000...Done. 364.8576591014862 total seconds elapsed
importing rows 11000000 to 12000000...Done. 398.59411811828613 total seconds elapsed
importing row