## Challenge 8 - Deliverable 4
Add the movies_df DataFrame and MovieLens rating CSV data to a SQL database.

In [11]:
import json
import pandas as pd
import numpy as np

import re

from sqlalchemy import create_engine
import psycopg2

import time

from config import db_password 

# Set the display so that all columns in DF are returned
pd.set_option('display.max_columns', None)

In [12]:
#  Add the clean movie function that takes in the argument, "movie".

def clean_movie(movie):

    movie = dict(movie) #create a non-destructive copy

    # Creates the dictionary that will hold the alternate title values if found
    alt_titles = {}
    
    # Loop through a list of all alternative title keys.
    # combine alternate titles into one list
    for key in ['Also known as','Arabic','Cantonese','Chinese','French',
                'Hangul','Hebrew','Hepburn','Japanese','Literally',
                'Mandarin','McCune–Reischauer','Original title','Polish',
                'Revised Romanization','Romanized','Russian',
                'Simplified','Traditional','Yiddish']:

        # Check if the current key exists in the movie object.
        if key in movie:
            # If so, add to the alternative titles dictionary.
            alt_titles[key] = movie[key]
            # And remove the key/value pair for the original column
            movie.pop(key)

    # Update the Movie dictionary passed in to have the new alt_titles dictionary
    if len(alt_titles) > 0:
        movie['alt_titles'] = alt_titles

    # merge column names
    def change_column_name(old_name, new_name):
        if old_name in movie:
            movie[new_name] = movie.pop(old_name)

    # Call the change_column_names function for each set of columns
    # that should be joined        
    change_column_name('Adaptation by', 'Writer(s)')
    change_column_name('Country of origin', 'Country')
    change_column_name('Directed by', 'Director')
    change_column_name('Distributed by', 'Distributor')
    change_column_name('Edited by', 'Editor(s)')
    change_column_name('Length', 'Running time')
    change_column_name('Original release', 'Release date')
    change_column_name('Music by', 'Composer(s)')
    change_column_name('Produced by', 'Producer(s)')
    change_column_name('Producer', 'Producer(s)')
    change_column_name('Productioncompanies ', 'Production company(s)')
    change_column_name('Productioncompany ', 'Production company(s)')
    change_column_name('Released', 'Release Date')
    change_column_name('Release Date', 'Release date')
    change_column_name('Screen story by', 'Writer(s)')
    change_column_name('Screenplay by', 'Writer(s)')
    change_column_name('Story by', 'Writer(s)')
    change_column_name('Theme music composer', 'Composer(s)')
    change_column_name('Written by', 'Writer(s)')

    return movie

In [13]:
# 1 Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)

def extract_transform_load(wiki_file, kaggle_file, ratings_file):

    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    kaggle_metadata = pd.read_csv(f'{kaggle_file}', low_memory=False)
    ratings = pd.read_csv(f'{ratings_file}')

    # Open the read the Wikipedia data JSON file.
    with open(f'{wiki_file}', mode='r') as file:
        wiki_movies_raw = json.load(file)  # Load the file sinto a list of Dictionaries NOT RIGHT TO JSON

    # Write a list comprehension to filter out TV shows.
    wiki_movies = [movie for movie in wiki_movies_raw 
                         if ('No. of episodes' not in movie)]
           
    # Write a list comprehension to iterate through the cleaned wiki movies list
    # and call the clean_movie function on each movie.
    clean_movies = [clean_movie(movie) for movie in wiki_movies]

    #  Read in the cleaned movies list from Step 4 as a DataFrame.
    wiki_movies_df = pd.DataFrame(clean_movies)

    #  Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    #  dropping any imdb_id duplicates. If there is an error, capture and print the exception.
    try:
        # Extract the imdb_id from the imdb_link using RegEx
        wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
        # Drop duplicate IMDB IDs
        wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)

    except Exception as e:
        print(f'Exception raised imdb_id parsing.  Exception is {e}.')

    #  Write a list comprehension to keep the columns that have non null values from the wiki_movies_df DataFrame.

    ################################################################################################################################
    ## Note - This reads like I should keep all columns that have one or more not null values but doing so would make my column list
    ##        much longer.  I applied the 90% condition that was in the module to get 23 columns in my results even though
    ##        the instructions did not say to apply the condition.
    ################################################################################################################################
    wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
    
    # We select these columns using the list of columns we want to keep to remove the columns that we don't want
    wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]

      # Create a variable that will hold the non-null values from the “Box office” column.
    box_office = wiki_movies_df['Box office'].dropna()
    
    # Convert the box office data created in Step 8 to string values using the lambda and join functions.
    box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)

    # Write a regular expression to match the six elements of "form_one" of the box office data.
    form_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on'

    # Write a regular expression to match the three elements of "form_two" of the box office data.
    form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'

    # Add the parse_dollars function.
    # Function to turn the extracted values into the dollar format we want
    def parse_dollars(s):

        # if s is not a string type, return NaN
        if type(s) != str:
            return np.nan

        # if input is of the form $###.# million
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):    

            # remove dollar sign and " million"
            # \$ looks for the dollar sign and sets it to '' if found
            # \s looks for spaces and sets it to '' if found
            # [a-zA-Z] looks for alphabet characters and set them to ''
            # the ,s on re.sub is to set the returning value to a string
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a million
            value = float(s) * 10**6

            # return value
            return value

        # if input is of the form $###.# billion
        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):    

            # remove dollar sign and " billion"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a billion
            value = float(s) * 10**9

            # return value
            return value
        # if input is of the form $###,###,###
        elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):

            # remove dollar sign and commas
            s = re.sub('\$|,','', s)

            # convert to float
            value = float(s)

            # return value
            return value
            
        # otherwise, return NaN
        else:
            return np.nan   
    
        
    # Clean the box office column in the wiki_movies_df DataFrame.
    box_office = box_office.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    wiki_movies_df.drop('Box office', axis=1, inplace=True) # We no longer need the Box Office column, so we'll just drop it

    # Clean the budget column in the wiki_movies_df DataFrame.
    # Create a series for the budget, any values missing will be dropped 
    budget = wiki_movies_df['Budget'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x) # Drop null values & Convert any lists to strings
    budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True) # remove any values between a dollar sign and a hyphen (for budgets given in ranges)
    budget = budget.str.replace(r'\[\d+\]\s*', '', regex=True) # Remove the citation references
    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', \
        flags=re.IGNORECASE)[0].apply(parse_dollars) #Run Parse Dollars for any budgets we could extract using RegEx
    # Don't drop this column because it's in the challenge screenshot 
    # wiki_movies_df.drop('Budget', axis=1, inplace=True) #Drop the original budget column since we have a clean one now

    # Clean the release date column in the wiki_movies_df DataFrame.
    release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x) # Drop null values & Convert any lists to strings
    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]?\d,\s\d{4}' #  matches the month, dd, yyyy format
    date_form_two = r'\d{4}.[01]\d.[0123]\d' # matches these two formats, yyyy-mm-dd and yyyy/mm/dd
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}' # matches month yyyy
    date_form_four = r'\d{4}' # matches yyyy
    
    wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract\
        (f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True) # Extract the dates with RegEx and apply to_datetime to convert to a date
    # wiki_movies_df.drop('Release date', axis=1, inplace=True) #Drop the original release date column since we have a clean one now

    # Clean the running time column in the wiki_movies_df DataFrame.
    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x) # make a variable that the non-null values of Release date in the DataFrame, converting lists to strings
    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m') # Extract only digits, and we want to allow for both hour/mn and minutes only  patterns.
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0) # Convert to numeric value, set nulls to 0 with coerce
    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + \
        row[1] if row[2] == 0 else row[2], axis=1) # Convert to all minutes and save running time in minutes to new column
    wiki_movies_df.drop('Running time', axis=1, inplace=True)

     
    # 2. Clean the Kaggle metadata.
    # Remove adult movies
    kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult',axis='columns')
    
    # Set the datatype of video to Boolean
    kaggle_metadata['video'] = kaggle_metadata['video'] == 'True'

    # To convert the numbers use the to_numeric()
    # Errors= argument is set to 'raise', so we'll know if there's any data that can't be converted to numbers.
    kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
    kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
    kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')

    # Convert the date/time - It is already formatted properly in this example, Use RegEx or Str function if need to reformat
    kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])

    # Convert Ratings timestamp using pandas to_datetime which is currently in Unix Epoch format
    ratings['timestamp'] = pd.to_datetime(ratings['timestamp'], unit='s')


    # 3. Merged the two DataFrames into the movies DataFrame.
    # Inner Join on imdb_id column.  If there are duplicate column names, the suffix 
    # specified for each column is used
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])

    # 4. Drop unnecessary columns from the merged DataFrame.
    movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)

    # 5. Add in the function to fill in the missing Kaggle data.
    # Pass in the dataframe, the name of the Kaggle Column to change, the Wiki Column to use if Kaggle column is 0
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):

        # Use the apply() method and a LAMBDA function to replace any 0 values in the Kaggle column with the wiki column value
        df[kaggle_column] = df.apply(
            lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column]
            , axis=1)

        # Drop the wiki column after the replacement has been done
        df.drop(columns=wiki_column, inplace=True)


    # 6. Call the function in Step 5 with the DataFrame and columns as the arguments.
    fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')

    # 7. Filter the movies DataFrame for specific columns.
    movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                        'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                        'genres','original_language','overview','spoken_languages','Country',
                        'production_companies','production_countries','Distributor',
                        'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                        ]]

    # 8. Rename the columns in the movies DataFrame.
    movies_df.rename({'id':'kaggle_id',
                    'title_kaggle':'title',
                    'url':'wikipedia_url',
                    'budget_kaggle':'budget',
                    'release_date_kaggle':'release_date',
                    'Country':'country',
                    'Distributor':'distributor',
                    'Producer(s)':'producers',
                    'Director':'director',
                    'Starring':'starring',
                    'Cinematography':'cinematography',
                    'Editor(s)':'editors',
                    'Writer(s)':'writers',
                    'Composer(s)':'composers',
                    'Based on':'based_on'
                    }, axis='columns', inplace=True)

    # 9. Transform and merge the ratings DataFrame.

    # Get the count of ratings for each movie by using GroupBy on movieId and rating 
    # Pivot this data so that movieId is the index
    # the columns will be all the rating values, and the rows will be the counts for each rating value.
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                    .rename({'userId':'count'}, axis=1) \
                    .pivot(index='movieId',columns='rating', values='count')


    # We want to rename the columns so they're easier to understand. 
    # We'll prepend rating_ to each column with a list comprehension:
    rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]

    # Left merge the ratings data onto the movies table and create a new dataframe 
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')

    # Replace NaN values 0 for the DataFame columns that match the list of columns we pull from the ratings_counts table
    movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)

    # Comment out for Deliverable 4
    # return wiki_movies_df, movies_with_ratings_df, movies_df

    # Deliverable 4, Step 3
    # Connect to PostgreSQL movie_data DB
    db_string = f"postgresql://postgres:{db_password}@127.0.0.1:5432/movie_data"

    # Create the database engine with the following line 
    engine = create_engine(db_string)

    # Save the movies_df DataFrame to a SQL table "movies"- Replace the table if it already exists
    movies_df.to_sql(name='movies', con=engine, if_exists='replace')   

    # create a variable for the number of rows imported
    rows_imported = 0

    # get the start_time from time.time()
    start_time = time.time()

    for data in pd.read_csv(f'{ratings_file}', chunksize=1000000):

        # print out the range of rows that are being imported
        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')

        # First time through, drop the table 
        if rows_imported == 0:
            # drop the table if it exists on the first chunk through
            data.to_sql(name='ratings', con=engine, if_exists='replace')
        else:
            data.to_sql(name='ratings', con=engine, if_exists='append')

        # increment the number of rows imported by the size of 'data'
        rows_imported += len(data)

        # print that the rows have finished importing
        print('Done.')

    # add elapsed time to final print out
    print(f'Done. {time.time() - start_time} total seconds elapsed')

In [14]:
# 10. Create the path to your file directory and variables for the three files.
file_dir = "Resources"
# Wikipedia data
wiki_file = f'{file_dir}/wikipedia-movies.json'
# Kaggle metadata
kaggle_file = f'{file_dir}/movies_metadata.csv'
# MovieLens rating data.
ratings_file = f'{file_dir}/ratings.csv'

In [15]:
# 11. Set the three variables equal to the function created in D1.
extract_transform_load(wiki_file, kaggle_file, ratings_file)

importing rows 0 to 1000000...Done.
importing rows 1000000 to 2000000...Done.
importing rows 2000000 to 3000000...Done.
importing rows 3000000 to 4000000...Done.
importing rows 4000000 to 5000000...Done.
importing rows 5000000 to 6000000...Done.
importing rows 6000000 to 7000000...Done.
importing rows 7000000 to 8000000...Done.
importing rows 8000000 to 9000000...Done.
importing rows 9000000 to 10000000...Done.
importing rows 10000000 to 11000000...Done.
importing rows 11000000 to 12000000...Done.
importing rows 12000000 to 13000000...Done.
importing rows 13000000 to 14000000...Done.
importing rows 14000000 to 15000000...Done.
importing rows 15000000 to 16000000...Done.
importing rows 16000000 to 17000000...Done.
importing rows 17000000 to 18000000...Done.
importing rows 18000000 to 19000000...Done.
importing rows 19000000 to 20000000...Done.
importing rows 20000000 to 21000000...Done.
importing rows 21000000 to 22000000...Done.
importing rows 22000000 to 23000000...Done.
importing row