## PostgreSQL schema for movies, users, and watch sessions 

In [1]:
pip install pandas sqlalchemy psycopg2-binary

Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
import argparse
import pandas as pd
import psycopg2
from psycopg2 import sql
from io import StringIO
from datetime import datetime

In [3]:
API_KEY = "cb14257b9a7d0a1f02dc36ad0043b14a"
BASE_URL = "https://api.themoviedb.org/3"

In [8]:
def get_popular_movies(page=1):
    url = f"{BASE_URL}/movie/popular"
    params = {
        "api_key": API_KEY,
        "language": "en-US",
        "page": page
    }
    response = requests.get(url, params=params)

    if response.status_code == 200:
        data = response.json()
        return data.get("results", [])
    else:
        print(f"Error: {response.status_code}, {response.text}")
        return []

all_movies = []
for p in range(1, 350):
    movies = get_popular_movies(page=p) or []
    all_movies.extend(movies)

df = pd.DataFrame(all_movies)

df.head()

Unnamed: 0,adult,backdrop_path,genre_ids,id,original_language,original_title,overview,popularity,poster_path,release_date,title,video,vote_average,vote_count
0,False,/iZLqwEwUViJdSkGVjePGhxYzbDb.jpg,"[878, 53]",755898,en,War of the Worlds,Will Radford is a top analyst for Homeland Sec...,1106.8877,/yvirUYrva23IudARHn3mMGVxWqM.jpg,2025-07-29,War of the Worlds,False,4.248,408
1,False,/ZtcGMc204JsNqfjS9lU6udRgpo.jpg,"[28, 18]",911430,en,F1,Racing legend Sonny Hayes is coaxed out of ret...,812.4176,/9PXZIUsSDh4alB80jheWX4fhZmy.jpg,2025-06-25,F1,False,7.791,1481
2,False,/538U9snNc2fpnOmYXAPUh3zn31H.jpg,"[28, 12, 53]",575265,en,Mission: Impossible - The Final Reckoning,Ethan Hunt and team continue their search for ...,616.3361,/z53D72EAOxGRqdr7KXXWp9dJiDe.jpg,2025-05-17,Mission: Impossible - The Final Reckoning,False,7.241,1405
3,False,/eU7IfdWq8KQy0oNd4kKXS0QUR08.jpg,"[878, 12, 28]",1061474,en,Superman,"Superman, a journalist in Metropolis, embarks ...",625.4189,/ombsmhYUqR4qqOLOxAyr5V8hbyv.jpg,2025-07-09,Superman,False,7.562,2600
4,False,/xk0ck8qmYmevisTmphWIDm1g43p.jpg,"[53, 28, 35]",1151334,en,Eenie Meanie,A former teenage getaway driver gets dragged b...,585.1001,/12Va3oO3oYUdOd75zM57Nx1976a.jpg,2025-08-21,Eenie Meanie,False,6.742,64


In [9]:
df.shape

(6980, 14)

In [3]:
import os

os.environ["DATABASE_URL"] = "postgresql+psycopg2://USER:PASSWORD@HOST:5432/moviesdb"

In [5]:
"""
Clean ETL structure: Extract → Transform → Load into PostgreSQL
"""
# ---------------- Extract ----------------
def extract(input_path: str) -> pd.DataFrame:
    print(f"Reading input CSV: {input_path}")
    df = pd.read_csv(input_path)
    return df

# ---------------- Transform ----------------
def transform(df: pd.DataFrame) -> pd.DataFrame:
    print("Transforming data...")
    df = df.copy()
    # Standardize column names
    df.columns = [c.strip().lower().replace(' ', '_') for c in df.columns]
    # Trim string columns
    for col in df.select_dtypes(include=['object']).columns:
        df[col] = df[col].str.strip()
    # Add ingestion timestamp
    df['ingested_at'] = datetime.utcnow()
    return df

# --------------- Load ----------------
def load(df: pd.DataFrame, table: str, database_url: str):
    print("Loading to Postgres...")
    conn = psycopg2.connect(database_url)
    cur = conn.cursor()
    buffer = StringIO()
    df.to_csv(buffer, index=False, header=False)
    buffer.seek(0)

    columns = [sql.Identifier(c) for c in df.columns]
    copy_sql = sql.SQL("COPY {} ({}) FROM STDIN WITH CSV").format(
        sql.Identifier(table), sql.SQL(', ').join(columns)
    )

    try:
        cur.copy_expert(copy_sql.as_string(conn), buffer)
        conn.commit()
        print(f"Loaded {len(df)} rows into {table}")
    except Exception as e:
        conn.rollback()
        print("Error during load:", e)
        raise
    finally:
        cur.close()
        conn.close()

# ---------------- Orchestration ----------------
def run_etl(input_path: str, table: str, database_url: str, dry_run: bool = False):
    df = extract(input_path)
    df = transform(df)
    if dry_run:
        print("Dry-run mode: showing first 5 transformed rows, not loading:")
        print(df.head())
    else:
        load(df, table, database_url)
        print("ETL complete")

# ---------------- CLI ----------------
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Simple ETL to Postgres")
    parser.add_argument("--input-path", required=True, help="Path to input CSV file")
    parser.add_argument("--table", help="Target Postgres table name (not required in dry-run)")
    parser.add_argument("--db-url", help="Postgres connection URL (overrides env DATABASE_URL)")
    parser.add_argument("--dry-run", action="store_true", help="Run only extract+transform without loading")
    args = parser.parse_args()

    db_url = args.db_url or os.environ.get("DATABASE_URL")
    if not args.dry_run and not db_url:
        raise ValueError("No database connection string found. Set --db-url or DATABASE_URL env var.")

    if not args.dry_run and not args.table:
        raise ValueError("Target table is required unless running in dry-run mode.")

    run_etl(args.input_path, args.table, db_url, dry_run=args.dry_run)
 

usage: ipykernel_launcher.py [-h] --input-path INPUT_PATH [--table TABLE] [--db-url DB_URL]
                             [--dry-run]
ipykernel_launcher.py: error: the following arguments are required: --input-path


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
