In [None]:
import polars as pl
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix
import xgboost as xgb
import joblib
import json
from pathlib import Path

In [None]:
# Load processed data set
processed_df = pl.read_parquet("processed_data/comprehensive_eeg_features.parquet")

print(f"Shape: {processed_df.shape}")
print(f"Columns: {processed_df.columns}")

In [None]:
def prepare_classification_data(df, target_column, feature_columns=None):
    """
    Prepare data for classification by separating labeled and unlabeled data
    """
    labeled_df = df.filter(pl.col(target_column) != "")
    unlabeled_df = df.filter(pl.col(target_column) == "")
    
    if feature_columns is None:
        exclude_cols = ['seizure_type', 'localization', 'lateralization', 'patient_id', 'seizure_id']
        feature_columns = [col for col in df.columns if col not in exclude_cols]
    
    X_labeled = labeled_df.select(feature_columns).to_numpy()
    y_labeled = labeled_df.select(target_column).to_numpy().ravel()
    
    X_unlabeled = unlabeled_df.select(feature_columns).to_numpy()
    
    return X_labeled, y_labeled, X_unlabeled, feature_columns, exclude_cols

In [None]:
def train_xgboost_classifier(X_train, y_train, X_test, y_test):
    """
    Train XGBoost classifier with proper label encoding
    """
    # Fit encoder on ALL labels (train + test) to avoid unseen label errors
    label_encoder = LabelEncoder()
    all_labels = np.concatenate([y_train, y_test])
    label_encoder.fit(all_labels)
    
    # Transform train and test sets
    y_train_encoded = label_encoder.transform(y_train)
    y_test_encoded = label_encoder.transform(y_test)
    
    n_classes = len(label_encoder.classes_)
    print(f"Number of classes detected: {n_classes}")
    print(f"Classes: {label_encoder.classes_}")
    
    # Initialize and train model with explicit num_class
    if n_classes == 2:
        # Binary classification
        model = xgb.XGBClassifier(
            n_estimators=1000,
            max_depth=8,
            learning_rate=0.01,
            objective='binary:logistic',
            random_state=42
        )
    else:
        # Multi-class classification
        model = xgb.XGBClassifier(
            n_estimators=1000,
            max_depth=8,
            learning_rate=0.01,
            objective='multi:softprob',
            num_class=n_classes,
            random_state=42
        )
    
    print(f'X train shape: {X_train.shape}')
    print(f'Y train encoded shape: {y_train_encoded.shape}')
    
    model.fit(X_train, y_train_encoded)
    
    # Predict
    y_pred = model.predict(X_test)
    
    return model, y_pred, y_test_encoded, label_encoder

In [None]:
def evaluate_model(y_true, y_pred, label_encoder=None):
    """
    Print evaluation metrics
    """
    if label_encoder:
        y_true_labels = label_encoder.inverse_transform(y_true)
        y_pred_labels = label_encoder.inverse_transform(y_pred)
        print("Classification Report:")
        print(classification_report(y_true_labels, y_pred_labels))
        print("\nConfusion Matrix:")
        print(confusion_matrix(y_true_labels, y_pred_labels))
    else:
        print("Classification Report:")
        print(classification_report(y_true, y_pred))
        print("\nConfusion Matrix:")
        print(confusion_matrix(y_true, y_pred))

In [None]:
def cross_validate_model(X, y, cv_folds=4):
    """
    Perform cross-validation to assess model stability
    """
    # Encode labels if they're strings
    label_encoder = LabelEncoder()
    if isinstance(y[0], str):
        y_encoded = label_encoder.fit_transform(y)
    else:
        y_encoded = y
        
    n_classes = len(label_encoder.classes_)
    
    if n_classes == 2:
        # Binary classification
        model = xgb.XGBClassifier(
            n_estimators=1000,
            max_depth=8,
            learning_rate=0.01,
            objective='binary:logistic',
            random_state=42
        )
    else:
        # Multi-class classification
        model = xgb.XGBClassifier(
            n_estimators=1000,
            max_depth=8,
            learning_rate=0.01,
            objective='multi:softprob',
            num_class=n_classes,
            random_state=42
        )
    
    scores = cross_val_score(model, X, y_encoded, cv=cv_folds, scoring='accuracy')
    print(f"Cross-validation scores: {scores}")
    print(f"Mean accuracy: {scores.mean():.3f} (+/- {scores.std() * 2:.3f})")
    
    return scores

In [None]:
def predict_unlabeled_seizures(model, X_unlabeled, label_encoder=None):
    """
    Predict labels for unlabeled seizures
    """
    predictions = model.predict(X_unlabeled)
    probabilities = model.predict_proba(X_unlabeled)
    
    if label_encoder:
        predictions = label_encoder.inverse_transform(predictions)
    
    return predictions, probabilities

In [None]:
def get_feature_importance(model, feature_names):
    """
    Get and display feature importance
    """
    importance = model.feature_importances_
    feature_importance = list(zip(feature_names, importance))
    feature_importance.sort(key=lambda x: x[1], reverse=True)
    
    print("Top 10 Most Important Features:")
    for feat, imp in feature_importance[:10]:
        print(f"{feat}: {imp:.4f}")
    
    return feature_importance

In [None]:
def build_seizure_classifier(df, target_column):
    """
    Complete pipeline to build and evaluate a seizure classifier
    """
    # Prepare data
    X_labeled, y_labeled, X_unlabeled, feature_columns, exclude_cols = prepare_classification_data(df, target_column)
    
    print(f'y labeled shape: {y_labeled.shape}')
    print(f'x labeled shape: {X_labeled.shape}')
    print(f'x unlabeled shape: {X_unlabeled.shape}')
    
    # Split labeled data
    X_train, X_test, y_train, y_test = train_test_split(
        X_labeled, y_labeled, test_size=0.2, random_state=42, stratify=y_labeled
    )
    
    # Train model
    model, y_pred, y_test_encoded, label_encoder = train_xgboost_classifier(X_train, y_train, X_test, y_test)
    
    print(f'y pred shape: {y_pred.shape}')
    
    # Evaluate
    evaluate_model(y_test_encoded, y_pred, label_encoder)
    
    # Get feature importance
    feature_importance = get_feature_importance(model, feature_columns)
    
    # Cross-validate
    cv_scores = cross_validate_model(X_labeled, y_labeled)
    
    # Predict unlabeled
    if len(X_unlabeled) > 0:
        predictions, probabilities = predict_unlabeled_seizures(model, X_unlabeled, label_encoder)
        print(f"\nPredicted {len(predictions)} unlabeled seizures")
    else:
        predictions, probabilities = None, None
    
    return model, label_encoder, predictions, probabilities, feature_importance

In [None]:
# Save and load functions
def save_model(model, label_encoder, filepath_prefix):
    """
    Save model and label encoder
    """
    joblib.dump(model, f"{filepath_prefix}_model.pkl")
    joblib.dump(label_encoder, f"{filepath_prefix}_encoder.pkl")

In [None]:
def load_model(filepath_prefix):
    """
    Load model and label encoder
    """
    model = joblib.load(f"{filepath_prefix}_model.pkl")
    label_encoder = joblib.load(f"{filepath_prefix}_encoder.pkl")
    return model, label_encoder

In [None]:
def train_all_classifiers(df):
    """
    Train classifiers for all three target variables
    """
    results = {}
    
    for target in ['seizure_type', 'localization', 'lateralization']:
        print(f"\n{'='*50}")
        print(f"Training classifier for: {target}")
        print('='*50)
        
        model, encoder, predictions, probabilities, feature_importance = build_seizure_classifier(df, target)
        
        results[target] = {
            'model': model,
            'encoder': encoder,
            'predictions': predictions,
            'probabilities': probabilities,
            'feature_importance': feature_importance
        }
        
        # Save model
        save_model(model, encoder, f"{target}_classifier")
    
    return results

## Data Encoding

In [None]:
targets =[
    'seizure_type', 
    'localization', 
    'lateralization'
]
processed_df = processed_df.with_columns(
    pl.col(targets).cast(pl.Categorical)
)

In [None]:
processed_df['seizure_type'].n_unique()

In [None]:
def encode_categoricals(df):
    # Copy dataframe to avoid modifying original
    encoded_df = df.clone()
    
    # Dictionary to store encoding mappings
    encoding_mappings = {}
    
    # Process each column
    for col in df.columns:
        dtype = df[col].dtype
        
        # Check if column is string/object type
        if dtype == pl.Utf8 or dtype == pl.Object:
            # Get unique values and create mapping (starting from 1)
            unique_vals = encoded_df[col].unique().drop_nulls().sort()
            mapping = {val: i+1 for i, val in enumerate(unique_vals)}
            encoding_mappings[col] = mapping
            
            # Apply the mapping directly
            encoded_df = encoded_df.with_columns(
                pl.col(col).replace(mapping).alias(col)
            )
    
    return encoded_df, encoding_mappings

In [None]:
encoded_df, encoding_mappings = encode_categoricals(processed_df)

In [None]:
#encoded_df.columns

In [None]:
features = [
    'time_zero_crossings_std',
    'de_delta_asymmetry_mean',
    'ictal_wt_level5_entropy_std',
    'pac_theta_high_gamma',
    'de_theta_asymmetry_mean',
    'ictal_wt_level3_entropy_std',
    'permutation_entropy_std',
    'ictal_time_hjorth_complexity_max',
    'ictal_de_high_beta_asymmetry_mean',
    'ictal_de_low_beta_asymmetry_mean',
    'de_alpha_median',
    'duration_seconds',
    'time_mean_min',
    'de_delta_mean',
    'time_mean_mean',
    'wt_level0_entropy_std',
    'patient_id',
    'seizure_type',
    'localization',
    'lateralization'
]

In [None]:
encoded_df = encoded_df.select(features)

In [None]:
# Train all classifiers
results = train_all_classifiers(encoded_df)

In [None]:
# Access individual results
seizure_type_model = results['seizure_type']['model']
seizure_type_model

In [None]:
seizure_type_predictions = results['seizure_type']['predictions']
seizure_type_predictions

In [None]:
seizure_type_feature_importances = results['seizure_type']['feature_importance']
seizure_type_feature_importances

In [None]:
def convert_feature_importances_to_df(feature_importances):
    """Convert list of (feature_name, importance_value) tuples to Polars DataFrame"""
    feature_names = [item[0] for item in feature_importances]
    importance_values = [float(item[1]) for item in feature_importances]
    
    return pl.DataFrame({
        'feature': feature_names,
        'importance': importance_values
    })
    
seizure_type_feature_df = convert_feature_importances_to_df(seizure_type_feature_importances)