In [1]:
import json
import pandas as pd
import numpy as np

import re

from sqlalchemy import create_engine
import psycopg2

from config import db_password

import time

In [2]:
#  Add the clean movie function that takes in the argument, "movie".
def clean_movie(movie):
    movie = dict(movie) #create a non-destructive copy
    return movie

In [3]:
# 1 Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)

def extract_transform_load(wiki_file, kaggle_file, ratings_file):
    # Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
    kaggle_metadata = pd.read_csv(f'{file_dir}movies_metadata.csv', low_memory=False)
    ratings = pd.read_csv(f'{file_dir}ratings.csv')

    # Open and read the Wikipedia data JSON file.
    with open(f'{file_dir}wikipedia-movies.json', mode='r') as file:
        wiki_movies_raw = json.load(file)
    
    # 3. Write a list comprehension to filter out TV shows.
    wiki_movies = [movie for movie in wiki_movies_raw
               if ('Director' in movie or 'Directed by' in movie)
                   and 'imdb_link' in movie
                   and 'No. of episodes' not in movie]

    # 4. Write a list comprehension to iterate through the cleaned wiki movies list
    # and call the clean_movie function on each movie.
    clean_movies = [clean_movie(movie) for movie in wiki_movies]

    # 5. Read in the cleaned movies list from Step 4 as a DataFrame.
    wiki_movies_df = pd.DataFrame(clean_movies)

    # Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
    #  dropping any imdb_id duplicates. If there is an error, capture and print the exception.
    try:
        wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
        wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)
    except ValueError :
         print("ID doesnt match expression" )

    #  Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.
    wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.8 ]
    # Saving new columns as the dataframe
    wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]


    # Create a variable that will hold the non-null values from the “Box office” column.
    box_office = wiki_movies_df['Box office'].dropna()
    
    # Convert the box office data created in Step 8 to string values using the lambda and join functions.
    box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)  

    # Removing all of the dashes (range) to only one number
    box_office = box_office.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    
    # Write a regular expression to match the six elements of "form_one" of the box office data.
    form_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on'
    # Write a regular expression to match the three elements of "form_two" of the box office data.
    form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'


    # Add the parse_dollars function.
    def parse_dollars(s):
        # if s is not a string, return NaN
        if type(s) != str:
            return np.nan

        # if input is of the form $###.# million
        if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " million"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a million
            value = float(s) * 10**6

            # return value
            return value

        # if input is of the form $###.# billion
        elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):

            # remove dollar sign and " billion"
            s = re.sub('\$|\s|[a-zA-Z]','', s)

            # convert to float and multiply by a billion
            value = float(s) * 10**9

            # return value
            return value

        # if input is of the form $###,###,###
        elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):

            # remove dollar sign and commas
            s = re.sub('\$|,','', s)

            # convert to float
            value = float(s)

            # return value
            return value

        # otherwise, return NaN
        else:
            return np.nan

        
    # Clean the box office column in the wiki_movies_df DataFrame.
    wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)
    
    
    # Clean the budget column in the wiki_movies_df DataFrame.
    
    #Dropping null values 
    budget = wiki_movies_df['Budget'].dropna()
    # Turning column data in to strings 
    budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)
    # removing range brackets 
    budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
    budget = budget.str.replace(r'\[\d+\]\s*', '')
    #Clean budget column 
    wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)


    # Clean the release date column in the wiki_movies_df DataFrame.
    #Dropping null values 
    release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    # Creating expression forms to match dates data
    date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]\d,\s\d{4}'
    date_form_two = r'\d{4}.[01]\d.[123]\d'
    date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
    date_form_four = r'\d{4}'
    #Clean release date row 
    wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)
   
    

    # Clean the running time column in the wiki_movies_df DataFrame.
    
    # Dropping null values from running time row 
    running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)
    # extracting hours and minutues record based on pattern
    running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')
    # Changing all extracted digits in to integers 
    running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)
    # Clean row 
    wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)
    # Dropping old running time column
    wiki_movies_df.drop('Running time', axis=1, inplace=True)
    
     
    # 2. Clean the Kaggle metadata.
    # Keep all of the kaggle data that do not have adult content and then drop the adult column
    kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult',axis='columns')
    kaggle_metadata['video'] = kaggle_metadata['video'] == 'True'
    # Change budget coulumn to integers 
    kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
    # Change Id column to numeric values 
    kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
    # Change popularity column to numeric values 
    kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')                               
    # change release data column in to datetime datatypes
    kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])                                
    # 3. Merged the two DataFrames into the movies DataFrame.

    movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])
    movies_df

    # 4. Drop unnecessary columns from the merged DataFrame.
    movies_df.drop(columns=['title_wiki','release_date_wiki','Language'], inplace=True)

    # 5. Add in the function to fill in the missing Kaggle data.
    def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
        df[kaggle_column] = df.apply(
            lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column]
            , axis=1)
        df.drop(columns=wiki_column, inplace=True)

    # 6. Call the function in Step 5 with the DataFrame and columns as the arguments.
    fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
    fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
    fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')

    # 7. Filter the movies DataFrame for specific columns.
    for col in movies_df.columns:
        lists_to_tuples = lambda x: tuple(x) if type(x) == list else x
        value_counts = movies_df[col].apply(lists_to_tuples).value_counts(dropna=False)
        num_values = len(value_counts)
        if num_values == 1:
            print(col)
            
#     movies_df['video'].value_counts(dropna=False)

    # 8. Rename the columns in the movies DataFrame.
    # Reorder columns 
    movies_df = movies_df.loc[:, ['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                       'genres','original_language','overview','spoken_languages','Country',
                       'production_companies','production_countries','Starring','Cinematography','Based on'
                      ]]
    
#     'Distributor', 'Producer(s)', 'Director', 'Editor(s)', 'Writer(s)',\n       'Composer(s)'],\n  
    movies_df.rename({'id':'kaggle_id',
                  'title_kaggle':'title',
                  'url':'wikipedia_url',
                  'budget_kaggle':'budget',
                  'release_date_kaggle':'release_date',
                  'Country':'country',
                  'Distributor':'distributor',
                  'Producer(s)':'producers',
                  'Director':'director',
                  'Starring':'starring',
                  'Cinematography':'cinematography',
                  'Editor(s)':'editors',
                  'Writer(s)':'writers',
                  'Composer(s)':'composers',
                  'Based on':'based_on'
                 }, axis='columns', inplace=True)
    
    # 9. Transform and merge the ratings DataFrame.
    # grouping movieid and rating column and rename it it count b
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                        .rename({'userId':'count'}, axis=1)
    # pivot this data so that movieId is the index, the columns will be all the rating values, and the rows will be the counts for each rating value
    rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                .rename({'userId':'count'}, axis=1) \
                .pivot(index='movieId',columns='rating', values='count')
    # prepend rating_ to each column with a list comprehension
    rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]
    # using left merge to keep everything in movied dataframe
    movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')
    # Fill in missing values with zero 
    movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)
   

    # connection string
    db_string = f"postgresql+psycopg2://postgres:{db_password}@localhost:5439/movie_data"
    
    # creating new engine 
    engine = create_engine(db_string)
    
   # Saving movies dataframe to SQL
    movies_df.to_sql(name='movies', con=engine, if_exists= 'replace')

    
    #prints out the elapsed time to import each row
    rows_imported = 0
    # get the start_time from time.time()
    start_time = time.time()
    for data in pd.read_csv(f'{file_dir}ratings.csv', chunksize=1000000):
        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
        data.to_sql(name='ratings', con=engine, if_exists='append')
        rows_imported += len(data)

        # add elapsed time to final print out
        print(f'Done. {time.time() - start_time} total seconds elapsed')
    
#     # Set the three variables equal to the function created in D1.
#     wiki_file, kaggle_file, ratings_file = extract_transform_load()

In [4]:
# 10. Create the path to your file directory and variables for the three files.
file_dir = '/Users/caseymarescot/Documents/Class/Movies-ETL/Resources/
# The Wikipedia data
wiki_file = f'{file_dir}/wikipedia.movies.json'
# The Kaggle metadata
kaggle_file = f'{file_dir}/movies_metadata.csv'
# The MovieLens rating data.
ratings_file = f'{file_dir}/ratings.csv'

In [5]:
# 11. pass in the variables for the files created in Step 10 of Deliverable 3 in the function created in Deliverable 1
extract_transform_load(wiki_file, kaggle_file, ratings_file)

  budget = budget.str.replace(r'\[\d+\]\s*', '')


video
importing rows 0 to 1000000...Done. 36.33817195892334 total seconds elapsed
importing rows 1000000 to 2000000...Done. 71.71258091926575 total seconds elapsed
importing rows 2000000 to 3000000...Done. 107.43261003494263 total seconds elapsed
importing rows 3000000 to 4000000...Done. 143.218416929245 total seconds elapsed
importing rows 4000000 to 5000000...Done. 178.82450795173645 total seconds elapsed
importing rows 5000000 to 6000000...Done. 214.75322580337524 total seconds elapsed
importing rows 6000000 to 7000000...Done. 250.29533791542053 total seconds elapsed
importing rows 7000000 to 8000000...Done. 288.0577619075775 total seconds elapsed
importing rows 8000000 to 9000000...Done. 325.6234059333801 total seconds elapsed
importing rows 9000000 to 10000000...Done. 362.42015409469604 total seconds elapsed
importing rows 10000000 to 11000000...Done. 398.7495069503784 total seconds elapsed
importing rows 11000000 to 12000000...Done. 435.3513340950012 total seconds elapsed
importi

In [6]:
# 12. Set the DataFrames from the return statement equal to the file names in Step 11. 
wiki_movies_df = wiki_file
movies_with_ratings_df = kaggle_file
movies_df = ratings_file

In [8]:
# 13. Check the wiki_movies_df DataFrame. 
wiki_movies_df

'/Users/caseymarescot/Documents/Class//wikipedia.movies.json'

In [None]:
# 14. Check the movies_with_ratings_df DataFrame.
movies_with_ratings_df.head()

In [None]:
# 15. Check the movies_df DataFrame. 
movies_df.head()