#### Upload Data from S3 to PostgreSQL

##### Importing Relevant Libraries

In [22]:
import boto3
import pandas as pd
import psycopg2
from io import StringIO
from sqlalchemy import create_engine, text
import glob
import os
from dotenv import load_dotenv
import logging
from typing import Dict, Any

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')
import json

# For display
pd.set_option('display.max_columns', None)

#### Data Pipeline

In [23]:
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class DataPipeline:
    def __init__(self):
        """Initialize the data pipeline with configurations."""
        load_dotenv()
        self.engine = self._create_db_connection()
        self.bucket = "finriskai"
        self.prefix = "datasets/"
        self.files = self._get_s3_file_paths()
        
    def _create_db_connection(self):
        """Create PostgreSQL connection engine."""
        try:
            db_url = (
                f"postgresql://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@"
                f"{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}"
            )
            engine = create_engine(db_url)
            logger.info("Database connection established successfully")
            return engine
        except Exception as e:
            logger.error(f"Failed to create database connection: {e}")
            raise
    
    def _get_s3_file_paths(self) -> Dict[str, str]:
        """Define S3 file paths."""
        return {
            "applications": f"s3://{self.bucket}/{self.prefix}credit_applications.csv",
            "bureau": f"s3://{self.bucket}/{self.prefix}credit_bureau_data.csv",
            "profiles": f"s3://{self.bucket}/{self.prefix}customer_profiles.csv",
            "predictions": f"s3://{self.bucket}/{self.prefix}model_predictions.csv",
            "transactions": f"s3://{self.bucket}/{self.prefix}transaction_data.csv",
        }
    
    def load_data_from_s3(self) -> Dict[str, pd.DataFrame]:
        """Load all datasets from S3 into DataFrames with error handling."""
        dataframes = {}
        storage_options = {"anon": False}
        
        for name, path in self.files.items():
            try:
                logger.info(f"Loading {name} from S3...")
                df = pd.read_csv(path, storage_options=storage_options)
                logger.info(f"Successfully loaded {name}: {len(df)} rows")
                dataframes[name] = df
            except Exception as e:
                logger.error(f"Failed to load {name} from S3: {e}")
                raise
        
        return dataframes
    
    def analyze_data_ranges(self, dataframes: Dict[str, pd.DataFrame]):
        """Analyze data ranges to set appropriate constraints."""
        if 'profiles' in dataframes:
            df = dataframes['profiles']
            logger.info("Data range analysis for profiles:")
            logger.info(f"Behavioral score range: {df['behavioral_score'].min()} - {df['behavioral_score'].max()}")
            logger.info(f"Credit score range: {df['credit_score'].min()} - {df['credit_score'].max()}")
            logger.info(f"Customer age range: {df['customer_age'].min()} - {df['customer_age'].max()}")
            
        if 'bureau' in dataframes:
            df = dataframes['bureau']
            logger.info("Data range analysis for bureau:")
            logger.info(f"Credit utilization range: {df['credit_utilization'].min()} - {df['credit_utilization'].max()}")
            logger.info(f"Payment history range: {df['payment_history'].min()} - {df['payment_history'].max()}")

    def drop_tables(self):
        """Drop existing tables to recreate with new schema."""
        tables_to_drop = [
            "fact_transactions",
            "fact_predictions", 
            "fact_applications",
            "dim_bureau",
            "dim_customer_profiles"
        ]
        
        try:
            with self.engine.begin() as conn:
                for table in tables_to_drop:
                    logger.info(f"Dropping table: {table}")
                    conn.execute(text(f"DROP TABLE IF EXISTS {table} CASCADE;"))
                logger.info("All tables dropped successfully")
        except Exception as e:
            logger.error(f"Failed to drop tables: {e}")
            raise

    def create_tables(self):
        """Create database tables with customer_id as the key (PK + FK)."""
        table_schemas = {
            "dim_customer_profiles": """
                CREATE TABLE IF NOT EXISTS dim_customer_profiles (
                    customer_id VARCHAR PRIMARY KEY,
                    customer_age INT,
                    annual_income NUMERIC(15,2),
                    employment_status VARCHAR(50),
                    account_tenure INT,
                    product_holdings INT,
                    relationship_value NUMERIC(15,2),
                    risk_segment VARCHAR(50),
                    behavioral_score NUMERIC(10,2),
                    credit_score INT,
                    city VARCHAR(100),
                    last_activity_date DATE,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
            """,

            "dim_bureau": """
                CREATE TABLE IF NOT EXISTS dim_bureau (
                    customer_id VARCHAR PRIMARY KEY,
                    credit_score INT,
                    credit_history_length INT,
                    number_of_accounts INT,
                    total_credit_limit NUMERIC(15,2),
                    credit_utilization NUMERIC(6,3),
                    payment_history NUMERIC(6,3),
                    public_records INT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    CONSTRAINT fk_bureau_customer FOREIGN KEY (customer_id)
                        REFERENCES dim_customer_profiles(customer_id)
                        ON DELETE CASCADE
                );
            """,

            "fact_applications": """
                CREATE TABLE IF NOT EXISTS fact_applications (
                    application_id VARCHAR PRIMARY KEY,
                    customer_id VARCHAR,
                    application_date DATE,
                    loan_amount NUMERIC(15,2),
                    loan_purpose VARCHAR(50),
                    employment_status VARCHAR(50),
                    annual_income NUMERIC(15,2),
                    debt_to_income_ratio NUMERIC(6,3),
                    credit_score INT,
                    application_status VARCHAR(20),
                    default_flag INT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    CONSTRAINT fk_app_customer FOREIGN KEY (customer_id)
                        REFERENCES dim_customer_profiles(customer_id)
                        ON DELETE CASCADE
                );
            """,

            "fact_predictions": """
                CREATE TABLE IF NOT EXISTS fact_predictions (
                    prediction_id VARCHAR PRIMARY KEY,
                    model_version VARCHAR(50),
                    customer_id VARCHAR,
                    prediction_date DATE,
                    prediction_type VARCHAR(30),
                    risk_score NUMERIC(10,2),
                    fraud_probability NUMERIC(6,3),
                    model_features JSONB,
                    prediction_explanation TEXT,
                    business_decision VARCHAR(50),
                    actual_outcome VARCHAR(50),
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    CONSTRAINT fk_pred_customer FOREIGN KEY (customer_id)
                        REFERENCES dim_customer_profiles(customer_id)
                        ON DELETE CASCADE
                );
            """,

            "fact_transactions": """
                CREATE TABLE IF NOT EXISTS fact_transactions (
                    transaction_id VARCHAR PRIMARY KEY,
                    customer_id VARCHAR,
                    transaction_date TIMESTAMP,
                    amount NUMERIC(15,2),
                    merchant_category VARCHAR(50),
                    transaction_type VARCHAR(30),
                    location VARCHAR(100),
                    device_info VARCHAR(200),
                    fraud_flag INT,
                    investigation_status VARCHAR(30),
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    CONSTRAINT fk_txn_customer FOREIGN KEY (customer_id)
                        REFERENCES dim_customer_profiles(customer_id)
                        ON DELETE CASCADE
                );
            """
        }

        try:
            with self.engine.begin() as conn:
                for table_name, schema in table_schemas.items():
                    logger.info(f"Creating table: {table_name}")
                    conn.execute(text(schema))
                logger.info("All tables created successfully with customer_id as key")
        except Exception as e:
            logger.error(f"Failed to create tables: {e}")
            raise

    
    def create_indexes(self):
        """Create indexes for better query performance."""
        indexes = [
            "CREATE INDEX IF NOT EXISTS idx_applications_customer_id ON fact_applications(customer_id);",
            "CREATE INDEX IF NOT EXISTS idx_applications_date ON fact_applications(application_date);",
            "CREATE INDEX IF NOT EXISTS idx_predictions_customer_id ON fact_predictions(customer_id);",
            "CREATE INDEX IF NOT EXISTS idx_predictions_date ON fact_predictions(prediction_date);",
            "CREATE INDEX IF NOT EXISTS idx_transactions_customer_id ON fact_transactions(customer_id);",
            "CREATE INDEX IF NOT EXISTS idx_transactions_date ON fact_transactions(transaction_date);",
            "CREATE INDEX IF NOT EXISTS idx_profiles_city ON dim_customer_profiles(city);"
        ]
        
        try:
            with self.engine.begin() as conn:
                for index in indexes:
                    conn.execute(text(index))
                logger.info("Indexes created successfully")
        except Exception as e:
            logger.error(f"Failed to create indexes: {e}")
            raise
    
    

    def load_data_to_postgres(self, dataframes: Dict[str, pd.DataFrame]):
        """Load DataFrames to PostgreSQL with error handling and JSON conversion."""
        table_mapping = {
            "profiles": "dim_customer_profiles",
            "bureau": "dim_bureau",
            "applications": "fact_applications",
            "predictions": "fact_predictions",
            "transactions": "fact_transactions"
        }

        try:
            for df_name, table_name in table_mapping.items():
                if df_name in dataframes:
                    df = dataframes[df_name]

                    # 🔹 Fix JSON columns before loading
                    if df_name == "predictions" and "model_features" in df.columns:
                        df["model_features"] = df["model_features"].apply(
                            lambda x: json.dumps(eval(x)) if isinstance(x, str) and x.startswith("{") else json.dumps(x)
                        )

                    logger.info(f"Loading {len(df)} records into {table_name}")
                    df.to_sql(
                        table_name,
                        con=self.engine,
                        if_exists="append",
                        index=False,
                        method="multi",
                        chunksize=1000
                    )
                    logger.info(f"Successfully loaded {df_name}")
                    
        except Exception as e:
            logger.error(f"Failed to load data to PostgreSQL: {e}")
            raise
    
    def validate_data_load(self):
        """Validate that data was loaded correctly."""
        validation_queries = {
            "dim_customer_profiles": "SELECT COUNT(*) as count FROM dim_customer_profiles;",
            "dim_bureau": "SELECT COUNT(*) as count FROM dim_bureau;",
            "fact_applications": "SELECT COUNT(*) as count FROM fact_applications;",
            "fact_predictions": "SELECT COUNT(*) as count FROM fact_predictions;",
            "fact_transactions": "SELECT COUNT(*) as count FROM fact_transactions;"
        }
        
        try:
            with self.engine.connect() as conn:
                for table, query in validation_queries.items():
                    result = conn.execute(text(query)).fetchone()
                    logger.info(f"{table}: {result[0]} records loaded")
        except Exception as e:
            logger.error(f"Failed to validate data load: {e}")
            raise
    
    def run_pipeline(self, force_recreate=True):
        """Execute the complete data pipeline."""
        try:
            logger.info("Starting data pipeline...")
            
            # Step 1: Load data from S3
            dataframes = self.load_data_from_s3()
            
            # Step 2: Analyze data ranges
            self.analyze_data_ranges(dataframes)
            
            # Step 3: Drop existing tables if force_recreate is True
            if force_recreate:
                logger.info("Force recreate enabled - dropping existing tables")
                self.drop_tables()
            
            # Step 4: Create tables
            self.create_tables()
            
            # Step 5: Create indexes
            self.create_indexes()
            
            # Step 6: Load data to PostgreSQL
            self.load_data_to_postgres(dataframes)
            
            # Step 7: Validate data load
            self.validate_data_load()
            
            logger.info("Data pipeline completed successfully!")
            
        except Exception as e:
            logger.error(f"Data pipeline failed: {e}")
            raise


In [24]:
# Usage
if __name__ == "__main__":
    pipeline = DataPipeline()
    # Set force_recreate=True to drop and recreate tables with new constraints
    pipeline.run_pipeline(force_recreate=True)

2025-08-16 21:52:06,529 - INFO - Database connection established successfully
2025-08-16 21:52:06,536 - INFO - Starting data pipeline...
2025-08-16 21:52:06,541 - INFO - Loading applications from S3...
2025-08-16 21:53:10,900 - INFO - Successfully loaded applications: 100000 rows
2025-08-16 21:53:10,902 - INFO - Loading bureau from S3...
2025-08-16 21:53:16,699 - INFO - Successfully loaded bureau: 25000 rows
2025-08-16 21:53:16,700 - INFO - Loading profiles from S3...
2025-08-16 21:53:29,553 - INFO - Successfully loaded profiles: 25000 rows
2025-08-16 21:53:29,555 - INFO - Loading predictions from S3...
2025-08-16 21:54:25,280 - INFO - Successfully loaded predictions: 50000 rows
2025-08-16 21:54:25,280 - INFO - Loading transactions from S3...
2025-08-16 21:55:09,661 - INFO - Successfully loaded transactions: 100000 rows
2025-08-16 21:55:09,663 - INFO - Data range analysis for profiles:
2025-08-16 21:55:09,685 - INFO - Behavioral score range: 2.19 - 927.24
2025-08-16 21:55:09,689 - INFO