In [2]:
import json
import pandas as pd
import numpy as np

import re

from config import db_password

from sqlalchemy import create_engine
import psycopg2
import time

In [3]:
def clean_movie(movie):

    movie = dict(movie) #Non-destructive copy of local variable

    alternative_titles = {} #Create dict of all alternative title keys/values

    #Loop through list of alternative title keys
    for key in ['Also known as', 'Arabic', 'Cantonese', 'Chinese', 'French',
                'Hangul', 'Hebrew', 'Hepburn', 'Japanese', 'Literally', 'Mandarin',
                'McCune–Reischauer', 'Original title', 'Polish', 'Revised Romanization',
                'Romanized', 'Russian', 'Simplified', 'Traditional', 'Yiddish']:
    
    #If key exists, remove key-value pair and add to alternative titles dict
        if key in movie:

            alternative_titles[key] = movie[key]

            movie.pop(key)

        if len(alternative_titles) > 0:

            movie['alternative_titles'] = alternative_titles

    #Merge column names

    def change_column_name(old_name, new_name):

        if old_name in movie:

            movie[new_name] = movie.pop(old_name)

    change_column_name('Adaptation by', 'Writer(s)')

    change_column_name('Country of origin', 'Country')

    change_column_name('Directed by', 'Director')

    change_column_name('Distributed by', 'Distributor')

    change_column_name('Edited by', 'Editor(s)')

    change_column_name('Length', 'Running time')

    change_column_name('Original release', 'Release date')

    change_column_name('Music by', 'Composer(s)')

    change_column_name('Produced by', 'Producer(s)')

    change_column_name('Producer', 'Producer(s)')

    change_column_name('Productioncompanies ', 'Production company(s)')

    change_column_name('Productioncompany ', 'Production company(s)')

    change_column_name('Released', 'Release Date')

    change_column_name('Release Date', 'Release date')

    change_column_name('Screen story by', 'Writer(s)')

    change_column_name('Screenplay by', 'Writer(s)')

    change_column_name('Story by', 'Writer(s)')

    change_column_name('Theme music composer', 'Composer(s)')

    change_column_name('Written by', 'Writer(s)')

    return movie

In [4]:
def file_read_clean():

    # Reads in the three data files

    kaggle_metadata = pd.read_csv(kaggle_file, low_memory=False)

    ratings = pd.read_csv(ratings_file)

    with open(wiki_file, mode='r') as file:

        wiki_movies_raw = json.load(file)

    # Write a list comprehension the filters out TV shows from wiki_movies_raw

    wiki_movies = [movie for movie in wiki_movies_raw

              if('Director' in movie or 'Directed by' in movie) 

                  and 'imdb_link' in movie
                  
                  and 'No. of episodes' not in movie]

    # Write another list comprehension to iterate through cleaned wiki movies

    clean_wiki_movies = [clean_movie(movie) for movie in wiki_movies]

    # Read in cleaned movies as a DataFrame
    
    clean_wiki_movies_df = pd.DataFrame(clean_wiki_movies)

    # Use a try/except block to extract imdb ids while dropping any duplicated ids. Print out error type if error occurs

    try:
        clean_wiki_movies_df['imdb_id'] = clean_wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')

        clean_wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)

    except Exception as e:
        print(e)

    # Keeping columns in which null values makes up less than 90% of the data for that column
    
    kept_columns = [column for column in clean_wiki_movies_df.columns if clean_wiki_movies_df[column].isnull().sum() < len(clean_wiki_movies_df) * 0.9]

    wiki_movies_df = clean_wiki_movies_df[kept_columns]

    wiki_movies_df.head()

    box_office = wiki_movies_df['Box office'].dropna()

    # Convert box office data to strings to apply regular expressions
    
    box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)

    # Match elements from first form of writing out box office data

    form_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on'

    # Match elements from second form of writing out box office data

    form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'

    # Create function to parse strings using regex, remove certain values, and convert value to a float

    def parse_dollars(s):

        #If s is not a string, return NaN
        if type(s) != str:
            return np.nan

        #If input is of the form $###.# million
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):

            #Remove dollar sign and ' million'
            s = re.sub('\$|\s|[a-zA-Z]', '', s)

            #Convert to float and multiply by a million
            value = float(s) * 10**6

            #Return value
            return value

        #If input is of the form $###.# billion
        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):

            #Remove dollar sign and ' billion'
            s = re.sub('\$|\s|[a-zA-Z]', '', s)

            #Convert to float and multipy by a billion
            value = float(s) * 10**9

            #Return value
            return value

        #If input is of the form $###,###,###
        elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):

            #Remove dollar sign and commas
            s = re.sub('\$|,','', s)

            #Convert to float
            value = float(s)

            #Return value
            return value

        #Otherwise, return NaN
        else:
            
            return np.nan

    # Cleaning box office data 

    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)

    wiki_movies_df.drop('Box office', axis=1, inplace=True)

    # Cleaning budget data

    budget = wiki_movies_df['Budget'].dropna()

    budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)

    budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)

    budget = budget.str.replace(r'\[\d+\]s*', '')

    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)

    wiki_movies_df.drop('Budget', axis=1, inplace=True)

    # Cleaning release date data

    release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)

    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]?\d,\s\d{4}'

    date_form_two = r'\d{4}.[01]\d.[0123]\d'

    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'

    date_form_four = r'\d{4}'

    release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})', flags=re.IGNORECASE)

    wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)

    wiki_movies_df.drop('Release date', axis=1, inplace=True)

    # Cleaning running time data

    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)

    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')

    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)

    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)

    wiki_movies_df.drop('Running time', axis=1, inplace=True)

    # Cleaning kaggle data

    kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult', axis='columns')

    kaggle_metadata['video'] = kaggle_metadata['video'] == 'True'

    kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)

    kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')

    kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')

    kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])

    # Merge wiki movies and kaggle data
    
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki', '_kaggle'])

    # Drop uncessary columns
    
    movies_df.drop(columns=['title_wiki', 'release_date_wiki', 'Language', 'Production company(s)'], inplace=True)

    # Function used to fill missing kaggle data with wiki data
    
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):

        df[kaggle_column] = df.apply(

            lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column]

            , axis=1)

        df.drop(columns=wiki_column, inplace=True)

    # Assign arguments for fill missing data function

    fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')

    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')

    fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')

    # Include the necessary columns

    movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                       'genres','original_language','overview','spoken_languages','Country',
                       'production_companies','production_countries','Distributor',
                       'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                      ]]

    # Rename columns 

    movies_df.rename({'id':'kaggle_id',
                  'title_kaggle':'title',
                  'url':'wikipedia_url',
                  'budget_kaggle':'budget',
                  'release_date_kaggle':'release_date',
                  'Country':'country',
                  'Distributor':'distributor',
                  'Producer(s)':'producers',
                  'Director':'director',
                  'Starring':'starring',
                  'Cinematography':'cinematography',
                  'Editor(s)':'editors',
                  'Writer(s)':'writers',
                  'Composer(s)':'composers',
                  'Based on':'based_on'
                 }, axis='columns', inplace=True)

    
    ratings_counts = ratings.groupby(['movieId', 'rating'], as_index=False).count() \
                .rename({'userId':'count'}, axis=1) \
                .pivot(index='movieId', columns='rating', values='count')

    ratings_counts.columns = ['rating_' + str(col) for col in ratings_counts.columns]

    movies_with_ratings_df = pd.merge(movies_df, ratings_counts, left_on='kaggle_id', right_index=True, how='left')

    movies_with_ratings_df[ratings_counts.columns] = movies_with_ratings_df[ratings_counts.columns].fillna(0)

    db_string = f'postgresql://postgres:{db_password}@127.0.0.1:5432/movie_data'

    engine = create_engine(db_string)

    movies_df.to_sql(name='movies', con=engine)

    #Create variable for number of rows imported
    rows_imported = 0

    #Get the start time
    start_time = time.time()

    for data in pd.read_csv(f'{file_dir}ratings.csv', chunksize=1000000):

        #Print out range of rows that are being imported
        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')

        data.to_sql(name='ratings', con=engine, if_exists='append')

        #Increment number of rows imported by the size of 'data'
        rows_imported += len(data)

        #Get the elapsed time
        #Print that the rows have finished importing
        print(f'Done. {time.time() - start_time} total seconds elapsed')



In [5]:
file_dir = 'C:/Users/Ryan/Documents/BootCamp/SQL/Movies_ETL/Raw_Data_Files/'

kaggle_file = f'{file_dir}movies_metadata.csv'

ratings_file = f'{file_dir}ratings.csv'

wiki_file = f'{file_dir}wikipedia-movies.json'

In [6]:
wiki_file, kaggle_file, ratings_file = file_read_clean() 

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  errors=errors,
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-ve

importing rows 0 to 1000000...Done. 23.11893367767334 total seconds elapsed
importing rows 1000000 to 2000000...Done. 47.41886901855469 total seconds elapsed
importing rows 2000000 to 3000000...Done. 70.43714833259583 total seconds elapsed
importing rows 3000000 to 4000000...Done. 94.61476159095764 total seconds elapsed
importing rows 4000000 to 5000000...Done. 118.08230495452881 total seconds elapsed
importing rows 5000000 to 6000000...Done. 142.01696705818176 total seconds elapsed
importing rows 6000000 to 7000000...Done. 167.06726384162903 total seconds elapsed
importing rows 7000000 to 8000000...Done. 191.75387573242188 total seconds elapsed
importing rows 8000000 to 9000000...Done. 220.7012062072754 total seconds elapsed
importing rows 9000000 to 10000000...Done. 245.7932629585266 total seconds elapsed
importing rows 10000000 to 11000000...Done. 272.2202513217926 total seconds elapsed
importing rows 11000000 to 12000000...Done. 296.37959575653076 total seconds elapsed
importing ro

TypeError: cannot unpack non-iterable NoneType object