# ETL: Amazon Products to PostgreSQL

This notebook loads Amazon Electronics metadata directly from `.gz` files into PostgreSQL.

**Supports:**
- `.jsonl.gz` - compressed files (reads directly, no unpacking needed)
- `.jsonl` - plain JSONL files

**Prerequisites:**
- PostgreSQL running via docker-compose (`make run-docker-compose`)
- Data file: `meta_Electronics.jsonl.gz` (or preprocessed version)

In [None]:
import json
import gzip
import psycopg2
from psycopg2.extras import execute_values
from dotenv import load_dotenv
import os
from pathlib import Path

load_dotenv("../../.env")

## Configuration

Set the path to your data file. Supports both `.jsonl` and `.jsonl.gz` formats.

In [None]:
DB_CONFIG = {
    "host": "0.0.0.0",
    "port": os.getenv("POSTGRES_PORT", "5432"),
    "dbname": os.getenv("POSTGRES_DB", "amazon_products"),
    "user": os.getenv("POSTGRES_USER", "bootcamp"),
    "password": os.getenv("POSTGRES_PASSWORD", "bootcamp"),
}

# === CONFIGURE YOUR DATA FILE HERE ===
# Option 1: Raw .gz file (full dataset ~2.7M items) - reads directly from compressed
# DATA_FILE = "../../data/meta_Electronics.jsonl.gz"

# Option 2: Preprocessed sample (1000 items)
DATA_FILE = "../../data/meta_Electronics_2022_2023_with_category_ratings_100_sample_1000.jsonl.gz"

# Filtering options (set to None to load all)
MIN_RATING_COUNT = 100  # Only products with 100+ ratings (set None to disable)
MIN_YEAR = 2022         # Only products from 2022+ (set None to disable)
MAX_ROWS = None         # Limit rows (set None for all, e.g., 10000 for testing)

print(f"Data file: {DATA_FILE}")
print(f"Filters: min_ratings={MIN_RATING_COUNT}, min_year={MIN_YEAR}, max_rows={MAX_ROWS}")

## Connect to PostgreSQL

In [None]:
DB_CONFIG

In [None]:
conn = psycopg2.connect(**DB_CONFIG)
conn.autocommit = True
cursor = conn.cursor()

cursor.execute("SELECT version();")
print(f"Connected to: {cursor.fetchone()[0]}")

## Create Products Table

Schema designed for SQL filtering in hybrid queries (price, rating, category filters).

In [None]:
CREATE_TABLE_SQL = """
DROP TABLE IF EXISTS products CASCADE;

CREATE TABLE products (
    asin VARCHAR(20) PRIMARY KEY,
    parent_asin VARCHAR(20),
    title TEXT,
    price DECIMAL(10, 2),
    average_rating DECIMAL(3, 2),
    rating_number INTEGER,
    main_category VARCHAR(100),
    store VARCHAR(255),
    description TEXT,
    features JSONB,
    created_at TIMESTAMP DEFAULT NOW()
);

-- Indexes for common query patterns
CREATE INDEX idx_products_rating ON products(average_rating);
CREATE INDEX idx_products_price ON products(price);
CREATE INDEX idx_products_category ON products(main_category);
CREATE INDEX idx_products_rating_number ON products(rating_number);
CREATE INDEX idx_products_parent_asin ON products(parent_asin);
"""

cursor.execute(CREATE_TABLE_SQL)
print("Table 'products' created with indexes.")

## Load Data from JSONL / JSONL.GZ

Supports both compressed and uncompressed files.

In [None]:
def open_file(filepath):
    """Open file, auto-detecting .gz compression."""
    if str(filepath).endswith('.gz'):
        return gzip.open(filepath, 'rt', encoding='utf-8')
    return open(filepath, 'r', encoding='utf-8')


def parse_price(price_val):
    """Extract numeric price from various formats."""
    if price_val is None:
        return None
    if isinstance(price_val, (int, float)):
        return float(price_val)
    if isinstance(price_val, str):
        cleaned = price_val.replace('$', '').replace(',', '').strip()
        try:
            return float(cleaned)
        except ValueError:
            return None
    return None


def extract_year(data):
    """Extract year from 'Date First Available' in details."""
    try:
        details = data.get('details', {})
        date_str = details.get('Date First Available', '')
        if date_str:
            return int(date_str[-4:])
    except (ValueError, TypeError, KeyError):
        pass
    return None


def should_include(data, min_rating_count=None, min_year=None):
    """Check if record passes filters."""
    if min_rating_count and (data.get('rating_number') or 0) < min_rating_count:
        return False
    if min_year:
        year = extract_year(data)
        if year and year < min_year:
            return False
    return True


def load_to_postgres(
    filepath, 
    cursor, 
    batch_size=500,
    min_rating_count=None,
    min_year=None,
    max_rows=None
):
    """Load JSONL/JSONL.GZ file into products table with filtering."""
    
    INSERT_SQL = """
        INSERT INTO products (
            asin, parent_asin, title, price, average_rating, 
            rating_number, main_category, store, description, features
        ) VALUES %s
        ON CONFLICT (asin) DO UPDATE SET
            title = EXCLUDED.title,
            price = EXCLUDED.price,
            average_rating = EXCLUDED.average_rating,
            rating_number = EXCLUDED.rating_number
    """
    
    batch = []
    total_loaded = 0
    total_processed = 0
    total_skipped = 0
    
    with open_file(filepath) as f:
        for line in f:
            total_processed += 1
            
            if total_processed % 100000 == 0:
                print(f"Processed {total_processed:,} lines, loaded {total_loaded:,}, skipped {total_skipped:,}")
            
            try:
                data = json.loads(line.strip())
            except json.JSONDecodeError:
                total_skipped += 1
                continue
            
            # Apply filters
            if not should_include(data, min_rating_count, min_year):
                total_skipped += 1
                continue
            
            # Use parent_asin as fallback if asin is missing
            asin = data.get('asin') or data.get('parent_asin')
            if not asin:
                total_skipped += 1
                continue
            
            # Extract description
            description = None
            if data.get('description'):
                if isinstance(data['description'], list):
                    description = ' '.join(str(d) for d in data['description'])
                else:
                    description = str(data['description'])
            
            # Prepare row
            row = (
                asin,
                data.get('parent_asin'),
                data.get('title'),
                parse_price(data.get('price')),
                data.get('average_rating'),
                data.get('rating_number'),
                data.get('main_category'),
                data.get('store'),
                description,
                json.dumps(data.get('features', [])),
            )
            batch.append(row)
            
            if len(batch) >= batch_size:
                execute_values(cursor, INSERT_SQL, batch)
                total_loaded += len(batch)
                batch = []
                
                if max_rows and total_loaded >= max_rows:
                    print(f"Reached max_rows limit: {max_rows}")
                    break
    
    # Insert remaining
    if batch:
        execute_values(cursor, INSERT_SQL, batch)
        total_loaded += len(batch)
    
    return total_loaded, total_processed, total_skipped

In [None]:
print(f"Loading from: {DATA_FILE}")
print(f"File exists: {Path(DATA_FILE).exists()}")
print()

loaded, processed, skipped = load_to_postgres(
    DATA_FILE, 
    cursor,
    min_rating_count=MIN_RATING_COUNT,
    min_year=MIN_YEAR,
    max_rows=MAX_ROWS
)

print()
print(f"=== ETL Complete ===")
print(f"Total processed: {processed:,}")
print(f"Total loaded: {loaded:,}")
print(f"Total skipped: {skipped:,}")

## Verify Data

In [None]:
cursor.execute("SELECT COUNT(*) FROM products;")
print(f"Total products in database: {cursor.fetchone()[0]:,}")

In [None]:
cursor.execute("""
    SELECT asin, title, price, average_rating, main_category 
    FROM products 
    LIMIT 5;
""")

for row in cursor.fetchall():
    print(f"ASIN: {row[0]}")
    title = row[1][:60] + '...' if row[1] and len(row[1]) > 60 else row[1]
    print(f"  Title: {title}")
    print(f"  Price: ${row[2]}, Rating: {row[3]}, Category: {row[4]}")
    print()

## Data Statistics

In [None]:
cursor.execute("""
    SELECT main_category, COUNT(*) as cnt 
    FROM products 
    GROUP BY main_category 
    ORDER BY cnt DESC
    LIMIT 20;
""")

print("Products by Category (top 20):")
for row in cursor.fetchall():
    print(f"  {row[0]}: {row[1]:,}")

In [None]:
cursor.execute("""
    SELECT 
        MIN(price) as min_price,
        MAX(price) as max_price,
        AVG(price) as avg_price,
        COUNT(*) FILTER (WHERE price IS NOT NULL) as with_price
    FROM products;
""")

row = cursor.fetchone()
print(f"Price Statistics:")
print(f"  Min: ${row[0]}, Max: ${row[1]}, Avg: ${row[2]:.2f}" if row[2] else "  No price data")
print(f"  Products with price: {row[3]:,}")

In [None]:
cursor.execute("""
    SELECT 
        MIN(average_rating) as min_rating,
        MAX(average_rating) as max_rating,
        AVG(average_rating) as avg_rating,
        MIN(rating_number) as min_reviews,
        MAX(rating_number) as max_reviews
    FROM products;
""")

row = cursor.fetchone()
print(f"Rating Statistics:")
print(f"  Rating range: {row[0]} - {row[1]}, Avg: {row[2]:.2f}")
print(f"  Review count range: {row[3]:,} - {row[4]:,}")

## Cleanup

In [None]:
cursor.close()
conn.close()
print("Database connection closed.")
print()
print("You can now delete the source .gz file if needed - data is safely in PostgreSQL!")