In [1]:
import pandas as pd
import numpy as np
from dotenv import load_dotenv
import os
from sqlalchemy import create_engine
import mysql.connector

#### 1. Setup and Configuration

In [2]:
load_dotenv()
MYSQL_USER = os.getenv('MYSQL_USER')
MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD')
MYSQL_HOST = os.getenv('MYSQL_HOST')
MYSQL_DATABASE = os.getenv('MYSQL_DATABASE')
CSV_FILE = 'synthetic_gts_survey_data.csv'

# SQLAlchemy Connection String
DATABASE_URL = (
    f"mysql+mysqlconnector://{MYSQL_USER}:{MYSQL_PASSWORD}@"
    f"{MYSQL_HOST}/{MYSQL_DATABASE}"
)

#### 2. Transformation Function

In [3]:
def transform_data(df_raw):
    """
    Applies data quality control, standardization, and imputation.
    Returns the clean DataFrame (df_clean).
    """
    df_clean = df_raw.copy()
    
    # Configuration
    gender_map = {'Female': 'Female', 'Male': 'Male', 'male': 'Male', 'fEmAlE': 'Female'}
    aid_provider_map = {
        'UNHCR ': 'UNHCR', 'Intl Rescue Commitee': 'IRC', 
        'WFP': 'WFP', 'UNICEF': 'UNICEF', 'NRC': 'NRC', 'ICRC': 'ICRC'
    }
    score_cols = ['aid_satisfaction', 'trust_in_aid_provider', 'communication_clarity', 'aid_fairness']
    median_scores = df_clean[score_cols].median(skipna=True).apply(lambda x: int(round(x)))
    
    # Initialize audit columns
    df_clean['processing_notes'] = ''
    df_clean['is_valid'] = 1 # MySQL BOOLEAN/TINYINT True is 1
    
    # 1. Imputation and Type Casting (CRITICAL for NOT NULL)
    for col in score_cols:
        is_missing = df_clean[col].isna()
        if is_missing.any():
            imputed_value = median_scores[col]
            df_clean.loc[is_missing, col] = imputed_value
            df_clean.loc[is_missing, 'processing_notes'] += f"Imputed missing {col}. "
            df_clean.loc[is_missing, 'is_valid'] = 0 # Mark as less valid due to imputation
        # Force integer type for MySQL TINYINT
        df_clean[col] = df_clean[col].astype(int) 

    # 2. Standardization
    df_clean['gender'] = df_clean['gender'].astype(str).str.strip().replace(gender_map)
    df_clean['aid_provider'] = df_clean['aid_provider'].astype(str).str.strip().replace(aid_provider_map)
    
    # 3. Final Cleanup (replace pandas NaN/None with standard Python None)
    df_clean = df_clean.replace({np.nan: None})
    
    processed_cols = ['response_id', 'survey_date', 'location', 'aid_provider', 'displacement_status', 
                      'gender', 'age_group', 'aid_satisfaction', 'trust_in_aid_provider', 
                      'communication_clarity', 'aid_fairness', 'feedback_comment', 'is_valid', 'processing_notes']
                      
    return df_clean[processed_cols]

#### 3. ETL Main Function

In [4]:
def run_mysql_etl():
    """Executes the full ETL pipeline for both raw and processed tables."""
    
    # E - Extract
    print(f"1. Extracting data from {CSV_FILE}...")
    try:
        df_raw = pd.read_csv(CSV_FILE)
        df_raw['survey_date'] = pd.to_datetime(df_raw['survey_date']).dt.date
    except FileNotFoundError:
        print(f"FATAL: CSV file '{CSV_FILE}' not found.")
        return

    # T - Transform (Phase 1.3)
    print("2. Applying data transformation and cleaning ...")
    df_clean = transform_data(df_raw)
    
    # L - Load (Phase 1.2 & 1.3)
    print("3. Connecting to MySQL and loading data...")
    try:
        engine = create_engine(DATABASE_URL)
        
        # Load Raw Data (Phase 1.2) - Allows NaNs/NULLs for audit
        df_raw_for_db = df_raw.replace({np.nan: None}) # Replace NaNs with None for SQL NULL
        df_raw_for_db.to_sql('raw_responses', con=engine, if_exists='replace', index=False, chunksize=1000)
        print("   -> Successfully loaded data into 'raw_responses'.")
        
        # Load Cleaned Data (Phase 1.3) - Guaranteed no score NULLs
        # Note: 'replace' drops and recreates the table, preserving schema integrity.
        df_clean.to_sql('gts_processed_data', con=engine, if_exists='replace', index=False, chunksize=1000)
        print("   -> Successfully loaded cleaned data into 'gts_processed_data'.")
        
    except Exception as e:
        print(f"FATAL: Database connection or loading error: {e}")
        print("Please check your .env credentials, MySQL server status, and table schemas.")
        return

    print("\nETL Pipeline complete.")

if __name__ == "__main__":
    run_mysql_etl()

1. Extracting data from synthetic_gts_survey_data.csv...
2. Applying data transformation and cleaning ...
3. Connecting to MySQL and loading data...
   -> Successfully loaded data into 'raw_responses'.
   -> Successfully loaded cleaned data into 'gts_processed_data'.

ETL Pipeline complete.
