In [1]:
import json
import pandas as pd
import numpy as np
import os
import re

from sqlalchemy import create_engine
import psycopg2

# 1 uncomment the # from config import db_password so this code is working.
from config import db_password

import time

In [2]:
print(db_password)

gobucks1


In [3]:
#  Add the clean movie function that takes in the argument, "movie".
def clean_movie(movie):
    movie = dict(movie) #create a non-destructive copy
    alt_titles = {}
    # combine alternate titles into one list
    for key in ['Also known as','Arabic','Cantonese','Chinese','French',
                'Hangul','Hebrew','Hepburn','Japanese','Literally',
                'Mandarin','McCune-Reischauer','Original title','Polish',
                'Revised Romanization','Romanized','Russian',
                'Simplified','Traditional','Yiddish']:
        if key in movie:
            alt_titles[key] = movie[key]
            movie.pop(key)
    if len(alt_titles) > 0:
        movie['alt_titles'] = alt_titles

    # merge column names
    def change_column_name(old_name, new_name):
        if old_name in movie:
            movie[new_name] = movie.pop(old_name)
    change_column_name('Adaptation by', 'Writer(s)')
    change_column_name('Country of origin', 'Country')
    change_column_name('Directed by', 'Director')
    change_column_name('Distributed by', 'Distributor')
    change_column_name('Edited by', 'Editor(s)')
    change_column_name('Length', 'Running time')
    change_column_name('Original release', 'Release date')
    change_column_name('Music by', 'Composer(s)')
    change_column_name('Produced by', 'Producer(s)')
    change_column_name('Producer', 'Producer(s)')
    change_column_name('Productioncompanies ', 'Production company(s)')
    change_column_name('Productioncompany ', 'Production company(s)')
    change_column_name('Released', 'Release Date')
    change_column_name('Release Date', 'Release date')
    change_column_name('Screen story by', 'Writer(s)')
    change_column_name('Screenplay by', 'Writer(s)')
    change_column_name('Story by', 'Writer(s)')
    change_column_name('Theme music composer', 'Composer(s)')
    change_column_name('Written by', 'Writer(s)')

    return movie

In [4]:
# 1 Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)

def extract_transform_load(wiki_file, kaggle_file, ratings_file):
    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.    
    kaggle_metadata = pd.read_csv(os.path.join('Archive','movies_metadata.csv'), low_memory=False)
    ratings = pd.read_csv(os.path.join('Archive','ratings.csv'))      

    # Open and read the Wikipedia data JSON file.
    with open(os.path.join('Archive','wikipedia-movies.json'), mode='r') as file:
        wiki_movies_raw = json.load(file)         
    
    # Write a list comprehension to filter out TV shows.
    wiki_tv = [tvshows for tvshows in wiki_movies_raw 
        if 'Television series' in tvshows]   

    # Write a list comprehension to iterate through the cleaned wiki movies list
    # and call the clean_movie function on each movie.
    clean_movies = [clean_movie(movie) for movie in wiki_movies_raw]       

    # Read in the cleaned movies list from Step 4 as a DataFrame.
    wiki_movies_df = pd.DataFrame(clean_movies)

    # Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    #  dropping any imdb_id duplicates. If there is an error, capture and print the exception.
    try:
        wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
        wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)            
    except: 
        print("No link to extract")        

    #  Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.
    wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
    wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]    

    # Create a variable that will hold the non-null values from the “Box office” column.
    box_office = wiki_movies_df['Box office'].dropna()
    
    # Convert the box office data created in Step 8 to string values using the lambda and join functions.
    box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)    

    # Write a regular expression to match the six elements of "form_one" of the box office data.
    form_one = r'\$\d+\.?\d*\s*[mb]illion'
    
    # Write a regular expression to match the three elements of "form_two" of the box office data.
    form_two = r'\$\d{1,3}(?:,\d{3})+'

    # Add the parse_dollars function.
    def parse_dollars(s):
        # if s is not a string, return NaN
        if type(s) != str:
            return np.nan

        # if input is of the form $###.# million
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " million"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a million
            value = float(s) * 10**6

            # return value
            return value

        # if input is of the form $###.# billion
        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " billion"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a billion
            value = float(s) * 10**9

            # return value
            return value

        # if input is of the form $###,###,###
        elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):

            # remove dollar sign and commas
            s = re.sub('\$|,','', s)

            # convert to float
            value = float(s)

            # return value
            return value

        # otherwise, return NaN
        else:
            return np.nan       
        
    # Clean the box office column in the wiki_movies_df DataFrame.
    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    wiki_movies_df.drop('Box office', axis=1, inplace=True)
    
    # Clean the budget column in the wiki_movies_df DataFrame.
    # First, we need to preprocess the budget data. Create a budget variable
    budget = wiki_movies_df['Budget'].dropna()
    
    # Convert any lists to strings:
    budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)
    
    # Then remove any values between a dollar sign and a hyphen (for budgets given in ranges):
    budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    
    # Use the same pattern matches that you created to parse the box office data, and apply them without modifications 
    # to the budget data. Then, look at what's left.
    matches_form_one = budget.str.contains(form_one, flags=re.IGNORECASE, na=False)
    matches_form_two = budget.str.contains(form_two, flags=re.IGNORECASE, na=False)
    budget[~matches_form_one & ~matches_form_two]
    
    # Another issue is discovered: citation references (the numbers in square brackets).
    # Remove those with a regular expression.
    budget = budget.str.replace(r'\[\d+\]\s*', '')
    budget[~matches_form_one & ~matches_form_two]
    
    # Ready to parse the budget values
    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    
    # Drop the original Budget column
    wiki_movies_df.drop('Budget', axis=1, inplace=True)    

    # Clean the release date column in the wiki_movies_df DataFrame.
    # Parse Release Date
    # Parsing the release date will follow a similar pattern to parsing box office and budget, but with different forms.

    # First, make a variable that holds the non-null values of Release date in the DataFrame, converting lists to strings:
    release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    
    # The forms we'll be parsing are:

    # 1 Full month name, one- to two-digit day, four-digit year (i.e., January 1, 2000)
    # 2 Four-digit year, two-digit month, two-digit day, with any separator (i.e., 2000-01-01)
    # 3 Full month name, four-digit year (i.e., January 2000)
    # 4 Four-digit year
    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]?\d,\s\d{4}'
    date_form_two = r'\d{4}.[01]\d.[0123]\d'
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
    date_form_four = r'\d{4}'
    
    # And then we can extract the dates with:
    release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})', flags=re.IGNORECASE)
    
    # Instead of creating our own function to parse the dates, we'll use the built-in to_datetime() method in Pandas. Since 
    # there are different date formats, set the infer_datetime_format option to True. The date formats we've targeted are 
    # among those that the to_datetime() function can recognize, which explains the infer_datetime_format=True argument below.
    wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)  
    
    # Clean the running time column in the wiki_movies_df DataFrame.
    # Parse Running Time
    # First, make a variable that holds the non-null values of Release date in the DataFrame, converting lists to strings:
    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    running_time
    
    # It looks like most of the entries just look like "100 minutes." Let's see how many running times look exactly like 
    # that by using string boundaries.
    running_time.str.contains(r'^\d*\s*minutes$', flags=re.IGNORECASE, na=False).sum()
    
    # The above code returns 6,528 entries. Let's get a sense of what the other 366 entries look like.
    running_time[running_time.str.contains(r'^\d*\s*minutes$', flags=re.IGNORECASE, na=False) != True]
    
    # Let's make this more general by only marking the beginning of the string, and accepting other abbreviations of 
    # "minutes" by only searching up to the letter "m.
    running_time.str.contains(r'^\d*\s*m', flags=re.IGNORECASE, na=False).sum()
    
    # That accounts for 6,877 entries. The remaining 17 follow:
    running_time[running_time.str.contains(r'^\d*\s*m', flags=re.IGNORECASE, na=False) != True]
    
    # We can match all of the hour + minute patterns with one regular expression pattern. Our pattern follows:
    # 1 Start with one or more digits.
    # 2 Have an optional space after the digit and before the letter "h."
    # 3 Capture all the possible abbreviations of "hour(s)." To do this, we'll make every letter in "hours" optional except 
    #   the "h."
    # 4 Have an optional space after the "hours" marker.
    # 5 Have an optional number of digits for minutes.
    # As a pattern, this looks like "\d+\s*ho?u?r?s?\s*\d*".

    # We only want to extract digits, and we want to allow for both possible patterns. Therefore, we'll add capture 
    # groups around the \d instances as well as add an alternating character. Our code will look like the following.
    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
    
    # Unfortunately, this new DataFrame is all strings, we'll need to convert them to numeric values. Because we may have 
    # captured empty strings, we'll use the to_numeric() method and set the errors argument to 'coerce'. Coercing the 
    # errors will turn the empty strings into Not a Number (NaN), then we can use fillna() to change all the NaNs to zeros.
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
    
    # Now we can apply a function that will convert the hour capture groups and minute capture groups to minutes if the 
    # pure minutes capture group is zero, and save the output to wiki_movies_df:
    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
    
    # Finally, we can drop Running time from the dataset with the following code:
    wiki_movies_df.drop('Running time', axis=1, inplace=True)
    
    # Return three variables. The first is the wiki_movies_df DataFrame
    # return wiki_movies_df, kaggle_metadata, ratings     
     
    # 2. Clean the Kaggle metadata.
    
    # Remove bad data
    kaggle_metadata[~kaggle_metadata['adult'].isin(['True','False'])]
    
    # Since imbd_id is missing for adult films, only keep rows where adult is False, and then drop the "adult" column.
    # The following code will keep rows where the adult column is False, and then drop the adult column.
    kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult',axis='columns')
    
    # Convert video by changing data type.
    # kaggle_metadata_df['video'] == 'True'
    # The above code creates the Boolean column we want. We just need to assign it back to video
    kaggle_metadata['video'] = kaggle_metadata['video'] == 'True'
    
    # For the numeric columns, we can just use the to_numeric() method from Pandas. We'll make sure the errors= argument 
    # is set to 'raise', so we'll know if there's any data that can't be converted to numbers.
    kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
    kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
    kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')
    
    # Finally, we need to convert release_date to datetime. Luckily, Pandas has a built-in function for that as well: 
    # to_datetime(). 
    # Since release_date is in a standard format, to_datetime() will convert it without any fuss.
    kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])
    
    # 3. Merged the two DataFrames into the movies DataFrame.
    movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])

    # 4. Drop unnecessary columns from the merged DataFrame.
    # use the dataframe.drop() function
    movies_df.drop(columns=['title_wiki'])

    # 5. Add in the function to fill in the missing Kaggle data.
    # Add the fill_missing_kaggle_data() function that fills in the missing Kaggle data on the movies_df DataFrame. 
    # Used reference code from M_Fullerton "ETL_clean_kaggle_data.ipynb." Not familiar with function
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        df[kaggle_column] = df.apply(
            lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column], axis=1)
        df.drop(columns=wiki_column, inplace=True)

    # 6. Call the function in Step 5 with the DataFrame and columns as the arguments.
    # Used reference code from M_Fullerton "ETL_clean_kaggle_data.ipynb." Not familiar with function.
    fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')
    movies_df

    # 7. Filter the movies DataFrame for specific columns.
    for col in movies_df.columns:
        lists_to_tuples = lambda x: tuple(x) if type(x) == list else x
        value_counts = movies_df[col].apply(lists_to_tuples).value_counts(dropna=False)
        num_values = len(value_counts)
    if num_values == 1:
        print(col)

    # 8. Rename the columns in the movies DataFrame for consistency.
    # Reference M_Fullerton "ETL_clean_wiki_movies.ipynb" from GitHub. Did not have complete code, missing column_names and
    # the reindex function.
    column_names = ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                        'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                        'genres','original_language','overview','spoken_languages','Country',
                        'production_companies','production_countries','Distributor',
                        'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                       ]
    movies_df = movies_df.reindex(columns=column_names)
    
    movies_df.rename({'id':'kaggle_id',
                  'title_kaggle':'title',
                  'url':'wikipedia_url',
                  'budget_kaggle':'budget',
                  'release_date_kaggle':'release_date',
                  'Country':'country',
                  'Distributor':'distributor',
                  'Producer(s)':'producers',
                  'Director':'director',
                  'Starring':'starring',
                  'Cinematography':'cinematography',
                  'Editor(s)':'editors',
                  'Writer(s)':'writers',
                  'Composer(s)':'composers',
                  'Based on':'based_on'
                 }, axis='columns', inplace=True)

    # 9. Transform and merge the ratings DataFrame.
    # use a groupby on the "movieId" and "rating" columns and take the 
    # count for each group.
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count()
    
    # Rename the "userId" column to "count."
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                .rename({'userId':'count'}, axis=1)
    
    # pivot this data so that movieId is the index, the columns will be 
    # all the rating values, and the rows will be the counts for each 
    # rating value.
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                .rename({'userId':'count'}, axis=1) \
                .pivot(index='movieId',columns='rating', values='count')
    
    #  rename the columns so they're easier to understand. We'll prepend 
    # rating_ to each column with a list comprehension
    rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]
    
    # We need to use a left merge, since we want to keep everything in 
    # movies_df:
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
    
    # Not every movie got a rating for each rating level, there will be 
    # missing values instead of zeros. We have to fill those in ourselves, 
    # like this:
    movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)

    # 2 Remove the return statement, return wiki_movies_df, movies_with_ratings_df, movies_df.
    # return wiki_movies_df, movies_with_ratings_df, movies_df   
    
    # 3 After Step 9, Transform and merge the ratings DataFrame, add the code to create the connection to the PostgreSQL 
    # database, 
    
    # Create the Database Engine - local server, the connection string will be as follows:
    # Used reference code from Raquely44 (Raquel Yates) "ETL_create_database.ipynb" on GitHub for code to complete db_string.
    # Instructor Nick Meyer helped with this string
    # db_string = f"postgresql://postgres:{db_password}@127.0.0.1:5432/movie_data"
    db_string = f"postgresql://postgres:{db_password}@localhost:5432/movies_db"

    # Create the database engine (to the PostgreSQL database)
    engine = create_engine(db_string)
    
    # Then add the movies_df DataFrame to a SQL database.
    # Use 'replace' for the if_exists parameter so that the movies_df DataFrame data won't be added to the table again.
    movies_df.to_sql(name='movies', if_exists='replace',con=engine)
    
    # 4 Before reading in the MovieLens rating CSV data, drop the ratings table in pgAdmin. 
    # data.to_sql(name='ratings', con=engine, if_exists='replace', index=False)
      
    # 5 Add the code that prints out the elapsed time to import each row. 
    # create a variable for the number of rows imported
    rows_imported = 0
    # get the start_time from time.time()
    # Used reference code from Raquely44 (Raquel Yates) "ETL_create_database.ipynb" on GitHub to complete for loop including
    # chunksize parameter and data.to_sql function con=engine parameter
    start_time = time.time()
    for data in pd.read_csv(ratings_file, chunksize=1000000):
        data.to_sql(name='ratings', con=engine, if_exists='replace', index=False)
    # print out the range of rows that are being imported
    # Used reference code from Raquely44 (Raquel Yates) "ETL_create_database.ipynb" on GitHub for code to complete print
    # function
        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')

    # increment the number of rows imported by the size of 'data'
        rows_imported += len(data)

    # print that the rows have finished importing
        print(f'Done. {time.time() - start_time} total seconds elapsed')
    

In [5]:
# 10. Create the path to your file directory and variables for the three files.
file_dir = '/Users/rfnichol/OneDrive - COOPER TIRE & RUBBER COMPANY/Personal/Data Analytics Boot Camp/Module_8_ETL/Module_8_files/Movies_ETL/archive'
# The Wikipedia data
wiki_file = f'{file_dir}/wikipedia.movies.json'
# The Kaggle metadata
kaggle_file = f'{file_dir}/movies_metadata.csv'
# The MovieLens rating data.
ratings_file = f'{file_dir}/ratings.csv'

In [6]:
# 11. Delivrable 4 step 6, set the three variables equal to the function created in D1.
# wiki_file, kaggle_file, ratings_file = 
extract_transform_load(wiki_file, kaggle_file, ratings_file)



importing rows 0 to 1000000...Done. 17.62169599533081 total seconds elapsed
importing rows 1000000 to 2000000...Done. 34.32362413406372 total seconds elapsed
importing rows 2000000 to 3000000...Done. 51.4937047958374 total seconds elapsed
importing rows 3000000 to 4000000...Done. 70.24476718902588 total seconds elapsed
importing rows 4000000 to 5000000...Done. 88.34394955635071 total seconds elapsed
importing rows 5000000 to 6000000...Done. 106.40009140968323 total seconds elapsed
importing rows 6000000 to 7000000...Done. 124.37980651855469 total seconds elapsed
importing rows 7000000 to 8000000...Done. 142.28150463104248 total seconds elapsed
importing rows 8000000 to 9000000...Done. 160.44074630737305 total seconds elapsed
importing rows 9000000 to 10000000...Done. 178.63142204284668 total seconds elapsed
importing rows 10000000 to 11000000...Done. 196.91586923599243 total seconds elapsed
importing rows 11000000 to 12000000...Done. 214.7011513710022 total seconds elapsed
importing ro

In [7]:
# Run a query on the PostgreSQL database that retreives the number of rows for the movies and ratings tables.
# Steps - see example code in Data Analytics Notes
# 1 Connect to PostgreSQL from Python
#     db_string = f"postgresql://postgre:{db_password}@127.0.0.1:5432/movie_data"

    # Create the database engine (to the PostgreSQL database)
#    engine = create_engine(db_string)
# 2 Define a PostgreSQL SELECT Query
# SELECT col1, col2,…colnN FROM postgresql_table WHERE id = 5
# 3 Get Cursor Object from Connection 
# Next, use a connection.cursor() method to create a Psycopg2 cursor object. This method creates a new 
# psycopg2.extensions.cursor object.
# 4 Execute the SELECT query using a execute() method
# Execute the select query using the cursor.execute() method.

# 5 Extract all rows from a result
# Use the fetchall() method of a cursor object to get all rows from a query result. it returns a list of row
# 6 Iterate each row
# Iterate a row list using a for loop and access each row individually (Access each row’s column data using a column 
# name or index number.)
# 7 Close the cursor object and database connection object
# use cursor.clsoe() and connection.clsoe() method to close open connections after your work completes.

