# Olist ETL Pipeline - Extract and Load to Supabase PostgreSQL

This notebook creates a one-time ETL pipeline to extract data from marketing_funnel.zip and sales.zip and load them into separate PostgreSQL schemas with proper foreign key relationships.

## Cell 1: Import Required Libraries

In [28]:
import pandas as pd
import psycopg2
import psycopg2.extras
import zipfile
import os
from io import StringIO
from dotenv import load_dotenv
import logging
from typing import Dict, List, Tuple

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

print("Libraries imported successfully!")

Libraries imported successfully!


## Cell 2: Database Configuration and Connection Setup

In [29]:
# Load environment variables
load_dotenv()

# Database connection parameters
USER = os.getenv("user")
PASSWORD = os.getenv("password")
HOST = os.getenv("host")
PORT = os.getenv("port")
DBNAME = os.getenv("dbname")

# # Create .env file template if it doesn't exist
# if not os.path.exists('.env'):
#     with open('.env', 'w') as f:
#         f.write("""# Database Configuration
# user=postgres
# password=ffrZS5YbNOJZjrOd
# host=db.pzykoxdiwsyclwfqfiii.supabase.co
# port=5432
# dbname=postgres
# """)
#     print("Created .env template file. Please update with your credentials.")
# else:
#     print("Environment variables loaded!")
#     print(f"Host: {HOST}")
#     print(f"Database: {DBNAME}")
#     print(f"User: {USER}")

In [30]:
PORT

'6543'

## Cell 3: Define Schema Names and Sales Table Structures

In [31]:
# Define schema mappings
sales_schema = "olist_sales_data_set"
marketing_schema = "olist_marketing_data_set"

# Define table structures for sales data
sales_tables = {
    'olist_customers_dataset': {
        'columns': {
            'customer_id': 'TEXT PRIMARY KEY',
            'customer_unique_id': 'TEXT',
            'customer_zip_code_prefix': 'INTEGER',
            'customer_city': 'TEXT',
            'customer_state': 'TEXT'
        }
    },
    'olist_geolocation_dataset': {
        'columns': {
            'geolocation_zip_code_prefix': 'INTEGER',
            'geolocation_lat': 'REAL',
            'geolocation_lng': 'REAL',
            'geolocation_city': 'TEXT',
            'geolocation_state': 'TEXT'
        }
    },
    'olist_order_items_dataset': {
        'columns': {
            'order_id': 'TEXT',
            'order_item_id': 'INTEGER',
            'product_id': 'TEXT',
            'seller_id': 'TEXT',
            'shipping_limit_date': 'TIMESTAMP',
            'price': 'REAL',
            'freight_value': 'REAL'
        }
    },
    'olist_order_payments_dataset': {
        'columns': {
            'order_id': 'TEXT',
            'payment_sequential': 'INTEGER',
            'payment_type': 'TEXT',
            'payment_installments': 'INTEGER',
            'payment_value': 'REAL'
        }
    },
    'olist_order_reviews_dataset': {
        'columns': {
            'review_id': 'TEXT PRIMARY KEY',
            'order_id': 'TEXT',
            'review_score': 'INTEGER',
            'review_comment_title': 'TEXT',
            'review_comment_message': 'TEXT',
            'review_creation_date': 'TIMESTAMP',
            'review_answer_timestamp': 'TIMESTAMP'
        }
    },
    'olist_orders_dataset': {
        'columns': {
            'order_id': 'TEXT PRIMARY KEY',
            'customer_id': 'TEXT',
            'order_status': 'TEXT',
            'order_purchase_timestamp': 'TIMESTAMP',
            'order_approved_at': 'TIMESTAMP',
            'order_delivered_carrier_date': 'TIMESTAMP',
            'order_delivered_customer_date': 'TIMESTAMP',
            'order_estimated_delivery_date': 'TIMESTAMP'
        }
    },
    'olist_products_dataset': {
        'columns': {
            'product_id': 'TEXT PRIMARY KEY',
            'product_category_name': 'TEXT',
            'product_name_lenght': 'INTEGER',
            'product_description_lenght': 'INTEGER',
            'product_photos_qty': 'INTEGER',
            'product_weight_g': 'INTEGER',
            'product_length_cm': 'INTEGER',
            'product_height_cm': 'INTEGER',
            'product_width_cm': 'INTEGER'
        }
    },
    'olist_sellers_dataset': {
        'columns': {
            'seller_id': 'TEXT PRIMARY KEY',
            'seller_zip_code_prefix': 'INTEGER',
            'seller_city': 'TEXT',
            'seller_state': 'TEXT'
        }
    },
    'product_category_name_translation': {
        'columns': {
            'product_category_name': 'TEXT PRIMARY KEY',
            'product_category_name_english': 'TEXT'
        }
    }
}

print(f"Defined {len(sales_tables)} sales tables")

Defined 9 sales tables


## Cell 4: Define Marketing Tables and Foreign Keys

In [32]:
# Define table structures for marketing data
marketing_tables = {
    'olist_closed_deals_dataset': {
        'columns': {
            'mql_id': 'TEXT',
            'seller_id': 'TEXT',
            'sdr_id': 'TEXT',
            'sr_id': 'TEXT',
            'won_date': 'DATE',
            'business_segment': 'TEXT',
            'lead_type': 'TEXT',
            'lead_behaviour_profile': 'TEXT',
            'has_company': 'BOOLEAN',
            'has_gtin': 'BOOLEAN',
            'average_stock': 'TEXT',
            'business_type': 'TEXT',
            'declared_product_catalog_size': 'REAL',
            'declared_monthly_revenue': 'REAL'
        }
    },
    'olist_marketing_qualified_leads_dataset': {
        'columns': {
            'mql_id': 'TEXT PRIMARY KEY',
            'first_contact_date': 'DATE',
            'landing_page_id': 'TEXT',
            'origin': 'TEXT'
        }
    }
}

# Define foreign key relationships
foreign_keys = {
    'sales': [
        ('olist_orders_dataset', 'customer_id', 'olist_customers_dataset', 'customer_id'),
        ('olist_order_items_dataset', 'order_id', 'olist_orders_dataset', 'order_id'),
        ('olist_order_items_dataset', 'product_id', 'olist_products_dataset', 'product_id'),
        ('olist_order_items_dataset', 'seller_id', 'olist_sellers_dataset', 'seller_id'),
        ('olist_order_payments_dataset', 'order_id', 'olist_orders_dataset', 'order_id'),
        ('olist_order_reviews_dataset', 'order_id', 'olist_orders_dataset', 'order_id'),
        ('olist_products_dataset', 'product_category_name', 'product_category_name_translation', 'product_category_name')
    ],
    'marketing': [
        ('olist_closed_deals_dataset', 'mql_id', 'olist_marketing_qualified_leads_dataset', 'mql_id')
    ],
    'cross_schema': [
        ('olist_marketing_data_set.olist_closed_deals_dataset', 'seller_id', 'olist_sales_data_set.olist_sellers_dataset', 'seller_id')
    ]
}

print(f"Defined {len(marketing_tables)} marketing tables and {sum(len(v) for v in foreign_keys.values())} foreign key relationships")

Defined 2 marketing tables and 9 foreign key relationships


## Cell 5: Database Connection and Schema Creation Functions

In [33]:
def connect_database():
    """Establish connection to PostgreSQL database."""
    try:
        connection = psycopg2.connect(
            user=USER,
            password=PASSWORD,
            host=HOST,
            port=PORT,
            database=DBNAME
        )
        logger.info("Database connection successful!")
        return connection
    except Exception as e:
        logger.error(f"Failed to connect to database: {e}")
        return None

def create_schemas(connection):
    """Create schemas for sales and marketing data."""
    try:
        cursor = connection.cursor()
        cursor.execute(f"CREATE SCHEMA IF NOT EXISTS {sales_schema}")
        cursor.execute(f"CREATE SCHEMA IF NOT EXISTS {marketing_schema}")
        connection.commit()
        cursor.close()
        logger.info("Schemas created successfully!")
        return True
    except Exception as e:
        logger.error(f"Failed to create schemas: {e}")
        return False

print("Database connection functions defined!")

Database connection functions defined!


## Cell 6: Table Creation and Data Processing Functions

In [34]:
def check_table_exists(connection, schema: str, table_name: str) -> bool:
    """Check if a table exists in the specified schema."""
    try:
        cursor = connection.cursor()
        check_query = """
        SELECT EXISTS (
            SELECT FROM information_schema.tables 
            WHERE table_schema = %s AND table_name = %s
        )
        """
        cursor.execute(check_query, (schema, table_name))
        exists = cursor.fetchone()[0]
        cursor.close()
        return exists
    except Exception as e:
        logger.error(f"Failed to check if table exists: {e}")
        return False

def get_user_choice_for_existing_table(schema: str, table_name: str) -> str:
    """Get user input for handling existing tables."""
    print(f"\n⚠️  Table {schema}.{table_name} already exists!")
    print("Choose an option:")
    print("1. DROP - Drop the existing table and recreate (data will be lost)")
    print("2. TRUNCATE - Keep table structure but clear all data")
    print("3. SKIP - Skip processing this table (keep existing data)")
    print("4. DROP ALL - Drop all existing tables without asking again")
    print("5. TRUNCATE ALL - Truncate all existing tables without asking again")
    print("6. SKIP ALL - Skip all existing tables without asking again")
    
    while True:
        choice = input("Enter your choice (1-6): ").strip()
        if choice in ['1', '2', '3', '4', '5', '6']:
            return choice
        print("Invalid choice. Please enter 1, 2, 3, 4, 5, or 6.")

def create_table(connection, schema: str, table_name: str, columns: Dict[str, str], 
                user_choice_cache: Dict[str, str] = None) -> bool:
    """Create a table with specified columns, handling existing tables based on user choice."""
    try:
        # Initialize cache if not provided
        if user_choice_cache is None:
            user_choice_cache = {}
        
        # Check if table exists
        table_exists = check_table_exists(connection, schema, table_name)
        
        if table_exists:
            # Check if we have a cached choice for all tables
            if 'all_choice' in user_choice_cache:
                choice = user_choice_cache['all_choice']
            else:
                # Get user choice for this table
                choice = get_user_choice_for_existing_table(schema, table_name)
                
                # Cache the choice if it's an "ALL" option
                if choice in ['4', '5', '6']:
                    user_choice_cache['all_choice'] = choice
            
            cursor = connection.cursor()
            
            if choice in ['1', '4']:  # DROP or DROP ALL
                logger.info(f"Dropping existing table {schema}.{table_name}")
                cursor.execute(f"DROP TABLE IF EXISTS {schema}.{table_name} CASCADE")
                
            elif choice in ['2', '5']:  # TRUNCATE or TRUNCATE ALL
                logger.info(f"Truncating existing table {schema}.{table_name}")
                cursor.execute(f"TRUNCATE TABLE {schema}.{table_name} CASCADE")
                connection.commit()
                cursor.close()
                logger.info(f"Table {schema}.{table_name} truncated successfully!")
                return True
                
            elif choice in ['3', '6']:  # SKIP or SKIP ALL
                logger.info(f"Skipping existing table {schema}.{table_name}")
                cursor.close()
                return True
            
            cursor.close()
        
        # Create the table (either it didn't exist or user chose to drop it)
        cursor = connection.cursor()
        columns_def = ', '.join([f"{col} {dtype}" for col, dtype in columns.items()])
        create_query = f"CREATE TABLE {schema}.{table_name} ({columns_def})"
        cursor.execute(create_query)
        connection.commit()
        cursor.close()
        logger.info(f"Table {schema}.{table_name} created successfully!")
        return True
        
    except Exception as e:
        logger.error(f"Failed to create table {schema}.{table_name}: {e}")
        return False

print("Table creation functions defined!")

Table creation functions defined!


## Cell 7: Data Cleaning and Processing Functions

In [35]:
def advanced_clean_dataframe(df: pd.DataFrame, column_definitions: Dict[str, str], table_name: str) -> pd.DataFrame:
    """Advanced cleaning function for DataFrames with comprehensive data quality handling."""
    try:
        df_cleaned = df.copy()
        logger.info(f"Starting advanced cleaning for {table_name} with {len(df_cleaned)} rows")
        
        # Handle timestamp columns
        timestamp_columns = [col for col, dtype in column_definitions.items() 
                           if 'TIMESTAMP' in dtype.upper() or 'DATE' in dtype.upper()]
        
        for col in timestamp_columns:
            if col in df_cleaned.columns:
                # Convert string 'NaT' to actual NaT
                df_cleaned[col] = df_cleaned[col].replace('NaT', pd.NaT)
                
                # Handle various date formats
                try:
                    df_cleaned[col] = pd.to_datetime(df_cleaned[col], errors='coerce')
                except Exception as e:
                    logger.warning(f"Date conversion issue in {col}: {e}")
                    df_cleaned[col] = pd.to_datetime(df_cleaned[col], errors='coerce')
        
        # Handle boolean columns
        boolean_columns = [col for col, dtype in column_definitions.items() 
                         if 'BOOLEAN' in dtype.upper()]
        
        for col in boolean_columns:
            if col in df_cleaned.columns:
                # Convert to proper boolean values
                df_cleaned[col] = df_cleaned[col].astype(str).str.lower()
                df_cleaned[col] = df_cleaned[col].map({
                    'true': True, 't': True, '1': True, 'yes': True, 'y': True,
                    'false': False, 'f': False, '0': False, 'no': False, 'n': False
                })
        
        # Handle numeric columns with potential string values
        numeric_columns = [col for col, dtype in column_definitions.items() 
                         if any(num_type in dtype.upper() for num_type in ['INTEGER', 'REAL', 'FLOAT'])]
        
        for col in numeric_columns:
            if col in df_cleaned.columns:
                # Convert to numeric, handling errors
                df_cleaned[col] = pd.to_numeric(df_cleaned[col], errors='coerce')
        
        # Remove duplicates for tables with primary keys
        primary_key_columns = [col for col, dtype in column_definitions.items() 
                             if 'PRIMARY KEY' in dtype.upper()]
        
        if primary_key_columns and not table_name == 'olist_geolocation_dataset':
            # Don't deduplicate geolocation as per user request
            initial_count = len(df_cleaned)
            df_cleaned = df_cleaned.drop_duplicates(subset=primary_key_columns, keep='first')
            final_count = len(df_cleaned)
            
            if initial_count != final_count:
                logger.info(f"Removed {initial_count - final_count} duplicate rows based on primary key(s): {primary_key_columns}")
        
        # Clean text columns - remove extra whitespace
        text_columns = [col for col, dtype in column_definitions.items() 
                       if 'TEXT' in dtype.upper()]
        
        for col in text_columns:
            if col in df_cleaned.columns:
                df_cleaned[col] = df_cleaned[col].astype(str).str.strip()
                df_cleaned[col] = df_cleaned[col].replace('nan', None)
        
        logger.info(f"Advanced cleaning completed for {table_name}. Final rows: {len(df_cleaned)}")
        return df_cleaned
        
    except Exception as e:
        logger.error(f"Error in advanced cleaning for {table_name}: {e}")
        return df

def insert_dataframe(connection, df: pd.DataFrame, schema: str, table_name: str, batch_size: int = 1000):
    """Insert DataFrame into PostgreSQL table with batch processing and error handling."""
    try:
        if df.empty:
            logger.warning(f"No data to insert for {schema}.{table_name}")
            return
        
        cursor = connection.cursor()
        
        # Get column names from DataFrame
        columns = list(df.columns)
        placeholders = ', '.join(['%s'] * len(columns))
        
        # Create insert query
        insert_query = f"""
        INSERT INTO {schema}.{table_name} ({', '.join(columns)})
        VALUES ({placeholders})
        ON CONFLICT DO NOTHING
        """
        
        # Convert DataFrame to list of tuples for batch insert
        total_rows = len(df)
        inserted_rows = 0
        
        logger.info(f"Inserting {total_rows} rows into {schema}.{table_name} in batches of {batch_size}")
        
        for i in range(0, total_rows, batch_size):
            batch_df = df.iloc[i:i + batch_size]
            
            # Convert to records and handle NaN/None values
            records = []
            for _, row in batch_df.iterrows():
                record = []
                for value in row:
                    if pd.isna(value):
                        record.append(None)
                    elif isinstance(value, (pd.Timestamp, pd.NaT.__class__)):
                        record.append(value if not pd.isna(value) else None)
                    else:
                        record.append(value)
                records.append(tuple(record))
            
            try:
                cursor.executemany(insert_query, records)
                connection.commit()
                inserted_rows += len(records)
                
                if i + batch_size < total_rows:
                    logger.info(f"Inserted {inserted_rows}/{total_rows} rows...")
                    
            except Exception as batch_error:
                connection.rollback()
                logger.error(f"Error inserting batch {i}-{i+batch_size} for {table_name}: {batch_error}")
                
                # Try inserting records one by one to identify problematic rows
                for j, record in enumerate(records):
                    try:
                        cursor.execute(insert_query, record)
                        connection.commit()
                        inserted_rows += 1
                    except Exception as row_error:
                        connection.rollback()
                        logger.warning(f"Skipped problematic row {i+j} in {table_name}: {row_error}")
        
        cursor.close()
        logger.info(f"✅ Successfully inserted {inserted_rows}/{total_rows} rows into {schema}.{table_name}")
        
    except Exception as e:
        logger.error(f"Failed to insert data into {schema}.{table_name}: {e}")

def extract_and_load_zip(connection, zip_path: str, schema: str, table_definitions: Dict):
    """Extract CSV files from zip and load into database with enhanced error handling and user choice."""
    try:
        # Initialize user choice cache to remember "ALL" decisions
        user_choice_cache = {}
        
        with zipfile.ZipFile(zip_path, 'r') as zip_file:
            csv_files = [f for f in zip_file.namelist() if f.endswith('.csv')]
            
            logger.info(f"Found {len(csv_files)} CSV files to process in {zip_path}")
            
            for csv_file in csv_files:
                table_name = csv_file.replace('.csv', '')
                
                if table_name in table_definitions:
                    logger.info(f"Processing {csv_file}...")
                    
                    try:
                        # Create table with user choice handling
                        table_created = create_table(
                            connection, 
                            schema, 
                            table_name, 
                            table_definitions[table_name]['columns'],
                            user_choice_cache
                        )
                        
                        # If table was skipped, continue to next table
                        if not table_created:
                            logger.warning(f"Skipped table creation for {table_name}")
                            continue
                        
                        # Check if user chose to skip this table
                        if ('all_choice' in user_choice_cache and user_choice_cache['all_choice'] in ['3', '6']):
                            logger.info(f"Skipping data loading for {table_name} (user choice)")
                            continue
                        
                        # Read CSV data
                        with zip_file.open(csv_file) as file:
                            df = pd.read_csv(file)
                            
                        logger.info(f"Read {len(df)} rows from {csv_file}")
                        
                        # Enhanced data cleaning and preparation
                        df_cleaned = advanced_clean_dataframe(df, table_definitions[table_name]['columns'], table_name)
                        
                        logger.info(f"After cleaning: {len(df_cleaned)} rows for {table_name}")
                        
                        # Insert data (only if table wasn't just truncated or we're not skipping)
                        if ('all_choice' not in user_choice_cache or 
                            user_choice_cache['all_choice'] not in ['3', '6']):
                            insert_dataframe(connection, df_cleaned, schema, table_name)
                        
                        logger.info(f"✅ Successfully processed {table_name}")
                        
                    except Exception as table_error:
                        logger.error(f"❌ Failed to process table {table_name}: {table_error}")
                        # Continue with other tables instead of stopping entire process
                        continue
                        
                else:
                    logger.warning(f"Table definition not found for {table_name}")
                    
        logger.info(f"🎉 Completed processing {schema} schema!")
        
    except Exception as e:
        logger.error(f"Failed to extract and load zip {zip_path}: {e}")

def get_user_choice_for_orphaned_records(orphan_count: int, description: str) -> str:
    """Get user input for handling orphaned records that violate foreign key constraints."""
    print(f"\n⚠️  Found {orphan_count} orphaned records: {description}")
    print("Choose an option:")
    print("1. REMOVE - Remove orphaned records (recommended)")
    print("2. SKIP - Skip creating this foreign key constraint")
    
    while True:
        choice = input("Enter your choice (1-2): ").strip()
        if choice in ['1', '2']:
            return choice
        print("Invalid choice. Please enter 1 or 2.")

def validate_foreign_keys(connection):
    """Validate foreign key relationships and fix data integrity issues."""
    try:
        cursor = connection.cursor()
        
        # Check and fix missing product categories
        logger.info("🔍 Checking product category references...")
        
        # Find missing categories in translation table
        check_query = """
        SELECT DISTINCT p.product_category_name 
        FROM olist_sales_data_set.olist_products_dataset p
        LEFT JOIN olist_sales_data_set.product_category_name_translation t
        ON p.product_category_name = t.product_category_name
        WHERE t.product_category_name IS NULL 
        AND p.product_category_name IS NOT NULL
        """
        
        cursor.execute(check_query)
        missing_categories = cursor.fetchall()
        
        if missing_categories:
            logger.info(f"Found {len(missing_categories)} missing product categories")
            
            # Add missing categories to translation table
            for (category,) in missing_categories:
                insert_query = """
                INSERT INTO olist_sales_data_set.product_category_name_translation 
                (product_category_name, product_category_name_english) 
                VALUES (%s, %s)
                ON CONFLICT (product_category_name) DO NOTHING
                """
                # Use the Portuguese name as English translation for missing entries
                cursor.execute(insert_query, (category, category))
                logger.info(f"Added missing category: {category}")
            
            connection.commit()
            logger.info("✅ Fixed missing product categories")
        else:
            logger.info("✅ All product categories have translations")
        
        # Check marketing schema relationships
        logger.info("🔍 Checking marketing schema references...")
        
        # Find orphaned closed deals (mql_id not in qualified leads)
        orphan_query = """
        SELECT COUNT(*) 
        FROM olist_marketing_data_set.olist_closed_deals_dataset cd
        LEFT JOIN olist_marketing_data_set.olist_marketing_qualified_leads_dataset mql
        ON cd.mql_id = mql.mql_id
        WHERE mql.mql_id IS NULL AND cd.mql_id IS NOT NULL
        """
        
        cursor.execute(orphan_query)
        orphan_count = cursor.fetchone()[0]
        
        if orphan_count > 0:
            choice = get_user_choice_for_orphaned_records(
                orphan_count, 
                "marketing deals with mql_id not in qualified leads table"
            )
            
            if choice == '1':  # Remove orphaned records
                delete_query = """
                DELETE FROM olist_marketing_data_set.olist_closed_deals_dataset
                WHERE mql_id NOT IN (
                    SELECT mql_id FROM olist_marketing_data_set.olist_marketing_qualified_leads_dataset
                    WHERE mql_id IS NOT NULL
                ) AND mql_id IS NOT NULL
                """
                cursor.execute(delete_query)
                connection.commit()
                logger.info(f"✅ Removed {orphan_count} orphaned marketing deals")
            else:
                logger.info("⚠️ Skipping marketing FK constraint due to orphaned records")
                # Store flag to skip this FK
                cursor.execute("CREATE TEMP TABLE IF NOT EXISTS skip_fks (fk_name TEXT)")
                cursor.execute("INSERT INTO skip_fks VALUES ('marketing_mql_fk')")
                connection.commit()
        else:
            logger.info("✅ All marketing deals have valid MQL references")
        
        # Check cross-schema relationships (marketing seller_id -> sales seller_id)
        logger.info("🔍 Checking cross-schema seller references...")
        
        cross_schema_query = """
        SELECT COUNT(*) 
        FROM olist_marketing_data_set.olist_closed_deals_dataset cd
        LEFT JOIN olist_sales_data_set.olist_sellers_dataset s
        ON cd.seller_id = s.seller_id
        WHERE s.seller_id IS NULL AND cd.seller_id IS NOT NULL
        """
        
        cursor.execute(cross_schema_query)
        missing_sellers = cursor.fetchone()[0]
        
        if missing_sellers > 0:
            choice = get_user_choice_for_orphaned_records(
                missing_sellers, 
                "marketing deals with seller_id not in sales sellers table"
            )
            
            if choice == '1':  # Remove orphaned records
                delete_query = """
                DELETE FROM olist_marketing_data_set.olist_closed_deals_dataset
                WHERE seller_id NOT IN (
                    SELECT seller_id FROM olist_sales_data_set.olist_sellers_dataset
                    WHERE seller_id IS NOT NULL
                ) AND seller_id IS NOT NULL
                """
                cursor.execute(delete_query)
                rows_deleted = cursor.rowcount
                connection.commit()
                logger.info(f"✅ Removed {rows_deleted} marketing deals with invalid seller_ids")
            else:
                logger.info("⚠️ Skipping cross-schema FK constraint due to orphaned records")
                # Store flag to skip this FK
                cursor.execute("CREATE TEMP TABLE IF NOT EXISTS skip_fks (fk_name TEXT)")
                cursor.execute("INSERT INTO skip_fks VALUES ('cross_schema_seller_fk')")
                connection.commit()
        else:
            logger.info("✅ All marketing deals reference valid sellers")
        
        cursor.close()
        return True
        
    except Exception as e:
        logger.error(f"Failed to validate foreign keys: {e}")
        return False

def should_skip_fk(connection, fk_identifier: str) -> bool:
    """Check if a foreign key should be skipped based on user choices during validation."""
    try:
        cursor = connection.cursor()
        cursor.execute("SELECT COUNT(*) FROM skip_fks WHERE fk_name = %s", (fk_identifier,))
        should_skip = cursor.fetchone()[0] > 0
        cursor.close()
        return should_skip
    except:
        return False

def create_foreign_keys(connection):
    """Create foreign key constraints with enhanced error handling and validation."""
    try:
        # First validate and fix data integrity issues
        logger.info("🔧 Validating and fixing data integrity...")
        if not validate_foreign_keys(connection):
            logger.error("Data validation failed, skipping FK creation")
            return
        
        # Sales schema foreign keys - each in its own transaction
        logger.info("Creating sales schema foreign keys...")
        
        sales_fks = [
            ('olist_orders_dataset', 'customer_id', 'olist_customers_dataset', 'customer_id'),
            ('olist_order_items_dataset', 'order_id', 'olist_orders_dataset', 'order_id'),
            ('olist_order_items_dataset', 'product_id', 'olist_products_dataset', 'product_id'),
            ('olist_order_items_dataset', 'seller_id', 'olist_sellers_dataset', 'seller_id'),
            ('olist_order_payments_dataset', 'order_id', 'olist_orders_dataset', 'order_id'),
            ('olist_order_reviews_dataset', 'order_id', 'olist_orders_dataset', 'order_id'),
            ('olist_products_dataset', 'product_category_name', 'product_category_name_translation', 'product_category_name'),
            # Add geolocation foreign keys
            ('olist_customers_dataset', 'customer_zip_code_prefix', 'olist_geolocation_dataset', 'geolocation_zip_code_prefix'),
            ('olist_sellers_dataset', 'seller_zip_code_prefix', 'olist_geolocation_dataset', 'geolocation_zip_code_prefix')
        ]
        
        for parent_table, parent_col, child_table, child_col in sales_fks:
            try:
                cursor = connection.cursor()
                fk_name = f"fk_{parent_table}_{child_table}_{parent_col}"
                alter_query = f"ALTER TABLE {sales_schema}.{parent_table} ADD CONSTRAINT {fk_name} FOREIGN KEY ({parent_col}) REFERENCES {sales_schema}.{child_table}({child_col})"
                cursor.execute(alter_query)
                connection.commit()
                cursor.close()
                logger.info(f"✅ Created FK: {fk_name}")
            except Exception as e:
                connection.rollback()
                if cursor:
                    cursor.close()
                logger.warning(f"⚠️ Failed to create FK {fk_name}: {e}")
        
        # Marketing schema foreign keys
        logger.info("Creating marketing schema foreign keys...")
        
        if not should_skip_fk(connection, 'marketing_mql_fk'):
            marketing_fks = [
                ('olist_closed_deals_dataset', 'mql_id', 'olist_marketing_qualified_leads_dataset', 'mql_id')
            ]
            
            for parent_table, parent_col, child_table, child_col in marketing_fks:
                try:
                    cursor = connection.cursor()
                    fk_name = f"fk_{parent_table}_{child_table}_{parent_col}"
                    alter_query = f"ALTER TABLE {marketing_schema}.{parent_table} ADD CONSTRAINT {fk_name} FOREIGN KEY ({parent_col}) REFERENCES {marketing_schema}.{child_table}({child_col})"
                    cursor.execute(alter_query)
                    connection.commit()
                    cursor.close()
                    logger.info(f"✅ Created FK: {fk_name}")
                except Exception as e:
                    connection.rollback()
                    if cursor:
                        cursor.close()
                    logger.warning(f"⚠️ Failed to create FK {fk_name}: {e}")
        else:
            logger.info("⏭️ Skipped marketing schema FK (user choice)")
        
        # Cross-schema foreign keys
        logger.info("Creating cross-schema foreign keys...")
        
        if not should_skip_fk(connection, 'cross_schema_seller_fk'):
            cross_schema_fks = [
                ('olist_marketing_data_set.olist_closed_deals_dataset', 'seller_id', 'olist_sales_data_set.olist_sellers_dataset', 'seller_id')
            ]
            
            for parent_table, parent_col, child_table, child_col in cross_schema_fks:
                try:
                    cursor = connection.cursor()
                    fk_name = f"fk_cross_{parent_table.split('.')[-1]}_{child_table.split('.')[-1]}_{parent_col}"
                    alter_query = f"ALTER TABLE {parent_table} ADD CONSTRAINT {fk_name} FOREIGN KEY ({parent_col}) REFERENCES {child_table}({child_col})"
                    cursor.execute(alter_query)
                    connection.commit()
                    cursor.close()
                    logger.info(f"✅ Created cross-schema FK: {fk_name}")
                except Exception as e:
                    connection.rollback()
                    if cursor:
                        cursor.close()
                    logger.warning(f"⚠️ Failed to create cross-schema FK {fk_name}: {e}")
        else:
            logger.info("⏭️ Skipped cross-schema FK (user choice)")
        
        logger.info("🎉 Foreign key creation process completed!")
        
    except Exception as e:
        logger.error(f"Failed in foreign key creation process: {e}")

print("Enhanced ETL processing and foreign key functions with geolocation FKs and orphaned record handling defined!")

Enhanced ETL processing and foreign key functions with geolocation FKs and orphaned record handling defined!


## Cell 8: Establish Database Connection

In [36]:
# Establish database connection
print("🔌 Connecting to database...")
connection = connect_database()

if connection:
    print("✅ Database connection established!")
    print(f"Connected to: {HOST}:{PORT}/{DBNAME}")
else:
    print("❌ Failed to establish database connection!")
    print("Please check your .env file and database credentials.")

🔌 Connecting to database...


2025-06-11 23:02:31,951 - INFO - Database connection successful!


✅ Database connection established!
Connected to: aws-0-eu-west-1.pooler.supabase.com:6543/postgres


## Cell 9: Create Database Schemas

In [37]:
if connection:
    success = create_schemas(connection)
    if success:
        print("✅ Schemas created successfully!")
    else:
        print("❌ Failed to create schemas!")
else:
    print("❌ No database connection available!")

2025-06-11 23:02:32,584 - INFO - Schemas created successfully!


✅ Schemas created successfully!


## Cell 10: Load Sales Data

In [38]:
# Process sales data
sales_zip_path = "../Resources/data/sales.zip"

if os.path.exists(sales_zip_path) and connection:
    logger.info("Processing sales data...")
    extract_and_load_zip(connection, sales_zip_path, sales_schema, sales_tables)
    print("✅ Sales data processing completed!")
else:
    if not os.path.exists(sales_zip_path):
        print(f"❌ Sales zip file not found: {sales_zip_path}")
    if not connection:
        print("❌ No database connection available!")

2025-06-11 23:02:32,596 - INFO - Processing sales data...
2025-06-11 23:02:32,603 - INFO - Found 9 CSV files to process in ../Resources/data/sales.zip
2025-06-11 23:02:32,604 - INFO - Processing olist_customers_dataset.csv...



⚠️  Table olist_sales_data_set.olist_customers_dataset already exists!
Choose an option:
1. DROP - Drop the existing table and recreate (data will be lost)
2. TRUNCATE - Keep table structure but clear all data
3. SKIP - Skip processing this table (keep existing data)
4. DROP ALL - Drop all existing tables without asking again
5. TRUNCATE ALL - Truncate all existing tables without asking again
6. SKIP ALL - Skip all existing tables without asking again


2025-06-11 23:02:42,160 - INFO - Skipping existing table olist_sales_data_set.olist_customers_dataset
2025-06-11 23:02:42,161 - INFO - Skipping data loading for olist_customers_dataset (user choice)
2025-06-11 23:02:42,162 - INFO - Processing olist_geolocation_dataset.csv...
2025-06-11 23:02:42,381 - INFO - Skipping existing table olist_sales_data_set.olist_geolocation_dataset
2025-06-11 23:02:42,382 - INFO - Skipping data loading for olist_geolocation_dataset (user choice)
2025-06-11 23:02:42,383 - INFO - Processing olist_order_items_dataset.csv...
2025-06-11 23:02:42,591 - INFO - Skipping existing table olist_sales_data_set.olist_order_items_dataset
2025-06-11 23:02:42,592 - INFO - Skipping data loading for olist_order_items_dataset (user choice)
2025-06-11 23:02:42,593 - INFO - Processing olist_order_payments_dataset.csv...
2025-06-11 23:02:42,743 - INFO - Skipping existing table olist_sales_data_set.olist_order_payments_dataset
2025-06-11 23:02:42,744 - INFO - Skipping data loading

✅ Sales data processing completed!


## Cell 11: Load Marketing Data

In [39]:
# Process marketing data
marketing_zip_path = "../Resources/data/marketing_funnel.zip"

if os.path.exists(marketing_zip_path) and connection:
    logger.info("Processing marketing data...")
    extract_and_load_zip(connection, marketing_zip_path, marketing_schema, marketing_tables)
    print("✅ Marketing data processing completed!")
else:
    if not os.path.exists(marketing_zip_path):
        print(f"❌ Marketing zip file not found: {marketing_zip_path}")
    if not connection:
        print("❌ No database connection available!")

2025-06-11 23:02:43,539 - INFO - Processing marketing data...
2025-06-11 23:02:43,541 - INFO - Found 2 CSV files to process in ../Resources/data/marketing_funnel.zip
2025-06-11 23:02:43,543 - INFO - Processing olist_closed_deals_dataset.csv...



⚠️  Table olist_marketing_data_set.olist_closed_deals_dataset already exists!
Choose an option:
1. DROP - Drop the existing table and recreate (data will be lost)
2. TRUNCATE - Keep table structure but clear all data
3. SKIP - Skip processing this table (keep existing data)
4. DROP ALL - Drop all existing tables without asking again
5. TRUNCATE ALL - Truncate all existing tables without asking again
6. SKIP ALL - Skip all existing tables without asking again


2025-06-11 23:03:04,132 - INFO - Dropping existing table olist_marketing_data_set.olist_closed_deals_dataset
2025-06-11 23:03:04,844 - INFO - Table olist_marketing_data_set.olist_closed_deals_dataset created successfully!
2025-06-11 23:03:05,044 - INFO - Read 842 rows from olist_closed_deals_dataset.csv
2025-06-11 23:03:05,046 - INFO - Starting advanced cleaning for olist_closed_deals_dataset with 842 rows
2025-06-11 23:03:05,161 - INFO - Advanced cleaning completed for olist_closed_deals_dataset. Final rows: 842
2025-06-11 23:03:05,163 - INFO - After cleaning: 842 rows for olist_closed_deals_dataset
2025-06-11 23:03:05,164 - INFO - Inserting 842 rows into olist_marketing_data_set.olist_closed_deals_dataset in batches of 1000
2025-06-11 23:05:35,742 - INFO - ✅ Successfully inserted 842/842 rows into olist_marketing_data_set.olist_closed_deals_dataset
2025-06-11 23:05:35,754 - INFO - ✅ Successfully processed olist_closed_deals_dataset
2025-06-11 23:05:35,755 - INFO - Processing olist_ma


⚠️  Table olist_marketing_data_set.olist_marketing_qualified_leads_dataset already exists!
Choose an option:
1. DROP - Drop the existing table and recreate (data will be lost)
2. TRUNCATE - Keep table structure but clear all data
3. SKIP - Skip processing this table (keep existing data)
4. DROP ALL - Drop all existing tables without asking again
5. TRUNCATE ALL - Truncate all existing tables without asking again
6. SKIP ALL - Skip all existing tables without asking again


2025-06-11 23:05:49,059 - INFO - Truncating existing table olist_marketing_data_set.olist_marketing_qualified_leads_dataset
2025-06-11 23:05:49,458 - INFO - Table olist_marketing_data_set.olist_marketing_qualified_leads_dataset truncated successfully!
2025-06-11 23:05:49,499 - INFO - Read 8000 rows from olist_marketing_qualified_leads_dataset.csv
2025-06-11 23:05:49,500 - INFO - Starting advanced cleaning for olist_marketing_qualified_leads_dataset with 8000 rows
2025-06-11 23:05:49,536 - INFO - Advanced cleaning completed for olist_marketing_qualified_leads_dataset. Final rows: 8000
2025-06-11 23:05:49,537 - INFO - After cleaning: 8000 rows for olist_marketing_qualified_leads_dataset
2025-06-11 23:05:49,537 - INFO - Inserting 8000 rows into olist_marketing_data_set.olist_marketing_qualified_leads_dataset in batches of 1000
2025-06-11 23:08:48,944 - INFO - Inserted 1000/8000 rows...
2025-06-11 23:11:46,413 - INFO - Inserted 2000/8000 rows...
2025-06-11 23:14:46,595 - INFO - Inserted 30

✅ Marketing data processing completed!


## Cell 12: Create Foreign Key Relationships

In [40]:
if connection:
    logger.info("Creating foreign key relationships...")
    create_foreign_keys(connection)
    print("✅ Foreign key relationships created!")
else:
    print("❌ No database connection available!")

2025-06-11 23:29:55,751 - INFO - Creating foreign key relationships...
2025-06-11 23:29:55,754 - INFO - 🔧 Validating and fixing data integrity...
2025-06-11 23:29:55,758 - INFO - 🔍 Checking product category references...
2025-06-11 23:29:56,238 - INFO - ✅ All product categories have translations
2025-06-11 23:29:56,239 - INFO - 🔍 Checking marketing schema references...
2025-06-11 23:29:56,420 - INFO - ✅ All marketing deals have valid MQL references
2025-06-11 23:29:56,422 - INFO - 🔍 Checking cross-schema seller references...



⚠️  Found 462 orphaned records: marketing deals with seller_id not in sales sellers table
Choose an option:
1. REMOVE - Remove orphaned records (recommended)
2. SKIP - Skip creating this foreign key constraint


2025-06-11 23:30:27,527 - INFO - ✅ Removed 462 marketing deals with invalid seller_ids
2025-06-11 23:30:27,529 - INFO - Creating sales schema foreign keys...









2025-06-11 23:30:32,031 - INFO - Creating marketing schema foreign keys...

2025-06-11 23:30:32,689 - INFO - Creating cross-schema foreign keys...

2025-06-11 23:30:33,439 - INFO - 🎉 Foreign key creation process completed!


✅ Foreign key relationships created!


## Cell 13: Verify Data Load and Close Connection

In [41]:
if connection:
    cursor = connection.cursor()
    
    # Check sales schema tables
    print("\n📊 Sales Schema Table Counts:")
    for table_name in sales_tables.keys():
        try:
            cursor.execute(f"SELECT COUNT(*) FROM {sales_schema}.{table_name}")
            count = cursor.fetchone()[0]
            print(f"  {table_name}: {count:,} rows")
        except Exception as e:
            print(f"  {table_name}: Error - {e}")
    
    # Check marketing schema tables
    print("\n📊 Marketing Schema Table Counts:")
    for table_name in marketing_tables.keys():
        try:
            cursor.execute(f"SELECT COUNT(*) FROM {marketing_schema}.{table_name}")
            count = cursor.fetchone()[0]
            print(f"  {table_name}: {count:,} rows")
        except Exception as e:
            print(f"  {table_name}: Error - {e}")
    
    cursor.close()
    connection.close()
    print("\n✅ ETL Pipeline completed successfully!")
    print("📝 Database connection closed.")
else:
    print("❌ No database connection to verify!")


📊 Sales Schema Table Counts:
  olist_customers_dataset: 99,441 rows
  olist_geolocation_dataset: 1,000,163 rows
  olist_order_items_dataset: 112,650 rows
  olist_order_payments_dataset: 103,886 rows
  olist_order_reviews_dataset: 98,410 rows
  olist_orders_dataset: 99,441 rows
  olist_products_dataset: 32,951 rows
  olist_sellers_dataset: 3,095 rows
  product_category_name_translation: 73 rows

📊 Marketing Schema Table Counts:
  olist_closed_deals_dataset: 380 rows
  olist_marketing_qualified_leads_dataset: 8,000 rows

✅ ETL Pipeline completed successfully!
📝 Database connection closed.


## Summary and Next Steps

🎉 **ETL Pipeline Summary:**

✅ **Created two PostgreSQL schemas:**
   - `olist_sales_data_set` (9 tables)
   - `olist_marketing_data_set` (2 tables)

✅ **Loaded data from:**
   - `Resources/data/sales.zip`
   - `Resources/data/marketing_funnel.zip`

✅ **Established foreign key relationships:**
   - Within sales schema (7 relationships)
   - Within marketing schema (1 relationship)
   - Cross-schema: marketing.seller_id → sales.seller_id

🔍 **Key Relationships:**
   - Marketing leads can be traced to sellers
   - Sellers can be linked to their orders and products
   - Orders contain items, payments, and reviews
   - Products have categories and translations

📈 **Ready for analysis!**

**To run this pipeline:**
1. Update the `.env` file with your Supabase credentials  
2. Run cells sequentially from top to bottom
3. Monitor the output for any errors
4. Verify data counts in the final cell

**Database Schemas Created:**
- `olist_sales_data_set`: Complete e-commerce transaction data
- `olist_marketing_data_set`: Marketing funnel and lead data
- Cross-schema foreign keys enable comprehensive analysis