In [1]:
import json
import pandas as pd
import numpy as np
import re
import sqlalchemy
from sqlalchemy import create_engine
from config import db_password
import time

In [50]:
# Create a function that takes in three arguments

def etl_auto(wiki, kaggle, rating):
    file_dir = 'C:/Users/shula/OneDrive/Desktop/UCB_Data_052020/UCB-202005-downloads/data'
    
    # Extract data from multiple sources 
    
    try:
        kaggle_metadata = pd.read_csv(f'{file_dir}/{kaggle}.csv', low_memory = False) 
        print('kaggle data converted')
        ratings= pd.read_csv(f'{file_dir}/{rating}.csv', low_memory = False)       
        print('rating data converted')
        wiki_df = pd.read_csv(f'{file_dir}/{wiki}.csv', low_memory = False)   
        print('wiki data converted')
    except FileNotFoundError:    
            with open(f'{file_dir}/{wiki}.json', mode='r') as f:
                raw = json.load(f)
            print('wiki data converted')
            pass
    
    # Transform wiki data and convert into dataframe 
    # Clean duplicated columns and rename columns 
    wiki_movies = [movie for movie in raw
                   if ('Director' in movie or 'Directed by' in movie) 
                   and 'imdb_link' in movie
                   and 'No. of episodes' not in movie]
    def clean_movie(movie):
        alt_titles ={}
        # Combine altenative titles into one list 'alt_titles' 
        for key in ['Also known as','Arabic','Cantonese','Chinese','French',
                    'Hangul','Hebrew','Hepburn','Japanese','Literally',
                    'Mandarin','McCune–Reischauer','Original title','Polish',
                    'Revised Romanization','Romanized','Russian',
                    'Simplified','Traditional','Yiddish']:
            if key in movie:
                alt_titles[key] = movie[key]
                movie.pop(key)
        if len(alt_titles) > 0:
            movie['alt_titles'] = alt_titles
        # Merge similar columns name
        def change_column_name(old_name, new_name):
            if old_name in movie:
                movie[new_name] = movie.pop(old_name)
        change_column_name('Adaptation by', 'Writer(s)')
        change_column_name('Country of origin', 'Country')
        change_column_name('Directed by', 'Director')
        change_column_name('Distributed by', 'Distributor')
        change_column_name('Edited by', 'Editor(s)')
        change_column_name('Length', 'Running time')
        change_column_name('Original release', 'Release date')
        change_column_name('Music by', 'Composer(s)')
        change_column_name('Produced by', 'Producer(s)')
        change_column_name('Producer', 'Producer(s)')
        change_column_name('Productioncompanies ', 'Production company(s)')
        change_column_name('Productioncompany ', 'Production company(s)')
        change_column_name('Released', 'Release Date')
        change_column_name('Release Date', 'Release date')
        change_column_name('Screen story by', 'Writer(s)')
        change_column_name('Screenplay by', 'Writer(s)')
        change_column_name('Story by', 'Writer(s)')
        change_column_name('Theme music composer', 'Composer(s)')
        change_column_name('Written by', 'Writer(s)')
        # Return output
        return movie
    try:
        clean_movies = [clean_movie(movie) for movie in wiki_movies]
    except NameError:
            pass
    wiki_movies_df = pd.DataFrame(clean_movies)
    # Substitute imdb_id with string extracted from imdb_link
    wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
    wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)
    # Catch columns that has more than 10% null  
    not_null_col = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
    wiki_movies_df = wiki_movies_df.loc[:,(not_null_col)]
    # Drop na and convert list 
    box_office = wiki_movies_df['Box office'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    budget = wiki_movies_df['Budget'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    # Parse box office values 
    form_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on'
    form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'
    box_office = box_office.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    # Define function to convert box office value into float
    def parse_dollars(s):
        if type(s) != str:
            return np.nan
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):
            s = re.sub('\$|\s|[a-zA-Z]','',s)
            value = float(s)*10**6
            return(value)
        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):
            s = re.sub('\$|\s|[a-zA-Z]','',s)
            value = float(s)*10**9
            return(value)
        elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):
            s = re.sub('\$|,','',s)
            value = float(s)
            return(value)
        else:
            return np.nan
    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    wiki_movies_df.drop('Box office', axis=1, inplace=True)
    # Parse budget values
    budget = budget.str.replace(r'\$.*[-—–](?![a-z])','$',regex = True)
    budget = budget.str.replace(r'\[\d+\]\s*', '')
    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    wiki_movies_df.drop('Budget', axis=1, inplace=True)
    # Parse release date 
    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]\d,\s\d{4}'
    date_form_two = r'\d{4}.[01]\d.[123]\d'
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
    date_form_four = r'\d{4}'
    wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)
    # Parse running time
    running_time[running_time.str.contains(r'^\d*\s*minutes$', flags=re.IGNORECASE) != True]
    running_time[running_time.str.contains(r'^\d*\s*m', flags=re.IGNORECASE) != True]
    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
    wiki_movies_df.drop('Running time', axis=1, inplace=True)
    print('wiki data transformed')
    
    # Transform kaggle data
    kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult',axis='columns')
    kaggle_metadata['video'] = kaggle_metadata['video'] == 'True'
    kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
    kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
    kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')
    kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])
    print('kaggle data transformed')
    
    # Transform rating data 
    ratings['timestamp'] = pd.to_datetime(ratings['timestamp'], unit='s')
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                .rename({'userId':'count'}, axis=1) \
                .pivot(index='movieId',columns='rating', values='count')
    rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]
    print('rating data transformed')
    
    # Merge e data set 
    # Merge wiki and kaggle 
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])
    try:
        movies_df = movies_df.drop(movies_df[(movies_df['release_date_wiki'] > '1996-01-01') & (movies_df['release_date_kaggle'] < '1965-01-01')].index)
    except KeyError:
           pass 
    movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)
    # fill missing kaggle data with wiki data 
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        df[kaggle_column] = df.apply(lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column], axis=1)
        df.drop(columns=wiki_column, inplace=True)
    fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')
    # Remove useless columns
    def drop_col(df):    
        for col in df.columns:
            lists_to_tuples = lambda x: tuple(x) if type(x) == list else x
            value_counts = df[col].apply(lists_to_tuples).value_counts(dropna=False)
            num_values = len(value_counts)
            if num_values == 1:
                df.drop(columns=[col],inplace=True)
    drop_col(movies_df)
    movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                       'genres','original_language','overview','spoken_languages','Country',
                       'production_companies','production_countries','Distributor',
                       'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                      ]]
     # Rename columns
    movies_df.rename({'id':'kaggle_id',
                  'title_kaggle':'title',
                  'url':'wikipedia_url',
                  'budget_kaggle':'budget',
                  'release_date_kaggle':'release_date',
                  'Country':'country',
                  'Distributor':'distributor',
                  'Producer(s)':'producers',
                  'Director':'director',
                  'Starring':'starring',
                  'Cinematography':'cinematography',
                  'Editor(s)':'editors',
                  'Writer(s)':'writers',
                  'Composer(s)':'composers',
                  'Based on':'based_on'
                 }, axis='columns', inplace=True)            
    print('merged wiki with kaggle')
    # Merge with rating 
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
    movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)
    print('merged with rating')
    
    # Load movies_df to sql
    db_string = f"postgres://postgres:{db_password}@127.0.0.1:5432/movie_data"
    engine = create_engine(db_string)
    try: 
        movies_df.to_sql(name='movies', con=engine)
        print('Upload movies_df to sql')
    except ValueError: 
            movies_df.to_sql(name='movies', con=engine, if_exists='append')
            print('Upload movies_df to sql')
    except:   
        print('failed to upload') 
        pass     
    # Load df with ratings to sql
    rows_imported = 0
    start_time = time.time()
    for data in pd.read_csv(f'{file_dir}/ratings.csv', chunksize=1000000):
        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
        try: 
            data.to_sql(name='ratings', con=engine, if_exists='append')
            rows_imported += len(data)
            print(f'Done.{time.time() - start_time} total seconds elapsed')
        except ValueError:
            print('failed to upload') 
            pass 
    print('Upload movies_with_ratings to sql')
    #return(movies_df)

In [47]:
etl_auto('wikipedia_movies','movies_metadata','ratings')

kaggle data converted
rating data converted
wiki data converted
wiki data transformed
kaggle data transformed
rating data transformed
merged wiki with kaggle
merged with rating
Upload movies_df to sql
importing rows 0 to 1000000...Done.267.135372877121 total seconds elapsed
importing rows 1000000 to 2000000...Done.504.3378608226776 total seconds elapsed
importing rows 2000000 to 3000000...Done.753.531227350235 total seconds elapsed
importing rows 3000000 to 4000000...Done.1001.0689928531647 total seconds elapsed
importing rows 4000000 to 5000000...Done.1237.8944487571716 total seconds elapsed
importing rows 5000000 to 6000000...Done.1452.0176339149475 total seconds elapsed
importing rows 6000000 to 7000000...Done.1661.0025672912598 total seconds elapsed
importing rows 7000000 to 8000000...Done.1876.6237833499908 total seconds elapsed
importing rows 8000000 to 9000000...Done.2087.4797134399414 total seconds elapsed
importing rows 9000000 to 10000000...Done.2301.4453315734863 total secon