In [1]:
import json
import pandas as pd
import numpy as np

import re

from sqlalchemy import create_engine
import psycopg2
from config import password
# from config import db_password

import time

In [2]:
file_dir = 'Input'
# Wikipedia data
wiki_file = f'{file_dir}/wikipedia_movies.json'
# Kaggle metadata
kaggle_file = f'{file_dir}/movies_metadata.csv'
# MovieLens rating data.
ratings_file = f'{file_dir}/ratings.csv'

In [3]:
# 1. Add the clean movie function that takes in the argument, "movie".
def clean_movie(movies):
    movie = dict(movies)
    alt_titles = {}
    for key in ['Also known as','Arabic','Cantonese','Chinese','French',
                'Hangul','Hebrew','Hepburn','Japanese','Literally',
                'Mandarin','McCune–Reischauer','Original title','Polish',
                'Revised Romanization','Romanized','Russian',
                'Simplified','Traditional','Yiddish']:
        if key in movie:
            alt_titles[key]= movie[key]
            movie.pop(key)
        if len(alt_titles) > 0:
            movie['alt_titles'] = alt_titles
            
        def change_column_name(old_name, new_name):
            if old_name in movie:
                movie[new_name] = movie.pop(old_name)
                
    change_column_name('Adaptation by', 'Writer(s)')
    change_column_name('Country of origin', 'Country')
    change_column_name('Directed by', 'Director')
    change_column_name('Distributed by', 'Distributor')
    change_column_name('Edited by', 'Editor(s)')
    change_column_name('Length', 'Running time')
    change_column_name('Original release', 'Release date')
    change_column_name('Music by', 'Composer(s)')
    change_column_name('Produced by', 'Producer(s)')
    change_column_name('Producer', 'Producer(s)')
    change_column_name('Productioncompanies ', 'Production company(s)')
    change_column_name('Productioncompany ', 'Production company(s)')
    change_column_name('Released', 'Release Date')
    change_column_name('Release Date', 'Release date')
    change_column_name('Screen story by', 'Writer(s)')
    change_column_name('Screenplay by', 'Writer(s)')
    change_column_name('Story by', 'Writer(s)')
    change_column_name('Theme music composer', 'Composer(s)')
    change_column_name('Written by', 'Writer(s)')
    
    return movie

In [4]:
def load_dataset(*file):
# 2. Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    for f in file:
        if "csv" in f:
            file_input = pd.read_csv(f, low_memory=False)
            df = pd.DataFrame(file_input)
        else:
            with open(f, mode='r') as x:
                file_input = json.load(x)
                file_input1 = [i for i in file_input
                if ('Director' in i or 'Directed by' in i)
                    and ('imdb_link' in i) and ('No. of episodes' not in i)]
                file_input2 = [clean_movie(movie) for movie in file_input1]
            df = pd.DataFrame(file_input2)
        return df

In [5]:
# define function for parsing budget column data
def parse_dollars(x):
    if type(x) != str:
        return np.nan
    if re.match(r'\$\s*\d+\.?\d*\s*milli?on', x, flags=re.IGNORECASE):
        x = re.sub(r'\$|\s*|[a-zA-Z]', '', x)
        value = float(x) * 1000000
        return value
    elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', x, flags=re.IGNORECASE):
        x = re.sub(r'\$|\s*|[a-zA-Z]', '', x)
        value = float(x) * 1000000000
        return value
    elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illi?on)', x, flags=re.IGNORECASE):
        x = re.sub(r'\$|,', '', x)
        value = float(x)
        return value
    else: 
        return np.nan

In [6]:
def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
    df[kaggle_column] = df.apply(
        lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column]
        , axis=1)
    df.drop(columns=wiki_column, inplace=True)

In [9]:
# execute functions to load data in DataFrames, and to perform the initial "clean" function
files_to_load = [wiki_file, kaggle_file, ratings_file]
file_names = ["wiki_movies_df", "kaggle_meta", "ratings"]
for x, y in zip(file_names, files_to_load):
    globals()[x] = load_dataset(y)  

In [10]:
try:
    # clean IMDB data
    # grab only the IMDB ID number from each entry
    wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
    # drop dupicated from the IMDB ID column
    wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)
    # count the number of null values per column, and compare to the total. keep if sum is less than 90% of the total dataset
    columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
    wiki_movies_df = wiki_movies_df[columns_to_keep]
    # clean box office data
    # drop null values from Box Office
    box_office = wiki_movies_df['Box office'].dropna()
    # convert to string values and filter based on regular expressions
        # if a list, join values with a space, else keep original
    box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)
        # define regex to capture the most common formats
    form_one = r'\$\s*\d+\.?\d*\s*[mb]illion'
    form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illi?on)'
        # replace hypens from value ranges with '$'
    box_office = box_office.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
        # execute capture of both regex statements for the box_office column and apply the Parse_Dollars function
    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    # drop original Box office column
    wiki_movies_df.drop('Box office', axis=1, inplace=True)
    # clean budget data
    # drop null values
    budget = wiki_movies_df['Budget'].dropna()
    # join lists with a space if a list is present, else keep the original value 
    budget = budget.apply(lambda x: ' '.join(x) if type(x) == list else x)
    # replace hyphen value ranges with a '$'
    budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    # remove bracketed references
    budget = budget.str.replace(r'\[\d+\]\s*', '', regex=True)
    # capture buget values based on the same regex used for box office values
    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    # drop original column
    wiki_movies_df.drop('Budget', axis=1, inplace=True)
    # clean release date data
    # drop null values and join lists if they are present, else keep value
    release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    # define date formats as regex
        # Month DD YYYY
    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{1,2},\d{4}'
        # YYYY (various separator) MM (various separator) DD
    date_form_two = r'\d{4}[,|.|-|\:|\\|\s|/]\d{2}[,|.|-|\:|\\|\s|/]\d{2}'
        # Month YYYY
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
        # YYYY
    date_form_four = r'\d{4}'
    # capture release dates that contain the above regex, convert to datetime and auto identify date format
    wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})', flags=re.IGNORECASE)[0], infer_datetime_format=True)
    # clean running time data
    # drop null values and join lists if they exist, else keep original value
    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    # capture only the number values for varying hour/min formats
    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
    # convert the captured values to numeric formats, based on columns H|M|minutes(separate from first pair)
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
    # calculate running time combining cols 1 and 2 (if not 0), else using 3
    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: float(row[0]*60 + row[1]) if row[2] == 0 else float(row[2]), axis=1)
    # drop original column
    wiki_movies_df.drop('Running time', axis=1, inplace=True)
    # Clean Kaggle data
    # drop adult films, and grab those that are videos
    kaggle_meta = kaggle_meta[kaggle_meta['adult'] == 'False'].drop('adult',axis='columns')
    kaggle_meta['video'] = kaggle_meta['video'] == 'True'
    # set budget column as integer
    kaggle_meta['budget'] = kaggle_meta['budget'].astype(int)
    # change ID column to numeric
    kaggle_meta['id'] = pd.to_numeric(kaggle_meta['id'], errors='raise')
    # change popularity to numeric
    kaggle_meta['popularity'] = pd.to_numeric(kaggle_meta['popularity'], errors='raise')
    # change release date to datetime
    kaggle_meta['release_date'] = pd.to_datetime(kaggle_meta['release_date'])
    # merge movies dataframe with kaggle data on movies df IMDB ID
    movies_df = pd.merge(wiki_movies_df, kaggle_meta, on='imdb_id', suffixes=['_wiki','_kaggle'])
    # change running time to float type
    [float(x) for x in movies_df['running_time']]
    # call function to fill in missing data
    fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')
    # filer movies to specific columns
    movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                   'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                   'genres','original_language','overview','spoken_languages','Country',
                   'production_companies','production_countries','Distributor',
                   'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                  ]]
    # rename columns to specific
    movies_df.rename({'id':'kaggle_id',
                      'title_kaggle':'title',
                      'url':'wikipedia_url',
                      'budget_kaggle':'budget',
                      'release_date_kaggle':'release_date',
                      'Country':'country',
                      'Distributor':'distributor',
                      'Producer(s)':'producers',
                      'Director':'director',
                      'Starring':'starring',
                      'Cinematography':'cinematography',
                      'Editor(s)':'editors',
                      'Writer(s)':'writers',
                      'Composer(s)':'composers',
                      'Based on':'based_on'
                     }, axis='columns', inplace=True)
    # count the number of ratings per movie by rating, rename to 'count', create table to show ID, and rating count
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
            .rename({'userId':'count'}, axis=1) \
            .pivot(index='movieId',columns='rating', values='count')
    # convert ratings timestamp to datetime
    ratings['timestamp'] = pd.to_datetime(ratings['timestamp'], unit='s')
    # rename columns to represent rating value
    rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]
    # fill empty counts with 0
    rating_counts = rating_counts[["rating_0.5", "rating_1.0", "rating_1.5", "rating_2.0", "rating_2.5", "rating_3.0", "rating_3.5", "rating_4.0", "rating_4.5", "rating_5.0"]].fillna(0)
    # merge ratings with movies_df for final dataframe
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
    for i in ["rating_0.5", "rating_1.0", "rating_1.5", "rating_2.0", "rating_2.5", "rating_3.0", "rating_3.5", "rating_4.0", "rating_4.5", "rating_5.0"]:
        movies_with_ratings_df[i].fillna(0, inplace=True)
    movies_with_ratings_df['belongs_to_collection'].fillna("No", inplace=True)
except Exception as error:
    print(error)
    
    

In [11]:
# 20. Check that the wiki_movies_df DataFrame looks like this. 
movies_with_ratings_df.head()

Unnamed: 0,imdb_id,kaggle_id,title,original_title,tagline,belongs_to_collection,wikipedia_url,imdb_link,runtime,budget,...,rating_0.5,rating_1.0,rating_1.5,rating_2.0,rating_2.5,rating_3.0,rating_3.5,rating_4.0,rating_4.5,rating_5.0
0,tt0098987,9548,The Adventures of Ford Fairlane,The Adventures of Ford Fairlane,Kojak. Columbo. Dirty Harry. Wimps.,No,https://en.wikipedia.org/wiki/The_Adventures_o...,https://www.imdb.com/title/tt0098987/,104.0,49000000.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,tt0098994,25501,"After Dark, My Sweet","After Dark, My Sweet",All they risked was everything.,No,"https://en.wikipedia.org/wiki/After_Dark,_My_S...",https://www.imdb.com/title/tt0098994/,114.0,6000000.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,tt0099005,11856,Air America,Air America,The few. The proud. The totally insane.,No,https://en.wikipedia.org/wiki/Air_America_(film),https://www.imdb.com/title/tt0099005/,112.0,35000000.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,tt0099012,8217,Alice,Alice,,No,https://en.wikipedia.org/wiki/Alice_(1990_film),https://www.imdb.com/title/tt0099012/,102.0,12000000.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,tt0099018,25943,Almost an Angel,Almost an Angel,Who does he think he is?,No,https://en.wikipedia.org/wiki/Almost_an_Angel,https://www.imdb.com/title/tt0099018/,95.0,25000000.0,...,3.0,0.0,3.0,2.0,5.0,26.0,37.0,46.0,16.0,11.0


In [12]:
# 21. Check that wiki_movies_df DataFrame columns are correct. 
movies_with_ratings_df.columns.to_list()

['imdb_id',
 'kaggle_id',
 'title',
 'original_title',
 'tagline',
 'belongs_to_collection',
 'wikipedia_url',
 'imdb_link',
 'runtime',
 'budget',
 'revenue',
 'release_date',
 'popularity',
 'vote_average',
 'vote_count',
 'genres',
 'original_language',
 'overview',
 'spoken_languages',
 'country',
 'production_companies',
 'production_countries',
 'distributor',
 'producers',
 'director',
 'starring',
 'cinematography',
 'editors',
 'writers',
 'composers',
 'based_on',
 'rating_0.5',
 'rating_1.0',
 'rating_1.5',
 'rating_2.0',
 'rating_2.5',
 'rating_3.0',
 'rating_3.5',
 'rating_4.0',
 'rating_4.5',
 'rating_5.0']

In [13]:
# create SQL table for movies_df
connection = F"postgresql://postgres:{password}@127.0.0.1:5432/Movies_Data"
engine = create_engine(connection)
movies_df.to_sql(name='movies', con=engine, if_exists='replace')

# create table for ratings_df
rows_imported = 0
start_time = time.time()
for data in pd.read_csv(ratings_file, chunksize=1000000):
    round_time = time.time()
    print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
    data.to_sql(name='ratings', con=engine, if_exists='replace')
    rows_imported += len(data)
    print(f'Chunk Done. {time.time() - round_time} total seconds elapsed')

print(f'Done. {time.time() - start_time} total seconds elapsed for entire upload.')


importing rows 0 to 1000000...Chunk Done. 48.866738080978394 total seconds elapsed
importing rows 1000000 to 2000000...Chunk Done. 47.790443658828735 total seconds elapsed
importing rows 2000000 to 3000000...Chunk Done. 47.639814138412476 total seconds elapsed
importing rows 3000000 to 4000000...Chunk Done. 47.94582724571228 total seconds elapsed
importing rows 4000000 to 5000000...Chunk Done. 49.34446907043457 total seconds elapsed
importing rows 5000000 to 6000000...Chunk Done. 48.45144557952881 total seconds elapsed
importing rows 6000000 to 7000000...Chunk Done. 47.260180711746216 total seconds elapsed
importing rows 7000000 to 8000000...Chunk Done. 46.02908897399902 total seconds elapsed
importing rows 8000000 to 9000000...Chunk Done. 46.38875198364258 total seconds elapsed
importing rows 9000000 to 10000000...Chunk Done. 53.09155035018921 total seconds elapsed
importing rows 10000000 to 11000000...Chunk Done. 57.98240351676941 total seconds elapsed
importing rows 11000000 to 1200