# 🎬 IMDb Movies ETL Pipeline

This notebook performs an Extract-Transform-Load (ETL) process on IMDb movie data, cleaning and loading it into a PostgreSQL database.

**Steps Covered:**
1. 📥 Extract data from TSV/CSV files
2. 🧹 Clean and transform the data
3. 🗃️ Normalize genres into a separate table
4. 💾 Load into PostgreSQL using SQLAlchemy


In [None]:
# Core
import pandas as pd
import numpy as np

# SQLAlchemy
from sqlalchemy import create_engine, Column, String, Integer, Float, Boolean, ForeignKey, BigInteger
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.dialects.postgresql import BIGINT, INTEGER, FLOAT, BOOLEAN, TEXT

## 📥 Extract Data

Load the relevant IMDb datasets into Pandas for cleaning and transformation.


In [28]:
# Reading the datasets
title_basics_db = pd.read_csv(r"C:\Users\Hen\Documents\Projects\ETL\title.basics.tsv\title.basics.tsv", sep="\t")
title_ratings_db = pd.read_csv(r"C:\Users\Hen\Documents\Projects\ETL\title.ratings.tsv\title.ratings.tsv", sep="\t")

  title_basics_db = pd.read_csv(r"C:\Users\Hen\Documents\Projects\ETL\title.basics.tsv\title.basics.tsv", sep="\t")


## 🧹 Transform Data

This section handles:
- Renaming columns
- Dropping irrelevant data
- Converting data types
- Handling nulls and invalid values


In [29]:
# Filtering the type to be only movies
title_basics_db = title_basics_db[title_basics_db['titleType'] == "movie"]
title_basics_db.head()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
8,tt0000009,movie,Miss Jerry,Miss Jerry,0,1894,\N,45,Romance
144,tt0000147,movie,The Corbett-Fitzsimmons Fight,The Corbett-Fitzsimmons Fight,0,1897,\N,100,"Documentary,News,Sport"
498,tt0000502,movie,Bohemios,Bohemios,0,1905,\N,100,\N
570,tt0000574,movie,The Story of the Kelly Gang,The Story of the Kelly Gang,0,1906,\N,70,"Action,Adventure,Biography"
587,tt0000591,movie,The Prodigal Son,L'enfant prodigue,0,1907,\N,90,Drama


In [30]:
# Replacing \n with np.nan 
title_basics_db.replace(r"\N", np.nan, inplace=True)

  title_basics_db.replace(r"\N", np.nan, inplace=True)


In [31]:
# Transforming the columns
title_basics_db['startYear'] = title_basics_db['startYear'].astype('Int64')
title_basics_db['isAdult'] = title_basics_db['isAdult'].astype(bool)
title_basics_db['genres'] = title_basics_db['genres'].str.split(",")
title_ratings_db['averageRating'] = title_ratings_db['averageRating'].astype('float')

In [32]:
#  Merging both columns to one dataset using 'tconst' column
df = title_basics_db.merge(title_ratings_db, on='tconst', how='inner')

In [33]:
# Cleaning the df
df = df.drop(columns='endYear')

In [34]:
# Select and rename relevant columns
df = df[[
    'tconst', 'primaryTitle', 'isAdult',
    'startYear', 'runtimeMinutes', 'averageRating',
    'genres', 'numVotes'
]].rename(columns={
    'tconst': 'movie_id',
    'primaryTitle': 'title',
    'isAdult': 'is_adult',
    'startYear': 'year',
    'runtimeMinutes': 'runtime_minutes',
    'averageRating': 'average_rating',
    'numVotes': 'num_votes'
})

# Split into final tables
df_movies = df.drop(columns=['genres'])  
df_genres = df[['movie_id', 'genres']] 

## 🎭 Normalize Genres

In [35]:
# Exploding the genres column to be usable
df_genres = df[['movie_id', 'genres']].explode('genres').rename(columns={'genres': 'genre'})

# Drop empty or null genres
df_genres = df_genres[df_genres['genre'].notna() & (df_genres['genre'].str.strip() != '')]

# Reset index
df_genres = df_genres.reset_index(drop=True)

In [36]:
# Creating sqlalchemy engine and session objects
engine = create_engine("postgresql://postgres:221004@localhost:5432/imdb_movies")

Session = sessionmaker(bind=engine)

session = Session()

## 💾 Load to PostgreSQL

Load the cleaned and transformed data into the PostgreSQL database using batch inserts.


In [37]:
# Defining the tables
Base = declarative_base()

class Movie(Base):
    __tablename__ = 'movies'

    movie_id = Column(String, primary_key=True)
    title = Column(String)
    is_adult = Column(Boolean)
    year = Column(Integer)
    runtime_minutes = Column(Integer)
    average_rating = Column(Float)
    num_votes = Column(Integer)

class MovieGenre(Base):
    __tablename__ = 'movie_genres'

    id = Column(Integer, primary_key=True, autoincrement=True)
    movie_id = Column(String, ForeignKey('movies.movie_id'))
    genre = Column(String)


In [38]:
# Creating the tables with constraints
Base.metadata.create_all(engine)


In [39]:
# Loading the data into the tables
df_movies.to_sql(
    'movies',
    engine,
    if_exists='append',
    index=False,
    dtype={
        'movie_id': TEXT(),
        'title': TEXT(),
        'is_adult': BOOLEAN(),
        'year': INTEGER(),
        'runtime_minutes': INTEGER(),
        'average_rating': FLOAT(),
        'num_votes': BIGINT()
    },
    method='multi'
)

df_genres.to_sql(
    'movie_genres',
    engine,
    if_exists='append',
    index=False,
    dtype={
        'movie_id': TEXT(),
        'genre': TEXT()
    },
    method='multi'
)

563974