**IMPORT LIBRARY**

In [2944]:
import pandas as pd
import numpy as np
from decimal import Decimal
import psycopg2
from sqlalchemy import create_engine, Table, MetaData

**ETL PROCESS**

EXTRACT

In [2945]:
# create the variable containing dataset url/ directory from local device
url = 'https://docs.google.com/spreadsheets/d/18-TvHgDGvZCmx95DmdzxfuL-MMLYAJENzvzG6-2EXkc/export?format=csv'

# extract the dataset and load it into pandas dataframe
df = pd.read_csv(url)

TRANSFORMATION

Data Cleaning

In [2946]:
# standardize the columns name into lowercase
df.columns = df.columns.str.lower()

# change the separator into '-'
df = df.rename(columns={'one-line':'one_line'})

In [2947]:
# remove the certain character from year column
df['year'] = df['year'].str.replace('(','').str.replace(')','')

# split year column by '-' into production year and release year columns
df[['production_year', 'release_year']] = df['year'].str.split('-', expand=True)

# get 4 first character from production year to exclude the '-'
df['production_year'] = df['production_year'].str.slice(start=0, stop=4)

# change blank value into NaN in production_year and release_year columns
df['production_year'] = df['production_year'].replace({'': np.nan})
df['release_year'] = df['release_year'].replace({None: np.nan})

  df['year'] = df['year'].str.replace('(','').str.replace(')','')


In [2948]:
# remove certain character from dataframe
df = df.replace('\n', '', regex=True)
df['votes'] = df['votes'].replace(',', '', regex=True)

In [2949]:
# getting the director name and actor name by splitting the stars column
df["director"] = df["stars"].str.split("|").str.get(0)
df['actor'] = df['stars'].str.split("s:").str.get(1)

# change the data that don't contain the director name in NaN
df.loc[~df['director'].str.contains('Director', na=False), 'director'] = np.nan

# get the director name after cleaning the data
df["director"] = df["director"].str.split(":").str.get(1)

In [2950]:
# change the data type of actor and director columns into string
df['actor'] = df['actor'].astype(str)
df['director'] = df['director'].astype(str)

In [2951]:
# trimming all data in dataframe
df = df.apply(lambda x: x.str.strip() if x.dtype == "object" else x)

In [2952]:
# remove year and star columns
df = df.drop(columns=['year', 'stars'])

# remove '#' character in movies column
df['movies'] = df['movies'].str.replace('#','')

# sort the dataframe based on movies name
df = df.sort_values(by='movies')

In [2953]:
# get gross value number
df["gross"] = df["gross"].str.split("$").str.get(1)
df["gross"] = df["gross"].str.split("M").str.get(0)

In [2954]:
# change these following numeric columns from float into decimal
df['rating'] = df['rating'].apply(lambda x: Decimal(x) if pd.notnull(x) else x)
df['votes'] = df['votes'].apply(lambda x: Decimal(x) if pd.notnull(x) else x)
df['runtime'] = df['runtime'].apply(lambda x: Decimal(x) if pd.notnull(x) else x)
df['gross'] = df['gross'].apply(lambda x: Decimal(x) if pd.notnull(x) else x)

# multiply the gross column by 1_000_000 based on removed 'M'
df["gross"] *= 1_000_000

In [2955]:
# normalize the redundant data by aggregating some columns
# numeric columns
df['rating'] = df.groupby('movies')['rating'].transform('mean')
df['runtime'] = df.groupby('movies')['runtime'].transform('mean')
df['votes'] = df.groupby('movies')['votes'].transform('sum').replace({0: np.nan})
df['gross'] = df.groupby('movies')['gross'].transform('sum').replace({0: np.nan})
# string columns
df['one_line'] = df.groupby('movies')['one_line'].transform(lambda x: x.loc[x.str.len().idxmax()])
df['director'] = df.groupby('movies')['director'].transform(lambda x: x.loc[x.str.len().idxmax()])
df['actor'] = df.groupby('movies')['actor'].transform(lambda x: x.loc[x.str.len().idxmax()])

# round up the decimal columns
df["rating"] = df["rating"].apply(lambda x: round(x,1) if pd.notnull(x) else x)
df["votes"] = df["votes"].apply(lambda x: round(x,1) if pd.notnull(x) else x)
df["runtime"] = df["runtime"].apply(lambda x: round(x,1) if pd.notnull(x) else x)

# remove all duplicate data after being normalized
df = df.drop_duplicates()

# print(len(df))
# df.head()
# df[df['movies']=='Rhythm + Flow']

In [2956]:
# create movie_id based on normalized movie name column
df['movie_id'] = df.groupby('movies').ngroup() + 1

In [2957]:
# change nan data into NaN/ NULL format
df['director'] = df['director'].replace({'nan': np.nan})
df['actor'] = df['actor'].replace({'nan': np.nan})
df['one_line'] = df['one_line'].replace({'nan': np.nan})

In [2958]:
# create new currency column for every filled value in gross column
df.loc[df['gross'].notnull(), 'currency'] = "$"

Create Dataframe Based on Conceptual Data Model

In [2959]:
# create movie_facts dataframe from cleaned dataframe
movie_facts = df[['movie_id', 'votes', 'runtime', 'gross', 'currency']]
# change numeric column data type into numeric so it can be read by PostgreSQL as numeric data type
movie_facts['votes'] = pd.to_numeric(df['votes'], errors='coerce')
movie_facts['gross'] = pd.to_numeric(df['gross'], errors='coerce')

# create movie_dim dataframe from cleaned dataframe
movie_dim = df[['movie_id', 'movies', 'production_year', 'release_year', 'rating', 'one_line']]
# change numeric column data type into numeric so it can be read by PostgreSQL as numeric data type
movie_dim['production_year'] = pd.to_numeric(df['production_year'], errors='coerce', downcast='integer')
movie_dim['release_year'] = pd.to_numeric(df['release_year'], errors='coerce', downcast='integer')

# movie_dim.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  movie_facts['votes'] = pd.to_numeric(df['votes'], errors='coerce')
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  movie_facts['gross'] = pd.to_numeric(df['gross'], errors='coerce')
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  movie_dim['production_year'] = pd.to_numeric(df['production_year'], err

Create Bridge Tables

In [2960]:
# create brigde table movie_actor by splitting the data by ',' and transpose it into rows
movie_actor = df.set_index('movie_id').actor.str.split(',', expand=True).stack().reset_index(level=1, drop=True).rename('actor').to_frame()

# change index into column
movie_actor = movie_actor.reset_index(drop=False)

# trim the data after getting splitted
movie_actor = movie_actor.apply(lambda x: x.str.strip() if x.dtype == "object" else x)

# create actor_id based on actor
movie_actor['actor_id'] = movie_actor.groupby('actor').ngroup() + 1

# movie_actor.head()

In [2961]:
# create brigde table movie_director by splitting the data by ',' and transpose it into rows
movie_director = df.set_index('movie_id').director.str.split(',', expand=True).stack().reset_index(level=1, drop=True).rename('director').to_frame()

# change index into column
movie_director = movie_director.reset_index(drop=False)

# trim the data after getting splitted
movie_director = movie_director.apply(lambda x: x.str.strip() if x.dtype == "object" else x)

# create director_id based on director
movie_director['director_id'] = movie_director.groupby('director').ngroup() + 1

# movie_director.head()

In [2962]:
# create brigde table movie_genre by splitting the data by ',' and transpose it into rows
movie_genre = df.set_index('movie_id').genre.str.split(',', expand=True).stack().reset_index(level=1, drop=True).rename('genre').to_frame()

# change index into column
movie_genre = movie_genre.reset_index(drop=False)

# trim the data after getting splitted
movie_genre = movie_genre.apply(lambda x: x.str.strip() if x.dtype == "object" else x)

# create director_id based on genre
movie_genre['genre_id'] = movie_genre.groupby('genre').ngroup() + 1

# movie_genre.head()

Create Look Up Table/ Master Data

In [2963]:
# take actor_id and actor name from bridge table
actor_dim = movie_actor[['actor_id', 'actor']]

# remove duplicate so it can be used as master data
actor_dim = actor_dim.drop_duplicates()

# print(len(actor_dim))
# actor_dim.head()

In [2964]:
# take director_id and director name from bridge table
director_dim = movie_director[['director_id', 'director']]

# remove duplicate so it can be used as master data
director_dim = director_dim.drop_duplicates()

# print(len(director_dim))
# director_dim.head()

In [2965]:
# take genre_id and genre name from bridge table
genre_dim = movie_genre[['genre_id', 'genre']]

# remove duplicate so it can be used as master data
genre_dim = genre_dim.drop_duplicates()

# print(len(genre_dim))
# genre_dim.head()

Finalizing Bridge Tables

In [2966]:
# take only movie_id and genre_id in movie_genre bridge table
movie_genre = movie_genre[['movie_id', 'genre_id']]

# movie_genre.head()

In [2967]:
# take only movie_id and actor_id in movie_actor bridge table
movie_actor = movie_actor[['movie_id', 'actor_id']]

# movie_actor.head()

In [2968]:
# take only movie_id and director_id in movie_director bridge table
movie_director = movie_director[['movie_id', 'director_id']]

# movie_director.head()

**LOAD**

In [2969]:
# Connect to the PostgreSQL database
conn = psycopg2.connect(
    host="localhost",
    user="postgres",
    password="1234",
    dbname="cadit"
)

# Create an SQLAlchemy engine to connect to the database
engine = create_engine('postgresql://postgres:1234@localhost:5432/cadit')

# Load these following dataframe into each table in PostgreSQL
# use append in 'if_exists' to insert new data/ streaming
movie_facts.to_sql("movie_facts", engine, schema = 'techTest', if_exists='replace', index=False)
movie_dim.to_sql("movie_dim", engine, schema = 'techTest', if_exists='replace', index=False)
actor_dim.to_sql("actor_dim", engine, schema = 'techTest', if_exists='replace', index=False)
director_dim.to_sql("director_dim", engine, schema = 'techTest', if_exists='replace', index=False)
genre_dim.to_sql("genre_dim", engine, schema = 'techTest', if_exists='replace', index=False)
movie_director.to_sql("movie_director", engine, schema = 'techTest', if_exists='replace', index=False)
movie_actor.to_sql("movie_actor", engine, schema = 'techTest', if_exists='replace', index=False)
movie_genre.to_sql("movie_genre", engine, schema = 'techTest', if_exists='replace', index=False)

587