<a href="https://colab.research.google.com/github/aymenchibouti/doctorat/blob/main/model3_claude_xai.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv1D, LSTM, Dense, Dropout, GlobalMaxPooling1D, concatenate
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import optuna
import shap
import lime
import matplotlib.pyplot as plt
import seaborn as sns
from collections import defaultdict
import warnings
warnings.filterwarnings('ignore')

class StudentDropoutPredictor:
    def __init__(self):
        self.model = None
        self.scaler = StandardScaler()
        self.event_types = ['access', 'problem', 'wiki', 'discussion', 'navigate', 'page_close', 'video']
        self.n_weeks = 4
        self.n_events = 7

    def load_data(self):
        """Load and parse the CSV files"""
        print("Loading data files...")

        # Load truth data (enrollment_id, dropout_label)
        truth_df = pd.read_csv('truth_train.csv', header=None, names=['enrollment_id', 'dropout'])

        # Load enrollment data
        enrollment_df = pd.read_csv('enrollment_train.csv')

        # Load log data
        log_df = pd.read_csv('log_train spliting.csv')
        log_df['time'] = pd.to_datetime(log_df['time'])

        print(f"Truth data shape: {truth_df.shape}")
        print(f"Enrollment data shape: {enrollment_df.shape}")
        print(f"Log data shape: {log_df.shape}")

        return truth_df, enrollment_df, log_df

    def create_temporal_features(self, log_df, truth_df):
        """
        Create Model (3) structure: (1 student, 4 weeks, 7 events)
        Each student gets a 3D tensor of shape (4 weeks, 7 events)
        """
        print("Creating temporal features...")

        # Get unique enrollment IDs from truth data
        enrollment_ids = truth_df['enrollment_id'].unique()

        # Initialize the feature matrix
        n_students = len(enrollment_ids)
        X = np.zeros((n_students, self.n_weeks, self.n_events))
        y = np.zeros(n_students)
        student_ids = []

        # Create mapping from enrollment_id to dropout label
        truth_map = dict(zip(truth_df['enrollment_id'], truth_df['dropout']))

        for idx, enrollment_id in enumerate(enrollment_ids):
            if enrollment_id not in truth_map:
                continue

            student_ids.append(enrollment_id)
            y[idx] = truth_map[enrollment_id]

            # Get student's log data
            student_logs = log_df[log_df['enrollment_id'] == enrollment_id].copy()

            if len(student_logs) == 0:
                continue

            # Sort by time
            student_logs = student_logs.sort_values('time')

            # Get the first activity time as reference
            start_time = student_logs['time'].min()

            # Create weekly bins
            for _, row in student_logs.iterrows():
                # Calculate which week this activity belongs to
                time_diff = (row['time'] - start_time).days
                week_idx = min(time_diff // 7, self.n_weeks - 1)  # Cap at 4 weeks

                # Find event index
                if row['event'] in self.event_types:
                    event_idx = self.event_types.index(row['event'])
                    X[idx, week_idx, event_idx] += 1  # Count occurrences

        # Filter out students with no data
        valid_indices = [i for i, sid in enumerate(student_ids) if sid in truth_map]
        X = X[valid_indices]
        y = y[valid_indices]
        student_ids = [student_ids[i] for i in valid_indices]

        print(f"Created features for {len(student_ids)} students")
        print(f"Feature matrix shape: {X.shape}")
        print(f"Dropout distribution: {np.bincount(y.astype(int))}")

        return X, y, student_ids

    def create_cnn_lstm_model(self, trial=None):
        """
        Create CNN+LSTM model for dropout prediction
        Model (3): (1 student, 4 weeks, 7 events)
        """
        if trial:
            # Hyperparameter tuning mode
            lstm_units = trial.suggest_int('lstm_units', 32, 128)
            conv_filters = trial.suggest_int('conv_filters', 16, 64)
            conv_kernel = trial.suggest_int('conv_kernel', 2, 4)
            dropout_rate = trial.suggest_float('dropout_rate', 0.2, 0.5)
            learning_rate = trial.suggest_float('learning_rate', 1e-4, 1e-2, log=True)
        else:
            # Default parameters
            lstm_units = 64
            conv_filters = 32
            conv_kernel = 3
            dropout_rate = 0.3
            learning_rate = 0.001

        # Input layer: (batch_size, 4 weeks, 7 events)
        input_layer = Input(shape=(self.n_weeks, self.n_events), name='student_activity')

        # CNN layers to capture local patterns
        conv1 = Conv1D(filters=conv_filters, kernel_size=conv_kernel,
                      activation='relu', padding='same')(input_layer)
        conv2 = Conv1D(filters=conv_filters*2, kernel_size=conv_kernel,
                      activation='relu', padding='same')(conv1)

        # LSTM layers to capture temporal dependencies
        lstm1 = LSTM(lstm_units, return_sequences=True, dropout=dropout_rate)(conv2)
        lstm2 = LSTM(lstm_units//2, return_sequences=False, dropout=dropout_rate)(lstm1)

        # Global max pooling for CNN features
        global_pool = GlobalMaxPooling1D()(conv2)

        # Combine LSTM and CNN features
        combined = concatenate([lstm2, global_pool])

        # Dense layers for classification
        dense1 = Dense(64, activation='relu')(combined)
        dropout1 = Dropout(dropout_rate)(dense1)
        dense2 = Dense(32, activation='relu')(dropout1)
        dropout2 = Dropout(dropout_rate)(dense2)

        # Output layer
        output = Dense(1, activation='sigmoid', name='dropout_prediction')(dropout2)

        # Create model
        model = Model(inputs=input_layer, outputs=output)

        # Compile model
        model.compile(
            optimizer=Adam(learning_rate=learning_rate),
            loss='binary_crossentropy',
            metrics=['accuracy', 'precision', 'recall']
        )

        return model

    def hyperparameter_tuning(self, X_train, y_train, X_val, y_val, n_trials=50):
        """Perform hyperparameter tuning using Optuna"""
        print(f"Starting hyperparameter tuning with {n_trials} trials...")

        def objective(trial):
            # Create model with trial parameters
            model = self.create_cnn_lstm_model(trial)

            # Early stopping
            early_stopping = EarlyStopping(
                monitor='val_loss',
                patience=10,
                restore_best_weights=True
            )

            # Train model
            history = model.fit(
                X_train, y_train,
                validation_data=(X_val, y_val),
                epochs=50,
                batch_size=32,
                callbacks=[early_stopping],
                verbose=0
            )

            # Return validation AUC as objective
            val_pred = model.predict(X_val, verbose=0)
            val_auc = roc_auc_score(y_val, val_pred)

            return val_auc

        # Create study
        study = optuna.create_study(direction='maximize')
        study.optimize(objective, n_trials=n_trials)

        print(f"Best trial: {study.best_trial.number}")
        print(f"Best AUC: {study.best_value:.4f}")
        print(f"Best parameters: {study.best_params}")

        return study.best_params

    def train_model(self, X_train, y_train, X_val, y_val, best_params=None):
        """Train the CNN+LSTM model"""
        print("Training the model...")

        # Create model with best parameters
        if best_params:
            # Create a mock trial with best parameters
            class MockTrial:
                def __init__(self, params):
                    self.params = params
                def suggest_int(self, name, *args):
                    return self.params[name]
                def suggest_float(self, name, *args):
                    return self.params[name]

            mock_trial = MockTrial(best_params)
            self.model = self.create_cnn_lstm_model(mock_trial)
        else:
            self.model = self.create_cnn_lstm_model()

        # Callbacks
        callbacks = [
            EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True),
            ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=10, min_lr=1e-6)
        ]

        # Handle class imbalance
        class_weights = {
            0: len(y_train) / (2 * np.sum(y_train == 0)),
            1: len(y_train) / (2 * np.sum(y_train == 1))
        }

        # Train model
        history = self.model.fit(
            X_train, y_train,
            validation_data=(X_val, y_val),
            epochs=100,
            batch_size=32,
            callbacks=callbacks,
            class_weight=class_weights,
            verbose=1
        )

        return history

    def evaluate_model(self, X_test, y_test):
        """Evaluate the model and return metrics"""
        print("Evaluating model...")

        # Predictions
        y_pred_prob = self.model.predict(X_test)
        y_pred = (y_pred_prob > 0.5).astype(int)

        # Metrics
        auc_score = roc_auc_score(y_test, y_pred_prob)
        print(f"AUC Score: {auc_score:.4f}")

        print("\nClassification Report:")
        print(classification_report(y_test, y_pred))

        print("\nConfusion Matrix:")
        cm = confusion_matrix(y_test, y_pred)
        print(cm)

        return {
            'auc': auc_score,
            'predictions': y_pred,
            'probabilities': y_pred_prob,
            'confusion_matrix': cm
        }

    def explain_predictions_shap(self, X_sample, sample_size=100):
        """Generate SHAP explanations for model predictions"""
        print("Generating SHAP explanations...")

        # Sample data for SHAP (to reduce computation time)
        if len(X_sample) > sample_size:
            indices = np.random.choice(len(X_sample), sample_size, replace=False)
            X_shap = X_sample[indices]
        else:
            X_shap = X_sample

        # Create SHAP explainer
        explainer = shap.DeepExplainer(self.model, X_shap[:50])  # Use subset as background
        shap_values = explainer.shap_values(X_shap[:20])  # Explain subset

        return explainer, shap_values, X_shap[:20]

    def plot_feature_importance(self, shap_values, X_sample):
        """Plot feature importance using SHAP values"""
        print("Plotting feature importance...")

        # Reshape SHAP values and features for plotting
        shap_vals_reshaped = shap_values[0].reshape(-1, self.n_weeks * self.n_events)
        X_reshaped = X_sample.reshape(-1, self.n_weeks * self.n_events)

        # Create feature names
        feature_names = []
        for week in range(self.n_weeks):
            for event in self.event_types:
                feature_names.append(f"Week{week+1}_{event}")

        # Summary plot
        plt.figure(figsize=(12, 8))
        shap.summary_plot(shap_vals_reshaped, X_reshaped,
                         feature_names=feature_names, show=False)
        plt.title("SHAP Feature Importance for Dropout Prediction")
        plt.tight_layout()
        plt.show()

        return feature_names

    def create_temporal_heatmap(self, X_sample, predictions, n_samples=5):
        """Create heatmaps showing temporal patterns for sample students"""
        print("Creating temporal activity heatmaps...")

        fig, axes = plt.subplots(n_samples, 1, figsize=(12, 3*n_samples))
        if n_samples == 1:
            axes = [axes]

        for i in range(min(n_samples, len(X_sample))):
            # Student activity matrix
            student_data = X_sample[i]
            dropout_prob = predictions[i][0]

            # Create heatmap
            sns.heatmap(student_data.T,
                       xticklabels=[f'Week {j+1}' for j in range(self.n_weeks)],
                       yticklabels=self.event_types,
                       cmap='YlOrRd',
                       annot=True,
                       fmt='.0f',
                       ax=axes[i])

            axes[i].set_title(f'Student {i+1} - Dropout Probability: {dropout_prob:.3f}')
            axes[i].set_xlabel('Time Period')
            axes[i].set_ylabel('Event Type')

        plt.tight_layout()
        plt.show()

def main():
    """Main execution function"""
    # Initialize predictor
    predictor = StudentDropoutPredictor()

    # Load data
    truth_df, enrollment_df, log_df = predictor.load_data()

    # Create temporal features (Model 3 structure)
    X, y, student_ids = predictor.create_temporal_features(log_df, truth_df)

    # Split data
    X_train, X_temp, y_train, y_temp = train_test_split(
        X, y, test_size=0.4, random_state=42, stratify=y
    )
    X_val, X_test, y_val, y_test = train_test_split(
        X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
    )

    print(f"Training set: {X_train.shape[0]} samples")
    print(f"Validation set: {X_val.shape[0]} samples")
    print(f"Test set: {X_test.shape[0]} samples")

    # Hyperparameter tuning
    best_params = predictor.hyperparameter_tuning(X_train, y_train, X_val, y_val, n_trials=20)

    # Train model with best parameters
    history = predictor.train_model(X_train, y_train, X_val, y_val, best_params)

    # Evaluate model
    results = predictor.evaluate_model(X_test, y_test)

    # XAI: SHAP explanations
    explainer, shap_values, X_shap_sample = predictor.explain_predictions_shap(X_test)

    # Plot feature importance
    feature_names = predictor.plot_feature_importance(shap_values, X_shap_sample)

    # Create temporal heatmaps
    predictor.create_temporal_heatmap(X_test[:5], results['probabilities'][:5])

    print("\n=== Model Training and Evaluation Complete ===")
    print(f"Final AUC Score: {results['auc']:.4f}")
    print(f"Best hyperparameters: {best_params}")

if __name__ == "__main__":
    main()