In [108]:
# CHALLENGE MODULE 8
#   Create a function that takes in three arguments:
#       Wikipedia data
#       Kaggle metadata
#       MovieLens rating data (from Kaggle)

#  Use the code from your Jupyter Notebook so that the function performs all of the transformation steps.
#  Remove any exploratory data analysis and redundant code.

#  Add the load steps from the Jupyter Notebook to the function. You’ll need to remove the existing data from SQL,
#  but keep the empty tables.

#  Check that the function works correctly on the current Wikipedia and Kaggle data.

#  Document any assumptions that are being made.


In [109]:
# Import Dependencies
import json
import pandas as pd
import numpy as np
import os
import time
from sqlalchemy import create_engine
import re

# Import Database Password
from config import db_password

In [110]:
# Set file path directory
file_dir = r'C:/Users/dritter/Berkeley/Movies-ETL/Resources/'

In [111]:
# Open and load Wikepedia JSON
with open(f'{file_dir}/wikipedia.movies.json', mode='r') as file:
    wiki_movies_raw = json.load(file)

In [112]:
# Read Kaggle metadata and ratings
kaggle_metadata = pd.read_csv(f'{file_dir}movies_metadata.csv')
ratings = pd.read_csv(f'{file_dir}ratings.csv')

  interactivity=interactivity, compiler=compiler, result=result)


In [113]:
# Create dataframe for wiki movies
wiki_movies_df = pd.DataFrame(wiki_movies_raw)

In [114]:
# List of wiki movies
wiki_movies = [movie for movie in wiki_movies_raw
               if ('Director' in movie or 'Directed by' in movie)
                   and 'imdb_link' in movie
                   and 'No. of episodes' not in movie]

In [115]:
# Function to clean data
def clean_movie(movie):
    movie = dict(movie) #create a non-destructive copy
    alt_titles = {}
    # combine alternate titles into one list
    for key in ['Also known as','Arabic','Cantonese','Chinese','French',
                'Hangul','Hebrew','Hepburn','Japanese','Literally',
                'Mandarin','McCune-Reischauer','Original title','Polish',
                'Revised Romanization','Romanized','Russian',
                'Simplified','Traditional','Yiddish']:
        if key in movie:
            alt_titles[key] = movie[key]
            movie.pop(key)
    if len(alt_titles) > 0:
        movie['alt_titles'] = alt_titles

    # merge column names
    def change_column_name(old_name, new_name):
        if old_name in movie:
            movie[new_name] = movie.pop(old_name)
    change_column_name('Adaptation by', 'Writer(s)')
    change_column_name('Country of origin', 'Country')
    change_column_name('Directed by', 'Director')
    change_column_name('Distributed by', 'Distributor')
    change_column_name('Edited by', 'Editor(s)')
    change_column_name('Length', 'Running time')
    change_column_name('Original release', 'Release date')
    change_column_name('Music by', 'Composer(s)')
    change_column_name('Produced by', 'Producer(s)')
    change_column_name('Producer', 'Producer(s)')
    change_column_name('Productioncompanies ', 'Production company(s)')
    change_column_name('Productioncompany ', 'Production company(s)')
    change_column_name('Released', 'Release Date')
    change_column_name('Release Date', 'Release date')
    change_column_name('Screen story by', 'Writer(s)')
    change_column_name('Screenplay by', 'Writer(s)')
    change_column_name('Story by', 'Writer(s)')
    change_column_name('Theme music composer', 'Composer(s)')
    change_column_name('Written by', 'Writer(s)')

    return movie

In [116]:
# Create clean movie list and create dataframe
clean_movies = [clean_movie(movie) for movie in wiki_movies]
wiki_movies_df = pd.DataFrame(clean_movies)

In [117]:
# Drop duplicates from clean data
wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)

In [118]:
# Define columns to retain
wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]

In [119]:
# Drop NA values
box_office = wiki_movies_df['Box office'].dropna() 

In [120]:
# Find non-strings
box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)

In [121]:
# Variable for form $123.4 million/billion.”
form_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on'

In [122]:
# Variable for form “$123,456,789.”
form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'

In [123]:
# Compare the variables in forms
matches_form_one = box_office.str.contains(form_one, flags=re.IGNORECASE)
matches_form_two = box_office.str.contains(form_two, flags=re.IGNORECASE)

In [124]:
# Search for string with dollar sign and ends with hyphen - replace with dollar sign
box_office = box_office.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)

In [125]:
# Extract and convert box office values
def parse_dollars(s):
    # if s is not a string, return NaN
    if type(s) != str:
        return np.nan

    # if input is of the form $###.# million
    if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):

        # remove dollar sign and " million"
        s = re.sub('\$|\s|[a-zA-Z]','', s)

        # convert to float and multiply by a million
        value = float(s) * 10**6

        # return value
        return value

    # if input is of the form $###.# billion
    elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):

        # remove dollar sign and " billion"
        s = re.sub('\$|\s|[a-zA-Z]','', s)

        # convert to float and multiply by a billion
        value = float(s) * 10**9

        # return value
        return value

    # if input is of the form $###,###,###
    elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):

        # remove dollar sign and commas
        s = re.sub('\$|,','', s)

        # convert to float
        value = float(s)

        # return value
        return value

    # otherwise, return NaN
    else:
        return np.nan

In [126]:
# Extract values from box office
wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)

In [127]:
# Drop temporary box office column
wiki_movies_df.drop('Box office', axis=1, inplace=True)

In [128]:
# Create budget variable
budget = wiki_movies_df['Budget'].dropna()

In [129]:
# Convert lists to strings
budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)

In [130]:
# Remove values between dollar sign and hyphen
budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)

In [131]:
# Parse box office data
matches_form_one = budget.str.contains(form_one, flags=re.IGNORECASE)
matches_form_two = budget.str.contains(form_two, flags=re.IGNORECASE)

In [132]:
# remove citation references
budget = budget.str.replace(r'\[\d+\]\s*', '')

In [133]:
# Parse budget values
wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)

In [134]:
# Drop budget column
wiki_movies_df.drop('Budget', axis=1, inplace=True)

In [135]:
# Parse Release date
release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)

In [136]:
# Parse Full month name - Four digit year, Full month name, four digit year
date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]\d,\s\d{4}'
date_form_two = r'\d{4}.[01]\d.[123]\d'
date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
date_form_four = r'\d{4}'

In [137]:
# Parse Release date
wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)

In [138]:
# Parse Running time - create variable of non-nulls and convert lists to strings
running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)

In [139]:
# Extract Values
running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')

In [140]:
# Convert to numeric values
running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)

In [141]:
# Convert the hour capture groups and minute capture groups to minutes if the pure minutes capture group is zero
wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)

In [142]:
# Drop Running Time
wiki_movies_df.drop('Running time', axis=1, inplace=True)

In [143]:
# Merge wiki movies and kaggle data
movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])

In [144]:
# Drop error merging two
movies_df = movies_df.drop(movies_df[(movies_df['release_date_wiki'] > '1996-01-01') & (movies_df['release_date_kaggle'] < '1965-01-01')].index)

In [145]:
# Drop title_wiki, release_date_wiki, Language, and Production company(s) columns.
movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)

In [146]:
# Create function filling in missing data for column pair then drop redundant column
def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
    df[kaggle_column] = df.apply(
        lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column]
        , axis=1)
    df.drop(columns=wiki_column, inplace=True)

In [147]:
# Create loop
for col in movies_df.columns:
    lists_to_tuples = lambda x: tuple(x) if type(x) == list else x
    value_counts = movies_df[col].apply(lists_to_tuples).value_counts(dropna=False)
    num_values = len(value_counts)
    if num_values == 1:
        print(col)

adult


In [148]:
# Re-order columns
movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                       'genres','original_language','overview','spoken_languages','Country',
                       'production_companies','production_countries','Distributor',
                       'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                      ]]

In [149]:
# Rename columns for consistency
movies_df.rename({'id':'kaggle_id',
                  'title_kaggle':'title',
                  'url':'wikipedia_url',
                  'budget_kaggle':'budget',
                  'release_date_kaggle':'release_date',
                  'Country':'country',
                  'Distributor':'distributor',
                  'Producer(s)':'producers',
                  'Director':'director',
                  'Starring':'starring',
                  'Cinematography':'cinematography',
                  'Editor(s)':'editors',
                  'Writer(s)':'writers',
                  'Composer(s)':'composers',
                  'Based on':'based_on'
                 }, axis='columns', inplace=True)

In [150]:
# group movieID and ratings - rename for userID to count
rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                .rename({'userId':'count'}, axis=1) \
                .pivot(index='movieId',columns='rating', values='count')

In [151]:
# Rename columns
rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]

In [152]:
# convert ID to int64
movies_df['kaggle_id'] = movies_df['kaggle_id'].astype(np.int64)

In [153]:
# Merge data with left join
movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')

In [154]:
# fill NA values
movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)

In [155]:
# Create database string
db_string = f"postgres://postgres:{db_password}@127.0.0.1:5432/movie_data"

In [156]:
# Create database engine
engine = create_engine(db_string)

In [158]:
# Import movie data
movies_df.to_sql(name='movies', con=engine)

In [107]:
# Import and print results
rows_imported = 0
# get the start_time from time.time()
for data in pd.read_csv(f'{file_dir}ratings.csv', chunksize=1000000):

    print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
    data.to_sql(name='ratings', con=engine, if_exists='append')
    rows_imported += len(data)

    # add elapsed time to final print out
    print(f'Done.')

importing rows 0 to 1000000...Done.
importing rows 1000000 to 2000000...Done.
importing rows 2000000 to 3000000...Done.
importing rows 3000000 to 4000000...Done.
importing rows 4000000 to 5000000...Done.
importing rows 5000000 to 6000000...Done.
importing rows 6000000 to 7000000...Done.
importing rows 7000000 to 8000000...Done.
importing rows 8000000 to 9000000...Done.
importing rows 9000000 to 10000000...Done.
importing rows 10000000 to 11000000...Done.
importing rows 11000000 to 12000000...Done.
importing rows 12000000 to 13000000...Done.
importing rows 13000000 to 14000000...Done.
importing rows 14000000 to 15000000...Done.
importing rows 15000000 to 16000000...Done.
importing rows 16000000 to 17000000...Done.
importing rows 17000000 to 18000000...Done.
importing rows 18000000 to 19000000...Done.
importing rows 19000000 to 20000000...Done.
importing rows 20000000 to 21000000...Done.
importing rows 21000000 to 22000000...Done.
importing rows 22000000 to 23000000...Done.
importing row