# Streaming ETL Pipeline

This notebook executes the full ETL process using the project’s Python modules.

The workflow includes:

- **Extract:** load the local Netflix demo dataset  
- **Transform:** clean and normalize the movie entries  
- **Load:** apply SQL schema and insert data into Neon PostgreSQL  
- **Validate:** verify database content with SQL queries  

In [None]:
# ============================================================
# 1. INITIAL SETUP
# ============================================================

# Load environment variables and verify that database credentials are available.

import os
import pandas as pd
from dotenv import load_dotenv

load_dotenv()

DATABASE_URL = os.getenv("DATABASE_URL")

if not DATABASE_URL:
    raise ValueError("DATABASE_URL not found in .env")

DATABASE_URL
# Load environment variables and verify that database credentials are available.


In [None]:
# ============================================================
# 2. HELPER FUNCTION (RUN ETL SCRIPTS)
# ============================================================

# To keep the notebook clean and readable, we use a small utility that executes any ETL script by name.
#This mirrors real engineering notebooks, where orchestration remains minimal while the logic resides in separate modules.

import subprocess, sys, os

PROJECT_ROOT = os.path.dirname(os.getcwd())

def run(script):
    script_path = os.path.join(PROJECT_ROOT, "etl", script)
    print(f"\n▶ Running: {script_path}")

    result = subprocess.run(
        [sys.executable, script_path],
        capture_output=True,
        text=True,
        encoding="utf-8",
        errors="replace"
    )

    print(result.stdout)
    if result.stderr.strip():
        print("Errors:", result.stderr)


In [None]:
# ============================================================
# 3. TEST DATABASE CONNECTION
# ============================================================

#We begin by confirming that Neon PostgreSQL is reachable.  
#This ensures the ETL can run safely. 

run("test_connection.py")


In [None]:
# ============================================================
# 4. CLEAN & TRANSFORM DATASET
# ============================================================

#We clean the raw Netflix dataset and prepare the final structure used in the analytics pipeline.

run("clean_local_netflix_csv.py")


In [None]:
# ============================================================
# 5. RESET DATABASE SCHEMA
# ============================================================

#Before loading new data, we drop all existing tables to ensure a clean, reproducible ETL environment.

run("reset_schema.py")



In [None]:
# ============================================================
# 6. APPLY SQL SCHEMA (DDL)
# ============================================================

#We now create the SQL structure for the `movies` table using the schema defined in:

# ``sql/000_schema.sql``

run("apply_schema.py")


In [None]:
# ============================================================
# 7. LOAD CLEANED DATA INTO NEON
# ============================================================

#The cleaned dataset is inserted into the Neon PostgreSQL database using SQLAlchemy.

run("etl_load_movies.py")


In [None]:
# ============================================================
# 9. VALIDATE FINAL TABLE
# ============================================================

#Finally, we query the database to confirm that the `movies` table has been generated successfully.

from sqlalchemy import create_engine
import pandas as pd

engine = create_engine(DATABASE_URL)

df = pd.read_sql("SELECT * FROM movies LIMIT 20;", engine)
df


# CONCLUSION
The ETL pipeline executed successfully:

- Extract: dataset loaded from local CSV  
- Transform: cleaned into 7,973 normalized rows  
- Load: stored into Neon PostgreSQL  
- Validate: confirmed via SQL query  

This notebook demonstrates a clean, modular, and production-style ETL workflow.
