<a href="https://colab.research.google.com/github/alfonsoayalapaloma/etl/blob/main/03/ETL101_lab03.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ETL_101_202509
![MIT License](https://img.shields.io/badge/license-MIT-blue.svg)

![Course Banner](https://github.com/alfonsoayalapaloma/etl/blob/main/03/docs/images/logo.png?raw=1)

## Overview

Has sido seleccionado como Ingeniero de Datos en Netflix. Tu misión es llenar las tablas Movie, Genre y MovieGenre en la base de datos "Movies" a partir del archivo "imdb_top_1000.csv".  Se debe transformar el campo Genre, pues contiene data separada por comas, adicionalmente, se debe llenar la tabla MovieGenre con las llaves de Movie y de Genre. Para esto deberas correr varios "Pipelines".

## Instrucciones
1.   Examina el archivo "imdb_top_1000.csv" usando Excel o un editor de texto.
2.   Examina el archivo "movies.db" usando DBeaver. Examina la data de las tablas.
3.   Carga el archivo "movies.db" al entorno colab.
4.   Corre los pipelines de este notebook.
5.   Descarga el archivo "movies.db" modificado de colab.
6.   Examina de nuevo el archivo usando DBeaver. Observa los cambios.


## PipeLines a usar
1.   Cargue de la tabla Genre (Generos).
2.   Cargue de la table Movies
3.   Cargue de la tabla MovieGenre


## Pipeline MOVIES

Extraccion

In [None]:
import pandas as pd
import sqlite3
import requests
from io import StringIO

URL="https://raw.githubusercontent.com/alfonsoayalapaloma/datasets/refs/heads/main/imdb_top_1000.csv"
DB_NAME="movies.db"

# Step 1: Extraction
def extract_movie_from_url(url: str) -> pd.DataFrame:
    response = requests.get(url)
    response.raise_for_status()  # Raise error if download fails
    csv_data = StringIO(response.text)
    df = pd.read_csv(csv_data)
    return df


Transformacion

In [None]:
# Step 2: Transformation
def transform_movie(df: pd.DataFrame) -> pd.DataFrame:
    # Clean column names
    df.columns = df.columns.str.strip().str.replace('_', '')

    # Convert types
    df['ReleasedYear'] = pd.to_numeric(df['ReleasedYear'], errors='coerce')
    df['IMDBRating'] = pd.to_numeric(df['IMDBRating'], errors='coerce')
    df['Metascore'] = pd.to_numeric(df['Metascore'], errors='coerce')
    df['NoofVotes'] = pd.to_numeric(df['NoofVotes'], errors='coerce')

    # Drop rows with missing Gross
    df = df.dropna(subset=['Gross'])

    return df

Loading (Cargue) a Almacenamiento

In [None]:
import sqlite3

# Step 3: Loading
def load_movie(df: pd.DataFrame, db_name: str = ':memory:') -> sqlite3.Connection:
    conn = sqlite3.connect(db_name)
    df.to_sql('Movie', conn, if_exists='append', index=False)
    #print(f"DataFrame inserted successfully into the {db_name} SQLite database.")
    return conn


### Pipeline

In [None]:
# ETL Pipeline Runner
def run_etl_pipeline_movie(url: str):
    process_name = "Movie"

    print(f"🔍 Extracting data...{process_name}")
    raw_df = extract_movie_from_url(url)

    print(f"🧼 Transforming data...{process_name}")
    clean_df = transform_movie(raw_df)

    print(f"📦 Loading data into DB...{process_name}")
    conn = load_movie(clean_df, DB_NAME)

    print(f"✅ ETL pipeline completed. {process_name}")
    return conn  # You can now query this connection

## Pipeline GENRE

In [None]:
# Step 1: Extraction
# Already done

# Step 2: Transformation
def transform_genre(df: pd.DataFrame) -> pd.DataFrame:
    # Split and explode the Genre column
    df_genres = df['Genre'].str.split(',').explode().str.strip()
    # Drop duplicates and reset index
    unique_genres = pd.DataFrame({'Genre': df_genres.drop_duplicates().reset_index(drop=True)})
    #print(unique_genres.columns)
    #print(unique_genres)
    unique_genres.columns=['GenreName']
    return unique_genres


# Step 3: Loading
def load_genre(df: pd.DataFrame, db_name: str = ':memory:') -> sqlite3.Connection:
    conn = sqlite3.connect(db_name)
    df.to_sql('Genre', conn, if_exists='append', index=False)
    #print(f"DataFrame inserted successfully into the {db_name} SQLite database.")
    return conn

def run_etl_pipeline_genre(url: str):
    process_name="Genre"
    print(f"🔍 Extracting data...{process_name}")
    raw_df = extract_movie_from_url(url)

    print(f"🧼 Transforming data...{process_name}")
    unique_genres = transform_genre(raw_df)

    print(f"📦 Loading data into DB...{process_name}")
    conn = load_genre(unique_genres, DB_NAME)

    print(f"✅ ETL pipeline completed. {process_name}")
    return conn  # You can now query this connection


## Pipeline MovieGenre

In [None]:
def extract_movie_from_db(table_name: str, db_name: str = ':memory:') -> pd.DataFrame:
    conn = sqlite3.connect(db_name)
    df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn)
    return df

def get_genre_id(genre_name):
    conn = sqlite3.connect(DB_NAME)
    cursor = conn.cursor()
    cursor.execute("SELECT GenreId FROM Genre WHERE GenreName = ?", (genre_name,))
    result = cursor.fetchone()
    return result[0] if result else None

def transform_moviegenre( df: pd.DataFrame) -> pd.DataFrame:
    conn = sqlite3.connect(DB_NAME)
    cursor = conn.cursor()

    # Normalize GenreName into rows
    df_expanded = df.assign(Genre=df['Genre'].str.split(',')).explode('Genre')
    df_expanded['Genre'] = df_expanded['Genre'].str.strip()
    df_expanded['GenreId'] = df_expanded['Genre'].apply(get_genre_id)
    df_clean = df_expanded.dropna(subset=['GenreId'])
    return df_clean


def load_moviegenre(df: pd.DataFrame, db_name: str = ':memory:') -> sqlite3.Connection:
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()

    # Insert into MovieGenre table
    for _, row in df.iterrows():
        cursor.execute(
            "INSERT INTO MovieGenre (MovieId, GenreId) VALUES (?, ?)",
            (row['MovieId'], int(row['GenreId']))
        )
    conn.commit()
    return conn


def load_to_staging(df: pd.DataFrame, db_name: str = ':memory:') -> sqlite3.Connection:
    conn = sqlite3.connect(db_name)
    df.to_sql('STGMovie', conn, if_exists='replace', index=False)
    return conn

# ELT Pipeline Runner
def run_etl_pipeline_moviegenre(url: str):
    process_name="MovieGenre"
    print(f"🔍 Extracting data...{process_name}")
    raw_df = extract_movie_from_db( "Movie", DB_NAME)

    print(f"📦 Loading data into Staging...{process_name}")
    df = transform_moviegenre(raw_df)

    print(f"🧼 Transforming data...{process_name}")
    return load_moviegenre(df, DB_NAME)

    print(f"✅ ETL pipeline completed. {process_name}")
    return conn  # You can now query this connection

## Correr todos los Pipelines

In [None]:
conn = run_etl_pipeline_genre(URL)
conn = run_etl_pipeline_movie(URL)
conn = run_etl_pipeline_moviegenre(URL)

🔍 Extracting data...Genre
🧼 Transforming data...Genre
📦 Loading data into DB...Genre
✅ ETL pipeline completed. Genre
🔍 Extracting data...Movie
🧼 Transforming data...Movie
📦 Loading data into DB...Movie
✅ ETL pipeline completed. Movie
🔍 Extracting data...MovieGenre
📦 Loading data into Staging...MovieGenre
🧼 Transforming data...MovieGenre


In [None]:
df_moviegenre = pd.read_sql_query("SELECT * FROM MovieGenre", conn)
df_moviegenre

Unnamed: 0,MovieGenreId,MovieId,GenreId
0,1,1,1
1,2,2,2
2,3,2,1
3,4,3,3
4,5,3,2
...,...,...,...
42175,42176,4985,17
42176,42177,4985,19
42177,42178,4986,1
42178,42179,4986,8


In [None]:
df_count = pd.read_sql_query("SELECT count(*) FROM Movie", conn)
df_count

Unnamed: 0,count(*)
0,4986


Pipeline

In [None]:
df=pd.read_sql_query("SELECT * FROM Movie", conn)

In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4986 entries, 0 to 4985
Data columns (total 17 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   MovieId       4986 non-null   int64  
 1   PosterLink    4986 non-null   object 
 2   SeriesTitle   4986 non-null   object 
 3   ReleasedYear  4980 non-null   float64
 4   Certificate   4710 non-null   object 
 5   Runtime       4986 non-null   object 
 6   Genre         4986 non-null   object 
 7   IMDBRating    4986 non-null   float64
 8   Overview      4986 non-null   object 
 9   MetaScore     4500 non-null   float64
 10  Director      4986 non-null   object 
 11  Star1         4986 non-null   object 
 12  Star2         4986 non-null   object 
 13  Star3         4986 non-null   object 
 14  Star4         4986 non-null   object 
 15  NoOfVotes     4986 non-null   int64  
 16  Gross         4986 non-null   object 
dtypes: float64(3), int64(2), object(12)
memory usage: 662.3+ KB


In [None]:
df_genres=pd.read_sql_query("SELECT * FROM Genre", conn)
df_genres

Unnamed: 0,GenreId,GenreName
0,1,Drama
1,2,Crime
2,3,Action
3,4,Adventure
4,5,Biography
...,...,...
121,122,Music
122,123,Horror
123,124,Musical
124,125,Film-Noir
