# Final use-case : School Failure Prediction

This notebook will try to build a **predictive ai solution** to estimate a school failure for a given student.

The system use a "*Chain of Responsibiliy*" pattern to pipeline the process. Each element of the chain is responsible of one process, then give the result to the other.

This architecture allow to easily change or add process orchestration.

In [None]:
%load_ext autoreload
%autoreload 2
import pandas as pd
from loguru import logger

from pipeline_core.pipeline_core import DataHandler, PipelineContext, PipelineOrchestrator

In [2]:
from pipeline_core.pipeline_core import PipelineContext


class DataLoader(DataHandler):
    """
    Handle data loading from csv
    """
    def __init__(self, files_to_load):
        self.files_to_load = files_to_load

        super().__init__()

    def process(self, context: PipelineContext) -> PipelineContext:
        """
        """
        from file_handling_core.file_manager import FileManager
       

        file_manager = FileManager()

        for name, file in self.files_to_load.items():
            df = file_manager.load_data(file)
            if df is not None and len(df) > 0:
                context.data_map[name] = df

        return context
    

In [3]:
from pipeline_core.pipeline_core import PipelineContext


class SensitiveDataHandler(DataHandler):
    """
    Remove all sensible columns from sources before merge the two dataframes
    See:
        DataHandler abstract class
    """
    def __init__(self, sensitive_columns: list):
        super().__init__()
        self.sensitive_columns = sensitive_columns

    def process(self, context: PipelineContext) -> PipelineContext:
        logger.info(f"üîé Check sensitives columns in: {len(context.data_map)} sources")

        for name, df in context.data_map.items():
            to_drop = [col for col in self.sensitive_columns if col in df.columns]

            if to_drop:
                context.data_map[name] = df.drop(columns=to_drop)
                logger.debug(f"‚ùå Remove {to_drop} from source: {name}")
            else:
                logger.debug(f"üå± No sensitive datas in source: {name}")
        return context

In [None]:
class MergerHandler(DataHandler):
    """
    Merge context dataframe
    Logs process in the context
    """
    def process(self, context: PipelineContext) -> PipelineContext:
        if not context.data_map:
            raise ValueError("‚ùå MergerHandler : data_map is empty. Nothing to merge")

        source_names = list(context.data_map.keys())
        logger.info(f"üîÑ Merge sources : {source_names}")

        # 1. Check for columns consistance
        first_df_cols = set(context.data_map[source_names[0]].columns)
        for name in source_names[1:]:
            current_cols = set(context.data_map[name].columns)
            if first_df_cols != current_cols:
                diff = first_df_cols.symmetric_difference(current_cols)
                logger.warning(f"‚ö†Ô∏è Diffrence between columns was detected {name}: {diff}")
                # Check if we can merge columuns 

        # 2. Prepare and merge
        frames_to_concat = []
        for name, df in context.data_map.items():
            temp_df = df.copy()
            temp_df['source_origin'] = name  # Ajout de la provenance
            frames_to_concat.append(temp_df)

        merged_df = pd.concat(frames_to_concat, ignore_index=True)
        initial_count = len(merged_df)

        # 3. Duplicates handling
        # 'source_origin' ignored to identify real business duplicates
        subset_cols = [col for col in merged_df.columns if col != 'source_origin']
        context.final_df = merged_df.drop_duplicates(subset=subset_cols).reset_index(drop=True)
        
        duplicates_removed = initial_count - len(context.final_df)

        # 4. Store logs in context (dict metadata/logs)
        context.metadata['merger_report'] = {
            'initial_rows': initial_count,
            'final_rows': len(context.final_df),
            'duplicates_removed': duplicates_removed,
            'sources': source_names
        }

        logger.success(f"‚úÖ Merge complete: {len(context.final_df)} rows kept ({duplicates_removed} duplicates remove).")
        
        return context

## Outlier handler

**Sujet** Identifie les valeurs "ab√©rantes" et utlise la strat√©gie de suppression de la ligne enti√®re

In [None]:
from strategy_core.outliers_strategies import OutlierStrategy

class OutlierHandler(DataHandler):
    def __init__(self, strategy: OutlierStrategy, target_columns: list):
        super().__init__()
        self.strategy = strategy
        self.target_columns = target_columns

    def process(self, context: PipelineContext) -> PipelineContext:
        """
        Identify ludicrous data and remove all row if found
        """
        logger.info("üõ†Ô∏è Outliers detection running...")
        
        final_df = context.final_df
        if final_df is not None:
            initial_count = len(final_df)
            df_inlier = self.strategy.detect_and_clean(df=final_df, columns=self.target_columns)
            removed = initial_count - len(df_inlier)

            context.final_df = df_inlier

            # Store metadatas
            if "outlier_reports" not in context.metadata:
                context.metadata['outlier_reports'] = {}
            # Get the concrete strategy name
            s_name = self.strategy.__class__.__name__
            context.metadata["outlier_reports"][s_name] = removed

            logger.debug(f"üóëÔ∏è {removed} outliers removed using {s_name}.")

        return context

## NaN imputation

**Sujet** : Identifier les valeurs manquantes et utiliser une strat√©gie pour remplacer

On va utiliser une d√©tection intelligente "regressive" pour isoler les donn√©es manquantes et les remplacer.

In [None]:
from pipeline_core.pipeline_core import PipelineContext
from strategy_core.imputation_strategies import ImputationStrategy

class ImputationHandler(DataHandler):
    """
    Identify and impute missing values
    """
    def __init__(self, strategy: ImputationStrategy):
        self.strategy = strategy
        super().__init__()

    def process(self, context: PipelineContext) -> PipelineContext:

        if context.final_df is None:
            logger.error("‚ùå SmartImputationHandler : final_df is empty. This handler must be place AFTER MergerHandler.")
            return context
        
        df = context.final_df

        # 1. Automatic NaN columns detection
        nan_report = df.isna().sum()
        cols_with_nan = nan_report[nan_report > 0].index.tolist()

        # Only numercial columns are kept
        target_cols = [c for c in cols_with_nan if pd.api.types.is_numeric_dtype(df[c])]

        if not target_cols:
            logger.info("‚úÖ No missing datas detected in the dataframe")
            return context
        

        logger.info("üõ†Ô∏è Smart NaN imputation running...")

        # Applying strategy
        initial_nan_count = df[target_cols].isna().sum().sum()
        context.final_df = self.strategy.apply(df, target_cols)

        # Logging and metadatas
        context.logs["imputation_report"] = {
            "fixed_columns": target_cols,
            "total_values_filled": int(initial_nan_count)
        }

        logger.success(f"‚ú® {initial_nan_count} successfuly missing datas processed")
        
        return context

In [None]:
import os
from datetime import datetime
from file_handling_core.file_manager import FileManager

class DataExportHandler(DataHandler):
    """
    Save dataframe using FileHandler
    Comes after cleaning and merging
    """
    def __init__(self, output_dir: str = "outputs/data_processed"):
        super().__init__()
        self.output_dir = output_dir
        self.file_manager = FileManager()
        
        # Create folder if not exists
        if not os.path.exists(self.output_dir):
            os.makedirs(self.output_dir)
            logger.info(f"üìÅ Folder created: {self.output_dir}")

    def process(self, context: PipelineContext) -> PipelineContext:
        if context.final_df is None or context.final_df.empty:
            logger.warning("‚ö†Ô∏è DataExportHandler: No data to save (df is empty).")
            return context

        # Filename generation: student_JJMMAAAA_HHMMss_processed.csv
        timestamp = datetime.now().strftime("%d%m%Y_%H%M%S")
        file_name = f"student_{timestamp}_processed.csv"
        full_path = os.path.join(self.output_dir, file_name)

        try:
            logger.info(f"üíæ Try to save to {full_path}")
            self.file_manager.save_processed_data(context.final_df, full_path)
            
            # Add log to context
            context.metadata['export_path'] = full_path
            
        except Exception as e:
            logger.error(f"‚ùå Data export failed: {e}")
            raise # Stop here if failed

        return context

## Learning handler
Using strategies, handler will run Logistic Regression (LR) or Random Forest Classifier (RF) with 4 hypothesis.
During training, MLFlow stores metrics, artifacts and finally store the most powerfull model.

In [None]:
from strategy_core.training_strategies import TrainingStrategy

class ModelHandler(DataHandler):
    def __init__(self, strategy: TrainingStrategy, scenario_label:str):
        """
        Initiate Training Model
        Params:
            strategy: TrainingStrategy one of the strategy to use
            scenario_label: str - Scenario to store into MLFlow tracking
        """
        self.strategy = strategy
        self.scenario_label = scenario_label

    def process(self, context: PipelineContext) -> PipelineContext:
        logger.info(f"üöÄ Training launching: {self.scenario_label}")

        if context.final_df is not None:
            self.strategy.execute(context.final_df, self.scenario_label)
            return context
        else:
            logger.error("‚ùå dataframe is none. Process interrupted!")
            raise Exception("Dataframe is none or empty. Training was interrupted")

## Orchestrator settings
- Sets sources,
- Sets sensitive datas,
- Initiate orchestrator

In [None]:
from strategy_core.outliers_strategies import IsolationForestStrategy
from strategy_core.imputation_strategies import AIImputationStrategy

files_to_load = {
    "maths": "datas/student-mat.csv",
    "por": "datas/student-por.csv"
}

sensitives = [
    "romantic", # No correlation
    "Dalc", # Discriminant data, cannot be used
    "Walc", # Discrimant data, cannot be used
]

# Make chain instances :
# 1. Data processing chain
loader = DataLoader(files_to_load=files_to_load)
cleaner = SensitiveDataHandler(sensitive_columns=sensitives)
merger = MergerHandler()

# Sets one of the Outliers detection strategy (Isolation Forest)
outlier_strategy = IsolationForestStrategy(contamination=0.01)
outlier = OutlierHandler(strategy=outlier_strategy, target_columns=["studytime", "absences", "age"])

# Sets one of the Imputation Strategy
imputer_strategy = AIImputationStrategy()
imputer = ImputationHandler(imputer_strategy)

exporter = DataExportHandler()

# Instanciate Pipeline
pipeline = (PipelineOrchestrator()
    .add_handler(loader)
    .add_handler(cleaner)
    .add_handler(merger)
    .add_handler(outlier)
    .add_handler(imputer)
    .add_handler(exporter)
)

# 2. Learning processing
scenarii = [
    (1, "Full_Features", []),
    (2, "No_Sensitive", ["romantic", "Dalc", "Walc"]),
    (3, "No_Sensitive_No_G2", ["romantic", "Dalc", "Walc", "G2"]),
    (4, "No_Sensitive_No_G1_G2", ["romantic", "Dalc", "Walc", "G1", "G2"])
]
from strategy_core.training_strategies import LogisticRegressionStrategy
from strategy_core.training_strategies import RandomForestStrategy
# 2.1 From definitions add strategies needed
for s_id, s_name, s_exclusions in scenarii:
    for strategy_class in [LogisticRegressionStrategy, RandomForestStrategy]:
        strategy = strategy_class(scenario_id=s_name, exclusions=s_exclusions)
        model_handler = ModelHandler(strategy=strategy, scenario_label=s_name)
        pipeline.add_handler(model_handler)


## Run orchestrator

Orchestrator is a Chain of Responsibilies. At the end of the chain, all processes are done.



In [None]:
# Initialize context
context = PipelineContext()

# Run the pipeline
try:
    pipeline.configure_pipeline() # Configure the pipeline
    final_context = pipeline.run(context)
    logger.success("‚òë Pipeline fully executed")

    # Final report
    print("\n--- Merged datas overview ---")
    display(final_context.final_df.head())

    print("\n--- Execution stats ---")
    for step, duration in final_context.execution_time.items():
        print(f"{step:25} : {duration:.4f}s")
except Exception as e:
    logger.error(f"‚ùå Pipeline failed: {e}")



[32m2025-12-17 15:55:58.211[0m | [1mINFO    [0m | [36mpipeline_core.pipeline_core[0m:[36mhandle[0m:[36m20[0m - [1mStep DataLoader started...[0m
[32m2025-12-17 15:55:58.216[0m | [1mINFO    [0m | [36mfile_handling_core.file_manager[0m:[36mload_data[0m:[36m29[0m - [1müìÑ Successfuly loaded data from: datas/student-mat.csv[0m
[32m2025-12-17 15:55:58.217[0m | [31m[1mERROR   [0m | [36mfile_handling_core.file_manager[0m:[36mload_data[0m:[36m38[0m - [31m[1m‚ùå Data loading error; [Errno 2] No such file or directory: 'datas/student-port.csv'[0m
[32m2025-12-17 15:55:58.218[0m | [1mINFO    [0m | [36mpipeline_core.pipeline_core[0m:[36mhandle[0m:[36m20[0m - [1mStep SensitiveDataHandler started...[0m
[32m2025-12-17 15:55:58.218[0m | [1mINFO    [0m | [36m__main__[0m:[36mprocess[0m:[36m15[0m - [1müîé Check sensitives columns in: 1 sources[0m
[32m2025-12-17 15:55:58.219[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mprocess[0m:[36


Data shape: (395, 1)

Columns: Index(['school;sex;age;address;famsize;Pstatus;Medu;Fedu;Mjob;Fjob;reason;guardian;traveltime;studytime;failures;schoolsup;famsup;paid;activities;nursery;higher;internet;romantic;famrel;freetime;goout;Dalc;Walc;health;absences;G1;G2;G3'], dtype='object')

CRows: 395

Types:
 school;sex;age;address;famsize;Pstatus;Medu;Fedu;Mjob;Fjob;reason;guardian;traveltime;studytime;failures;schoolsup;famsup;paid;activities;nursery;higher;internet;romantic;famrel;freetime;goout;Dalc;Walc;health;absences;G1;G2;G3    object
dtype: object

--- Merged datas overview ---
