In [1]:
import json
import pandas as pd
import numpy as np
import re
from sqlalchemy import create_engine 
import psycopg2
from config import db_password
import time

In [2]:
# Define a variable for the directory that's holding the data
file_dir = "C:/Users/krumb/Classwork/Movies_ETL"

In [3]:
kaggle_metadata = pd.read_csv(f"{file_dir}/Data/movies_metadata.csv")
ratings = pd.read_csv(f"{file_dir}/Data/ratings.csv")

  interactivity=interactivity, compiler=compiler, result=result)


In [4]:
# To open the file in the directory instead of needing to type out entire directory every time
# If I move files, I only need to update the file_dir rather than 
f"{file_dir}wikipedia.movies.json"

'C:/Users/krumb/Classwork/Movies_ETLwikipedia.movies.json'

In [5]:
# Load the JSON file into a list of dictionaries
with open(f"{file_dir}/Data/wikipedia.movies.json", mode="r") as file:
    wiki_movies_raw = json.load(file)

In [6]:
# Create a fiter expression only for movies with a director and IMDb link
wiki_movies = [movie for movie in wiki_movies_raw
                if ("Director" in movie or "Directed by" in movie) 
                and "imdb_link" in movie
                and "No. of episodes" not in movie]

In [7]:
# Handle alternative titles by creating an empty dictionary
# loop through all alt. title keys
# if they exist, remove the key-value pair and add to alt. titles dict.
def clean_movie(movie):
    movie = dict(movie) # create a non-destructive copy
    alt_titles = {}
    # combine alternate titles into one list
    for key in ['Also known as','Arabic','Cantonese','Chinese','French',
                'Hangul','Hebrew','Hepburn','Japanese','Literally',
                'Mandarin','McCune–Reischauer','Original title','Polish',
                'Revised Romanization','Romanized','Russian',
                'Simplified','Traditional','Yiddish']:
        if key in movie:
            alt_titles[key] = movie[key]
            movie.pop(key)
    if len(alt_titles) > 0:
        movie['alt_titles'] = alt_titles 

    # merger column names that mean the same thing
    def change_column_name(old_name, new_name):
        if old_name in movie:
            movie[new_name] = movie.pop(old_name)
    change_column_name('Adaptation by', 'Writer(s)')
    change_column_name('Country of origin', 'Country')
    change_column_name('Directed by', 'Director')
    change_column_name('Distributed by', 'Distributor')
    change_column_name('Edited by', 'Editor(s)')
    change_column_name('Length', 'Running time')
    change_column_name('Original release', 'Release date')
    change_column_name('Music by', 'Composer(s)')
    change_column_name('Produced by', 'Producer(s)')
    change_column_name('Producer', 'Producer(s)')
    change_column_name('Productioncompanies ', 'Production company(s)')
    change_column_name('Productioncompany ', 'Production company(s)')
    change_column_name('Released', 'Release Date')
    change_column_name('Release Date', 'Release date')
    change_column_name('Screen story by', 'Writer(s)')
    change_column_name('Screenplay by', 'Writer(s)')
    change_column_name('Story by', 'Writer(s)')
    change_column_name('Theme music composer', 'Composer(s)')
    change_column_name('Written by', 'Writer(s)')

    return movie

In [8]:
# Make a list of cleanned movies with a list comprehension
try:
    clean_movies = [clean_movie(movie) for movie in wiki_movies]
    wiki_movies_df = pd.DataFrame(clean_movies)
except:
    print ("Unable to transform wikipedia data into DataFrame.")

In [9]:
# "(tt\d{7}}" the {7} says to match the last thing exactly 7 times
# doing this to get each individual movie with an IMDb 
# need to put an 'r' in front of the quotes because there is a backslash being used
wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)

In [10]:
# # Find null values
[[column,wiki_movies_df[column].isnull().sum()] for column in wiki_movies_df.columns]

[['url', 0],
 ['year', 0],
 ['imdb_link', 0],
 ['title', 1],
 ['Based on', 4852],
 ['Starring', 184],
 ['Narrated by', 6752],
 ['Cinematography', 691],
 ['Release date', 32],
 ['Running time', 139],
 ['Country', 236],
 ['Language', 244],
 ['Budget', 2295],
 ['Box office', 1548],
 ['Director', 0],
 ['Distributor', 357],
 ['Editor(s)', 548],
 ['Composer(s)', 518],
 ['Producer(s)', 202],
 ['Production company(s)', 1678],
 ['Writer(s)', 199],
 ['Genre', 6923],
 ['Original language(s)', 6875],
 ['Original network', 6908],
 ['Executive producer(s)', 6936],
 ['Production location(s)', 6986],
 ['Picture format', 6969],
 ['Audio format', 6972],
 ['Voices of', 7031],
 ['Followed by', 7024],
 ['Created by', 7023],
 ['Preceded by', 7023],
 ['Suggested by', 7032],
 ['alt_titles', 7012],
 ['Recorded', 7031],
 ['Venue', 7032],
 ['Label', 7031],
 ['Animation by', 7031],
 ['Color process', 7032],
 ['imdb_id', 0]]

In [12]:
# How to get rid of columns where 90% of the values are null
wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]

In [13]:
# BOX OFFICE
# Box office data should be numeric. Only want to look at rows that has defined data (aka drop missing values) 
box_office = wiki_movies_df["Box office"].dropna()

In [14]:
# Can see that some are stored as lists. We want to use a join() function and use a space as our joining character
box_office = box_office.apply(lambda x: " ".join(x) if type(x) == list else x)

In [15]:
# Create a regular expression to catch all of the box office values
# Fixing pattern matches
#   Some values have spaces in between the dollar sign and the number...just need to add '\s*' after the $
#   When million is misspelled. Just need to make the second i optional
form_one = r"\$\s*\d+\.?\d*\s*[mb]illi?on"

In [16]:
# Count how many box office values match our first form
# To ignore whether letters are upper or lowercase, add the argument 'flags' and set it equal to 're.IGNORECASE'
box_office.str.contains(form_one, flags=re.IGNORECASE).sum()

3903

In [17]:
# Next, create a reg. ex. that matches the form '$123,456,789'
# Fixing pattern matches
#   Some values have spaces in between the dollar sign and the number...just need to add '\s*'
#   Some values use a period as a thousands separator, not a comma, add in '[,\.]'
#   Need to add a negatice lookahead because we don't want to catch values like '1.234 billion'
form_two = r"\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)"
box_office.str.contains(form_two, flags=re.IGNORECASE).sum()

1559

In [18]:
# Want to see if any values aren't descibed by either form, and if any value is described by both
matches_form_one = box_office.str.contains(form_one, flags=re.IGNORECASE)
matches_form_two = box_office.str.contains(form_two, flags=re.IGNORECASE)

In [19]:
# Solve values given as a range
box_office = box_office.str.replace(r"\$.*[---](?![a-z])", "$", regex=True)

In [20]:
# Extract and convert the box office values. Only want to extract the parts of the string that match. Use 'str.extract()'
box_office.str.extract(f"({form_one}|{form_two})")

Unnamed: 0,0
0,$21.4 million
1,$2.7 million
2,"$57,718,089"
3,"$7,331,647"
4,"$6,939,946"
...,...
7070,$19.4 million
7071,$41.9 million
7072,$76.1 million
7073,$38.4 million


In [21]:
# Need a function to turn the extracted values into a numeric value
def parse_dollars(s):
    # if s is not a string, return NaN
    if type(s) != str:
        return np.nan

    # if input is of the form $###.# million
    if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):

        # remove dollar sign and " million"
        s = re.sub('\$|\s|[a-zA-Z]','', s)

        # convert to float and multiply by a million
        value = float(s) * 10**6

        # return value
        return value

    # if input is of the form $###.# billion
    elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):

        # remove dollar sign and " billion"
        s = re.sub('\$|\s|[a-zA-Z]','', s)

        # convert to float and multiply by a billion
        value = float(s) * 10**9

        # return value
        return value

    # if input is of the form $###,###,###
    elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):

        # remove dollar sign and commas
        s = re.sub('\$|,','', s)

        # convert to float
        value = float(s)

        # return value
        return value

    # otherwise, return NaN
    else:
        return np.nan


In [22]:
# Now we can parse the box office values to numeric values
#   Extract values from box_office using str.extract and apply parse_dollars to the first column of the dataframe
wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)

In [23]:
# No longer need the box office column, so we may drop it 
wiki_movies_df.drop('Box office', axis=1, inplace=True)

In [24]:
# BUDGET
# Create a budget variable
budget = wiki_movies_df['Budget'].dropna()

In [25]:
# Convert lists to strings
budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)

In [26]:
# Remove values between a $ and a hyphen
budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)

In [27]:
# Parse the budget data
matches_form_one = budget.str.contains(form_one, flags=re.IGNORECASE)
matches_form_two = budget.str.contains(form_two, flags=re.IGNORECASE)

In [28]:
# Remove citation references
budget = budget.str.replace(r'\[\d+\]\s*', '')

In [29]:
# Can now parse the budget values
wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)

In [30]:
# Drop the original budget column
wiki_movies_df.drop('Budget', axis=1, inplace=True)

In [31]:
# RELEASE DATE
# Make a variable that holds non-null values of Release date in the dataframe, converting lists to strings
release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)

In [32]:
# What we'll be parsing:
#   January 1, 2000....form_one
#   2000-01-01....form_two
#   January 2000....form_three
#   four-digit year....form_four
date_form_one = r"(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]\d,\s\d{4}"
date_form_two = r"\d{4}.[01]\d.[123]\d"
date_form_three = r"(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}"
date_form_four = r"\d{4}"

In [33]:
# Use the built-in to_datetime() method in Pandas
# There are different date formats, want to set the 'infer_datetime_format' to 'True'
wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)

In [34]:
# RUNNING TIME
# Make a variable that holds the non-null values of Running time in the dataframe
running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)

In [35]:
# Need to match all the hour + minute patterns with one regex. Must:
#   Start with one digit
#   Have an optional space after the digit and before "h"
#   Capture all abbreviations of "hour(s)". Need to make every letter optional except the "h"
#   Have an optional space after hours marker
#   Have optional number of digits for minutes
#   CODE = \d+\s*ho?u?r?s?\s*\d*
#   Want to only extract digits, so add capture groups () around the digits. Also, add in the or statement | for the other form
running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')

In [36]:
# The dataframe is all strings, so we need to convert to numeric values. Use 'to_numeric()'
# Set the errors arguement to 'coerce', this will turn empty strings to NaN
# We can then use 'fillna()' to change all NaN's to zero 
running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)

In [37]:
# Apply a function that will convert the hour and minute capture groups to minutes
wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)

In [38]:
# Can now drop Running time from the dataset
wiki_movies_df.drop('Running time', axis=1, inplace=True)

In [39]:
# Keep rows where adult column is False, and drop adult column
kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult',axis='columns')

In [40]:
# Convert data types
kaggle_metadata['video'] == 'True'

0        False
1        False
2        False
3        False
4        False
         ...  
45461    False
45462    False
45463    False
45464    False
45465    False
Name: video, Length: 45454, dtype: bool

In [41]:
# Assign  it back to video
kaggle_metadata['video'] = kaggle_metadata['video'] == 'True'

In [42]:
# For numeric columns: use to_numeric(), and make sure errors= is set to 'raise' so we know of any data that can't be converted to numbers
kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')

In [43]:
# Convert release_date to datetime
kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])

In [44]:
# RATINGS DATA
# Use null_counts with option True 
ratings.info(null_counts=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 26024289 entries, 0 to 26024288
Data columns (total 4 columns):
userId       26024289 non-null int64
movieId      26024289 non-null int64
rating       26024289 non-null float64
timestamp    26024289 non-null int64
dtypes: float64(1), int64(3)
memory usage: 794.2 MB


In [45]:
# Use to_datetime() 'unix' as the time unit in seconds(s)
pd.to_datetime(ratings['timestamp'], unit='s')

0          2015-03-09 22:52:09
1          2015-03-09 23:07:15
2          2015-03-09 22:52:03
3          2015-03-09 22:52:26
4          2015-03-09 22:52:36
                   ...        
26024284   2009-10-31 23:26:04
26024285   2009-10-31 23:33:52
26024286   2009-10-31 23:29:24
26024287   2009-11-01 00:06:30
26024288   2009-10-31 23:30:58
Name: timestamp, Length: 26024289, dtype: datetime64[ns]

In [46]:
# Dates look good, so assign to timestamp column
ratings['timestamp'] = pd.to_datetime(ratings['timestamp'], unit='s')

In [47]:
# MERGING DATA
# Print list of columns so we can see what is redundant
movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])

In [48]:
# Competing data:
# Wiki                     Movielens                Resolution
#--------------------------------------------------------------------------
# title_wiki               title_kaggle            Drop wikipedia
# running_time             runtime                 Keep kaggle data, but fill in zeros with wikipedia data                
# budget_wiki              budget_kaggle           Keep kaggle data, but fill in zeros with wikipedia data
# box_office               revenue                 Keep kaggle data, but fill in zeros with wikipedia data
# release_date_wiki        release_date_kaggle
# Language                 original_language       Drop wikipedia
# Production company(s)    production_companies    

In [49]:
# PUT IT ALL TOGETHER
movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)

In [50]:
# Make a function that fills in missing data for a column pair, then drops the redundant column
def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
    df[kaggle_column] = df.apply(
        lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column]
        , axis=1)
    df.drop(columns=wiki_column, inplace=True)

In [51]:
# Call the new function for the columns we will be filling in that have zeros
fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')

In [52]:
# Reorder the rows
movies_df = movies_df[['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                       'genres','original_language','overview','spoken_languages','Country',
                       'production_companies','production_countries','Distributor',
                       'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                      ]]

In [53]:
# Rename columns
movies_df.rename({'id':'kaggle_id',
                  'title_kaggle':'title',
                  'url':'wikipedia_url',
                  'budget_kaggle':'budget',
                  'release_date_kaggle':'release_date',
                  'Country':'country',
                  'Distributor':'distributor',
                  'Producer(s)':'producers',
                  'Director':'director',
                  'Starring':'starring',
                  'Cinematography':'cinematography',
                  'Editor(s)':'editors',
                  'Writer(s)':'writers',
                  'Composer(s)':'composers',
                  'Based on':'based_on'
                 }, axis='columns', inplace=True)

In [54]:
rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                .rename({'userId':'count'}, axis=1) \
                .pivot(index='movieId',columns='rating', values='count')

In [55]:
rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]

In [56]:
# Use a left merge
movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')

In [57]:
movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)

In [58]:
# START DATA LOAD
db_string = f"postgres://postgres:{db_password}@127.0.0.1:5432/movie_data"

In [59]:
# Create a database engine 
engine = create_engine(db_string)

In [60]:
# IMPORT MOVIE DATA
movies_df.to_sql(name='movies', con=engine, if_exists="replace")

In [None]:
# LOAD IN THE RATINGS DATA
# Create variable for number of rows imported
rows_imported = 0
# get the start_time from time.time()
start_time = time.time()

for data in pd.read_csv(f'{file_dir}/Data/ratings.csv', chunksize=100000):
    print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
    data.to_sql(name='ratings', con=engine, if_exists='append')
    rows_imported += len(data)

    # add elapsed time to final print out
    print(f'Done. {time.time() - start_time} total seconds elapsed')

importing rows 0 to 100000...Done. 13.348315477371216 total seconds elapsed
importing rows 100000 to 200000...Done. 25.272439002990723 total seconds elapsed
importing rows 200000 to 300000...Done. 37.27848505973816 total seconds elapsed
importing rows 300000 to 400000...Done. 48.7588574886322 total seconds elapsed
importing rows 400000 to 500000...Done. 60.501654624938965 total seconds elapsed
importing rows 500000 to 600000...Done. 72.109934091568 total seconds elapsed
importing rows 600000 to 700000...Done. 84.25073885917664 total seconds elapsed
importing rows 700000 to 800000...Done. 96.81627798080444 total seconds elapsed
importing rows 800000 to 900000...Done. 109.11094546318054 total seconds elapsed
importing rows 900000 to 1000000...Done. 120.97537970542908 total seconds elapsed
importing rows 1000000 to 1100000...Done. 132.7213840484619 total seconds elapsed
importing rows 1100000 to 1200000...Done. 144.3063883781433 total seconds elapsed
importing rows 1200000 to 1300000...Do