In [1]:
#import dependencies
import numpy as np
import pandas as pd
import json
import re

from sqlalchemy import create_engine

from config import db_password
import time

In [2]:
#Constant Data Path
DATA_PATH = "data/"
#File names: assume that the updated data will stay in the same formats
wikipedia_filename = "wikipedia.movies.json"
kaggle_meta_filname = "movies_metadata.csv"
kaggle_rating_filename = "ratings.csv"

In [3]:
# Create a ETL Process function with 3 data sources
def ETL_Movies_Process(wikipedia_filename,kaggle_meta_filname,kaggle_rating_filename):
    #####################################################################
    #Select only movies and not TV Show
    def filter_movies(wiki_movies_raw_list):
        # Find all records have Director or Directed and imdb_link and not TV Show by attributes
        ### make sure movie is a dictionary type ###
        wiki_movies = [movie for movie in wiki_movies_raw_list\
                   if  type(movie) == dict
                       and ('Director' in movie or 'Directed by' in movie)\
                       and 'imdb_link' in movie\
                       and 'No. of episodes' not in movie]
        return wiki_movies
    ######################################################################
    #Normalize each movie the same attributes
    def clean_movie(movie):
        #any error during normalize movie then return None
        try:
            movie = dict(movie) #create a non-destructive copy
            alt_titles = {}
            #insert titles attribute in a dictionary of alter_titles
            #delete the attribute from movie dictionary
            for key in ['Also known as','Arabic','Cantonese','Chinese','French',
                        'Hangul','Hebrew','Hepburn','Japanese','Literally',
                        'Mandarin','McCune–Reischauer','Original title','Polish',
                        'Revised Romanization','Romanized','Russian',
                        'Simplified','Traditional','Yiddish']:
                 if key in movie:
                    alt_titles[key] = movie[key]
                    movie.pop(key)

            if len(alt_titles) > 0:
                movie['alt_titles'] = alt_titles
            # merge column names
            def change_column_name(old_name, new_name):
                if old_name in movie:
                    movie[new_name] = movie.pop(old_name)
            change_column_name('Adaptation by', 'Writer(s)')
            change_column_name('Country of origin', 'Country')
            change_column_name('Directed by', 'Director')
            change_column_name('Distributed by', 'Distributor')
            change_column_name('Edited by', 'Editor(s)')
            change_column_name('Length', 'Running time')
            change_column_name('Original release', 'Release date')
            change_column_name('Music by', 'Composer(s)')
            change_column_name('Produced by', 'Producer(s)')
            change_column_name('Producer', 'Producer(s)')
            change_column_name('Productioncompanies ', 'Production company(s)')
            change_column_name('Productioncompany ', 'Production company(s)')
            change_column_name('Released', 'Release Date')
            change_column_name('Release Date', 'Release date')
            change_column_name('Screen story by', 'Writer(s)')
            change_column_name('Screenplay by', 'Writer(s)')
            change_column_name('Story by', 'Writer(s)')
            change_column_name('Theme music composer', 'Composer(s)')
            change_column_name('Written by', 'Writer(s)')
            return movie
        except:
            print("<ERROR>","clean_movie",movie)
            return None
    #######################################################################
    def movies_dataframe(wiki_movies_filename):
        #Read wikipedia movies json file
        with open(f"{DATA_PATH}{wiki_movies_filename}", mode='r') as file:
            wiki_movies_raw_list = json.load(file)
        #select only movies
        wiki_movies = filter_movies(wiki_movies_raw_list)
        #clean up movie attributes
        #clean_movie can return None if movie item causes any issue during normalize
        clean_wiki_movies = [clean_movie(movie) for movie in wiki_movies]
        #make sure remove None
        clean_wiki_movies = [movie for movie in clean_wiki_movies if movie is not None]
        #creat dataframe for movies
        wiki_movies_df = pd.DataFrame(clean_wiki_movies)
    
        return wiki_movies_df
    #######################################################################
    def parse_dollars(s):
        try:
            # if s is not a string, return NaN
            if type(s) != str:
                return np.nan
            # if input is of the form $###.# million
            if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):
                # remove dollar sign and " million"
                s = re.sub('\$|\s|[a-zA-Z]','', s)
                # convert to float and multiply by a million
                value = float(s) * 10**6
                # return value
                return value
            # if input is of the form $###.# billion
            elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):
                # remove dollar sign and " billion"
                s = re.sub('\$|\s|[a-zA-Z]','', s)
                # convert to float and multiply by a billion
                value = float(s) * 10**9
                # return value
                return value
            # if input is of the form $###,###,###
            elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):
                # remove dollar sign and commas
                s = re.sub('\$|,','', s)
                # convert to float
                value = float(s)
                # return value
                return value
            # otherwise, return NaN
            else:
                return np.nan
        except:
            print("<Error>","parse_dollars", s)
            return np.nan
    #######################################################################
    def parse_to_number(dataframe, oldcolumn_name, newcolumn_name):
        try:
            #range pattern: $38.9–40.3 million 
            range_pattern = r'\$.*[-—–](?![a-z])'
            #Pattern $20.0 million or $ 1.02 billion or $2,030 million or $1.234.334 million
            form_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on'
            form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'

            #tranform the column values to uniform value for future analytic query
            #get series column and drop all none value  
            column_series = dataframe[oldcolumn_name].dropna() 
            #flat out the columns value list to string
            column_series = column_series.apply(lambda x: ' '.join(x) if type(x) == list else x)
            #replace range pattern with $
            column_series = column_series.str.replace(range_pattern, '$', regex=True)
            #replace [9] to empty string
            column_series = column_series.str.replace(r'\[\d+\]\s*', '',regex=True)
            #add new column 
            dataframe[newcolumn_name] = column_series.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
            #remove the old column
            dataframe.drop(oldcolumn_name, axis=1, inplace=True)
        except:
            print("<Error>","parse_to_number",oldcolumn_name,newcolumn_name)
    #######################################################################
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        df[kaggle_column] = df.apply(lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column], axis=1)
        df.drop(columns=wiki_column, inplace=True)
    #######################################################################
    def tranform_wiki_movies(wikipedia_filename):
        #load a clean movie dataframe
        wiki_movies_df = movies_dataframe(wikipedia_filename)
        
        #remove all duplicate rows that have the same imdb_id
        #imdb_id can extract from imdb_link (Patter: tt1234567)
        wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
        wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)
        
        #get columns that contains less than 90% of all values
        #Project to new set of columns
        wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9] 
        wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]
        
        #Create a new column to number value from old column string
        #box_office
        parse_to_number(wiki_movies_df,'Box office', 'box_office')
        #budget
        parse_to_number(wiki_movies_df,'Budget', 'budget')
        
        #date pattern
        # May 17, 1990
        date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]\d,\s\d{4}'
        # 1990.03.24
        date_form_two = r'\d{4}.[01]\d.[123]\d'
        # July 1998
        date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
        # 2018
        date_form_four = r'\d{4}'
        
        #transfrom Release date to uniform datetime
        release_date = wiki_movies_df['Release date'].dropna()
        release_date = release_date.apply(lambda x: ' '.join(x) if type(x) == list else x)
        release_date = release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')
        wiki_movies_df['release_date'] = pd.to_datetime(release_date[0], infer_datetime_format=True)
        wiki_movies_df.drop('Release date', axis=1, inplace=True)
        
        #tranform running time to number of minutes
        running_time = wiki_movies_df['Running time'].dropna()
        running_time = running_time.apply(lambda x: ' '.join(x) if type(x) == list else x)
        running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
        running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
        wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
        wiki_movies_df.drop('Running time', axis=1, inplace=True)
        
        return wiki_movies_df
    #######################################################################
    def tranform_kaggle_metadata(kaggle_meta_filname):
        #load kaggle_metadata
        kaggle_metadata_df = pd.read_csv(f'{DATA_PATH}{kaggle_meta_filname}',low_memory=False)
        #keep the row with adult is false and drop adult column after
        kaggle_metadata_df = kaggle_metadata_df.loc[kaggle_metadata_df['adult'] == 'False'].drop('adult',axis='columns')

        #Turn off all error to NaN ot NaT instead of raise error by default.
        #If ‘coerce’, then invalid parsing will be set as NaN. instead raise issue

        #convert to boolean for video column
        kaggle_metadata_df['video'] = kaggle_metadata_df['video'] == 'True'
        #convert to number for budget column
        kaggle_metadata_df['budget'] = pd.to_numeric(kaggle_metadata_df['budget'],errors='coerce') 
        #convert to number for id column
        kaggle_metadata_df['id'] = pd.to_numeric(kaggle_metadata_df['id'], errors='coerce')
        #convert to number for popularity column
        kaggle_metadata_df['popularity'] = pd.to_numeric(kaggle_metadata_df['popularity'], errors='coerce')
        #convert to datetime for release_date
        kaggle_metadata_df['release_date'] = pd.to_datetime(kaggle_metadata_df['release_date'], errors='coerce')
 
        return kaggle_metadata_df
    #######################################################################

    #######################################################################
    
    #Extract and Transform wikipedia movie to meaningful data
    wiki_movies_df = tranform_wiki_movies(wikipedia_filename)
    #Extract and Transform kaggle meta to meaningful data
    kaggle_metadata_df = tranform_kaggle_metadata(kaggle_meta_filname)
   
    ######################################################################
    
    #Merge data on imdb_id with inner join
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata_df, on='imdb_id', suffixes=['_wiki','_kaggle'])
    #Drop all movies release date
    movies_df = movies_df.drop(movies_df[(movies_df['release_date_wiki'] > '1996-01-01') & (movies_df['release_date_kaggle'] < '1965-01-01')].index)
    #Let Drop douplicated columns and keep more accurate column information
    movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)
    #Let Fill the missing data to one column before drop the orther
    fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')
    
    #project dataframe to new set columns
    movies_df = movies_df[['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                       'genres','original_language','overview','spoken_languages','Country',
                       'production_companies','production_countries','Distributor',
                       'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                      ]]
    
    #Rename column names
    movies_df.rename({'id':'kaggle_id',
                  'title_kaggle':'title',
                  'url':'wikipedia_url',
                  'budget_kaggle':'budget',
                  'release_date_kaggle':'release_date',
                  'Country':'country',
                  'Distributor':'distributor',
                  'Producer(s)':'producers',
                  'Director':'director',
                  'Starring':'starring',
                  'Cinematography':'cinematography',
                  'Editor(s)':'editors',
                  'Writer(s)':'writers',
                  'Composer(s)':'composers',
                  'Based on':'based_on'
                 }, axis='columns', inplace=True)
    
    #####################################################
    
    #Save to Database
    db_string = f"postgres://postgres:{db_password}@127.0.0.1:5432/movie_data"
    engine = create_engine(db_string)
    
    #Drop table before inserts
    movies_df.to_sql(name='movies', con=engine,if_exists='replace')
    print("Total rows export to SQL:",len(movies_df))
    print("Export Movies dataframe was successfull")
    
    
    # create a variable for the number of rows imported
    rows_imported = 0
    # get the start_time from time.time()
    start_time = time.time()
    # first read
    is_first_read = True
    for data in pd.read_csv(f'{DATA_PATH}{kaggle_rating_filename}', chunksize=1000000):

        # print out the range of rows that are being imported
        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
       
        #End sure drop table before insert
        if is_first_read:
            data.to_sql(name='ratings', con=engine, if_exists='replace')
            is_first_read=False
        else:
            data.to_sql(name='ratings', con=engine, if_exists='append')

        # increment the number of rows imported by the chunksize
        rows_imported += len(data)

        # add elapsed time to final print out
        print(f'Done. {time.time() - start_time} total seconds elapsed')
    
    print("ETL was completed")

In [4]:
#Execute the ETL Process
ETL_Movies_Process(wikipedia_filename,kaggle_meta_filname,kaggle_rating_filename)

Total rows export to SQL: 6051
Export Movies dataframe was successfull
importing rows 0 to 1000000...Done. 77.92512369155884 total seconds elapsed
importing rows 1000000 to 2000000...Done. 158.05686569213867 total seconds elapsed
importing rows 2000000 to 3000000...Done. 236.97258973121643 total seconds elapsed
importing rows 3000000 to 4000000...Done. 320.8134112358093 total seconds elapsed
importing rows 4000000 to 5000000...Done. 403.5784981250763 total seconds elapsed
importing rows 5000000 to 6000000...Done. 492.5768892765045 total seconds elapsed
importing rows 6000000 to 7000000...Done. 576.3061325550079 total seconds elapsed
importing rows 7000000 to 8000000...Done. 656.3751928806305 total seconds elapsed
importing rows 8000000 to 9000000...Done. 738.3340067863464 total seconds elapsed
importing rows 9000000 to 10000000...Done. 824.957765340805 total seconds elapsed
importing rows 10000000 to 11000000...Done. 909.865225315094 total seconds elapsed
importing rows 11000000 to 120

In [5]:
    #def tranform_kaggle_ratings(kaggle_rating_filename):
    ##Load ratings to data frame
    #kaggle_ratings_df = pd.read_csv(f'{DATA_PATH}{kaggle_rating_filename}')
    ##convert to datetime for timestamp
    #kaggle_ratings_df['timestamp'] = pd.to_datetime(kaggle_ratings_df['timestamp'], unit='s', errors='coerce') 
    #return kaggle_ratings_df

In [6]:
    ##performace this merge with SQL
    ##Merge data on kaggle_id with left_on
    ##create the pivot table with index is movieId
    #kaggle_ratings_df = kaggle_ratings_df.groupby(['movieId','rating'], as_index=False).count() \
    #                .rename({'userId':'count'}, axis=1) \
    #                .pivot(index='movieId',columns='rating', values='count')
    ##Create Column names
    #rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]
    ## merge two data frame on column and index
    #movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
    ## fill all raning is n/a to Zero
    #movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)