In [1]:
import json
import pandas as pd
import numpy as np

import re

from sqlalchemy import create_engine
import psycopg2

from config import db_password

import time

In [2]:
# Add clean movie function

def clean_movie(movie):
    movie = dict(movie)
    alt_titles = {}
    alt_langs = ['Also known as','Arabic', 'Cantonese', 'Chinese', 'French', 'Gwoyeu Romatzyh', 'Hangul', 
                 'Hebrew', 'Hepburn', 'Hanyu Pinyin', 'Hokkien POJ', 'Jyutping','Japanese',
                 'Literally','Literal Meaning', 'Mandarin', 'McCune–Reischauer', 'Original title',
                 'Polish',  'Revised Romanization', 'Romanized', 'Russian', 'Simplified', 'Simplified Chinese',
                 'Traditional', 'Traditional Chinese','Wide-Giles','Yale Romanization','Yiddish']
    for key in alt_langs:
        if key in movie:
            alt_titles[key] = movie[key]
            movie.pop(key)
            
    if len(alt_titles) > 0:
        movie['alt_titles'] = alt_titles
        
    def change_column_name(old_name, new_name):
        if old_name in movie:
            movie[new_name] = movie.pop(old_name)
            
            
    change_column_name('Adaptation by', 'Writer(s)')
    change_column_name('Country of origin', 'Country')
    change_column_name('Directed by', 'Director')
    change_column_name('Distributed by', 'Distributor')
    change_column_name('Edited by', 'Editor(s)')
    change_column_name('Engine', 'Engine(s)')
    change_column_name('Duration', 'Running time')
    change_column_name('Length', 'Running time')
    change_column_name('Original release', 'Release date')
    change_column_name('Music by', 'Composer(s)')
    change_column_name('Parent', 'Parents(s)')
    change_column_name('Preceded by', 'Predecessor')
    change_column_name('Produced by', 'Producer(s)')
    change_column_name('Producer', 'Producer(s)')
    change_column_name('Productioncompanies ', 'Production company(s)')
    change_column_name('Productioncompany ', 'Production company(s)')
    change_column_name('Released', 'Release Date')
    change_column_name('Replaced by', 'Replaced')
    change_column_name('Release Date', 'Release date')
    change_column_name('Screen story by', 'Writer(s)')
    change_column_name('Screenplay by', 'Writer(s)')
    change_column_name('Story by', 'Writer(s)')
    change_column_name('Theme music composer', 'Composer(s)')
    change_column_name('Written by', 'Writer(s)')
    
    return movie
    

In [8]:
#  Add the function that takes in the 3 data files

def extract_transform_load(wiki_file, kaggle_file, ratings_file ):
    
    # Read in kaggle metadata and MovieLens ratings csv files as dfs
    kaggle_metadata = pd.read_csv(kaggle_file, low_memory = False)
    ratings = pd.read_csv(ratings_file)
    
    
    # Open the read the Wikipedia data JSON file.
    with open(wiki_file, mode='r') as file:
        wiki_movies_raw = json.load(file)
    
    # Read in the raw wiki movie data as a Pandas DataFrame.
    wiki_movies_df = pd.DataFrame(wiki_movies_raw)
    
    # list comprehension to filter out tv shows
    wiki_movies = [movie for movie in wiki_movies_raw
           if ('Director' in movie or 'Directed by' in movie)
               and 'imdb_link' in movie
               and 'No. of episodes' not in movie]

    # list comprehension for clean_movies() on dataframe
    clean_movies = [clean_movie(movie) for movie in wiki_movies]
    
    # read in cleaned movies as dataframe
    wiki_movies_df = pd.DataFrame(clean_movies)
    
    # Use try except block to catch errors while dropping duplicate IMDB Ids
    try:

        wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')

        wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)
        
    except Exception as e:
        print(e)
        
    # keep rows that have less than 90% null
    wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
    wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]
    
    

    # Clean box office data
    box_office = wiki_movies_df['Box office'].dropna()
    
    # box office data to string values
    box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)
    box_office[box_office.map(lambda x: type(x) != str)]
    
    # regex to parse through box office
    form_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on'
    form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'
    
    # remove ranges
    box_office = box_office.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)

    
    # parse dollars function
    def parse_dollars(s):
        if type(s) != str:
            return np.nan

        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):
            s = re.sub('\$|\s|[a-zA-Z]','', s)
            value = float(s) * 10**6
            return value

        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):
            s = re.sub('\$|\s|[a-zA-Z]','', s)
            value = float(s) * 10**9         
            return value

        elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):
            s = re.sub('\$|,','', s)
            value = float(s)
            return value

        else:
            return np.nan
    
    # clean box office column
    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)

    
    
    # clean budget column
    budget = wiki_movies_df['Budget'].dropna()
    budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)
    
    # remove ranges and remove citation reverences
    budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    budget = budget.str.replace(r'\[\d+\]\s*', '')
    
    # parse and extract budget column
    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)

    
    # Clean release date
    release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    
    # Create regex for each possible form
    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]?\d,\s\d{4}'
    date_form_two = r'\d{4}.[01]\d.[0123]\d'
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
    date_form_four = r'\d{4}'
    
    wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)
    
    
    # Clean Running time
    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
    
    
   # Clean the kaggle data
    kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult',axis='columns')
    kaggle_metadata['video'] = kaggle_metadata['video'] == 'True'
    kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
    kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
    kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')
    kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])

    # Merge the dataframes into movies dataframe
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])

    
    # drop unnecessary columns
    movies_df.drop('Box office', axis=1, inplace=True)
    movies_df.drop('Budget', axis=1, inplace=True)
    movies_df.drop('Release date', axis=1, inplace=True)
    movies_df.drop('Running time', axis=1, inplace=True)
    movies_df.drop(movies_df[(movies_df['release_date_wiki'] > '1996-01-01') & (movies_df['release_date_kaggle'] < '1965-01-01')].index, inplace = True)
    movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)

    # fill in missing kaggle data
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        df[kaggle_column] = df.apply(
            lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column]
            , axis=1)
        df.drop(columns=wiki_column, inplace=True)
        
    fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')
    
    # Reorder columns
    movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                       'genres','original_language','overview','spoken_languages','Country',
                       'production_companies','production_countries','Distributor',
                       'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                      ]]
    
    # rename columns
    movies_df.rename({'id':'kaggle_id',
                      'title_kaggle':'title',
                      'url':'wikipedia_url',
                      'budget_kaggle':'budget',
                      'release_date_kaggle':'release_date',
                      'Country':'country',
                      'Distributor':'distributor',
                      'Producer(s)':'producers',
                      'Director':'director',
                      'Starring':'starring',
                      'Cinematography':'cinematography',
                      'Editor(s)':'editors',
                      'Writer(s)':'writers',
                      'Composer(s)':'composers',
                      'Based on':'based_on'
                     }, axis='columns', inplace=True)
    

    # Transform and merge rating data
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                .rename({'userId':'count'}, axis=1) \
                .pivot(index='movieId',columns='rating', values='count')
    rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]
    
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
    
    movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)

    
    # Save tables to PostgreSQL
    db_string = f"postgresql://postgres:{db_password}@127.0.0.1:5432/movie_data"

    engine = create_engine(db_string)

    movies_df.to_sql(name='movies', con=engine, if_exists = 'replace')
    
    movies_with_ratings_df.to_sql(name='movies_with_ratings', con=engine, if_exists = 'replace')
    
    # Import the ratings data--> too large to do ^
    rows_imported = 0
    # get the start_time from time.time()
    start_time = time.time()
    for data in pd.read_csv(f'{file_dir}ratings.csv', chunksize=1000000):
        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
        data.to_sql(name='ratings', con=engine, if_exists='append')
        rows_imported += len(data)

        # add elapsed time to final print out
        print(f'Done. {time.time() - start_time} total seconds elapsed')
    

In [9]:
# Create the path to file directory and variables for the three files. 

file_dir = '/Users/ellenrafferty/Desktop/AnalysisProjects/Movies-ETL/Resources/'

wiki_file = f'{file_dir}wikipedia-movies.json'

kaggle_file = f'{file_dir}movies_metadata.csv'

ratings_file = f'{file_dir}ratings.csv'

In [10]:
extract_transform_load(wiki_file, kaggle_file, ratings_file )



importing rows 0 to 1000000...Done. 37.99850416183472 total seconds elapsed
importing rows 1000000 to 2000000...Done. 72.71053504943848 total seconds elapsed
importing rows 2000000 to 3000000...Done. 108.5547239780426 total seconds elapsed
importing rows 3000000 to 4000000...Done. 144.83230304718018 total seconds elapsed
importing rows 4000000 to 5000000...Done. 179.67507910728455 total seconds elapsed
importing rows 5000000 to 6000000...Done. 214.65306687355042 total seconds elapsed
importing rows 6000000 to 7000000...Done. 251.33330821990967 total seconds elapsed
importing rows 7000000 to 8000000...Done. 287.2317581176758 total seconds elapsed
importing rows 8000000 to 9000000...Done. 326.62478709220886 total seconds elapsed
importing rows 9000000 to 10000000...Done. 361.45641899108887 total seconds elapsed
importing rows 10000000 to 11000000...Done. 397.23253107070923 total seconds elapsed
importing rows 11000000 to 12000000...Done. 435.4626190662384 total seconds elapsed
importing 