In [None]:
import optuna
import yaml
import json
from sklearn.metrics import f1_score
from typing import Any, Dict, Tuple
import pandas as pd
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from catboost import CatBoostClassifier
from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import (
    OneHotEncoder, StandardScaler, OrdinalEncoder, PowerTransformer, RobustScaler, MinMaxScaler,
    FunctionTransformer)
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from imblearn.pipeline import Pipeline as ImbPipeline
from sklearn.metrics import  accuracy_score, precision_score, recall_score, roc_auc_score, classification_report
import numpy as np
from utils_machine_learning import rename_columns_to_snake_case

from custom_transformers import (
    DropRedundantColumns,
    CreateNewFeature,

    LogTransformer,
    PowerTransformerWrapper,
    OutlierTransformer,
    OutlierDetector,
    ReplaceValueTransformer,
    OutlierHandler,

)
from optuna.samplers import TPESampler
from sklearn.decomposition import PCA

import joblib
from typing import Union

from imblearn.over_sampling import (
    RandomOverSampler,
    ADASYN,
    
)
from imblearn.under_sampling import (
    RandomUnderSampler,
    NearMiss,
)
from imblearn.combine import (
    SMOTEENN,
    SMOTETomek
)
pd.set_option('future.no_silent_downcasting', True)

In [None]:
# load the dataset
def load_dataset() -> pd.DataFrame:
    """
    Load the dataset from the CSV file and return it as a pandas DataFrame.

    Args:
        None

    Returns:
        pd.DataFrame: The dataset loaded from the CSV file.
    """
    
    data_path = 'https://github.com/donadviser/datasets/raw/master/data-don/auto_insurance_claim_fraud.csv'
    data = pd.read_csv(data_path, sep=",")
    return (data
            .pipe(rename_columns_to_snake_case)
            #.dropna()
            )

In [None]:
data_raw = load_dataset()
data_raw.head()

In [None]:
onehot_features = ['policy_state', 'collision_type', 'property_damage', 'police_report_available', 
                  'insured_sex', 'insured_education_level', 'insured_relationship', 'incident_type', 
                  'incident_severity', 'authorities_contacted', 'incident_state', 'incident_city', 
                  'policy_deductable', 'number_of_vehicles_involved', 'bodily_injuries', 'witnesses', 
                  'incident_period_of_day']

numerical_features = ['months_as_customer',  'age', 'policy_annual_premium', 'injury_claim', 
                      'property_claim', 'vehicle_claim', 'vehicle_age','total_claim_amount']

ordinal_features = ['insured_occupation', 'insured_hobbies', 'auto_make']

transform_features = ['umbrella_limit', 'capital_gains', 'capital_loss']

drop_columns = ['policy_number','policy_bind_date','policy_csl', 'insured_zip','incident_date',
                'incident_location','auto_model','auto_year', 'incident_hour_of_the_day',
                ]

bins_hour = [0, 6, 11, 16, 21, 24]  # Time bins for different periods of the day
names_period = ["early_morning", "morning", "afternoon", "evening", "night"] 

target_col = 'fraud_reported'

In [None]:
categorical_columns= ['policy_state', 'insured_sex', 'insured_education_level', 'insured_occupation', 'insured_hobbies', 'insured_relationship', 'incident_type', 'collision_type', 'incident_severity', 'authorities_contacted', 'incident_state', 'incident_city', 'property_damage', 'police_report_available', 'auto_make', 'policy_deductable', 'number_of_vehicles_involved', 'bodily_injuries', 'witnesses', 'fraud_reported', 'incident_hour_of_the_day']
numerical_columns =['months_as_customer', 'age', 'policy_annual_premium', 'umbrella_limit', 'capital_gains', 'capital_loss', 'total_claim_amount', 'injury_claim', 'property_claim', 'vehicle_claim', ]

In [None]:
for item in numerical_columns:
    print(f'Min - Max values for numerical Column: {item}')
    print(f"{data_raw[item].min()} - {data_raw[item].max()}")
    print('----------------------------------------\n')

In [None]:
# Assuming dataset is loaded in a pandas dataframe
X, y = data_raw.drop(columns=[target_col]), data_raw[target_col]

# Splitting the dataset
y = y.map({'Y': 1, 'N': 0})  # Map target labels
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

In [None]:
# Load YAML configuration file
def load_yaml_config(config_path: str) -> Dict[str, Any]:
    """
    Load the YAML configuration file containing model and hyperparameter definitions.

    Args:
        config_path (str): Path to the YAML configuration file.

    Returns:
        Dict[str, Any]: The loaded configuration as a dictionary.
    """
    with open(config_path, "r") as file:
        return yaml.safe_load(file)

In [None]:
class HyperparameterTuner:
    """
    HyperparameterTuner to return hyperparameters for each classifier.
    """
    def get_params(self, trial: optuna.Trial, classifier_name: str):
        if classifier_name == "RandomForest":
            return {
                "n_estimators": trial.suggest_int("n_estimators", 50, 300),
                "max_depth": trial.suggest_int("max_depth", 2, 30),
                "min_samples_split": trial.suggest_int("min_samples_split", 2, 20),
                "min_samples_leaf": trial.suggest_int("min_samples_leaf", 1, 20),
            }
        elif classifier_name == "DecisionTree":
            return {
                "max_depth": trial.suggest_int("max_depth", 2, 30),
                "min_samples_split": trial.suggest_int("min_samples_split", 2, 20),
                "min_samples_leaf": trial.suggest_int("min_samples_leaf", 1, 20),
            }
        elif classifier_name == "LGBM":
            return {
                "objective": "binary",
                "metric": "binary_logloss",
                "verbosity": -1,
                "boosting_type": "gbdt",
                "lambda_l1": trial.suggest_float("lambda_l1", 1e-8, 10.0, log=True),
                "lambda_l2": trial.suggest_float("lambda_l2", 1e-8, 10.0, log=True),
                "num_leaves": trial.suggest_int("num_leaves", 2, 256),
                "feature_fraction": trial.suggest_float("feature_fraction", 0.4, 1.0),
                "bagging_fraction": trial.suggest_float("bagging_fraction", 0.4, 1.0),
                "bagging_freq": trial.suggest_int("bagging_freq", 1, 7),
                "min_child_samples": trial.suggest_int("min_child_samples", 5, 100),
            }
        elif classifier_name == "XGBoost":
            return {
                "verbosity": 0,
                "objective": "binary:logistic",
                "eval_metric": "auc",
                "booster": trial.suggest_categorical("booster", ["gbtree", "gblinear", "dart"]),
                "lambda": trial.suggest_float("lambda", 1e-8, 1.0, log=True),
                "alpha": trial.suggest_float("alpha", 1e-8, 1.0, log=True),
                "subsample": trial.suggest_float("subsample", 0.2, 1.0),
                "colsample_bytree": trial.suggest_float("colsample_bytree", 0.2, 1.0),
            }
        elif classifier_name == "CatBoost":
            return {
                "objective": trial.suggest_categorical("objective", ["Logloss", "CrossEntropy"]),
                "colsample_bylevel": trial.suggest_float("colsample_bylevel", 0.01, 0.1),
                "depth": trial.suggest_int("depth", 1, 12),
                "boosting_type": trial.suggest_categorical("boosting_type", ["Ordered", "Plain"]),
                "bootstrap_type": trial.suggest_categorical("bootstrap_type", ["Bayesian", "Bernoulli", "MVS"]),
            }
        elif classifier_name == "LogisticRegression":
            # Basic hyperparameters
            params = {
                "solver": trial.suggest_categorical('solver', ['newton-cholesky', 'lbfgs', 'liblinear', 'sag', 'saga']),
                "max_iter": trial.suggest_int('max_iter', 10000, 50000),  # Increased max_iter to allow for better convergence
            }

            # Suggest penalty from a unified set
            all_penalties = ['l1', 'l2', 'elasticnet', None]  # Unified penalties
            params['penalty'] = trial.suggest_categorical('penalty', all_penalties)

            # Only suggest C if penalty is not None
            if params['penalty'] is not None:
                params["C"] = trial.suggest_float('C', 1e-10, 1000, log=True)
            
            # Only suggest l1_ratio if penalty is 'elasticnet'
            if params['penalty'] == 'elasticnet':
                params['l1_ratio'] = trial.suggest_float('l1_ratio', 0, 1)

            # Prune invalid combinations:
            if (
                (params['solver'] == 'lbfgs' and params['penalty'] not in ['l2', None]) or
                (params['solver'] == 'liblinear' and params['penalty'] not in ['l1', 'l2']) or
                (params['solver'] == 'sag' and params['penalty'] not in ['l2', None]) or
                (params['solver'] == 'newton-cholesky' and params['penalty'] not in ['l2', None]) or
                (params['solver'] == 'saga' and params['penalty'] not in ['elasticnet', 'l1', 'l2', None])
            ):
                raise optuna.TrialPruned()  # Invalid combination of solver and penalty

            return params

        
        elif classifier_name == "GradientBoosting":
            return {
                "learning_rate" : trial.suggest_float('learning_rate', 0.001, 0.3, log=True),
                "n_estimators" : trial.suggest_int('n_estimators', 100, 1000),
                "max_depth" : trial.suggest_int('max_depth', 3, 10),
                "min_samples_split" : trial.suggest_int('min_samples_split', 2, 20),
                "min_samples_leaf" : trial.suggest_int('min_samples_leaf', 1, 20),
                "max_features" : trial.suggest_categorical('max_features', ['sqrt', 'log2', None])
    
            }
        elif classifier_name == "KNeighbors":
            params = {
                "n_neighbors": trial.suggest_int('n_neighbors', 1, 50),
                "weights": trial.suggest_categorical('weights', ['uniform', 'distance']),
                "p": trial.suggest_int('p', 1, 2),  # 1: Manhattan, 2: Euclidean
                "leaf_size": trial.suggest_int('leaf_size', 10, 100),
                "metric": trial.suggest_categorical('metric', ['euclidean', 'manhattan', 'minkowski', 'chebyshev'])
            }
            return params
        else:
            raise ValueError(f"Invalid classifier name: {classifier_name}")

In [None]:
class ModelFactory:
    """
    A class to create model instances with additional parameters for specific classifiers.

    Attributes:
        model_name (str): The name of the model to be instantiated.
        best_params (dict): The best hyperparameters for the model.
    """

    def __init__(self, model_name: str, best_params: dict):
        """
        Initialize the ModelFactory with a model name and parameters.
        
        Args:
            model_name (str): The name of the model.
            best_params (dict): Hyperparameters for the model.
        """
        self.model_name = model_name
        self.best_params = best_params

    def get_model_instance(self):
        """
        Creates a model instance based on the model name with additional classifier-specific parameters.

        Returns:
            A model instance with the appropriate parameters.
        """
        # Dictionary of model classes
        model_dict = {
            "LGBM": LGBMClassifier,
            "XGBoost": XGBClassifier,
            "CatBoost": CatBoostClassifier,
            "RandomForest": RandomForestClassifier,
            "DecisionTree": DecisionTreeClassifier,
            "LogisticRegression": LogisticRegression,
            "SVC": SVC,
            "GradientBoosting": GradientBoostingClassifier,
            "KNeighbors": KNeighborsClassifier
        }

        # Check if the model exists in the model_dict
        if self.model_name not in model_dict:
            raise ValueError(f"Model {self.model_name} is not supported.")

        # Create a model instance with specific parameters
        if self.model_name == "LGBM":
            return model_dict[self.model_name](**self.best_params, random_state=42, verbose=-1)  # Add verbose for LGBM
        elif self.model_name == "RandomForest":
            return model_dict[self.model_name](**self.best_params, random_state=42, n_jobs=-1)  # Add n_jobs for RandomForest
        elif self.model_name == "SVC":
            return model_dict[self.model_name](**self.best_params, random_state=42, probability=True)  # Add probability for SVC
        elif self.model_name == "CatBoost":
            return model_dict[self.model_name](**self.best_params, random_state=42, verbose=0)  # Suppress CatBoost verbosity
        elif self.model_name == "KNeighbors":
            return model_dict[self.model_name](**self.best_params)  # Suppress CatBoost verbosity
        else:
            return model_dict[self.model_name](**self.best_params, random_state=42)  # Default for other models

In [None]:
class PipelineManager:
    """
    A class that handles both building and modifying pipelines dynamically.
    This class supports both scikit-learn's Pipeline and imbalanced-learn's Pipeline.

    It allows the construction of the initial pipeline and the insertion of steps 
    at any position within the pipeline.
    """

    def __init__(self, pipeline_type='ImbPipeline'):
        """
        Initialize the PipelineManager with a specified pipeline type.

        Args:
            pipeline_type (str): The type of pipeline to use ('ImbPipeline' or 'Pipeline').
        """
        if pipeline_type == 'ImbPipeline':
            self.pipeline = ImbPipeline(steps=[])
        elif pipeline_type == 'Pipeline':
            self.pipeline = Pipeline(steps=[])
        else:
            raise ValueError("Unsupported pipeline type. Choose 'ImbPipeline' or 'Pipeline'.")

    def add_step(self, step_name, step_object, position=None):
        """
        Add a transformation step to the pipeline.

        Args:
            step_name (str): Name of the step to add.
            step_object (object): The transformer or estimator object (e.g., scaler, classifier).
            position (int or None): Optional; the position to insert the step.
                                    If None, the step is appended at the end of the pipeline.
        """
        if position is None:
            self.pipeline.steps.append((step_name, step_object))
        else:
            self.pipeline.steps.insert(position, (step_name, step_object))

    def remove_step(self, step_name):
        """
        Remove a step from the pipeline by its name.

        Args:
            step_name (str): The name of the step to remove.
        """
        self.pipeline.steps = [(name, step) for name, step in self.pipeline.steps if name != step_name]

    def replace_step(self, step_name, new_step_object):
        """
        Replace an existing step in the pipeline with a new step.

        Args:
            step_name (str): The name of the step to replace.
            new_step_object (object): The new transformer or estimator object.
        """
        for i, (name, step) in enumerate(self.pipeline.steps):
            if name == step_name:
                self.pipeline.steps[i] = (step_name, new_step_object)
                break

    def get_pipeline(self):
        """
        Get the constructed or modified pipeline.

        Returns:
            Pipeline: The constructed or modified pipeline object.
        """
        return self.pipeline

In [None]:
class PreprocessingPipeline:
    """
    A class that encapsulates the preprocessing steps for feature engineering,
    imputation, scaling, encoding, and transformations. This can be inserted into
    the overall pipeline before the model fitting step.
    """
    def __init__(self, bins_hour, names_period, drop_columns, numerical_features,
                 onehot_features, ordinal_features, transform_features, trial: optuna.Trial=None):
        """
        Initialize the PreprocessingPipeline with necessary parameters.

        Args:
            bins_hour: Parameters for creating new features from hourly bins.
            names_period: Period names for feature creation.
            drop_columns: Columns to be dropped from the dataset.
            numerical_features: List of numerical features for processing.
            onehot_features: List of categorical features for OneHot encoding.
            ordinal_features: List of ordinal features for Ordinal encoding.
            transform_features: Features that require power transformation.
        """
        self.bins_hour = bins_hour
        self.names_period = names_period
        self.drop_columns = drop_columns
        self.numerical_features = numerical_features
        self.onehot_features = onehot_features
        self.ordinal_features = ordinal_features
        self.transform_features = transform_features
        
    def instantiate_numerical_simple_imputer(self, trial: optuna.Trial=None, strategy: str=None, fill_value: int=-1) -> SimpleImputer:
        if strategy is None and trial:
            strategy = trial.suggest_categorical(
                'numerical_strategy', ['mean', 'median', 'most_frequent']
            )
        #print(f"instantiate_numerical_simple_imputer: strategy= {strategy}")
        return SimpleImputer(strategy=strategy, fill_value=fill_value)

    def instantiate_categorical_simple_imputer(self, trial: optuna.Trial=None, strategy: str=None, fill_value: str='missing') -> SimpleImputer:
        if strategy is None and trial:
            strategy = trial.suggest_categorical('categorical_strategy', ['most_frequent', 'constant'])
        #print(f"instantiate_categorical_simple_imputer: strategy= {strategy}")
        return SimpleImputer(strategy=strategy, fill_value=fill_value)
    
    def instantiate_outliers(self, trial: optuna.Trial=None, strategy=None) -> Union[PowerTransformer, FunctionTransformer, OutlierDetector]:
        """
        Instantiate outlier handling method: PowerTransformer, LogTransformer, or OutlierDetector.

        Args:
            trial (optuna.Trial, optional): The trial object for hyperparameter optimization.

        Returns:
            Union[PowerTransformer, FunctionTransformer, OutlierDetector]: The selected outlier handling method.
        """
        # Suggest from available options
        options = ['power_transform', 'log_transform', 'iqr_clip', 'iqr_median', 'iqr_mean']
        if trial:
            strategy = trial.suggest_categorical('outlier_strategy', options)
        else:
            strategy = strategy  # Default to first option if no trial is provided

        if strategy == 'power_transform':
            return PowerTransformer(method='yeo-johnson')
        elif strategy == 'log_transform':
            return LogTransformer()
            #return FunctionTransformer(np.log1p)  # Log transformation
        elif strategy in ['iqr_clip', 'iqr_median', 'iqr_mean']:
            return OutlierHandler(strategy=strategy)  # Instantiate OutlierDetector
        else:
            raise ValueError(f"Unknown strategy for outlier handling: {strategy}")

         
    def build(self, step_name=None, trial: optuna.Trial=None, **column_transformer_strategy):
        """
        Build the preprocessing pipeline with feature creation, transformation, 
        imputation, scaling, and encoding steps.
        
        Returns:
            Transformer: The appropriate transformer for the given step.
        """
        
        if step_name == "create_new_features":
            return CreateNewFeature(bins_hour=self.bins_hour, names_period=self.names_period)
        
        if step_name == "replace_class":
            return ReplaceValueTransformer(old_value="?", new_value=np.nan)
        
        if step_name == "drop_cols":
            return DropRedundantColumns(redundant_cols=self.drop_columns)
        
        if step_name == 'column_transformer':
            
            numerical_strategy = column_transformer_strategy.get('numerical_strategy', None)
            categorical_strategy = column_transformer_strategy.get('categorical_strategy',None)
            outlier_strategy = column_transformer_strategy.get('outlier_strategy', None)
        
            
            return ColumnTransformer(
                transformers=[
                    ('categorical', Pipeline([
                        ('imputer', self.instantiate_categorical_simple_imputer(trial=trial, strategy=categorical_strategy)),   
                        ('onehot', OneHotEncoder(handle_unknown='ignore', drop='first', sparse_output=False))
                    ]), self.onehot_features),
                    
                    ('numerical', Pipeline([
                        ('imputer', self.instantiate_numerical_simple_imputer(trial=trial, strategy=numerical_strategy)),
                        #('scaler', StandardScaler())  # Add scaler if needed
                    ]), self.numerical_features),
                    
                    
                    
                    ('ordinal', Pipeline([
                        ('imputer', self.instantiate_categorical_simple_imputer(trial=trial, strategy=categorical_strategy)),
                        ('ordinal', OrdinalEncoder())
                    ]), self.ordinal_features),
                    
                    ('outlier_transform', Pipeline([
                        ('imputer', self.instantiate_numerical_simple_imputer(trial=trial, strategy=numerical_strategy)),
                        ('outlier_transformer', self.instantiate_outliers(trial=trial, strategy=outlier_strategy))  # Update this line
                    ]), self.transform_features),
                ],
                remainder='passthrough'
            )


In [None]:
class ResamplerSelector:
    """
    A class to select and return a resampling algorithm based on a given parameter or 
    from a trial suggestion if available.

    Attributes:
        trial (optuna.trial, optional): The trial object for hyperparameter optimization.
    """

    def __init__(self, trial=None, random_state=42):
        """
        Initialize the ResamplerSelector with an optional trial for hyperparameter optimization.

        Args:
            trial (optuna.trial, optional): An optional trial object for suggesting resampling strategies.
            random_state (int): Random seed for reproducibility. Default is 42.
        """
        self.trial = trial
        self.random_state = random_state

    def get_resampler(self, resampler=None):
        """
        Return the resampling algorithm based on the provided `resampler` parameter.
        If `resampler` is not given, it is suggested from the trial.

        Args:
            resampler (str, optional): The resampling method ('RandomOverSampler', 'ADASYN', etc.). 
                                       If not provided, it will be suggested from the trial (if available).

        Returns:
            resampler_obj (object): The resampling instance based on the selected method.
        """
        if resampler is None and self.trial:
            resampler = self.trial.suggest_categorical(
                'resampler', ['RandomOverSampler', 'SMOTEENN', 'SMOTETomek']
            )
            #['RandomOverSampler', 'ADASYN', 'RandomUnderSampler', 'NearMiss', 'SMOTEENN', 'SMOTETomek']

        if resampler == 'RandomOverSampler':
            return RandomOverSampler(random_state=self.random_state)
        elif resampler == 'ADASYN':
            return ADASYN(random_state=self.random_state)
        elif resampler == 'RandomUnderSampler':
            return RandomUnderSampler(random_state=self.random_state)
        elif resampler == 'NearMiss':
            return NearMiss()
        elif resampler == 'SMOTEENN':
            return SMOTEENN(random_state=self.random_state, sampling_strategy='minority' )
        elif resampler == 'SMOTETomek':
            return SMOTETomek(random_state=self.random_state)
        else:
            raise ValueError(f"Unknown resampler: {resampler}")

In [None]:
class ScalerSelector:
    """
    A class to select and return a scaling algorithm based on a given parameter or 
    from a trial suggestion if available.

    Attributes:
        trial (optuna.trial, optional): The trial object for hyperparameter optimization.
    """

    def __init__(self, trial=None):
        """
        Initialize the ScalerSelector with an optional trial for hyperparameter optimization.

        Args:
            trial (optuna.trial, optional): An optional trial object for suggesting resampling strategies.
        """
        self.trial = trial

    def get_scaler(self, scaler_name=None):
        """
        Return the scaling algorithm based on the provided `scaler_name` parameter.
        If `scaler_name` is not given, it is suggested from the trial.

        Args:
            scaler_name (str, optional): The scalring method ('MinMaxScaler', 'StandardScaler', etc.). 
                                       If not provided, it will be suggested from the trial (if available).

        Returns:
            rscaler_obj (object): The scaling instance based on the selected method.
        """ 
         
        # -- Instantiate scaler (skip scaler for CatBoost as it handles categorical features internally)
        if scaler_name is None and self.trial:
            scaler_name = self.trial.suggest_categorical("scaler", ['minmax', 'standard', 'robust'])
            
        if scaler_name == "minmax":
            return MinMaxScaler()
        elif scaler_name == "standard":
            return StandardScaler()
        elif scaler_name == "robust":
            return RobustScaler()
        else:
            raise ValueError(f"Unknown scaler: {scaler_name}")

In [None]:
class DimensionalityReductionSelector:
    """
    A class to select and return a dimensionality reduction algorithm based on a given parameter 
    or from a trial suggestion if available.

    Attributes:
        trial (optuna.trial, optional): The trial object for hyperparameter optimization.
    """

    def __init__(self, trial=None):
        """
        Initialize the DimensionalityReductionSelector with an optional trial for hyperparameter optimization.

        Args:
            trial (optuna.trial, optional): An optional trial object for suggesting dimensionality reduction strategies.
        """
        self.trial = trial

    def get_dimensionality_reduction(self, dim_red=None, pca_n_components=5):
        """
        Return the dimensionality reduction algorithm based on the provided `dim_red` parameter.
        If `dim_red` is not given, it is suggested from the trial.

        Args:
            dim_red (str, optional): The dimensionality reduction method ('PCA' or None). If not provided,
                                     it will be suggested from the trial (if available).

        Returns:
            dimen_red_algorithm (object or str): PCA algorithm or 'passthrough'.
        """
        if dim_red is None and self.trial:
            dim_red = self.trial.suggest_categorical("dim_red", ["PCA", None])

        if dim_red == "PCA":
            if self.trial:
                pca_n_components = self.trial.suggest_int("pca_n_components", 2, 30)
            else:
                pca_n_components = pca_n_components  # Default value if trial is not provided
            dimen_red_algorithm = PCA(n_components=pca_n_components)
        else:
            dimen_red_algorithm = 'passthrough'

        return dimen_red_algorithm


In [None]:
def get_pipeline_model_and_params(classifier_name, trial, model_mode='tune', input_params=None):
    # Got the Preprocessed Pipeline containting Data Cleaning and Column Transformation
    
                    
    preprocessing_pipeline = PreprocessingPipeline(
        bins_hour=bins_hour,
        names_period=names_period,
        drop_columns=drop_columns,
        numerical_features=numerical_features,
        onehot_features=onehot_features,
        ordinal_features=ordinal_features,
        transform_features=transform_features
    )


    #print(f"get_pipeline_model_and_params: Starting input params: {input_params}")

    # Initialize the manager with the preferred pipeline type ('ImbPipeline' or 'Pipeline')
    pipeline_manager = PipelineManager(pipeline_type='ImbPipeline')
    
    pipeline_manager.add_step('create_new_features', preprocessing_pipeline.build(step_name='create_new_features', trial=None), position=0)
    pipeline_manager.add_step('replace_class', preprocessing_pipeline.build(step_name='replace_class', trial=None), position=1)
    pipeline_manager.add_step('drop_cols', preprocessing_pipeline.build(step_name='drop_cols', trial=None), position=2)

    
    if model_mode == 'tune':
        # Add transformation steps: Option 3       
        pipeline_manager.add_step('column_transformer', preprocessing_pipeline.build(step_name='column_transformer', trial=trial), position=3)
    else:
        # Add transformation steps: Option 3 = Works perfectly
        numerical_strategy = input_params.pop('numerical_strategy', 'mean')
        categorical_strategy = input_params.pop('categorical_strategy','most_frequent')
        outlier_strategy = input_params.pop('outlier_strategy', 'power_transform')
        
        print(f"numerical_strategy: {numerical_strategy}")
        print(f"categorical_strategy: {categorical_strategy}")
        print(f"outlier_strategy: {outlier_strategy}")
        
        
        column_transformer_strategy ={
            "numerical_strategy": numerical_strategy,
            "categorical_strategy": categorical_strategy,
            "outlier_strategy": outlier_strategy,
            "missing_values": 'mean',
            "handle_unknown": 'ignore'  # It's important to handle unknown categorical values correctly when doing feature engineering.
        }
        
        pipeline_manager.add_step('column_transformer', preprocessing_pipeline.build(step_name='column_transformer', trial=None, **column_transformer_strategy), position=3)
    

    
    # Add the resampler step based on the provided resample name or trial suggestion
    resample_selector = ResamplerSelector(trial=trial) 
    resampler = input_params.pop('resampler', None)   
    resampler_obj = resample_selector.get_resampler(resampler=resampler)
    pipeline_manager.add_step('resampler', resampler_obj, position=4)
    
    
    # Add the scaler step based on the provided resample name or trial suggestion
    scaler_selector = ScalerSelector(trial=trial)
    scaler_name = input_params.pop('scaler', None)      
    scaler_obj = scaler_selector.get_scaler(scaler_name=scaler_name)
    pipeline_manager.add_step('scaler', scaler_obj, position=5)
    
    
    # Add the Dimensional Reduction step based on the provided parameter or trial suggestion
    dim_red_selector = DimensionalityReductionSelector(trial=trial)
    dim_red = input_params.pop('dim_red', None)
    pca_n_components = input_params.get('pca_n_components', 5)  
    dim_red_obj = dim_red_selector.get_dimensionality_reduction(dim_red=dim_red, pca_n_components=pca_n_components)
    pipeline_manager.add_step('dim_reduction', dim_red_obj, position=6)

    # Create an instance of the ModelFactory class with best_model and best_params
    #print(f"Input params into the classifer:\n{input_params}")
    model_factory = ModelFactory(classifier_name, input_params)
    model_obj = model_factory.get_model_instance()
    pipeline_manager.add_step('model', model_obj, position=7)
    
    pipeline = pipeline_manager.get_pipeline()
    
    return pipeline



In [None]:
# Define objective function for Optuna
def objective(classifier_name: str, trial: optuna.Trial=None, scoring='f1') -> float:
    """
    Objective function to optimize classifiers dynamically using Optuna.

    Args:
        trial (optuna.Trial): Optuna trial object for suggesting hyperparameters.
        classifier_name (str): Classifier to optimize.
        scoring (str): Scoring metric for cross-validation.

    Returns:
        float: The mean score from cross-validation.
    """
    
    # Get hyperparameters for the classifier from HyperparameterTuner
    hyperparameter_tuner = HyperparameterTuner()
    params = hyperparameter_tuner.get_params(trial, classifier_name)
    #print("hyperparameter parameters obtained from HyperparameterTuner class")
    
    # Got the Preprocessed Pipeline containting Data Cleaning and Column Transformation
    
    pipeline = get_pipeline_model_and_params(classifier_name, trial, model_mode='tune', input_params=params)
    
    # Cross-validation
    kfold = StratifiedKFold(n_splits=10)
    score = cross_val_score(pipeline, X_train, y_train, scoring=scoring, n_jobs=-1, cv=kfold, verbose=0, error_score='raise')
    score_training = score.mean()
    
    #pipeline.fit(X_train, y_train)
    #score_testing = pipeline.score(X_test, y_test)
    
    pipeline.fit(X_train, y_train)
    y_pred = pipeline.predict(X_test)
    f1 = f1_score(y_test, y_pred)
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    y_pred_proba = pipeline.predict_proba(X_test)[:, 1]
    roc_auc = roc_auc_score(y_test, y_pred_proba)

        
    tuning_test_metrics = {
    'f1': f1,
    'accuracy': accuracy,
    'precision': precision,
    'recall': recall,
    'roc_auc': roc_auc,
    'training_score': score_training,

}
    
    return tuning_test_metrics[scoring]

In [None]:
# Define another objective function that shares most of the `objective` function to reproduce the model with the best hyperparameters.

# Define objective function for Optuna
def detailed_objective(classifier_name: str, trial: optuna.Trial=None, scoring='f1') -> Tuple[float, float, float, float, float]:
    """
    Objective function to optimize classifiers dynamically using Optuna.

    Args:
        trial (optuna.Trial): Optuna trial object for suggesting hyperparameters.
        classifier_name (str): Classifier to optimize.
        scoring (str): Scoring metric for cross-validation.

    Returns:
        float: The mean score from cross-validation.
    """
    
    # Get hyperparameters for the classifier from HyperparameterTuner
    hyperparameter_tuner = HyperparameterTuner()
    params = hyperparameter_tuner.get_params(trial, classifier_name)
    #print("hyperparameter parameters obtained from HyperparameterTuner class")
    
    # Got the Preprocessed Pipeline containting Data Cleaning and Column Transformation
    
    pipeline = get_pipeline_model_and_params(classifier_name, trial, model_mode='tune', input_params=params)
    
    # Cross-validation
    kfold = StratifiedKFold(n_splits=10)
    score = cross_val_score(pipeline, X_train, y_train, scoring=scoring, n_jobs=-1, cv=kfold, verbose=0, error_score='raise')
    score_training = score.mean()
    
    
     
    
    pipeline.fit(X_train, y_train)
    y_pred = pipeline.predict(X_test)
    f1 = f1_score(y_test, y_pred)
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    y_pred_proba = pipeline.predict_proba(X_test)[:, 1]
    roc_auc = roc_auc_score(y_test, y_pred_proba)
    
    metrics_test = {
        'classifier_name': classifier_name,
        'f1': f1,
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'roc_auc': roc_auc,
        f'score_training_{scoring}': score_training,
        'best_params': trial,
        "classification_report": classification_report(y_test, y_pred, output_dict=True)  # Detailed report
        }
    
    # Save the variables to a file
    # Serialise the trained pipeline
    joblib.dump((pipeline, metrics_test), f'{classifier_name}_pipeline.pkl')
    print(f'Serialized {classifier_name} pipeline and test metrics to {classifier_name}_pipeline.pkl')
    
     

    return metrics_test
    

In [None]:
# Run the Optuna study
def run_optimization(config_path: str, n_trials: int = 100, scoring: str = 'f1') -> None:
    """
    Run Optuna study for hyperparameter tuning and model selection.
    
    Args:
        config_path (str): Path to the YAML configuration file.
        n_trials (int): Number of trials for optimization. Defaults to 100.
        scoring (str): Scoring metric for optimization. Defaults to 'f1'.
    """
    
    best_model_score = 0
    best_model = None
    best_params = None
    best_of_models = []

    all_models = ["RandomForest", "DecisionTree", 
                  "XGBoost", "LGBM", "GradientBoosting", 
                  "LogisticRegression", "KNeighbors", "CatBoost"]
    
    #all_models = ["DecisionTree", "RandomForest", "LogisticRegression"]

    for classifier_name in all_models:
        print(f"Optimizing model: {classifier_name} | Scoring: {scoring}")
        study = optuna.create_study(direction="maximize", sampler=TPESampler(seed=11))
        study.optimize(lambda trial: objective(classifier_name, trial, scoring), n_trials=n_trials)
        
        print(f"classifier_name: {classifier_name} | study.best_trial.value: {study.best_trial.value}")

        best_trial = study.best_trial
        best_of_models.append({
            "model": classifier_name,
            "model_score_params": best_trial.params,
            "model_score_trial_number": best_trial.number,
            "model_score_datetime": best_trial.datetime_start,
            "model_score_duration": best_trial.duration,
            "model_score_status": best_trial.state,
            "model_score_key": scoring,
            "model_score_value": best_trial.value
            
        })

        current_score = best_trial.value
        
        
        if current_score and current_score > best_model_score:
            best_model_score = best_trial.value
            best_model = classifier_name
            best_params = best_trial.params
            
        print(f"Current Model: {classifier_name}, Current Score: {current_score} | Best Model: {best_model}, Best Score: {best_model_score}")
            
        best_parameters_results = detailed_objective(classifier_name=classifier_name, trial=study.best_trial,  scoring=scoring)
        print(f"Best Parameters: {best_parameters_results}")
        

    # Display all results and the best model
    for result in best_of_models:
        print(result)
    
    print("Best model:", best_model)
    print("Best parameters:", best_params)
    
    
    # Save the variables to a file
    with open("best_model_and_params.pkl", "wb") as f:
        joblib.dump((best_model, best_params), f)
    

   
    
    return best_model, best_params, current_score

In [20]:
if __name__ == "__main__":
    config_path = "model_config.yaml"
    best_model, best_params, current_score = run_optimization(config_path, n_trials=30, scoring='f1')
    

[W 2024-11-05 09:31:58,483] Trial 11 failed with parameters: {'objective': 'CrossEntropy', 'colsample_bylevel': 0.07878989475519685, 'depth': 12, 'boosting_type': 'Ordered', 'bootstrap_type': 'Bayesian', 'categorical_strategy': 'constant', 'numerical_strategy': 'median', 'outlier_strategy': 'iqr_mean', 'resampler': 'SMOTEENN', 'scaler': 'robust', 'dim_red': None} because of the following error: KeyboardInterrupt().
Traceback (most recent call last):
  File "/Users/donadviser/.pyenv/versions/3.11.10/envs/venv311_insure/lib/python3.11/site-packages/optuna/study/_optimize.py", line 197, in _run_trial
    value_or_values = func(trial)
                      ^^^^^^^^^^^
  File "/var/folders/xh/sd37kh3d117gtcym3lygy3000000gn/T/ipykernel_10123/3688297276.py", line 26, in <lambda>
    study.optimize(lambda trial: objective(classifier_name, trial, scoring), n_trials=n_trials)
                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/folders/xh/sd37kh3d117gtcym3lygy

KeyboardInterrupt: 

In [None]:
# Function to deserialise a pipeline for inference
def load_pipeline(model_name):
    return joblib.load(f'{model_name}_pipeline.pkl')

In [None]:
# Load the serialized model and metrics test results
pipeline, metrics_test = load_pipeline("LogisticRegression")
# Perform predictions
#predictions = pipeline.predict(new_data)
pipeline

In [None]:
metrics_test

In [None]:
# Load the variables from the file
with open("fitted_best_model_and_params.pkl", "rb") as f:
    model_name, pipeline_fitted = joblib.load(f)

print(f"model_name: {model_name}")
print(f"pipeline_fitted: {pipeline_fitted}")

In [None]:
pipeline_fitted

In [None]:
pipeline_fitted.fit(X_train, y_train)
y_pred = pipeline_fitted.predict(X_test)
f1 = f1_score(y_test, y_pred)
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
y_pred_proba = pipeline_fitted.predict_proba(X_test)[:, 1]
roc_auc = roc_auc_score(y_test, y_pred_proba) #Calculate Roc

test_results = {
    'f1_score': f1,
    'accuracy': accuracy,
    'precision': precision,
    'recall': recall,
    'roc_auc': roc_auc,
    'model_name': model_name,
    'best_params': best_params,
}

In [None]:
test_results

In [None]:
print(f"best_model: {best_model}")
print(f"best_params: {best_params}")

# Create an instance of the ModelFactory class with best_model and best_params

In [None]:
# Load the variables from the file
with open("best_model_and_params.pkl", "rb") as f:
    best_model, best_params = joblib.load(f)

print(f"best_model: {best_model}")
print(f"best_params: {best_params}")

In [None]:
pipeline = get_pipeline_model_and_params(best_model, trial=None, model_mode='train', input_params=best_params)

In [None]:
pipeline

In [None]:
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
f1 = f1_score(y_test, y_pred)
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
y_pred_proba = pipeline.predict_proba(X_test)[:, 1]
roc_auc = roc_auc_score(y_test, y_pred_proba) #Calculate Roc

test_results = {
    'f1_score': f1,
    'accuracy': accuracy,
    'precision': precision,
    'recall': recall,
    'roc_auc': roc_auc,
    'best_model': best_model,
    'best_params': best_params,
}

In [None]:
test_results

In [None]:
# Assume `pipeline` is your final fitted pipeline
# Also, assume `X_train` is your training DataFrame

# Fit the pipeline (if not already fitted)
#pipeline.fit(X_train)

# If you have a ColumnTransformer, find it in the pipeline
column_transformer = None
for name, step in pipeline.named_steps.items():
    if isinstance(step, ColumnTransformer):
        column_transformer = step
        break

# If you have a ColumnTransformer, get the feature names
if column_transformer is not None:
    # Get transformed column names
    feature_names = []
    for name, transformer, columns in column_transformer.transformers_:
        if transformer != 'drop':
            if hasattr(transformer, 'get_feature_names_out'):
                feature_names.extend(transformer.get_feature_names_out(columns))
            else:
                # If the transformer does not have get_feature_names_out method
                feature_names.extend(columns)
else:
    # No ColumnTransformer, fallback to input features
    feature_names = X_train.columns.tolist()

print("Extracted Feature Names: ", feature_names)

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

def display_feature_importance(model, X, y, feature_names, n_top=10):
    """
    This function takes in a dictionary of models, the dataset X, y, and the feature names.
    It fits each model, extracts feature importances (if available), 
    and plots the top n features.

    Parameters:
    models (dict): A dictionary containing model names and their respective model objects.
    X (np.ndarray): Feature dataset.
    y (pd.Series): Target variable.
    feature_names (list): List of feature names after transformations.
    n_top (int): Number of top features to display. Default is 10.
    
    Returns:
    None
    """
    model_name = 'classifier'
     
    # Fit the model
    #model.fit(X, y)
    print(f"Feature ranking for model: {model_name}")
    try:
        # Check if the model has the attribute `feature_importances_`
        if hasattr(model, 'feature_importances_'):
            # Get feature importances
            importance_scores = model.feature_importances_
            
        else:                 
            print(f"{model_name} does not support feature importances.")
            importance_scores = model.coef_[0]
            
        # Create a DataFrame from feature names and importances
        data = {'Feature': feature_names, 'Score': importance_scores}
        df = pd.DataFrame(data)
        
        
        # Take the absolute value of the score
        df['Abs_Score'] = np.abs(df['Score'])
        
        df_sorted = df.sort_values(by="Abs_Score", ascending=False)
        if n_top:
            # Sort by absolute value of score in descending order (top 10)
            df_sorted = df_sorted.head(n_top)
        
        # Define a color palette based on score values (positive = green, negative = red)
        colors = ["green" if score > 0 else "red" for score in df_sorted["Score"]]
        plt.figure(figsize=(12, 8))
        # Create the bar chart with Seaborn
        sns.barplot(x="Feature", y="Score", hue="Feature", legend=False, data=df_sorted, palette=colors)
        
        # Customize the plot for better visual appeal
        plt.xlabel("Feature")
        plt.ylabel("Feature Importance Score")
        plt.title(f"Feature Importance in {model_name} Classification")
        plt.xticks(rotation=45, ha="right")  # Rotate x-axis labels for better readability
        plt.tight_layout()  # Adjust spacing between elements
    
        # Display the plot
        plt.show()
                
        
            
    except Exception as e:
        print(f"Error occurred while extracting feature importances: {str(e)}")

In [None]:
fitted_model = pipeline.named_steps['model']
fitted_model

In [None]:
# Call the display feature importance function after model evaluation
display_feature_importance(fitted_model, X_train, y_train, feature_names, n_top=10)  # Specify how many top features to display

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns

def plot_feature_importance(model, feature_names=None, top_n=20):
    """
    Plot the feature importance of a fitted tree-based model.

    Parameters:
    -----------
    model : estimator
        A fitted tree-based model with `feature_importances_` attribute (e.g., RandomForest, GradientBoosting).
    
    feature_names : list or None
        List of feature names. If None, numerical indices are used as feature names.
    
    top_n : int, default=20
        The number of top features to plot. If `None`, all features will be plotted.
    """
    # Check if the model has feature_importances_ attribute
    try:
        if not hasattr(model, 'feature_importances_'):
            importance_values = model.coef_[0]
        else:
            # Get feature importance values and sort them in descending order
            importance_values = model.feature_importances_
    except:
        raise ValueError("The model does not have `feature_importances_` attribute.")

    
    
    # Create a DataFrame for better manipulation and sorting
    if feature_names is None:
        feature_names = [f'Feature {i}' for i in range(len(importance_values))]
    
    importance_df = pd.DataFrame({'Feature': feature_names, 'Importance': importance_values})
    
    # Sort features by importance
    importance_df = importance_df.sort_values(by='Importance', ascending=False)
    
    # If top_n is specified, select the top_n features
    if top_n is not None and top_n < len(importance_df):
        importance_df = importance_df.head(top_n)
    
    # Plot the feature importance
    plt.figure(figsize=(12, 8))
    sns.barplot(x='Importance', y='Feature', data=importance_df, hue='Feature', palette='viridis', dodge=False, legend=False)
    plt.title('Top Feature Importance')
    plt.xlabel('Importance')
    plt.ylabel('Feature')
    plt.tight_layout()
    plt.show()


In [None]:
fitted_model = pipeline.named_steps['model']
fitted_model

In [None]:
# Use the function to plot the feature importance
plot_feature_importance(fitted_model, feature_names=feature_names, top_n=20)