In [13]:
import json
import pandas as pd
import numpy as np

import re

from sqlalchemy import create_engine
import psycopg2

from config import db_password

import time

In [14]:
#  Add the clean movie function that takes in the argument, "movie"
def clean_movie(movie):
    
    movie = dict(movie)
    alt_titles = {}
    
    # combine alternate titles into one list
    for key in ['Also known as','Arabic','Cantonese','Chinese','French',
                'Hangul','Hebrew','Hepburn','Japanese','Literally',
                'Mandarin','McCune–Reischauer','Original title','Polish',
                'Revised Romanization','Romanized','Russian',
                'Simplified','Traditional','Yiddish']:
        
        if key in movie:
            alt_titles[key] = movie[key]
            movie.pop(key)
    
    if len(alt_titles) > 0:
        movie['alt_titles'] = alt_titles
    
    # combining similar columns names
    def change_col_name(old_name, new_name):
        
        if old_name in movie:
            movie[new_name] = movie.pop(old_name)
        
    change_col_name('Adaptation by', 'Written by')
    change_col_name('Country of origin', 'Country')
    change_col_name('Directed by', 'Director')
    change_col_name('Distributed by', 'Distributor')
    change_col_name('Edited by', 'Editor(s)')
    change_col_name('Length', 'Running time')
    change_col_name('Original release', 'Release date')
    change_col_name('Music by', 'Composer(s)')
    change_col_name('Produced by', 'Producer(s)')
    change_col_name('Producer', 'Producer(s)')
    change_col_name('Productioncompanies ', 'Production company(s)')
    change_col_name('Productioncompany ', 'Production company(s)')
    change_col_name('Released', 'Release Date')
    change_col_name('Release Date', 'Release date')
    change_col_name('Screen story by', 'Written by')
    change_col_name('Screenplay by', 'Written by')
    change_col_name('Story by', 'Written by')
    change_col_name('Theme music composer', 'Music by')
    
    return movie

In [15]:
# add a function that takes in three arguments
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)
def etl(x, y, z):
    
    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames
    kaggle_metadata_df = pd.read_csv(kaggle_file, low_memory=False)
    ratings = pd.read_csv(ratings_file)

    # Open and read the Wikipedia data JSON file
    with open(wiki_file, mode='r') as file:
        raw = json.load(file)
    
    # list comprehension to filter out TV shows
    cleaned_wiki = [row for row in raw if ('Director' in row \
                                           or 'Directed by' in row) \
                                           and 'Television series' not in row \
                                           or 'No. of episodes' not in row \
                                           or 'No. of seasons' not in row \
                                           or 'Seasons' not in row]
    

    # list comprehension to iterate through cleaned wiki movies list and call clean_movie function on each movie
    clean_list = [clean_movie(x) for x in cleaned_wiki]

    # read in the cleaned movies list as a DataFrame
    wiki_movies_df = pd.DataFrame(clean_list)

    # try-except block to catch errors while extracting the IMDb ID using a regex string and drop any duplicates 
    try:
        wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
        wiki_movies_df.drop_duplicates(subset="imdb_id", inplace=True)
    except Exception:
        print("Oh fun, an error." (Exception))

    #  list comprehension to keep columns that don't have null values from wiki_movies_df DataFrame
    keep_columns = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]

    # variable to hold the non-null values from the “Box office” column
    box_office = wiki_movies_df['Box office'].dropna()
    
    # convert box office data to string values using lambda function
    # if data is stored as a list use join function to concatenate into a string with ' ' between each
    box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)

    # regex to match the six elements of "form_one" of the box office data
    form_1 = r'\$\s*\d+\.?\d*\s*[mb]illi?on'
    
    # regex to match the three elements of "form_two" of the box office data
    form_2 = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'

    # add the parse_dollars function
    def parse_dollars(s):
        
        # if s is not a string, return NaN
        if type(s) != str:
            return np.nan
    
        # if input is of the form $###.# million
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):
        
            # remove dollar sign and " million"
            s = re.sub('\$|\s|[a-zA-Z]','', s)
        
            # convert to float and multiply by a million
            value = float(s) * 10**6
        
            # return value
            return value
    
        # if input is of the form $###.# billion
        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):
        
            # remove dollar sign and " billion"
            s = re.sub('\$|\s|[a-zA-Z]','', s)
        
            # convert to float and multiply by a billion
            value = float(s) * 10**9
        
            # return value
            return value
    
        # if input is of the form $###,###,###
        elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):
        
            # remove dollar sign and commas
            s = re.sub('\$|,','', s)
        
            # convert to float
            value = float(s)
        
            # return value
            return value
    
        # otherwise, return NaN
        else:
            return np.nan
    
        
    # clean box office column in wiki_movies_df DataFrame
    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_1}|{form_2})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    
    # clean budget column in wiki_movies_df DataFrame
    budget = wiki_movies_df['Budget'].dropna()
    budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)
    budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True) # remove values between $ and hypen
    budget = budget.str.replace(r'\[\d+\]\s*', '') # remove citations
    wiki_movies_df['budget'] = budget.str.extract(f'({form_1}|{form_2})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    
    # clean release date column in wiki_movies_df DataFrame
    release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]?\d,\s\d{4}'
    date_form_two = r'\d{4}.[01]\d.[0123]\d'
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
    date_form_four = r'\d{4}'
    release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})', flags=re.IGNORECASE)
    
    # clean running time column in wiki_movies_df DataFrame
    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
    wiki_movies_df.drop('Running time', axis=1, inplace=True)
    
    # clean Kaggle metadata by dropping 'adult' films and converting data types
    kaggle_metadata_df = kaggle_metadata_df[kaggle_metadata_df['adult'] == 'False'].drop('adult',axis='columns')
    kaggle_metadata_df['budget'] = kaggle_metadata_df['budget'].astype(int)
    kaggle_metadata_df['id'] = pd.to_numeric(kaggle_metadata_df['id'], errors='raise')
    kaggle_metadata_df['popularity'] = pd.to_numeric(kaggle_metadata_df['popularity'], errors='raise')
    kaggle_metadata_df['release_date_kaggle'] = pd.to_datetime(kaggle_metadata_df['release_date'])
    kaggle_metadata_df['video'] = kaggle_metadata_df['video'] == 'True'

    # merge the two DataFrames into movies DataFrame
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata_df, on='imdb_id', suffixes=['_wiki','_kaggle'])

    # drop unnecessary columns from movies DataFrame
    movies_df.drop(columns=['title_wiki','Language','Production company(s)'], inplace=True)

    # add function to fill in the missing Kaggle data
    def fill_missing(df, kaggle_column, wiki_column):
    
        df[kaggle_column] = df.apply(lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column], axis=1)
    
        df.drop(columns=wiki_column, inplace=True)

    # call the fill_missing function DataFrame and columns as arguments
    fill_missing(movies_df, 'runtime', 'running_time')
    fill_missing(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing(movies_df, 'revenue', 'box_office')

    # drop video column
    movies_df.drop(columns=['video'], inplace=True)

    # reorder columns for readability
    movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                       'genres','original_language','overview','spoken_languages','Country',
                       'production_companies','production_countries','Distributor',
                       'Producer(s)','Director','Starring','Cinematography','Editor(s)','Written by','Music by','Based on']]
    
    # rename columns in movies DataFrame for consistency
    movies_df.rename({'id':'kaggle_id',
                  'title_kaggle':'title',
                  'url':'wikipedia_url',
                  'budget_kaggle':'budget',
                  'release_date_kaggle':'release_date',
                  'Country':'country',
                  'Distributor':'distributor',
                  'Producer(s)':'producers',
                  'Director':'director',
                  'Starring':'starring',
                  'Cinematography':'cinematography',
                  'Editor(s)':'editors',
                  'Written by':'writers',
                  'Music by':'composers',
                  'Based on':'based_on'}, 
                     axis='columns', inplace=True)

    # transform and merge the ratings DataFrame
    # groupby movieId & rating, rename userId as count, pivot on movieId as index 
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                .rename({'userId':'count'}, axis=1) \
                .pivot(index='movieId',columns='rating', values='count')
    # rename columns
    rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]
    # merge movies_df & rating_counts
    movie_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
    # fill any blanks with 0
    movie_ratings_df[rating_counts.columns] = movie_ratings_df[rating_counts.columns].fillna(0)
    
    # create connection string
    db_string = f"postgresql://postgres:{db_password}@127.0.0.1:5433/movie_data"
    #create engine
    engine = create_engine(db_string)
    # add DataFrame to SQL database
    movies_df.to_sql(name='movies', con=engine, if_exists='replace')
    
    # create a variable for the number of rows imported
    rows_imported = 0

    # get the start_time from time.time()
    start_time = time.time()

    for data in pd.read_csv(f'{file_dir}ratings.csv', chunksize=1000000):

        # print out the range of rows that are being imported
        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')

        data.to_sql(name='ratings', con=engine, if_exists='append')

        # increment the number of rows imported by the chunksize
        rows_imported += len(data)

        # print that the rows have finished importing along with elapsed time
        print(f'Done. {time.time() - start_time} total seconds elapsed')


In [16]:
# create the path to file directory and variables for the three files
file_dir = "C:\\Users\\Frank Bucalo\\Desktop\\classwork\\Movies-ETL\\"
# The Wikipedia data
wiki_file = f'{file_dir}wikipedia-movies.json'
# The Kaggle metadata
kaggle_file = f'{file_dir}movies_metadata.csv'
# The MovieLens rating data.
ratings_file = f'{file_dir}ratings.csv'

In [17]:
# pass the three variables into extract, transform, load function
etl(wiki_file, kaggle_file, ratings_file)



importing rows 0 to 1000000...Done. 30.329548120498657 total seconds elapsed
importing rows 1000000 to 2000000...Done. 58.83858513832092 total seconds elapsed
importing rows 2000000 to 3000000...Done. 88.95331358909607 total seconds elapsed
importing rows 3000000 to 4000000...Done. 118.40760803222656 total seconds elapsed
importing rows 4000000 to 5000000...Done. 145.9919455051422 total seconds elapsed
importing rows 5000000 to 6000000...Done. 176.54497170448303 total seconds elapsed
importing rows 6000000 to 7000000...Done. 205.55298566818237 total seconds elapsed
importing rows 7000000 to 8000000...Done. 235.4390001296997 total seconds elapsed
importing rows 8000000 to 9000000...Done. 274.8316102027893 total seconds elapsed
importing rows 9000000 to 10000000...Done. 311.17630434036255 total seconds elapsed
importing rows 10000000 to 11000000...Done. 343.3491988182068 total seconds elapsed
importing rows 11000000 to 12000000...Done. 374.2883985042572 total seconds elapsed
importing ro