In [3]:
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.sql import text # IMPORTANT FIX for TRUNCATE
from dotenv import load_dotenv
import os

# --- Configuration (Reads from .env) ---
load_dotenv()

DB_URL = (
    f"mysql+mysqlconnector://{os.getenv('MYSQL_USER')}:"
    f"{os.getenv('MYSQL_PASSWORD')}@{os.getenv('MYSQL_HOST')}/"
    f"{os.getenv('MYSQL_DATABASE')}"
)

FILE_TO_TABLE_MAP = {
    'telecom_cdr_data.csv': 'cdr_data',
    'telecom_billing_data.csv': 'billing_data',
    'telecom_tariff_reference.csv': 'tariff_reference'
}


def load_data_to_mysql(file_path, table_name, engine):
    """Loads CSV data into MySQL via the database connection stream."""
    
    print(f"\n-> Starting import for '{file_path}' into table '{table_name}'...")
    
    if_exists_mode = 'append'
    
    # --- CRITICAL CHANGE 1: Set index_to_sql_flag based on table ---
    # CDR/Billing data should NOT use the DF index (False)
    # Tariff data should NOT use the DF index (False), even though it has one, 
    # because it is ALREADY the PRIMARY KEY in the SQL table, preventing the 1170 error.
    index_to_sql_flag = False 
    
    if table_name == 'tariff_reference':
        if_exists_mode = 'replace' 
        # Although we read it with index_col=0, we explicitly tell to_sql NOT to index the column again.
        
    conn = None 
    try:
        # 1. Read CSV into Pandas DataFrame
        if table_name == 'tariff_reference':
            # We read the file using index_col=0 to ensure 'rate_plan_id' is the DF index
            df = pd.read_csv(file_path, index_col=0)
            df.index.name = 'rate_plan_id'
        else:
            df = pd.read_csv(file_path)

        if df.empty:
            print(f"   ⚠️ WARNING: DataFrame for '{table_name}' is empty. Skipping load.")
            return

        print(f"   File check: {len(df):,} records read successfully into DataFrame.")

        # 2. Ensure Timestamp is in the correct format
        if 'timestamp' in df.columns:
            df['timestamp'] = pd.to_datetime(df['timestamp'])
            
        # 3. Load DataFrame directly into MySQL
        conn = engine.connect()
        
        if if_exists_mode == 'append':
            conn.execute(text(f"TRUNCATE TABLE {table_name}")) 
            conn.commit()
            print(f"   Cleared existing data in '{table_name}' for clean append.")

        df.to_sql(
            name=table_name, 
            con=conn,
            if_exists=if_exists_mode, 
            # --- CRITICAL CHANGE 2: Set index=False for all tables ---
            index=index_to_sql_flag, 
            chunksize=10000 
        )
        
        conn.commit() 
        print(f"   ✅ Success: Loaded {len(df):,} records into '{table_name}' using mode '{if_exists_mode}' and COMMITTED.")

    except FileNotFoundError:
        print(f"   ❌ FATAL ERROR: File not found at '{file_path}'. Check your working directory.")
    except Exception as e:
        print(f"   ❌ UNEXPECTED ERROR loading data into '{table_name}': {e}")
    finally:
        if conn:
            conn.close()


# 2. Main Execution

try:
    engine = create_engine(DB_URL)
    
    with engine.connect() as connection:
        print(f"\nConnection to MySQL database '{os.getenv('MYSQL_DATABASE')}' successful!")

    # Load all defined files
    for file, table in FILE_TO_TABLE_MAP.items():
        load_data_to_mysql(file, table, engine)

    print("\n" + "="*70)
    print("ALL DATA GENERATED, LOADED, AND COMMITTED. Proceed to Leakage Detection.")
    print("="*70)
    
except Exception as e:
    print(f"\nA CRITICAL CONNECTION ERROR OCCURRED: {e}")


Connection to MySQL database 'safaricom_ra' successful!

-> Starting import for 'telecom_cdr_data.csv' into table 'cdr_data'...
   File check: 50,000 records read successfully into DataFrame.
   Cleared existing data in 'cdr_data' for clean append.
   ✅ Success: Loaded 50,000 records into 'cdr_data' using mode 'append' and COMMITTED.

-> Starting import for 'telecom_billing_data.csv' into table 'billing_data'...
   File check: 49,250 records read successfully into DataFrame.
   Cleared existing data in 'billing_data' for clean append.
   ✅ Success: Loaded 49,250 records into 'billing_data' using mode 'append' and COMMITTED.

-> Starting import for 'telecom_tariff_reference.csv' into table 'tariff_reference'...
   File check: 4 records read successfully into DataFrame.
   ✅ Success: Loaded 4 records into 'tariff_reference' using mode 'replace' and COMMITTED.

ALL DATA GENERATED, LOADED, AND COMMITTED. Proceed to Leakage Detection.
