In [None]:
import os
import csv
import datetime
import pandas as pd
from sqlalchemy import create_engine, text

# ---------------------------------------------------------
# CONFIGURATION & CONSTANTS
# ---------------------------------------------------------
DATABASE_URL = "mysql+pymysql://root:password@localhost:3306/scholarship_db"
ENGINE = create_engine(DATABASE_URL)

FILES = {
    "students": "students.csv",
    "academic": "academic_records.csv",
    "financial": "financial_records.csv",
    "ranking": "eligibility_ranking.csv"
}

# ---------------------------------------------------------
# FILE & INITIALIZATION FUNCTIONS
# ---------------------------------------------------------

def initialize_csv_files():
    """Ensures all required CSV files exist with correct headers."""
    headers = {
        FILES["students"]: ['student_id', 'first_name', 'last_name', 'fullname', 'gender', 
                            'date_of_birth', 'email', 'phone', 'nin', 'department', 'created_at'],
        FILES["academic"]: ['student_id', 'grade_point_system', 'cgpa', 'courses_passed', 'courses_failed'],
        FILES["financial"]: ['student_id', 'family_income', 'dependents', 'is_orphan']
    }
    for file_path, header in headers.items():
        if not os.path.exists(file_path):
            with open(file_path, "w", newline="") as f:
                csv.writer(f).writerow(header)

def get_last_student_id() -> int:
    """Retrieves the highest existing student ID to avoid collisions."""
    if not os.path.exists(FILES["students"]):
        return 0
    try:
        df = pd.read_csv(FILES["students"])
        return int(df["student_id"].max()) if not df.empty else 0
    except:
        return 0

def is_duplicate_nin(nin: str) -> bool:
    """Checks if a National Identity Number already exists in the local record."""
    if not os.path.exists(FILES["students"]):
        return False
    df = pd.read_csv(FILES["students"])
    return str(nin) in df['nin'].astype(str).values

# ---------------------------------------------------------
# DATA COLLECTION FUNCTIONS
# ---------------------------------------------------------

def collect_student_data() -> bool:
    """
    Interactively collects student data and writes to local CSV files.
    Returns:
        bool: True if data was added, False if the user declined.
    """
    choice = input("Do you want to enter new student data? (yes/no): ").lower()
    if choice != "yes":
        return False

    student_id = get_last_student_id() + 1
    
    print("\n--- Enter Student Personal Info ---")
    first_name = input("First name: ").strip().title()
    last_name = input("Last name: ").strip().title()
    
    gender = ""
    while gender not in ["male", "female"]:
        gender = input("Gender (male/female): ").strip().lower()

    dob = ""
    while not dob:
        dob_input = input("Date of birth (MM/DD/YYYY): ")
        try:
            dob = datetime.datetime.strptime(dob_input, "%m/%d/%Y").strftime("%Y-%m-%d")
        except ValueError:
            print("⚠ Invalid format. Please use MM/DD/YYYY.")

    email = input("Email: ").strip().lower()
    phone = input("Phone (11 digits): ").strip()
    
    nin = ""
    while not nin:
        nin_input = input("NIN (11 digits): ").strip()
        if nin_input.isdigit() and len(nin_input) == 11:
            if not is_duplicate_nin(nin_input):
                nin = nin_input
            else:
                print("⚠ NIN already exists!")
        else:
            print("⚠ NIN must be 11 digits.")

    dept = input("Department: ").strip().title()
    
    print("\n--- Enter Academic Info ---")
    gpa_system = input("Point grading system (4 or 5): ")
    cgpa = float(input("CGPA: "))
    passed = int(input("Courses passed: "))
    failed = int(input("Courses failed: "))

    print("\n--- Enter Financial Info ---")
    income = float(input("Average monthly family income: "))
    dependents = int(input("Number of dependents: "))
    orphan = input("Is the student an orphan? (yes/no): ").strip().lower().title()

    # Saving to files
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    with open(FILES["students"], "a", newline="") as f:
        csv.writer(f).writerow([student_id, first_name, last_name, f"{first_name} {last_name}", 
                                gender, dob, email, phone, nin, dept, timestamp])
    with open(FILES["academic"], "a", newline="") as f:
        csv.writer(f).writerow([student_id, gpa_system, cgpa, passed, failed])
    with open(FILES["financial"], "a", newline="") as f:
        csv.writer(f).writerow([student_id, income, dependents, orphan])

    print("✓ Student record saved successfully!\n")
    return True

# ---------------------------------------------------------
# ETL PROCESS FUNCTIONS
# ---------------------------------------------------------

def extract_data() -> tuple:
    """Extracts data from local CSV storage."""
    students = pd.read_csv(FILES["students"])
    academics = pd.read_csv(FILES["academic"])
    financials = pd.read_csv(FILES["financial"])
    return students, academics, financials

def transform_data(students_df, academic_df, financial_df):
    """Cleans data and merges sets to ensure relational integrity."""
    students_df = students_df.drop_duplicates(subset=["nin"], keep="last")
    students_df["date_of_birth"] = pd.to_datetime(students_df["date_of_birth"]).dt.strftime("%Y-%m-%d")
    
    # Inner join ensures we only process records that exist in the primary student table
    academic_df = academic_df.merge(students_df[["student_id", "nin"]], on="student_id", how="inner")
    financial_df = financial_df.merge(students_df[["student_id", "nin"]], on="student_id", how="inner")
    
    return students_df, academic_df, financial_df

def calculate_eligibility_score(acad_row, fin_row) -> dict:
    """Applies scholarship business logic to calculate scores and status."""
    score = 0
    reasons = []

    # Normalizing CGPA to a 5.0 scale for fairness
    raw_cgpa = float(acad_row["cgpa"])
    norm_cgpa = (raw_cgpa / 4 * 5) if int(acad_row["grade_point_system"]) == 4 else raw_cgpa
    
    if norm_cgpa >= 4.5: 
        score += 40
        reasons.append("High Academic Performance")
    
    if int(acad_row["courses_failed"]) == 0:
        score += 20
        reasons.append("Clean Academic Record")

    if float(fin_row["family_income"]) < 50000:
        score += 20
        reasons.append("Significant Financial Need")

    status = "Highly Eligible" if score >= 75 else "Eligible" if score >= 50 else "Not Eligible"
    
    return {"score": score, "status": status, "reasons": ", ".join(reasons)}

def load_data_to_db(students, academics, financials):
    """Loads transformed data into the SQL database and updates rankings."""
    with ENGINE.connect() as conn:
        for i, row in students.iterrows():
            # Check if student exists
            existing_id = conn.execute(
                text("SELECT student_id FROM students WHERE nin = :nin"), 
                {"nin": row["nin"]}
            ).scalar()

            if not existing_id:
                conn.execute(text("""
                    INSERT INTO students (first_name, last_name, fullname, gender, date_of_birth, email, phone, nin, department, created_at)
                    VALUES (:first_name, :last_name, :fullname, :gender, :date_of_birth, :email, :phone, :nin, :department, :created_at)
                """), row.to_dict())
                conn.commit()
                existing_id = conn.execute(text("SELECT student_id FROM students WHERE nin = :nin"), {"nin": row["nin"]}).scalar()

            # Upsert Records
            acad_data = academics.iloc[i].to_dict()
            fin_data = financials.iloc[i].to_dict()
            acad_data['student_id'] = fin_data['student_id'] = existing_id

            conn.execute(text("""
                INSERT INTO academic_records (student_id, grade_point_system, cgpa, courses_passed, courses_failed)
                VALUES (:student_id, :grade_point_system, :cgpa, :courses_passed, :courses_failed)
                ON DUPLICATE KEY UPDATE cgpa=:cgpa, courses_failed=:courses_failed
            """), acad_data)

            conn.execute(text("""
                INSERT INTO financial_records (student_id, family_income, dependents, is_orphan)
                VALUES (:student_id, :family_income, :dependents, :is_orphan)
                ON DUPLICATE KEY UPDATE family_income=:family_income, dependents=:dependents
            """), fin_data)

            # Eligibility Sync
            elig = calculate_eligibility_score(academics.iloc[i], financials.iloc[i])
            conn.execute(text("""
                INSERT INTO scholarship_eligibility (student_id, fullname, score, status, reasons)
                VALUES (:student_id, :fullname, :score, :status, :reasons)
                ON DUPLICATE KEY UPDATE score=:score, status=:status, reasons=:reasons
            """), {"student_id": existing_id, "fullname": row["fullname"], **elig})
            conn.commit()

        update_rankings_table(conn)

def update_rankings_table(conn):
    """Refreshes the ranking table in SQL and exports to CSV."""
    conn.execute(text("""
        CREATE TABLE IF NOT EXISTS eligibility_ranking (
            student_id INT PRIMARY KEY, fullname VARCHAR(255), score FLOAT, 
            status VARCHAR(50), eligibility_rank INT
        )
    """))
    
    results = conn.execute(text("""
        SELECT student_id, fullname, score, status FROM scholarship_eligibility ORDER BY score DESC
    """)).fetchall()
    
    ranking_list = []
    for rank, row in enumerate(results, 1):
        item = {**row._asdict(), "eligibility_rank": rank}
        ranking_list.append(item)
        conn.execute(text("""
            INSERT INTO eligibility_ranking (student_id, fullname, score, status, eligibility_rank)
            VALUES (:student_id, :fullname, :score, :status, :eligibility_rank)
            ON DUPLICATE KEY UPDATE eligibility_rank=:eligibility_rank, score=:score
        """), item)
    
    conn.commit()
    pd.DataFrame(ranking_list).to_csv(FILES["ranking"], index=False)

# ---------------------------------------------------------
# MAIN EXECUTION
# ---------------------------------------------------------

def run_pipeline():
    """Orchestrates the full data ingestion and ETL process."""
    initialize_csv_files()
    
    while collect_student_data():
        if input("Add another student? (yes/no): ").lower() != "yes":
            break

    try:
        print("\n--- Starting ETL Pipeline ---")
        raw_s, raw_a, raw_f = extract_data()
        clean_s, clean_a, clean_f = transform_data(raw_s, raw_a, raw_f)
        load_data_to_db(clean_s, clean_a, clean_f)
        print("✅ ETL Pipeline and Rankings Updated Successfully!")
    except Exception as e:
        print(f"❌ Error during ETL: {e}")

if __name__ == "__main__":
    run_pipeline()