In [None]:
from loguru import logger
import pandas as pd
from pathlib import Path
import numpy as np
import sklearn

# Load initial dataset
Explore bostonhousing dataset and generate intermediate processing dataset

In [None]:
def load_data():
    file_name_without_extension = "bostonhousing-693729dedb019653836667"
    original_file_path = Path("./datas") / f"{file_name_without_extension}.csv"
    clean_dataset_name = Path("./datas") / "cleaned_dataset.csv"

    try:
        full_pd_dataset = pd.read_csv(original_file_path)
        logger.info(f"Successfully loaded data from {original_file_path}")
        logger.info(f"Data shape: {full_pd_dataset.shape}")

        return full_pd_dataset
    except FileNotFoundError:
        logger.error(f"{original_file_path.absolute()} was not found")
    except Exception as e:
        logger.error(f"Something went wrong loading data: {e}")

## iqr outlier detection
IQR (Inter Quartile Range) : Determine "outlier" all values out of the interval `[Q1 - f * IQR, Q3 + f * IQR]` where f is a factor (std 1.5) and IQR is Q3 - Q1

In [None]:
def iqr_detection(df, column, factor=1.5):
    """
        IQR outlier detection in a specific column
        Args:
            df (pd.DataFrame): Input dataframe
            column (str): column name to analyze
            factor (float): multiplicant factor for IQR (standard : 1.5)
        Returns:
            pd.Series : boolean mask (True = outlier, False = non outlier)
    """

    Q1 = df[column].quantile(0.25)
    Q3 = df[column].quantile(0.75)
    IQR = Q3 - Q1

    lower_bound = Q1 - factor * IQR
    upper_bound = Q3 + factor * IQR

    outlier_mask = (df[column] < lower_bound) | (df[column] > upper_bound)

    return outlier_mask

## z-score outlier detection
Z-score count all standard deviation from one observation to mean. Standard threshold +/- 3 standard deviation

In [None]:
def zscore_detection(df, column, threshold=3):
    """
    Z-score outlier detection for one column

    Args:
        df (pd.DataFrame): Input dataframe
        column (str): Column to analyze
        threshold (float): Z-Score thersold (standard = 3.0).

    Returns:
        pd.Series: boolean mask (True = outlier, False = non-outlier).
    """
    # Z-score
    z_scores = np.abs((df[column] - df[column].mean()) / df[column].std())
    
    # Mask: True if Z-score > threshold (outlier)
    outlier_mask = z_scores > threshold
    return outlier_mask

## Isolation forest outlier detection
Isolation Forest is a non supervised learning algorithm isolating abnormal observations. Require scikit-learn

In [None]:
from sklearn.ensemble import IsolationForest

def isolation_forest_detection(df, column, contamination='auto', random_state=42):
    """
    Outlier detection using Isolation Forest algorithm.
    
    NOTE: This algorithm is multi-dimensional. Here it is applied 
          to unique column to keep coherence with other outlier detection methods.
          For better performance it needs multiple relevant columns

    Args:
        df (pd.DataFrame): Input dataframe
        column (str): Column to test
        contamination (str/float): Expected outliers number in dataframe
        random_state (int): Reproductibility

    Returns:
        pd.Series: boolean mask (True = outlier, False = non-outlier).
    """
    # Algorithm require an input as 2D (N, 1)
    X = df[[column]].values
    
    model = IsolationForest(
        contamination=contamination, 
        random_state=random_state,
        n_jobs=-1 # Use all cores
    )
    
    # Fit model
    model.fit(X)
    
    # predict method returns: 1 for inliers, -1 for outliers
    predictions = model.predict(X)
    
    # Mask: True if prediction is -1 (outlier)
    outlier_mask = pd.Series(predictions == -1, index=df.index)
    return outlier_mask

In [None]:
def outlier_detector(df, column, algorithm, **kwargs):
    """
    Apply one of the outlier detection function and returns cleaned dataset

    Args:
        df (pd.DataFrame): Input dataframe.
        column (str): Column name to analyze.
        algorithm (function): Detection function (e.g., iqr_detection).
        **kwargs: Specifics args to give to function (e.g., factor, threshold).

    Returns:
        pd.DataFrame: Dataframe without outliers identified
    """
    if df is None:
        logger.error("No dataframe. Detection aborted")
        return None
    
    algo_name = algorithm.__name__.replace('_detection', '').upper()
    logger.info(f"Running detection using : {algo_name}")
    
    try:
        # 1. Get outliers mask
        outlier_mask = algorithm(df, column, **kwargs)
        
        # 2. Count outliers
        num_outliers = outlier_mask.sum()
        total_rows = len(df)
        logger.info(f"{num_outliers} outliers was detected in '{column}' ({num_outliers/total_rows:.2%})")
        
        # 3. Build cleaned dataframe (Mask False = non-outlier)
        cleaned_df = df[~outlier_mask].copy() # Use .copy() to avoid SettingWithCopyWarning
        
        logger.success(f"Outliers cleaning done. DataFrame reduce to {total_rows} à {len(cleaned_df)} rows.")
        
        return cleaned_df
        
    except Exception as e:
        logger.error(f"Error using {algo_name}: {e}")
        return df # Return original dataframe avoiding to break the pipeline

## Normalization step
From a cleaned dataframe, apply a normalisation algorithm for all columns using MinMax scaling.

Xnorm = X - Xmin / Xmax - Xmin

Using this method we expect that all characteristics are in the same range ([0, 1]), distributions are kept, no problem with outliers already detected.

On Boston datas MinMax is better than MaxAbs cause : datas are already positives. MaxAbs is better on sparse data or datas are 0 centered.

In [None]:
from sklearn.preprocessing import MinMaxScaler


def minmax_normalize(df, target_column='medv', binary_columns=['chas']):
    """
    Normalize numeric columns using MinMax Scaling (0 à 1).

    Args:
        df (pd.DataFrame): Input dataframe.
        target_column (str): Target column (not normalized).
        binary_columns (list): Binary columns (not normalized).

    Returns:
        pd.DataFrame: normalized dataframe.
        MinMaxScaler: Adjusted Scaler, usefull to future denormalization.
    """
    if df is None:
        logger.error("No dataframe. Normalization aborted.")
        return None, None

    # 1. Identify all columns to normalize (expect target and binaries)
    columns_to_scale = [
        col for col in df.columns 
        if col not in [target_column] and col not in binary_columns
    ]
    
    logger.info(f"Columns to normalize ({len(columns_to_scale)}): {columns_to_scale}")

    # 2. Créer et ajuster le scaler
    scaler = MinMaxScaler()
    
    # Adujst scaler only for training columns
    # Use .values.reshape(-1, 1) to make sur that Pandas will correctly handle transformation
    df[columns_to_scale] = scaler.fit_transform(df[columns_to_scale])
    
    logger.success("MinMax normalization successfully applyed on features.")
    
    return df, scaler

## Standardization using z-score

Observing regression model, this function should be applied.

In [None]:
from sklearn.discriminant_analysis import StandardScaler


def zscore_standardize(df, target_column='medv', binary_columns=['chas']):
    """
    Z-Score standardization for all numerical columns
    (Mean=0, Standard variation=1).

    Args:
        df (pd.DataFrame): Input dataframe.
        target_column (str): Target column (non standard).
        binary_columns (list):Binaries columns (non standard).

    Returns:
        pd.DataFrame: Standardized dataframe.
        StandardScaler: Adujsted scaler, if we need to destandardized.
    """
    if df is None:
        logger.error("No dataframe. Standardization aborted.")
        return None, None

    # 1. Identify columns to process
    columns_to_scale = [
        col for col in df.columns 
        if col not in [target_column] and col not in binary_columns
    ]
    
    logger.info(f"Columns to process ({len(columns_to_scale)}): {columns_to_scale}")

    # 2. Create and adjust scaler
    scaler = StandardScaler()
    
    # Ajuster le scaler uniquement sur les colonnes d'entraînement
    df[columns_to_scale] = scaler.fit_transform(df[columns_to_scale])
    
    logger.success("Z-Score successfully applied on features")
    
    return df, scaler

## NaN detection and imputation using one of the method : median, mean or constant

NaN can issue some deviance. So, we impute data.

In [None]:
from typing import Literal


def impute_missing_values(
    df: pd.DataFrame, 
    strategy: Literal['median', 'mean', 'constant'], 
    constant_value=None
) -> pd.DataFrame:
    """
    Detect and impute missing values (NaN)

    Args:
        df (pd.DataFrame): Input dataframe.
        strategy (Literal['median', 'mean', 'constant']): Imputation method to use
        constant_value (float, optional): Constant value to input if constant strategy

    Returns:
        pd.DataFrame: Dataframe after imputation.
    """
    if df is None:
        logger.error("No dataframe. Imputation aborted.")
        return None

    df_imputed = df.copy()
    
    # Identify numerical columns (statistical imputation)
    # Exclude 'object' types (strings)
    numeric_cols = df_imputed.select_dtypes(include=['number']).columns
    
    logger.info(f"Start imputation using : {strategy.upper()} strategy")
    
    for col in numeric_cols:
        # 1. NaN detection
        nan_count = df_imputed[col].isnull().sum()
        
        if nan_count == 0:
            continue
        
        # 2. Imputation value determination according strategy
        impute_val = None
        
        if strategy == 'median':
            impute_val = df_imputed[col].median()
        elif strategy == 'mean':
            impute_val = df_imputed[col].mean()
        elif strategy == 'constant':
            impute_val = constant_value
            if impute_val is None:
                logger.warning(f"'constant' strategy used for '{col}' but constant_value is None. Ignored.")
                continue
        
        # 3. Imputation
        df_imputed[col].fillna(impute_val, inplace=True)
        logger.debug(f"Column '{col}': {nan_count} NaN values imputed with {impute_val:.4f} ({strategy}).")
        
    # Final check
    if df_imputed.isnull().sum().sum() == 0:
        logger.success("Imputation done : No NaN detected.")
    else:
        logger.warning("Warning : Some NaN detected (probably non numerical columns).")
        
    return df_imputed

## Artifacts : graphs

- residual,
- predictions

In [None]:
import os
import matplotlib.pyplot as plt
import mlflow

def save_and_log_regression_plots(model, X_train, y_train, run_id):
    """Generate and store residual and prediction graphs."""
    
    # 1. Artefact folder
    artifact_path = f"mlruns/{run_id}/artifacts/plots"
    os.makedirs(artifact_path, exist_ok=True)
    
    # Predict on training dataset for plots
    y_pred = model.predict(X_train)
    
    # --- Plot #1 : residual ---
    residues = y_train - y_pred
    
    plt.figure(figsize=(10, 6))
    plt.scatter(y_pred, residues, alpha=0.5)
    plt.hlines(y=0, xmin=y_pred.min(), xmax=y_pred.max(), color='red', linestyle='--')
    plt.title('Residual graph')
    plt.xlabel('Predicted values')
    plt.ylabel('Residual (Real - Predicted)')
    resid_plot_path = os.path.join(artifact_path, "residuals_plot.png")
    plt.savefig(resid_plot_path)
    plt.close() # Free memory
    mlflow.log_artifact(resid_plot_path, "plots")
    logger.debug("Residual graph stored")

    # --- Plot #2 : Predicted vs Real ---
    plt.figure(figsize=(8, 8))
    plt.scatter(y_train, y_pred, alpha=0.5)
    plt.plot([y_train.min(), y_train.max()], [y_train.min(), y_train.max()], 'r--')
    plt.title('Predicted vs. Real values')
    plt.xlabel('Real values')
    plt.ylabel('Predicted values')
    pred_plot_path = os.path.join(artifact_path, "predictions_vs_actual.png")
    plt.savefig(pred_plot_path)
    plt.close()
    mlflow.log_artifact(pred_plot_path, "plots")
    logger.debug("Predicted plot stored")

## Training and logging model using MLFlow
Training will be based on :

- CV (Cross validation),
- RMSE (Root Mean Squared Error) as scoring metric
- R2 scoring metric

In [None]:
import mlflow
import mlflow.sklearn
from sklearn.model_selection import KFold, cross_validate

def train_and_log_model(
    model, 
    X_train, 
    y_train, 
    model_name, 
    n_splits=5
):
    """
    Train model, Cross Validation performance measurement, store all with MLFlow
    """
    logger.info(f"MLFlow training experience for: {model_name}")
    
    # Metrics settings
    cv_scoring_metrics = {
        'neg_mean_squared_error': 'neg_mean_squared_error',
        'r2': 'r2'
    }

    # Start MLFlow run
    with mlflow.start_run(run_name=model_name) as run:
        
        run_id = run.info.run_id

        # 1. Cross Validation (CV)
        # Using KFlold (5 splits good default value)
        cv = KFold(n_splits=n_splits, shuffle=True, random_state=42)
        
        # Using RMSE (Root Mean Squared Error) as scoring metric
        # cross_val_score use default negative scoring (neg_mean_squared_error)
        cv_results = cross_validate(
            model, 
            X_train, 
            y_train, 
            scoring=cv_scoring_metrics, 
            cv=cv, 
            n_jobs=-1
        )
        
        # RMSE : Convert to positive RMSE
        rmse_scores = np.sqrt(-cv_results['test_neg_mean_squared_error'])
        mlflow.log_metric("cv_mean_rmse", rmse_scores.mean())
        mlflow.log_metric("cv_std_rmse", rmse_scores.std())
        logger.success(f"CV Mean RMSE for {model_name}: {rmse_scores.mean():.3f}")

        # R2
        r2_scores = cv_results['test_r2']
        mlflow.log_metric("cv_mean_r2", r2_scores.mean())
        mlflow.log_metric("cv_std_r2", r2_scores.std())
        logger.info(f"CV mean R2 for {model_name}: {r2_scores.mean():.3f}")
        
        # 2. Final training (on the whole training dataset)
        model.fit(X_train, y_train)
        
        # Plots and store results
        save_and_log_regression_plots(model, X_train, y_train, run_id)
        
        # 3. MLFlow logging
        
        # Hyperparameters storage
        mlflow.log_params(model.get_params())
        
        # CV metrics storage
        mlflow.log_params(model.get_params())
        
        logger.success(f"Mean CV RMSE for {model_name}: {rmse_scores.mean():.3f}")
        
        # Save final trained model
        mlflow.sklearn.log_model(model, "model")
        
    return model

## main : Handle all processes

In [None]:
# Load dataset
df = load_data()

if df is not None:
    logger.info("Datas was sucessfully loaded and stored in 'df'")
else:
    logger.debug("No data was loaded")

In [None]:
# Just print a few lines...
df.head()

## Outlier detection
Process outlier detection using one of the algorithm : iqr, z-score or isolation forest

In [None]:
# Apply outliers detection

TARGET_COLUMN = "medv"
ALGORITHM = "iqr_detection" # Can be : isolation_forest_detection | zscore_detection

df_iqr_cleaned = outlier_detector(
    df,
    column=TARGET_COLUMN,
    algorithm=iqr_detection,
    factor=1.5
)
df_iqr_cleaned.head()

df_zscore_cleaned = outlier_detector(
    df,
    column=TARGET_COLUMN,
    algorithm=zscore_detection,
    threshold=3.0
)
df_zscore_cleaned.head()

df_if_cleaned = outlier_detector(
    df=df,
    column=TARGET_COLUMN,
    algorithm=isolation_forest_detection,
    contamination=0.05 # Considering about 5% of outliers
)
df_if_cleaned.head()

## Apply normalization using MinMax algorithm

In [None]:
# Use dataframe reduced by iqr in the pipeline
df_normalized, scaler = minmax_normalize(
    df_iqr_cleaned,
    target_column=TARGET_COLUMN,
    binary_columns=["chas"]
)
df_normalized.head()

## Appply imputation using "mean" strategy
Mean better applies to normalized, near "normal" and outliers cleaned datas

In [None]:
df_imputed_mean = impute_missing_values(
    df_normalized,
    strategy="mean"
)
df_imputed_mean.head()

In [None]:
# Prepare trained and predicted datas from df_imputed_means (last clean dataset)
X = df_imputed_mean.drop(columns=[TARGET_COLUMN])
y = df_imputed_mean[TARGET_COLUMN]

# Split dataset in training and test
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

## Concrete training using multiple strategies

1. LinearRegression : good start to establish base performance

In [None]:
from sklearn.linear_model import LinearRegression


lr_model = LinearRegression()
trained_lr = train_and_log_model(
    model=lr_model,
    X_train=X_train,
    y_train=y_train,
    model_name="Linear_Regression_Baseline"
)

## Concrete training using multiple strategies

2. Random Forest : strong through scaling issues

In [None]:
from sklearn.ensemble import RandomForestRegressor


rf_model = RandomForestRegressor(n_estimators=100, random_state=42, n_jobs=-1)
trained_rf = train_and_log_model(
    model=rf_model,
    X_train=X_train,
    y_train=y_train,
    model_name="Random_Forest_Default"
)

## Concrete training using multiple strategies

3. lgbm_model : Accurate algorithm especially for tabular datas, often fastest than XGBoost

In [None]:
import lightgbm as lgb

lgbm_model = lgb.LGBMRegressor(random_state=42, n_jobs=-1)
trained_lgbm = train_and_log_model(
    model=lgbm_model,
    X_train=X_train,
    y_train=y_train,
    model_name="LightGBM_Default"
)

## Concrete training using multiple strategies

4. K-Nearest Neighbors : Distance models can be accurate on normalized and standardized data

In [None]:
from sklearn.neighbors import KNeighborsRegressor

knn_model = KNeighborsRegressor(n_neighbors=5) # 5 neighbors is a good default value
trained_knn = train_and_log_model(
    model=knn_model,
    X_train=X_train,
    y_train=y_train,
    model_name="KNN_Distance_Based"
)

## Final step : save the best model

To use it in the future API

In [None]:
import joblib
import os
from sklearn.preprocessing import MinMaxScaler # ou StandardScaler, selon votre choix

# Best model according m2 : lgbm
BEST_MODEL = trained_lgbm
# Assurez-vous d'avoir le scaler utilisé lors de la préparation des données
SCALER = scaler # L'objet retourné par minmax_normalize ou zscore_standardize

# Model directory
MODEL_DIR = "model_artefacts"
os.makedirs(MODEL_DIR, exist_ok=True)

# 1. Trained model saving
joblib.dump(BEST_MODEL, os.path.join(MODEL_DIR, "best_regressor.joblib"))

# 2. Adjusted scaler saving
joblib.dump(SCALER, os.path.join(MODEL_DIR, "feature_scaler.joblib"))

logger.success(f"Model and Scaler successfuly saved in : {MODEL_DIR}")