# Enviroment Used:

1. Python 3.11.5
2. Required libraries: 
    pandas===1.5.3, 
    numpy===1.24.3, 
    scikit-learn===1.2.2, 
    torch===2.1.1, 
    torch-tabnet===4.0.0, 
    matplotlib===3.7.2, 
    seaborn===0.12.2

We recommend ensuring your environment matches our experimental setup (same Python version and library versions) to facilitate running the code.
Therefore, you can directly install via pip using the Jupyter notebook below:

In [None]:
"""
Code Summary:
Installs specific versions of required libraries (pandas, numpy, torch, tabnet, catboost) to ensure environment reproducibility.
"""
!pip install pandas==1.5.3 numpy==1.24.3 scikit-learn==1.2.2 torch==2.1.1 pytorch-tabnet==4.0.0 matplotlib==3.7.2 seaborn==0.12.2 catboost





## 1. Data Processing 
import the required libraries

In [None]:
"""
Code Summary:
Imports necessary Python libraries for data manipulation, machine learning (CatBoost, TabNet), and visualization.
"""
import pandas as pd
import numpy as np
import os
from catboost import CatBoostClassifier, Pool
from sklearn.model_selection import train_test_split,RandomizedSearchCV
import matplotlib.pyplot as plt
import seaborn as sns
import time
import torch
from pytorch_tabnet.tab_model import TabNetClassifier
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
import time
import json
import random
import re
import itertools

Defines file paths for the raw input data, the codebook, and the processed output file.

In [None]:
input_csv = r'dataProcessing/Original.csv'
codebook_path = r'codebook/FinalCodeBook.csv'
output_csv = r'dataProcessing/Processed_Original.csv'

### Load Codebook and Parse Metadata
Loads the codebook, cleans column names, and identifies target variables.

In [None]:


print("Loading codebook...")
codebook = pd.read_csv(codebook_path, encoding='latin-1')
# Clean columns (BOM, whitespace)
codebook.columns = codebook.columns.str.strip().str.replace('^ï»¿', '', regex=True)

# Extract target variables from the codebook
if 'Variable' in codebook.columns:
    target_vars = codebook['Variable'].tolist()
else:
    print("Warning: 'Variable' column not found by name. Using first column.")
    target_vars = codebook.iloc[:, 0].tolist()
print(f"Found {len(target_vars)} target variables in codebook.")

# Parse missing values
# Example string: "9: Omitted or invalid; Sysmis: Not administered"
# We want to map {col_name: [9, 'Sysmis']}
missing_map = {}


Defines a helper function to parse the missing value schemes (e.g., '9: Omitted') from the codebook.

In [None]:
def parse_missing_scheme(scheme_str):
    missing_vals = []
    if pd.isna(scheme_str):
        return missing_vals
    
    # Split by semicolon
    parts = str(scheme_str).split(';')
    for part in parts:
        part = part.strip()
        # Look for "Value: Label" pattern
        if ':' in part:
            val_str = part.split(':')[0].strip()
            # Try to convert to float/int if possible
            try:
                val = float(val_str)
                # Check if it's an integer
                if val.is_integer():
                    missing_vals.append(int(val))
                else:
                    missing_vals.append(val)
            except ValueError:
                # Keep as string if not numeric (e.g. 'Sysmis' though usually handled separately)
                if val_str.lower() != 'sysmis': # Sysmis is usually auto-handled or empty in CSV
                     missing_vals.append(val_str)
    return missing_vals


Iterates through the codebook to build a dictionary mapping variables to their specific missing values.

In [None]:
for idx, row in codebook.iterrows():
    var = row['Variable']
    scheme = row.get('Missing Scheme Detailed: SPSS', '')
    vals = parse_missing_scheme(scheme)
    if vals:
        missing_map[var] = vals

print("Missing value schemes parsed.")



Sets up parameters for chunked processing and detects the CSV header encoding.


In [None]:
chunksize = 50000  # Adjust based on memory
first_chunk = True

print(f"Processing {input_csv} in chunks of {chunksize}...")

# Determine columns to keep: IDs + Target Vars
# Try reading with utf-8-sig to handle BOM
try:
    header = pd.read_csv(input_csv, nrows=0, encoding='utf-8-sig').columns.tolist()
except UnicodeDecodeError:
    print("utf-8-sig failed, trying latin-1 for header...")
    header = pd.read_csv(input_csv, nrows=0, encoding='latin-1').columns.tolist()

print(f"Detected header columns: {header[:5]}...")


Identifies columns to keep: IDs, weights, and target variables, removing duplicates.

In [None]:
id_vars = [col for col in header if col.startswith('ID')]
# Also keep weights if present, usually important (MATWGT, etc.)
weight_vars = [col for col in header if 'WGT' in col or 'JK' in col]

# Combine keep list
keep_cols = id_vars + weight_vars + target_vars
# Ensure we only keep columns that actually exist in the CSV
keep_cols = [c for c in keep_cols if c in header]
# Remove duplicates
keep_cols = list(dict.fromkeys(keep_cols))

print(f"Keeping {len(keep_cols)} columns: {keep_cols[:10]} ...")


Processes the large CSV in chunks: filters columns, handles missing values based on the codebook, and saves to a new file.

In [None]:
processed_count = 0

try:
    # Use the same encoding that worked for header
    encoding = 'utf-8-sig'
    try:
        pd.read_csv(input_csv, nrows=1, encoding=encoding)
    except:
        encoding = 'latin-1'

    with pd.read_csv(input_csv, chunksize=chunksize, usecols=keep_cols, encoding=encoding, low_memory=False) as reader:
        for chunk in reader:
            # Apply missing value handling
            for col in chunk.columns:
                if col in missing_map:
                    # Replace defined missing values with NaN
                    # chunk[col].replace(missing_map[col], np.nan, inplace=True) # deprecated
                    mask = chunk[col].isin(missing_map[col])
                    if mask.any():
                         chunk.loc[mask, col] = np.nan
            
            # Save to file
            mode = 'w' if first_chunk else 'a'
            header_arg = first_chunk
            chunk.to_csv(output_csv, index=False, mode=mode, header=header_arg)
            
            processed_count += len(chunk)
            first_chunk = False
            print(f"Processed {processed_count} rows...")

    print(f"Done! Processed data saved to {output_csv}")

except Exception as e:
    print(f"An error occurred: {e}")


## 2. Data Merging & Target Definition (2merge_fail_data.py)

Defines file paths for the Data Merging phase (Processed data + Original Results).Paths

In [None]:
processed_csv_path = r'dataProcessing/Processed_Original.csv'
result_csv_path = r'dataProcessing/OriginalResult.csv'
output_csv_path = r'dataProcessing/Final.csv'


Loads the processed survey data and the student result data (Math scores).

In [None]:
print("Loading datasets...")
# Load processed data
df_processed = pd.read_csv(processed_csv_path, low_memory=False)
print(f"Processed data loaded: {len(df_processed)} rows.")

# Load result data
# We only need IDSTUD and the Math PVs to determine fail status
cols_to_use = ['IDCNTRY', 'IDSTUD', 'BSMMAT01', 'BSMMAT02', 'BSMMAT03', 'BSMMAT04', 'BSMMAT05']
df_result = pd.read_csv(result_csv_path, usecols=cols_to_use, low_memory=False)
print(f"Result data loaded: {len(df_result)} rows.")

# Deduplicate result data
# OriginalResult.csv contains multiple rows per student (e.g. linked to different teachers)
# We drop duplicates based on IDSTUD, keeping the first occurrence (scores are invariant for the student)
df_result_unique = df_result.drop_duplicates(subset=['IDCNTRY', 'IDSTUD'])
print(f"Unique students in result data: {len(df_result_unique)}")

# Define Fail Standard
# Standard: Average Math Plausible Value < 400 (Below Low International Benchmark)
print("Calculating Fail status...")
pv_cols = ['BSMMAT01', 'BSMMAT02', 'BSMMAT03', 'BSMMAT04', 'BSMMAT05']
df_result_unique['Math_Mean'] = df_result_unique[pv_cols].mean(axis=1)


Calculates the 'Fail' status: Average Math Plausible Value < 400 indicates failure (1), otherwise pass (0).

In [None]:
# Create Fail column
# 1 = Fail (Mean Score < 400)
# 0 = Pass (Mean Score >= 400)
df_result_unique['Fail'] = np.where(df_result_unique['Math_Mean'] < 400, 1, 0)

print(f"Fail counts:\n{df_result_unique['Fail'].value_counts()}")

# Merge
print("Merging datasets...")
# Left merge to keep all students in the processed file
# Using IDCNTRY and IDSTUD as keys
df_merged = pd.merge(df_processed, 
                     df_result_unique[['IDCNTRY', 'IDSTUD', 'Math_Mean', 'Fail']], 
                     on=['IDCNTRY', 'IDSTUD'], 
                     how='left')


Checks for students without scores, warns if any, and saves the final merged dataset.

In [None]:
# Check for unmerged students
missing_scores = df_merged['Fail'].isna().sum()
if missing_scores > 0:
    print(f"Warning: {missing_scores} students in Processed_Original.csv did not have scores in OriginalResult.csv")

# Save
try:
    df_merged.to_csv(output_csv_path, index=False)
    print(f"Merged data saved to {output_csv_path}")
except PermissionError:
    print(f"Error: Could not save to {output_csv_path}. File might be open.")
    temp_output = output_csv_path.replace('.csv', '_new.csv')
    df_merged.to_csv(temp_output, index=False)
    print(f"Saved to {temp_output} instead.")


Reloads the codebook to prepare for updating it with the new 'Fail' variable.

In [None]:
# Update Codebook with Fail definition
codebook_path = r'codebook/FinalCodeBook.csv'
print(f"Updating codebook at {codebook_path}...")

try:
    codebook_df = pd.read_csv(codebook_path, encoding='utf-8-sig')
except:
    codebook_df = pd.read_csv(codebook_path, encoding='latin-1')

# Clean column names to ensure matching
codebook_df.columns = codebook_df.columns.str.strip().str.replace('^ï»¿', '', regex=True)
var_col = codebook_df.columns[0] # Assuming first column is Variable


Adds the 'Fail' variable definition and metadata to the codebook if it is not already present.

In [None]:
# Check if Fail variable already exists
if 'Fail' not in codebook_df[var_col].astype(str).values:
    # Construct new row dictionary matching the columns
    new_row = {col: '' for col in codebook_df.columns}
    
    new_row[codebook_df.columns[0]] = 'Fail' # Variable
    new_row[codebook_df.columns[1]] = 'Student Failure Status (Math Mean < 400)' # Label
    new_row[codebook_df.columns[2]] = 'Derived' # Question Location
    new_row[codebook_df.columns[3]] = 'Nominal' # Level
    new_row[codebook_df.columns[4]] = '1' # Width
    new_row[codebook_df.columns[5]] = '0' # Decimals
    new_row[codebook_df.columns[6]] = '0' # Range Minimum
    new_row[codebook_df.columns[7]] = '1' # Range Maximum
    new_row[codebook_df.columns[8]] = '0: Pass; 1: Fail' # Value Scheme Detailed
    new_row[codebook_df.columns[13]] = 'Derived' # Domain
    new_row[codebook_df.columns[14]] = 'D' # Variable Class
    new_row[codebook_df.columns[15]] = 'Derived from BSMMAT01-05' # Comment
    
    # Create DataFrame for new row
    new_row_df = pd.DataFrame([new_row])
    
    # Append
    codebook_df = pd.concat([codebook_df, new_row_df], ignore_index=True)
    
    # Save
    codebook_df.to_csv(codebook_path, index=False, encoding='utf-8-sig')
    print("Added 'Fail' variable to Codebook.")
else:
    print("'Fail' variable already exists in Codebook.")


## 3. CatBoost Training

### Code Summary: Hyperparameter Tuning
According to the **Hyperparameter Tuning Results** table (Section 6.2), this module executes a randomized search to optimize the model.
- **Goal:** Find the best combination of `n_d`, `n_steps`, `gamma`, etc.
- **Key Insight:** Trial 5 yielded the best performance with **Val AUC = 0.8145**.
- **Action:** The code below sets up the search space, runs the tuning, and retrains the model with these best parameters.

Sets random seeds for reproducibility and defines paths for the CatBoost training phase.

In [None]:

# Set random seed for reproducibility
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)

# Paths
DATA_PATH = r'dataProcessing/Final.csv'
CODEBOOK_PATH = r'codebook/FinalCodeBook.csv'
MODEL_DIR = r'models'

if not os.path.exists(MODEL_DIR):
    os.makedirs(MODEL_DIR)

print("="*50)
print("PHASE 1: DATA PREPARATION")
print("="*50)


Loads the final merged dataset for analysis/training.

In [None]:
print("Loading dataset...")
df = pd.read_csv(DATA_PATH, low_memory=False)
print(f"Data loaded: {df.shape}")


Parses the codebook to identify and separate Categorical and Numerical features.

In [None]:
# We will parse the Codebook to identify 'Nominal' as Categorical and 'Scale' as Numerical
print("Parsing Codebook for feature types...")
codebook = pd.read_csv(CODEBOOK_PATH, encoding='latin-1')

# Create dictionaries for feature types
cat_features = []
num_features = []

# Target variable
target = 'Fail'

# Features to exclude (IDs, weights, target, auxiliary)
exclude_cols = ['IDCNTRY', 'IDBOOK', 'IDSCHOOL', 'IDCLASS', 'IDSTUD', 
                'IDTEALIN', 'IDTEACH', 'IDLINK', 'IDPOP', 'IDGRADER', 
                'IDGRADE', 'IDSUBJ', 'MATWGT', 'JKREP', 'JKZONE', 
                'Math_Mean', 'Fail']

# Scan dataset columns and match with codebook
for col in df.columns:
    if col in exclude_cols:
        continue
    
    # Find variable in codebook
    # We strip whitespace just in case
    var_info = codebook[codebook.iloc[:, 0].astype(str).str.strip() == col]
    
    if not var_info.empty:
        # Use numerical index for Level (column 3 based on inspection)
        # Variable,Label,Question Location,Level,...
        level = var_info.iloc[0, 3]
        if level == 'Nominal':
            cat_features.append(col)
        elif level == 'Scale':
            num_features.append(col)
        else:
            # Fallback: check dtype
            if pd.api.types.is_numeric_dtype(df[col]):
                num_features.append(col)
            else:
                cat_features.append(col)
    else:
        # If not in codebook, guess based on dtype
        if pd.api.types.is_numeric_dtype(df[col]):
            num_features.append(col)
        else:
            cat_features.append(col)

print(f"Categorical features ({len(cat_features)}): {cat_features}")
print(f"Numerical features ({len(num_features)}): {num_features}")


Handles missing values: fills categorical with 'Missing' and numerical with median (for CatBoost).

In [None]:
# CatBoost requires categorical features to be strings or integers.
# We fill NaNs with a placeholder string to treat missingness as a category
print("Handling missing values for categorical features...")
for col in cat_features:
    df[col] = df[col].astype(str).replace('nan', 'Missing')

# Fill numerical missing values with median (simple imputation)
print("Handling missing values for numerical features...")
for col in num_features:
    # Ensure numeric type first, coercing errors to NaN
    df[col] = pd.to_numeric(df[col], errors='coerce')
    df[col] = df[col].fillna(df[col].median())


Splits the data into Training (70%), Validation (15%), and Test (15%) sets.

In [None]:
print("Splitting data...")
X = df[cat_features + num_features]
y = df[target]

# First split: Train (70%) vs Temp (30%)
X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.30, random_state=RANDOM_SEED, stratify=y
)

# Second split: Validation (15% of total -> 50% of Temp) vs Test (15% of total -> 50% of Temp)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.50, random_state=RANDOM_SEED, stratify=y_temp
)

print(f"Training Set: {X_train.shape}")
print(f"Validation Set: {X_val.shape}")
print(f"Test Set: {X_test.shape}")

# Create CatBoost Pools
train_pool = Pool(X_train, y_train, cat_features=cat_features)
val_pool = Pool(X_val, y_val, cat_features=cat_features)
test_pool = Pool(X_test, y_test, cat_features=cat_features)

print("\n" + "="*50)
print("PHASE 2: TRAINING (BASELINE)")
print("="*50)


Baseline Model

In [None]:


print("Initializing Baseline CatBoostClassifier (GPU)...")
baseline_model = CatBoostClassifier(
    task_type="GPU",
    iterations=1000,
    learning_rate=0.03,
    depth=6,
    loss_function='Logloss',
    eval_metric='AUC',
    random_seed=RANDOM_SEED,
    verbose=100
)


 Train with Early Stopping

In [None]:

print("Training Baseline Model...")
start_time = time.time()
baseline_model.fit(
    train_pool,
    eval_set=val_pool,
    early_stopping_rounds=50,
    use_best_model=True
)
end_time = time.time()
print(f"Baseline Training Time: {end_time - start_time:.2f} seconds")


Save Baseline

In [None]:

baseline_path = os.path.join(MODEL_DIR, 'catboost_baseline.cbm')
baseline_model.save_model(baseline_path)
print(f"Baseline model saved to {baseline_path}")

print("\n" + "="*50)
print("PHASE 3: HYPERPARAMETER TUNING")
print("="*50)


RandomizedSearchCV

In [None]:
print("Setting up RandomizedSearchCV...")

# Hyperparameter tuning
# Reduced search space for faster demonstration
param_dist = {
    'iterations': [500],
    'depth': [4, 6],
    'learning_rate': [0.03, 0.1],
    'l2_leaf_reg': [1, 3]
}


Initialize a base model for tuning

In [None]:

# Note: For GridSearchCV/RandomizedSearchCV with CatBoost, it's often better to pass CPU task_type for the search controller 
# but the estimator can use GPU. However, sklearn cross-validation splits data which forces CatBoost to re-upload to GPU many times.
# We will use a simplified approach: use the built-in randomized_search or grid_search from CatBoost library if possible, 
# or use sklearn's RandomizedSearchCV with GPU enabled on the estimator.

# Using CatBoost's built-in randomized_search
print("Starting Hyperparameter Tuning using CatBoost's built-in randomized_search...")
start_time = time.time()

tuning_model = CatBoostClassifier(
    task_type="GPU",
    loss_function='Logloss',
    eval_metric='AUC',
    cat_features=cat_features,
    random_seed=RANDOM_SEED,
    verbose=0,
    early_stopping_rounds=50
)


Splits the data into Training (70%), Validation (15%), and Test (15%) sets.

In [None]:
# CatBoost expects params in a different format for randomized_search
# It returns a dictionary with 'params' key containing the best parameters
tuned_result = tuning_model.randomized_search(
    param_distributions=param_dist,
    X=X_train,
    y=y_train,
    cv=2,
    n_iter=2,
    partition_random_seed=RANDOM_SEED,
    calc_cv_statistics=True,
    search_by_train_test_split=False, # Use CV
    verbose=False,
    plot=False
)

end_time = time.time()
print(f"Hyperparameter Tuning Time: {end_time - start_time:.2f} seconds")


best_params = tuned_result['params']
print(f"Best Parameters found: {best_params}")


Retrain with Best Parameters

In [None]:

print("Retraining Tuned Model with Best Parameters...")
tuned_model = CatBoostClassifier(
    task_type="GPU",
    iterations=best_params.get('iterations', 1000), # Default if not in result
    depth=best_params.get('depth', 6),
    learning_rate=best_params.get('learning_rate', 0.03),
    l2_leaf_reg=best_params.get('l2_leaf_reg', 3),
    loss_function='Logloss',
    eval_metric='AUC',
    random_seed=RANDOM_SEED,
    verbose=100
)

tuned_model.fit(
    train_pool,
    eval_set=val_pool,
    early_stopping_rounds=50,
    use_best_model=True
)


### Code Summary: Model Evaluation
According to the **Model Performance Comparison** table (Section 6.1), this module evaluates the final models.
- **Goal:** Compare Baseline vs. Tuned TabNet on the Test set.
- **Key Insight:** The Tuned Model improved **Recall by +3.55%** (from 0.2412 to 0.2767), which is critical for identifying at-risk students.
- **Action:** The code below saves the tuned model, defines the evaluation metrics (Accuracy, Precision, Recall, F1), and outputs the performance comparison.

Generates and saves the Confusion Matrix to evaluate model performance on the test set.

In [None]:
tuned_path = os.path.join(MODEL_DIR, 'catboost_tuned.cbm')
tuned_model.save_model(tuned_path)
print(f"Tuned model saved to {tuned_path}")


print("\n" + "="*50)
print("PHASE 4: VALIDATION & EVALUATION")
print("="*50)

def evaluate_model(model, pool, name="Model"):
    y_pred = model.predict(pool)
    y_prob = model.predict_proba(pool)[:, 1]
    
    acc = accuracy_score(y_test, y_pred)
    prec = precision_score(y_test, y_pred)
    rec = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    cm = confusion_matrix(y_test, y_pred)
    
    print(f"--- {name} Evaluation ---")
    print(f"Accuracy:  {acc:.4f}")
    print(f"Precision: {prec:.4f}")
    print(f"Recall:    {rec:.4f}")
    print(f"F1 Score:  {f1:.4f}")
    print("Confusion Matrix:")
    print(cm)
    return f1

print("Evaluating Baseline Model on Test Set...")
f1_baseline = evaluate_model(baseline_model, test_pool, "Baseline")

print("\nEvaluating Tuned Model on Test Set...")
f1_tuned = evaluate_model(tuned_model, test_pool, "Tuned")

print("\nPerformance Comparison:")
print(f"Baseline F1: {f1_baseline:.4f}")
print(f"Tuned F1:    {f1_tuned:.4f}")
if f1_tuned > f1_baseline:
    print("Result: Tuned model outperformed Baseline.")
else:
    print("Result: Tuned model did not outperform Baseline (might need more search iterations).")


print("\n" + "="*50)
print("PHASE 5: MODEL INTERPRETATION")
print("="*50)


Feature Importance

In [None]:

feature_importances = tuned_model.get_feature_importance(train_pool)
feature_names = X_train.columns
fi_df = pd.DataFrame({'Feature': feature_names, 'Importance': feature_importances})
fi_df = fi_df.sort_values(by='Importance', ascending=False).head(10)

print("Top 10 Important Features:")
print(fi_df)


Interpretation Text

In [None]:

top_feature = fi_df.iloc[0]['Feature']
print(f"\nInterpretation Analysis:")
print(f"The most influential feature for predicting student failure is '{top_feature}'.")
print("This suggests that this specific background factor plays a critical role in academic performance.")
print("Educators should focus on monitoring these high-impact variables to intervene early.")

print("\n" + "="*50)
print("EXECUTION COMPLETE")
print("="*50)


Set random seed for reproducibility (Must match train_catboost.py)·

In [None]:
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
# Paths
DATA_PATH = r'dataProcessing/Final.csv'
CODEBOOK_PATH = r'codebook/FinalCodeBook.csv'
MODEL_DIR = r'models'
MODEL_PATH = os.path.join(MODEL_DIR, 'catboost_tuned.cbm')

print("="*50)
print("GENERATING VISUALIZATION ARTIFACTS")
print("="*50)


Loads the final merged dataset for analysis/training.

In [None]:
print("Loading dataset...")
df = pd.read_csv(DATA_PATH, low_memory=False)


Parses the codebook to identify and separate Categorical and Numerical features.

In [None]:
print("Parsing Codebook for feature types...")
codebook = pd.read_csv(CODEBOOK_PATH, encoding='latin-1')

cat_features = []
num_features = []
target = 'Fail'
exclude_cols = ['IDCNTRY', 'IDBOOK', 'IDSCHOOL', 'IDCLASS', 'IDSTUD', 
                'IDTEALIN', 'IDTEACH', 'IDLINK', 'IDPOP', 'IDGRADER', 
                'IDGRADE', 'IDSUBJ', 'MATWGT', 'JKREP', 'JKZONE', 
                'Math_Mean', 'Fail']

for col in df.columns:
    if col in exclude_cols:
        continue
    
    var_info = codebook[codebook.iloc[:, 0].astype(str).str.strip() == col]
    
    if not var_info.empty:
        level = var_info.iloc[0, 3]
        if level == 'Nominal':
            cat_features.append(col)
        elif level == 'Scale':
            num_features.append(col)
        else:
            if pd.api.types.is_numeric_dtype(df[col]):
                num_features.append(col)
            else:
                cat_features.append(col)
    else:
        if pd.api.types.is_numeric_dtype(df[col]):
            num_features.append(col)
        else:
            cat_features.append(col)


Handles missing values: fills categorical with 'Missing' and numerical with median (for CatBoost).

In [None]:
for col in cat_features:
    df[col] = df[col].astype(str).replace('nan', 'Missing')

# Handle Numerical Missing Values
for col in num_features:
    df[col] = pd.to_numeric(df[col], errors='coerce')
    df[col] = df[col].fillna(df[col].median())


Splits the data into Training (70%), Validation (15%), and Test (15%) sets.

In [None]:
X = df[cat_features + num_features]
y = df[target]

X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.30, random_state=RANDOM_SEED, stratify=y
)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.50, random_state=RANDOM_SEED, stratify=y_temp
)


Loads the pre-trained CatBoost model from the saved file.

In [None]:
print(f"Loading model from {MODEL_PATH}...")
model = CatBoostClassifier()
model.load_model(MODEL_PATH)

Calculates feature importance from the model and saves the top features plot.

In [None]:
print("Generating Feature Importance Table...")
feature_importances = model.get_feature_importance(Pool(X_train, y_train, cat_features=cat_features))
feature_names = X_train.columns
fi_df = pd.DataFrame({'Feature': feature_names, 'Importance': feature_importances})
fi_df = fi_df.sort_values(by='Importance', ascending=False)

# Save Feature Importance CSV
fi_csv_path = os.path.join(MODEL_DIR, 'feature_importance.csv')
fi_df.to_csv(fi_csv_path, index=False)
print(f"Feature Importance CSV saved to: {fi_csv_path}")

# Plot Feature Importance (Top 20)
plt.figure(figsize=(10, 8))
sns.barplot(x="Importance", y="Feature", data=fi_df.head(20))
plt.title('Top 20 Important Features (CatBoost)')
plt.tight_layout()
fi_png_path = os.path.join(MODEL_DIR, 'feature_importance.png')
plt.savefig(fi_png_path)
print(f"Feature Importance Plot saved to: {fi_png_path}")


Generates and saves the Confusion Matrix to evaluate model performance on the test set.

In [None]:
print("Generating Confusion Matrix...")
y_pred = model.predict(X_test)
cm = confusion_matrix(y_test, y_pred)

plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=['Pass', 'Fail'], 
            yticklabels=['Pass', 'Fail'])
plt.ylabel('Actual')
plt.xlabel('Predicted')
plt.title('Confusion Matrix (Test Set)')
cm_png_path = os.path.join(MODEL_DIR, 'confusion_matrix.png')
plt.savefig(cm_png_path)
print(f"Confusion Matrix Plot saved to: {cm_png_path}")

print("="*50)
print("Artifact generation complete.")


## 4 TabNet Advanced Pipeline

Defines feature fusion logic (interactions/squares) and sets global random seeds for TabNet.

In [None]:
SEED = 42
ENABLE_FEATURE_FUSION = True

def add_feature_fusion(df, numeric_cols, max_base_features=20):
    if not numeric_cols:
        return df, numeric_cols
    base_cols = numeric_cols[:max_base_features]
    new_numeric_cols = list(numeric_cols)
    for col in base_cols:
        col_numeric = pd.to_numeric(df[col], errors="coerce")
        new_col = f"{col}__sq"
        df[new_col] = col_numeric ** 2
        new_numeric_cols.append(new_col)
    for c1, c2 in itertools.combinations(base_cols, 2):
        c1_numeric = pd.to_numeric(df[c1], errors="coerce")
        c2_numeric = pd.to_numeric(df[c2], errors="coerce")
        new_col = f"{c1}__x__{c2}"
        df[new_col] = c1_numeric * c2_numeric
        new_numeric_cols.append(new_col)
    return df, new_numeric_cols
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(SEED)

DATA_PATH = r'dataProcessing/Final.csv'
CODEBOOK_PATH = r'codebook/FinalCodeBook.csv'
OUTPUT_DIR = r'models_tabnet'

if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)

print(f"Device: {'cuda' if torch.cuda.is_available() else 'cpu'}")


Splits the data into Training (70%), Validation (15%), and Test (15%) sets.

In [None]:
def load_and_preprocess_data():
    print("\n[Phase 1] Data Preparation...")
    
    # Load Data
    df = pd.read_csv(DATA_PATH)
    print(f"Loaded data shape: {df.shape}")
    
    # Identify Target
    # User said "at_risk" (1=Risk). 
    # In our data, 'Fail' is 1 (Fail) / 0 (Pass).
    if 'Fail' in df.columns:
        df.rename(columns={'Fail': 'at_risk'}, inplace=True)
    elif 'at_risk' not in df.columns:
        # Fallback: assume last column is target if not named Fail/at_risk
        print("Warning: 'Fail' or 'at_risk' column not found. Using last column as target.")
        df.rename(columns={df.columns[-1]: 'at_risk'}, inplace=True)
    
    target_col = 'at_risk'
    
    # Force target to numeric and drop NaNs
    df[target_col] = pd.to_numeric(df[target_col], errors='coerce')
    df.dropna(subset=[target_col], inplace=True)
    df[target_col] = df[target_col].astype(int)
    
    print(f"Target distribution:\n{df[target_col].value_counts(normalize=True)}")

    # Load Codebook to determine allowed features and types
    try:
        cb = pd.read_csv(CODEBOOK_PATH, encoding='latin-1')
        cb.columns = cb.columns.str.strip().str.replace('^ï»¿', '', regex=True)
        var_col = cb.columns[0] # Usually 'Variable'
        
        allowed_vars = set(cb[var_col].dropna().astype(str).str.strip().tolist())
        print(f"Allowed variables from codebook: {len(allowed_vars)}")
        
        # Filter DataFrame to keep only allowed vars + Target
        # Also ensure we don't drop the target if it's not in allowed_vars (though it should be)
        keep_cols = [c for c in df.columns if c in allowed_vars or c == target_col]
        df = df[keep_cols]
        print(f"Filtered data shape (codebook vars only): {df.shape}")
        
        type_map = dict(zip(cb[var_col], cb['Level']))
    except Exception as e:
        print(f"Codebook reading failed ({e}), using all columns.")
        allowed_vars = set(df.columns)
        type_map = {}

    # Drop ID columns and Weights if they exist (heuristics based on previous file knowledge)
    # Even after filtering, double check to remove IDs if they somehow got into codebook
    drop_cols = [c for c in df.columns if c.startswith('ID') or 'WGT' in c or c == target_col]
    feature_cols = [c for c in df.columns if c not in drop_cols and c != target_col]
    
    numeric_cols = []
    categorical_cols = []
        
    for col in feature_cols:
        level = type_map.get(col, 'Unknown')
        if level == 'Scale':
            numeric_cols.append(col)
        elif level == 'Nominal' or level == 'Ordinal':
            categorical_cols.append(col)
        else:
            # Fallback heuristic
            if pd.api.types.is_numeric_dtype(df[col]) and df[col].nunique() > 10:
                numeric_cols.append(col)
            else:
                categorical_cols.append(col)

    if ENABLE_FEATURE_FUSION:
        df, numeric_cols = add_feature_fusion(df, numeric_cols)
        feature_cols = [c for c in df.columns if c not in drop_cols and c != target_col]

    print(f"Numeric features ({len(numeric_cols)}): {numeric_cols}")
    print(f"Categorical features ({len(categorical_cols)}): {categorical_cols}")
    
    X = df[feature_cols].copy()
    y = df[target_col].copy()

    # Split first (70% Train, 30% Temp)
    X_train, X_temp, y_train, y_temp = train_test_split(
        X, y, test_size=0.3, stratify=y, random_state=SEED
    )
    # Split Temp (50% Valid, 50% Test -> 15% Valid, 15% Test of total)
    X_valid, X_test, y_valid, y_test = train_test_split(
        X_temp, y_temp, test_size=0.5, stratify=y_temp, random_state=SEED
    )
    
    # Missing Value Imputation
    # Numeric -> Median (fit on Train)
    for col in numeric_cols:
        # Force to numeric (coerce errors to NaN)
        X_train[col] = pd.to_numeric(X_train[col], errors='coerce')
        X_valid[col] = pd.to_numeric(X_valid[col], errors='coerce')
        X_test[col] = pd.to_numeric(X_test[col], errors='coerce')
        
        median_val = X_train[col].median()
        X_train[col].fillna(median_val, inplace=True)
        X_valid[col].fillna(median_val, inplace=True)
        X_test[col].fillna(median_val, inplace=True)
        
    # Categorical -> Mode or "Missing" (fit on Train)
    for col in categorical_cols:
        # If numeric-like categorical (int codes), fill with -1 or mode.
        # If string, fill "Missing".
        if pd.api.types.is_numeric_dtype(X_train[col]):
            fill_val = -1 # Common for int-encoded categories
        else:
            fill_val = "Missing"
        
        X_train[col].fillna(fill_val, inplace=True)
        X_valid[col].fillna(fill_val, inplace=True)
        X_test[col].fillna(fill_val, inplace=True)

    # Encoding
    # Numeric -> StandardScaler
    scaler = StandardScaler()
    if numeric_cols:
        X_train[numeric_cols] = scaler.fit_transform(X_train[numeric_cols])
        X_valid[numeric_cols] = scaler.transform(X_valid[numeric_cols])
        X_test[numeric_cols] = scaler.transform(X_test[numeric_cols])
    
    # Categorical -> LabelEncoder
    # TabNet needs positive integers 0..N-1
    cat_idxs = []
    cat_dims = []
    
    # Map feature names to indices for TabNet
    feature_names = list(X_train.columns)
    
    for col in categorical_cols:
        le = LabelEncoder()
        # Fit on all possible values to avoid unknown class error, OR handle unknowns
        # Best practice: Fit on Train, handle unknown in Valid/Test
        le.fit(X_train[col].astype(str))
        
        # Helper to safely transform
        def safe_transform(series, encoder):
            classes = set(encoder.classes_)
            # Replace unknown with the first class (or a specific 'unknown' if we had one)
            # Here we map unknown to the most frequent (mode) which is usually class 0 after some sorting, 
            # or just use 0. Better: map to a special 'Unknown' if not present.
            # Simplified: Map unknown to class 0
            return series.astype(str).apply(lambda x: encoder.transform([x])[0] if x in classes else 0)

        X_train[col] = le.transform(X_train[col].astype(str))
        X_valid[col] = safe_transform(X_valid[col], le)
        X_test[col] = safe_transform(X_test[col], le)
        
        cat_idxs.append(feature_names.index(col))
        cat_dims.append(len(le.classes_))
    
    return {
        'X_train': X_train.values, 'y_train': y_train.values,
        'X_valid': X_valid.values, 'y_valid': y_valid.values,
        'X_test': X_test.values, 'y_test': y_test.values,
        'cat_idxs': cat_idxs, 'cat_dims': cat_dims,
        'feature_names': feature_names
    }


Training (Baseline):


Defines the function to train a baseline TabNet model with default parameters.

In [None]:
def train_baseline(data):
    print("\n[Phase 2] Training Baseline TabNet...")
    
    clf = TabNetClassifier(
        seed=SEED,
        cat_idxs=data['cat_idxs'],
        cat_dims=data['cat_dims'],
        cat_emb_dim=1, # Default is 1, can be tuned
        optimizer_fn=torch.optim.Adam,
        optimizer_params=dict(lr=2e-2),
        scheduler_params={"step_size":10, "gamma":0.9},
        scheduler_fn=torch.optim.lr_scheduler.StepLR,
        mask_type='sparsemax', # This is useful for interpretability
        verbose=1
    )
    
    start_time = time.time()
    clf.fit(
        X_train=data['X_train'], y_train=data['y_train'],
        eval_set=[(data['X_train'], data['y_train']), (data['X_valid'], data['y_valid'])],
        eval_name=['train', 'valid'],
        eval_metric=['auc', 'accuracy'],
        max_epochs=50, # Reduced for demo speed, typically 100+
        patience=10,
        batch_size=1024, 
        virtual_batch_size=128,
        num_workers=0,
        drop_last=False
    )
    print(f"Baseline Training Time: {time.time() - start_time:.2f}s")
    print(f"Best Valid Score: {clf.best_cost}")
    
    save_path = os.path.join(OUTPUT_DIR, 'tabnet_baseline.zip')
    clf.save_model(save_path)
    return clf


Defines the hyperparameter tuning function using Randomized Search to find optimal TabNet parameters.

In [None]:
def tune_model(data):
    print("\n[Phase 3] Tuning (Randomized Search Manual Loop)...")
    
    # Parameter Space
    param_grid = {
        'n_d': [8, 16, 24],
        'n_steps': [3, 5],
        'gamma': [1.0, 1.5],
        'lambda_sparse': [0, 1e-4],
        'learning_rate': [0.01, 0.02],
    }
    
    # Generate random combinations (e.g., 5 trials)
    import itertools
    keys, values = zip(*param_grid.items())
    all_combinations = [dict(zip(keys, v)) for v in itertools.product(*values)]
    # Randomly sample 5
    search_space = random.sample(all_combinations, min(5, len(all_combinations)))
    
    best_score = -1
    best_params = None
    best_model = None
    
    results = []

    for i, params in enumerate(search_space):
        print(f"Trial {i+1}/{len(search_space)}: {params}")
        
        # Set n_a = n_d as per TabNet recommendation
        params['n_a'] = params['n_d']
        
        clf = TabNetClassifier(
            seed=SEED,
            n_d=params['n_d'],
            n_a=params['n_a'],
            n_steps=params['n_steps'],
            gamma=params['gamma'],
            lambda_sparse=params['lambda_sparse'],
            cat_idxs=data['cat_idxs'],
            cat_dims=data['cat_dims'],
            optimizer_params=dict(lr=params['learning_rate']),
            verbose=0
        )
        
        clf.fit(
            X_train=data['X_train'], y_train=data['y_train'],
            eval_set=[(data['X_valid'], data['y_valid'])],
            eval_metric=['auc'],
            max_epochs=30,
            patience=5,
            batch_size=1024,
            virtual_batch_size=128
        )
        
        val_auc = clf.best_cost
        results.append({'params': params, 'val_auc': val_auc})
        print(f"  -> Valid AUC: {val_auc}")
        
        if val_auc > best_score:
            best_score = val_auc
            best_params = params
            best_model = clf
            
    print(f"\nBest Params: {best_params}")
    print(f"Best Valid AUC: {best_score}")
    
    # Save Best Model
    save_path = os.path.join(OUTPUT_DIR, 'tabnet_tuned.zip')
    best_model.save_model(save_path)
    
    # Save Tuning Logs
    pd.DataFrame(results).to_csv(os.path.join(OUTPUT_DIR, 'tuning_results.csv'), index=False)
    
    return best_model


Defines the evaluation function to calculate and save metrics (Accuracy, Precision, Recall, F1).

In [None]:
def evaluate_model(model, data, model_name="Model"):
    print(f"\n[Phase 4] Evaluating {model_name}...")
    
    # Predict
    preds = model.predict(data['X_test'])
    # TabNet predict_proba returns [prob_0, prob_1]
    probs = model.predict_proba(data['X_test'])[:, 1]
    
    y_true = data['y_test']
    
    # Metrics
    acc = accuracy_score(y_true, preds)
    prec = precision_score(y_true, preds)
    rec = recall_score(y_true, preds)
    f1 = f1_score(y_true, preds)
    
    print(f"Accuracy: {acc:.4f}")
    print(f"Precision: {prec:.4f}")
    print(f"Recall: {rec:.4f} (Crucial for At-Risk)")
    print(f"F1 Score: {f1:.4f}")
    print("\nClassification Report:")
    print(classification_report(y_true, preds))
    
    # Save Metrics
    metrics = {
        'Accuracy': acc, 'Precision': prec, 'Recall': rec, 'F1': f1
    }
    with open(os.path.join(OUTPUT_DIR, f'{model_name}_metrics.json'), 'w') as f:
        json.dump(metrics, f, indent=4)
        
    # Confusion Matrix Plot
    cm = confusion_matrix(y_true, preds)
    plt.figure(figsize=(6, 5))
    plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title(f'{model_name} Confusion Matrix')
    plt.colorbar()
    tick_marks = np.arange(2)
    plt.xticks(tick_marks, ['Not Risk', 'At Risk'])
    plt.yticks(tick_marks, ['Not Risk', 'At Risk'])
    
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], 'd'),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")
    
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.savefig(os.path.join(OUTPUT_DIR, f'{model_name}_cm.png'))
    plt.close()


Defines the interpretation function to extract and save feature importance from the TabNet model.

In [None]:
def interpret_model(model, data):
    print("\n[Phase 5] Feature Importance...")
    
    feat_importances = model.feature_importances_
    indices = np.argsort(feat_importances)[::-1]
    
    print("All Feature Importances:")
    top_feats = []
    for i in range(len(indices)):
        feat_name = data['feature_names'][indices[i]]
        score = feat_importances[indices[i]]
        print(f"{i+1}. {feat_name}: {score:.4f}")
        top_feats.append({'Feature': feat_name, 'Importance': score})
        
    pd.DataFrame(top_feats).to_csv(os.path.join(OUTPUT_DIR, 'feature_importance.csv'), index=False)


Main execution block: Runs the full TabNet pipeline (Prepare, Train, Tune, Evaluate, Interpret).

In [None]:
if __name__ == "__main__":
    # 1. Prepare
    data = load_and_preprocess_data()
    
    # 2. Train Baseline
    baseline_model = train_baseline(data)
    
    # 3. Tune
    tuned_model = tune_model(data)
    
    # 4. Evaluate
    import itertools # Ensure imported for plotting
    evaluate_model(baseline_model, data, "Baseline")
    evaluate_model(tuned_model, data, "Tuned")
    
    # 5. Interpret
    interpret_model(tuned_model, data)
    
    print(f"\nAll artifacts saved to {OUTPUT_DIR}")


## 6. Project Report & Results

### 6.1 Model Performance Comparison
| Metric | Baseline Model | Tuned Model | Improvement |
| :--- | :--- | :--- | :--- |
| **Accuracy** | 0.8763 | **0.8777** | +0.14% |
| **Precision** | **0.6914** | 0.6740 | -1.74% |
| **Recall** | 0.2412 | **0.2767** | +3.55% |
| **F1 Score** | 0.3576 | **0.3923** | +3.47% |

> **Note**: The Tuned Model shows a significant improvement in Recall (+3.55%), which is the most critical metric for identifying at-risk students.

### 6.2 Hyperparameter Tuning Results (Top 5 Trials)
| Trial | n_d | n_steps | gamma | lambda_sparse | lr | Val AUC |
| :--- | :--- | :--- | :--- | :--- | :--- | :--- |
| 1 | 24 | 5 | 1.0 | 0 | 0.01 | 0.7766 |
| 2 | 8 | 3 | 1.5 | 0.0001 | 0.02 | 0.7873 |
| 3 | 8 | 3 | 1.0 | 0 | 0.02 | 0.7783 |
| 4 | 24 | 5 | 1.5 | 0.0001 | 0.02 | 0.6209 |
| **5 (Best)** | **16** | **3** | **1.0** | **0** | **0.02** | **0.8145** |

### 6.3 Feature Importance (Top Features)
| Rank | Feature | Importance | Description |
| :--- | :--- | :--- | :--- |
| 1 | BCBG16B | 0.3856 | Student Absenteeism (Problem Degree) |
| 2 | BCBG14G | 0.1571 | Parental Expectations (School Character) |
| 3 | BCBG14H | 0.1442 | Parental Support (School Character) |
| 4 | BCBG14E | 0.0526 | Parental Involvement (School Character) |
| 5 | BCBG15G | 0.0495 | Encourage Students (Agreement) |