In [1]:
import pandas as pd
import json
from sqlalchemy import create_engine, text
import numpy as np

In [2]:
# Function to process 1 to Many relationship
# Parameters: Original data frame and the col to be extracted/processed
# Return: a list of original values (no duplicates)
def rel_1M (original_df, col):
    # extract the columns into a working data frame
    processing_df = original_df[[col,'imdb_id']].dropna().reset_index()

    # Create a list that holds dictionaries with data from the complex column
    parent_data = []
    update_data = []
    # Loop as many times as rows there are in the working data frame
    for idx in range(processing_df.shape[0]):       
        # Extract the data of the 1 to M column (parameter)
        col_str = processing_df[col][idx]         
        # If column is evaluated as float, it means there is a NaN value and we skip it
        col_str = col_str.replace('None', '\'\'')
        # replace ' with " to and jsonify it     
        col_list = json.loads(col_str.replace('\'', '\"'))  
        # add the various values to a dictionary and the add then dictionary to a list
        parent_data.append({'id':col_list['id'], 'name':col_list['name'], 'poster_path':col_list['poster_path'], \
                         'backdrop_path':col_list['backdrop_path']})
        # The necessary entry in the original data (foreign key) is made
        update_data.append({'id':col_list['id'], 'imdb_id':processing_df['imdb_id'][idx]})

    # Deduplicate and turn the lists into dataframes
    parent_data_df = pd.DataFrame(parent_data).drop_duplicates(subset={'id'})
    update_data_df = pd.DataFrame(update_data).drop_duplicates(subset={'id','imdb_id'})
            
    # return the lists (this will become the parent table)
    return parent_data_df, update_data_df

In [3]:
# Function to process M to Many relationship
# Parameters: Original data frame and the col to be extracted/processed
# Return: 
#    1) Parent table - this will use all the original information in the column. It will be dedupped
#    2) Association table - the id of the parent and the corresponding movie id (imdb_id)  
def rel_MM (original_df, col):
    # Create a list that holds dictionaries with data from the complex column
    parent_data = []
    # Create an association list with the id of the complex column and the movie id
    associate_data = []
    # Loop as many times as rows there are in the working data frame    
    for idx in range(original_df.shape[0]):
        # Extract the data of the M to M column (parameter)
        col_str = original_df[col][idx]
        # replace ' with " to and jsonify it 
        col_list = json.loads(col_str.replace('\'', '\"'))
        # The column contains a list of dictionaries. Iterate the list to get the infromation out
        for row in col_list:
            # add the various values to a dictionary and then add the dictionary to a list (parent table)
            parent_data.append({'id':row['id'], 'name':row['name']})
            # add the various values to a dictionary and then add the dictionary to a list (association table)
            associate_data.append({'id':row['id'], 'imdb_id':original_df['imdb_id'][idx]})
    
    # Deduplicate and turn the lists into dataframes
    parent_data_df = pd.DataFrame(parent_data).drop_duplicates(subset={'id'})
    associate_data_df = pd.DataFrame(associate_data).drop_duplicates(subset={'id','imdb_id'})
    
    # Return a dataframe with the parent and the association data
    return parent_data_df, associate_data_df

In [4]:
# Provide the path to the file
csv_file = "Data/movies_metadata.csv"

# Read the file into a dataframe
movies_alldata_df = pd.read_csv(csv_file)

In [5]:
rds_connection_string = "postgres:password@localhost:5432/movies_db"
engine = create_engine(f'postgresql://{rds_connection_string}')

In [6]:
# Process 1 to M columns
cols_1M_lst = [{'col': 'belongs_to_collection', 'table': 'collection', 'primary': 'collection_id'}]

for col in cols_1M_lst:
    parent_df, update_df = rel_1M (movies_alldata_df, col['col'])
    
    # Write parent data frame to the table
    parent_df = parent_df.rename(columns={'id': col['primary']})
    parent_df.to_sql(name=col['table'], con=engine, if_exists='append', index=False)

In [7]:
# Save movies information
# Extract the required columns
movies_df = movies_alldata_df[['imdb_id', 'adult', 'budget', 'homepage', 'original_language', 'original_title', \
                               'overview', 'popularity', 'poster_path', 'release_date', 'revenue', 'runtime', \
                               'status', 'tagline', 'title', 'vote_average', 'vote_count']]


# dedup in case there are duplicated movies
movies_df = movies_df.drop_duplicates(subset={'imdb_id'})
movies_df ['popularity'] = round(movies_df ['popularity'],4)
# Write data frame to the table
movies_df.to_sql(name='movie', con=engine, if_exists='append', index=False)

# use the update data frame to update the movies table
update_df.to_sql(name='w_collec', con=engine, if_exists='replace', index=False)
connection = engine.connect()
result = connection.execute("update movie m \
                                set collection_id = (select id \
                                                       from w_collec w \
                                                      where w.imdb_id = m.imdb_id)")

In [8]:
# Process M to M columns
cols_MM_lst = [
    {'col': 'genres', 'table': 'genre', 'primary': 'genre_id', 'assoc': 'film_genre'},
    {'col': 'production_companies', 'table': 'production_company', 'primary': 'company_id', 'assoc': 'film_production_company'},
    {'col': 'production_countries', 'table': 'country', 'primary': 'country_id', 'assoc': 'film_country'},
    {'col': 'spoken_languages', 'table': 'spoken_languages', 'primary': 'lang_id', 'assoc': 'film_spoken_languages'}
]

for col in cols_MM_lst:
    print(col['table'])
    parent_df, associate_df = rel_MM (movies_alldata_df, col['col'])
    # Write data frame to the table
    parent_df = parent_df.rename(columns={'id': col['primary']})
    parent_df.to_sql(name=col['table'], con=engine, if_exists='append', index=False)
    associate_df = associate_df.rename(columns={'id': col['primary']})
    associate_df.to_sql(name=col['assoc'], con=engine, if_exists='append', index=False)
    

genre
production_company
country
spoken_languages


In [9]:
# !jupyter nbconvert --to script --output "Movies_ETL" Movies_ETL.ipynb