## ETL Product Data Pipeline
In this notebook, we will build a robust ETL pipeline to process product data from an API. The pipeline will:
* Extract product data from API endpoint and return combined data from all pages as a pandas DataFrame.
* Transform the data to standardize column names, handle nested, duplicated or missing values, and calculate additional metric like discounted price.
* Load the cleaned data into a PostgreSQL database using a staging-to-main table pattern.
* Log all steps and errors for traceability and debugging.

### 1. Importing Required Libraries


In [1]:
import logging
import pandas as pd
import requests
from sqlalchemy import create_engine, text

### 2. Logging

In [2]:
logging.basicConfig(
    filename="etl_product_pipeline.log",
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    
)
logging.info("ETL pipeline started")

## 3. Defining Functions


### 3.1 Extract Function

* Fetches paginated data from REST API with configurable limits
* Extracts `products` key if exists, else uses full JSON response
* Stops early when total record count reached
* Skips empty pages, continues fetching
* Combines into single raw DataFrame

In [3]:
def extract(base_url, key="products", limit=30, max_pages=20):
    all_data = []
    
    with requests.Session() as session:
        for page in range(max_pages):
            url = f"{base_url}?limit={limit}&skip={page * limit}"
            try:
                response = session.get(url, timeout=10)
                response.raise_for_status()
                data = response.json()
                
                # Extract relevant data
                page_data = data.get(key, data)
                if not page_data:
                    logging.debug(f"Empty page {page}, continuing")
                    continue
                
                all_data.extend(page_data) 
                logging.info(f"Page {page}: {len(page_data)} records")
                
                # Stop if we've fetched all records
                if (total := data.get("total")) and len(all_data) >= total:
                    logging.info(f"All {total} records fetched")
                    break
                    
            except requests.RequestException as e:
                logging.error(f"Error on page {page}: {e}")
                break

    df = pd.DataFrame(all_data)
    logging.info(f"Extract completed: {len(df)} records fetched")
    return df


### 3.2 Transform Function
* Renames `discountPercentage` to `discount_percentage`.
* Explodes `reviews` into rows and extracts `reviewer_rating` and `reviewer_name`.
* Drops rows with missing critical fields and duplicate reviews.
* Validates `price` and `discount_percentage`, calculates `price_with_discount`.
* Converts column types and reorders for consistency.
* Returns a clean, standardized DataFrame.

In [4]:
def transform(df):
    # Copy DF to avoid modifying the original
    df = df.copy()

    # Rename the column
    df = df.rename(columns={"discountPercentage": "discount_percentage"})

    # Expand 'reviews' into multiple rows
    df = df.explode("reviews", ignore_index=True)

    # Extract rating and reviewer name
    df["reviewer_rating"] = df["reviews"].apply(lambda x: x.get("rating") if isinstance(x, dict) else None)
    df["reviewer_name"] = df["reviews"].apply(lambda x: x.get("reviewerName") if isinstance(x, dict) else None)

    df = df.drop(columns=["reviews"])
    
    # Drop invalid/missing or duplicated data 
    df = df.dropna(subset=["id", "title"], how="all")
    df = df.dropna(subset=["price", "reviewer_rating", "reviewer_name"])
    df = df.drop_duplicates(subset=["id", "reviewer_name"])

    df = df.loc[(df["price"] > 0) & (df["discount_percentage"].between(0, 100))]

    # Calculate discounted price
    df["price_with_discount"] = (df["price"] * (1 - df["discount_percentage"] / 100)).round(2)

    # Ensure correct data types
    df = df.astype({
        "price": float,
        "discount_percentage": float,
        "rating": float,
        "reviewer_rating": int,
        "reviewer_name": str,
        "price_with_discount": float
    })

    # Reorder and standardize columns
    df = df[["id", "title", "category", "price", "discount_percentage",
             "brand", "reviewer_rating", "reviewer_name", "price_with_discount"]] 
    
    df = df.reset_index(drop=True)

    return df
    


### 3.3 Load Function
* Loads DataFrame into a staging table temporarily.
* Inserts data from staging into the main table, updating existing rows on conflict (id + reviewer_name).
* Automatically sets `created_at` and `updated_at` timestamps.
* Uses transactions to ensure all-or-nothing data integrity.
* Optionally drops the staging table after load (default: True).

In [5]:
def load(df, sql_connection, table_name, drop_staging=True):
    schema = "etl_schema"
    staging_table = f"{table_name}_staging"

    #df to string for dynamic sql
    data_columns = ", ".join(df.columns)
    
    with sql_connection.begin() as conn:
        # Load to staging
        df.to_sql(staging_table, conn, if_exists="replace", index=False, schema=schema)
        logging.info(f"Loaded {len(df)} records to {schema}.{staging_table}")
        
        # Merge staging into main table with conflict handling
        merge_query = text(f"""
            INSERT INTO {schema}.{table_name} (
                {data_columns}, created_at, updated_at
            )
            SELECT {data_columns}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
            FROM {schema}.{staging_table}
            ON CONFLICT (id, reviewer_name)
            DO UPDATE SET reviewer_name = EXCLUDED.reviewer_name,
              reviewer_rating = EXCLUDED.reviewer_rating,
              price = EXCLUDED.price,
              discount_percentage = EXCLUDED.discount_percentage,
              price_with_discount = EXCLUDED.price_with_discount;
        """)
        conn.execute(merge_query)
        logging.info(f"Inserted data to {schema}.{table_name}")

        # Drop staging
        if drop_staging:
            conn.execute(text(f"DROP TABLE IF EXISTS {schema}.{staging_table}"))
            logging.info("Dropped staging table")


### 4. Usage 

In [6]:
# PostgreSQL Connection
engine = create_engine(
    'postgresql://etl_user:sifre@localhost:5432/etl_pipeline'  
)

try:
    # Extract
    df = extract("https://dummyjson.com/products")
    logging.info(f"Extract completed: {len(df)} raw records")
    
    # Transform
    df_t = transform(df)
    logging.info(f"Transform completed: {len(df_t)} clean records")
    
    # Load
    load(df_t, engine, table_name="products", drop_staging=True)
    logging.info("ETL Pipeline completed successfully!")
    
except Exception as e:
    logging.critical(f"Pipeline failed: {e}")
    raise
finally:
    engine.dispose()
    logging.info("Database connection closed")