### Import Modules


In [None]:
""" All modules for this steps of the pipeline are defined here. """

import os
from dotenv import load_dotenv
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.feature_selection import SelectKBest, f_classif
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
import optuna
import matplotlib.pyplot as plt


from _1_DataPrep_and_Cleaning_Part1 import run_data_prep

### Import Paths


In [None]:
""" Loading environment variables from .env file"""
load_dotenv()

combine_choice = os.getenv("combine_choice", 0)
combine_features_raw = os.getenv("combine_features", "")
target_column = os.getenv("target_column", "UNKNOWN")


feature_selection = os.getenv("feature_selection", "False")
target_column = os.getenv("target_column", "UNKNOWN")
top_n_features = os.getenv("top_n_features", '')
feature_figure_path = os.getenv("feature_figure_path", "").strip()

### Feature Combination

In [None]:
""" Function to combine features based on user-defined combinations. """

def combine_features(df):

    if combine_choice != "1" or not combine_features_raw:
        print(f"Combine choice is {combine_choice} or and features are {combine_features_raw}.")
        print("Skipping feature combination as combine_choice is not set to 1 or no features specified.")
        return df

    combinations = [x.strip() for x in combine_features_raw.split('|') if x.strip()]
    
    for combo in combinations:
        try:
            f1, f2, new_name = [c.strip() for c in combo.split(',')]
        except ValueError:
            raise ValueError(f"Invalid format in combination string: '{combo}'. Expected format is 'feature1,feature2,new_feature_name'.")

        if f1 not in df.columns or f2 not in df.columns:
            raise ValueError(f"One or both columns '{f1}' or '{f2}' do not exist in the DataFrame.")

        # Combine and insert at the position of the first feature
        combined_series = df[f1].astype(str) + "_" + df[f2].astype(str)
        insert_position = df.columns.get_loc(f1)
        df.insert(insert_position, new_name, combined_series)

        # Drop original columns
        df = df.drop(columns=[f1, f2])

    return df


### Feature Selection

In [None]:
""" Function to perform feature selection based on user-defined method. """

def select_features(dataFrame):
    feature_figure_path = os.getenv("feature_figure_path", "").strip()

    if not target_column or target_column not in dataFrame.columns:
        raise ValueError("Target column must be set and exist in the dataframe.")

    X = dataFrame.drop(columns=[target_column])
    y = dataFrame[target_column]

    if feature_selection == '0':
        print("Feature selection not applied. Using all features.")
        return dataFrame

    # Clean data
    X.replace('', np.nan, inplace=True)
    X.dropna(inplace=True)
    y = y.loc[X.index]

    X = X.apply(pd.to_numeric, errors='coerce')
    X.dropna(inplace=True)
    y = y.loc[X.index]

    use_optuna = top_n_features.strip() == ''
    top_n = int(top_n_features) if not use_optuna else None

    if feature_selection == '1':
        print("Applying feature selection using Random Forest feature importance...")

        if use_optuna:
            print("No top_n_features specified. Running Optuna to find optimal number of features...")

            def objective_rf(trial):
                k = trial.suggest_int("k", 1, X.shape[1])
                model = RandomForestClassifier(n_estimators=300, random_state=42)
                model.fit(X, y)
                importances = model.feature_importances_
                indices = np.argsort(importances)[::-1][:k]
                selected_X = X.iloc[:, indices]
                score = cross_val_score(model, selected_X, y, cv=3).mean()
                return score

            study = optuna.create_study(direction="maximize")
            study.optimize(objective_rf, n_trials=3)
            top_n = study.best_params["k"]
            print(f"Optuna selected top_n_features for Random Forest: {top_n}")

        model = RandomForestClassifier(n_estimators=300, random_state=42)
        model.fit(X, y)
        importances = model.feature_importances_
        feature_names = X.columns
        importance_df = pd.DataFrame({
            "Feature": feature_names,
            "Importance": importances
        }).sort_values(by="Importance", ascending=False)

        selected_columns = importance_df["Feature"].iloc[:top_n].tolist()
        print(f"Top {top_n} selected features (Random Forest):")
        for i, feature in enumerate(selected_columns, 1):
            print(f"{i}. {feature}")

        if feature_figure_path:
            os.makedirs(feature_figure_path, exist_ok=True)

            # Save plot
            plt.figure(figsize=(12, 6))
            bars = plt.bar(importance_df["Feature"], importance_df["Importance"], color="gray")
            for i in range(top_n):
                bars[i].set_color("blue")
            plt.xticks(rotation=90)
            plt.xlabel("Feature")
            plt.ylabel("Importance")
            plt.title(f"Feature Importances (Top {top_n} highlighted)")
            plt.tight_layout()

            plot_file = os.path.join(feature_figure_path, f"rf_feature_importance_top{top_n}.png")
            plt.savefig(plot_file)
            plt.close()
            print(f"Feature importance plot saved to: {plot_file}")

            # Save CSV
            csv_file = os.path.join(feature_figure_path, f"rf_feature_importance_top{top_n}.csv")
            importance_df.to_csv(csv_file, index=False)
            print(f"Feature importance CSV saved to: {csv_file}")

        return dataFrame[selected_columns + [target_column]]

    elif feature_selection == '2':
        print("Applying feature selection using SelectKBest with cross-validation...")

        if use_optuna:
            print("No top_n_features specified. Running Optuna to find optimal number of features...")

            def objective_kbest(trial):
                k = trial.suggest_int("k", 1, X.shape[1])
                selector = SelectKBest(score_func=f_classif, k=k)
                selected_X = selector.fit_transform(X, y)
                model = RandomForestClassifier(n_estimators=300, random_state=42)
                score = cross_val_score(model, selected_X, y, cv=3).mean()
                return score

            study = optuna.create_study(direction="maximize")
            study.optimize(objective_kbest, n_trials=3)
            top_n = study.best_params["k"]
            print(f"Optuna selected top_n_features for SelectKBest: {top_n}")

        selector = SelectKBest(score_func=f_classif, k=top_n)
        selector.fit(X, y)
        scores = selector.scores_
        feature_names = X.columns
        importance_df = pd.DataFrame({
            "Feature": feature_names,
            "Score": scores
        }).sort_values(by="Score", ascending=False)

        selected_columns = importance_df["Feature"].iloc[:top_n].tolist()
        print(f"Top {top_n} selected features (SelectKBest):")
        for i, feature in enumerate(selected_columns, 1):
            print(f"{i}. {feature}")

        if feature_figure_path:
            os.makedirs(feature_figure_path, exist_ok=True)

            # Save plot
            plt.figure(figsize=(12, 6))
            bars = plt.bar(importance_df["Feature"], importance_df["Score"], color="gray")
            for i in range(top_n):
                bars[i].set_color("blue")
            plt.xticks(rotation=90)
            plt.xlabel("Feature")
            plt.ylabel("Score")
            plt.title(f"SelectKBest Feature Scores (Top {top_n} highlighted)")
            plt.tight_layout()

            plot_file = os.path.join(feature_figure_path, f"kbest_scores_top{top_n}.png")
            plt.savefig(plot_file)
            plt.close()
            print(f"SelectKBest plot saved to: {plot_file}")

            # Save CSV
            csv_file = os.path.join(feature_figure_path, f"kbest_scores_top{top_n}.csv")
            importance_df.to_csv(csv_file, index=False)
            print(f"SelectKBest CSV saved to: {csv_file}")

        return dataFrame[selected_columns + [target_column]]

    else:
        raise ValueError(f"Invalid feature_selection value: {feature_selection}")

### Main Function

In [None]:
""" Main function to run feature engineering and selection. """

def run_feature_select():

    # Run the data preparation and cleaning script
    dataFrame = run_data_prep()
    
    # Combine features
    dataFrame = combine_features(dataFrame)
    
    # Select features
    dataFrame = select_features(dataFrame)

    return dataFrame
    

In [None]:
if __name__ == "__main__":
    run_feature_select()