In [None]:
import os
import numpy as np
import pandas as pd

from sklearn.preprocessing import OneHotEncoder, LabelEncoder, OrdinalEncoder, LabelBinarizer
from category_encoders.binary import BinaryEncoder
from category_encoders.target_encoder import TargetEncoder
from category_encoders.woe import WOEEncoder


class CategoricalFeatureSystem:

    def __init__(self, df_initial: pd.DataFrame, df_final_path: str, X: list, y: str = None):
        self.df_initial = df_initial
        self.df_final_path = df_final_path
        self.X = X
        self.y = y

        if os.path.exists(self.df_final_path):
            self.df_final = pd.read_csv(self.df_final_path)
        else:
            self.df_final = df_initial.copy()
            self.df_final.to_csv(self.df_final_path, index=False)


    def method(self, tool_name: str, config: dict):

        encoder_config = config.get("encoder_params", {})
        per_column_config = config.get("per_column", {})
        drop_original = config.get("drop_original", True)

        if tool_name == "OneHotEncoder":
            encoded_df = self._one_hot_encoder(encoder_config)

        elif tool_name == "LabelEncoder":
            encoded_df = self._label_encoder(per_column_config)

        elif tool_name == "BinaryEncoder":
            encoded_df = self._binary_encoder(encoder_config)

        elif tool_name == "FrequencyEncoder":
            encoded_df = self._frequency_encoder(per_column_config)

        elif tool_name == "TargetEncoder":
            encoded_df = self._target_encoder(encoder_config)

        elif tool_name == "WoE":
            encoded_df = self._woe_encoder(encoder_config)

        elif tool_name == "OrdinalEncoder":
            encoded_df = self._ordinal_encoder(encoder_config)

        elif tool_name == "LabelBinarizer":
            encoded_df = self._label_binarizer(per_column_config)

        elif tool_name == "CyclicalEncoder":
            encoded_df = self._cyclical_encoder(per_column_config)

        else:
            return "Invalid tool_name"

        if drop_original:
            self.df_final.drop(columns=self.X, inplace=True)

        self.df_final = pd.concat([self.df_final, encoded_df], axis=1)
        self.df_final.to_csv(self.df_final_path, index=False)

        return self.df_final


    def _one_hot_encoder(self, encoder_config: dict):
        encoder = OneHotEncoder(**encoder_config)
        encoded = encoder.fit_transform(self.df_final[self.X])

        if hasattr(encoded, "toarray"):
            encoded = encoded.toarray()

        columns = encoder.get_feature_names_out(self.X)
        return pd.DataFrame(encoded, columns=columns)


    def _label_encoder(self, per_column_config: dict):
        encoded_df = pd.DataFrame()

        for col in self.X:
            col_config = per_column_config.get(col, {})
            mapping = col_config.get("mapping")

            if mapping:
                encoded_df[col] = self.df_final[col].map(mapping)
            else:
                encoder = LabelEncoder()
                encoded_df[col] = encoder.fit_transform(self.df_final[col])

        return encoded_df


    def _binary_encoder(self, encoder_config: dict):
        encoder = BinaryEncoder(cols=self.X, **encoder_config)
        return encoder.fit_transform(self.df_final[self.X])


    def _frequency_encoder(self, per_column_config: dict):
        encoded_df = pd.DataFrame()

        for col in self.X:
            col_config = per_column_config.get(col, {})
            normalize = col_config.get("normalize", True)
            smoothing = col_config.get("smoothing")

            freq = self.df_final[col].value_counts(normalize=normalize)

            if smoothing:
                freq = (freq + smoothing) / (1 + smoothing)

            encoded_df[col] = self.df_final[col].map(freq)

        return encoded_df


    def _target_encoder(self, encoder_config: dict):
        if self.y is None:
            return "Target column required"

        encoder = TargetEncoder(cols=self.X, **encoder_config)
        return encoder.fit_transform(self.df_final[self.X], self.df_final[self.y])


    def _woe_encoder(self, encoder_config: dict):
        if self.y is None:
            return "Target column required"

        encoder = WOEEncoder(cols=self.X, **encoder_config)
        return encoder.fit_transform(self.df_final[self.X], self.df_final[self.y])


    def _ordinal_encoder(self, encoder_config: dict):
        encoder = OrdinalEncoder(**encoder_config)
        encoded = encoder.fit_transform(self.df_final[self.X])
        return pd.DataFrame(encoded, columns=self.X)


    def _label_binarizer(self, per_column_config: dict):
        encoded_df = pd.DataFrame()

        for col in self.X:
            col_config = per_column_config.get(col, {})
            neg_label = col_config.get("neg_label", 0)
            pos_label = col_config.get("pos_label", 1)
            sparse_output = col_config.get("sparse_output", False)

            encoder = LabelBinarizer(neg_label=neg_label, pos_label=pos_label, sparse_output=sparse_output)
            transformed = encoder.fit_transform(self.df_final[col])

            if transformed.ndim == 1:
                encoded_df[col] = transformed
            else:
                for idx, class_ in enumerate(encoder.classes_):
                    encoded_df[f"{col}_{class_}"] = transformed[:, idx]

        return encoded_df


    def _cyclical_encoder(self, per_column_config: dict):
        encoded_df = pd.DataFrame()

        for col in self.X:
            col_config = per_column_config.get(col, {})
            period = col_config.get("period")

            if period is None:
                return "period required per column"

            encoded_df[f"{col}_sin"] = np.sin(2 * np.pi * self.df_final[col] / period)
            encoded_df[f"{col}_cos"] = np.cos(2 * np.pi * self.df_final[col] / period)

        return encoded_df