In [None]:
#| label: custom-transformers
#| eval: false
#| echo: true
#| code-overflow: scroll
#| code-summary: "Mostrar/esconder código"
#| code-fold: true
# Este código é apenas um exemplo. Os transformadores estão disponíveis no repositório deste projeto.
class DropConstantColumns(BaseEstimator, TransformerMixin):
    """
    This class is made to work as a step in sklearn.pipeline.Pipeline object.
    It drops constant columns from a pandas dataframe object.
    Important: the constant columns are found in the fit function and dropped in the transform function.
    """
    def __init__(self, print_cols: bool = False, also: list[str] = []) -> None:
        """
        print_cols: default = False. Determine whether the fit function should print the constant columns' names.
        ignore: list of columns to ignore.
        Initiates the class.
        """
        self.print_cols = print_cols
        self.also = also
        pass

    def fit(self, X: pd.DataFrame , y: None = None) -> None:
        """
        X: dataset whose constant columns should be removed.
        y: Shouldn't be used. Only exists to prevent raise Exception due to accidental input in a pipeline.
        Creates class atributte with the names of the columns to be removed in the transform function.
        """
        self.constant_cols = [
            col
            for col in X.columns
            if (
                (X[col].nunique() == 1)
                | (col in self.also)
            )
        ]
        if self.print_cols:
            print(f"{len(self.constant_cols)} constant columns were found.")
        return self
    
    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        X: dataset whose constant columns should be removed.
        Returns dataset without the constant columns found in the fit function.
        """
        return X.copy().drop(self.constant_cols, axis=1)

class DropDuplicateColumns(BaseEstimator, TransformerMixin):
    """
    This class is made to work as a step in sklearn.pipeline.Pipeline object.
    It drops duplicate columns from a pandas dataframe object.
    Important: the duplicate columns are found in the fit function and dropped in the transform function.
    """
    def __init__(self, print_cols: bool = False, ignore: list[str] = []) -> None:
        """
        print_cols: default = False. Determine whether the fit function should print the duplicate columns' names.
        ignore: list of columns to ignore.
        Initiates the class.
        """
        self.print_cols = print_cols
        self.ignore = ignore
        pass

    def fit(self, X: pd.DataFrame, y: None = None) -> None:
        """
        X: dataset whose duplicate columns should be removed.
        y: Shouldn't be used. Only exists to prevent raise Exception due to accidental input in a pipeline.
        Creates class atributte with the names of the columns to be removed in the transform function.
        """
        regular_columns = []
        duplicate_columns = []
        sorted_cols = sorted(X.columns)
        for col0 in sorted_cols:
            if col0 not in duplicate_columns:
                regular_columns.append(col0)
            for col1 in sorted_cols:
                if (col0 != col1):
                    if X[col0].equals(X[col1]):
                        if col1 not in regular_columns:
                            duplicate_columns.append(col1)
        self.duplicate_cols = duplicate_columns
        if self.print_cols:
            print(f"{len(duplicate_columns)} duplicate columns were found.")
        return self
    
    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        X: dataset whose duplicate columns should be removed.
        Returns dataset without the duplicate columns found in the fit function.
        """ 
        X_ = X.copy()
        return X_.drop(self.duplicate_cols, axis=1)

class AddNonZeroCount(BaseEstimator, TransformerMixin):
    """
    This class is made to work as a step in sklearn.pipeline.Pipeline object.
    """
    def __init__(self, prefix: str = "", ignore: list[str] = []) -> None:
        """
        prefix: prefix of the columns to be summed.
        ignore: list of columns to ignore.
        fake_value: value to be replaced with None.
        Initiates de class.
        """
        self.prefix = prefix
        self.ignore = ignore
        pass

    def fit(self, X: pd.DataFrame, y: None = None) -> None:
        """
        X: dataset whose "prefix" variables different than 0 should be counted.
        y: Shouldn't be used. Only exists to prevent raise Exception due to accidental input in a pipeline.
        Creates class atributte with the names of the columns whose not 0 values should be counted in the transform function.
        """
        self.prefix_cols = [
            col
            for col in X.columns
            if (
                    (col.startswith(self.prefix))
                    & (col not in self.ignore)
            )
        ]
        return self
    
    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        X: dataset whose "prefix" variables' not 0 values should be counted.
        Returns dataset with new column with the count of the "prefix" variables' not 0 values.
        """  
        X_ = X.copy()
        X_[f"non_zero_count_{self.prefix}"] = X_[self.prefix_cols] \
            .applymap(lambda x: 0 if ((x == 0) | (x == None)) else 1) \
            .sum(axis=1)
        return X_

class CustomSum(BaseEstimator, TransformerMixin):
    """
    This class is made to work as a step in sklearn.pipeline.Pipeline object.
    It sums columns from a pandas dataframe object based on the columns prefix.
    """
    def __init__(self, prefix: str = "", ignore: list[str] = []) -> None:
        """
        prefix: prefix of the columns to be summed.
        ignore: list of columns to ignore.
        fake_value: value to be replaced with None.
        Initiates de class.
        """
        self.prefix = prefix
        self.ignore = ignore
        pass

    def fit(self, X: pd.DataFrame, y: None = None) -> None:
        """
        X: dataset whose columns with "prefix" should be summed.
        y: Shouldn't be used. Only exists to prevent raise Exception due to accidental input in a pipeline.
        Creates class atributte with the names of the columns to be summed in the transform function.
        """
        self.prefix_cols = [
            col
            for col in X.columns
            if (
                    (col.startswith(self.prefix))
                    & (col not in self.ignore)
            )
        ]
        return self
    
    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        X: dataset whose "prefix" variables should be summed.
        Returns dataset with new column with the sum of the "prefix" variables.
        """  
        X_ = X.copy()
        X_[f"sum_of_{self.prefix}"] = X_[self.prefix_cols] \
            .sum(axis=1)
        return X_

class CustomImputer(BaseEstimator, TransformerMixin):
    """
    This class is made to work as a step in a sklearn.pipeline.Pipeline object.
    It imputes values in a pandas dataframe object based on the columns prefix.
    """
    def __init__(self, prefix: str, to_replace: Union[int, float, str],
                 replace_with: Union[int, float, str] = np.nan, ignore: list[str] = []) -> None:
        """
        prefix: prefix of the columns to be imputed.
        to_replace: value to be replaced.
        replace_with: value to replace "to_replace" with.
        ignore: list of columns to ignore.
        Initiates de class.
        """
        self.prefix = prefix
        self.to_replace = to_replace
        self.replace_with = replace_with
        self.ignore = ignore
        pass

    def fit(self, X: Union[pd.DataFrame, pd.Series], y: None = None) -> None:
        """
        X: dataset whose columns with "prefix" should be imputed.
        y: Shouldn't be used. Only exists to prevent raise Exception due to accidental input in a pipeline.
        Creates class atributte with the names of the columns to be imputed in the transform function.
        """
        self.prefix_cols = [
            col
            for col in X.columns
            if (
                    (col.startswith(self.prefix))
                    & (col not in self.ignore)
            )
        ]
        return self
    
    def transform(self, X: Union[pd.DataFrame, pd.Series]) -> Union[pd.DataFrame, pd.Series]:
        """
        X: dataset whose columns with "prefix" should be imputed.
        Returns dataset with the imputed columns.
        """
        X_ = X.copy()
        X_[self.prefix_cols] = X_[self.prefix_cols] \
            .replace(self.to_replace, self.replace_with)
        return X_
 
class AddNoneCount(BaseEstimator, TransformerMixin):
    """
    This class is made to work as a step in sklearn.pipeline.Pipeline object.
    It counts the number of None values in a pandas dataframe object based on the columns prefix.
    """
    def __init__(self, prefix: str = "", ignore: list[str] = []) -> None:
        """
        prefix: subset of variables for none count starting with this string.
        fake_value: values inserted to replace None.
        ignore: list of columns with prefix to ignore.
        drop_constant: whether to drop columns that would become constant without missing features or not.
        """
        self.prefix = prefix
        self.ignore = ignore
        pass

    def fit(self, X: pd.DataFrame, y: None = None) -> None:
        """
        X: dataset whose "prefix" variables' null values should be counted.
        y: Shouldn't be used. Only exists to prevent raise Exception due to accidental input in a pipeline.
        Creates class atributte with the names of the columns whose null values should be counted in the transform function.
        """
        self.prefix_cols = [
            col
            for col in X.columns
            if (
                    (col.startswith(self.prefix))
                    & (col not in self.ignore)
            )
        ]
        return self
    
    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        X: dataset to apply transformation on.
        Returns dataset with new column with the count of the "prefix" variables' null values.
        """  
        X_ = X.copy()
        X_[f"none_count_{self.prefix}"] = X_[self.prefix_cols] \
            .isnull() \
            .sum(axis=1)
        return X_
    
class CustomEncoder(BaseEstimator, TransformerMixin):
    """
    This class is made to work as a step in sklearn.pipeline.Pipeline object.
    It encodes categorical variables in a pandas dataframe based on the categories mean of the target variable.
    Unknown values must be defined by the user.
    """
    def __init__(self, colname: str) -> None:
        """
        labels: dictionary with the labels to be replaced.
        colname: name of the column to be encoded.
        Initiates de class.
        """
        self.colname = colname
        pass

    def fit(self, X: pd.DataFrame, y: Union[pd.DataFrame, pd.Series]) -> None:
        """
        X: dataset whose column should be encoded.
        y: Shouldn't be used. Only exists to prevent raise Exception due to accidental input in a pipeline.
        Creates class atributte with the dictionary to be used in the transform function.
        """
        X_ = X.copy().assign(TARGET=y)

        grouped_X_ = X_ \
            .groupby(self.colname) \
            .agg({"TARGET": "mean"}) \
            .sort_values("TARGET", ascending=True)
        
        groups = grouped_X_.index

        self.labels ={
            groups[i]: i
            for i in range(len(groups))
        }

        self.most_frequent = X_[self.colname].mode()[0]
        return self
    
    def _apply_map(self, x: Union[int, str]) -> int:
        """
        x: value to be replaced.
        Returns the value to replace "x" with.
        """
        if x in self.labels.keys():
            return self.labels[x]
        else:
            return self.labels[self.most_frequent]
    
    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        X: dataset whose column should be encoded.
        Returns dataset with the encoded column.
        """
        X_ = X.copy()
        X_[self.colname] = X_[self.colname] \
            .apply(self._apply_map)
        return X_
    
class PrefixScaler(BaseEstimator, TransformerMixin):
    def __init__(self, prefixes, scaler, zero_heavy = False, ignore=[]):
        self.prefixes = prefixes
        self.scaler = scaler
        self.ignore = ignore
        self.zero_heavy = zero_heavy
        pass

    def fit(self, X, y=None):
        self.prefix_cols = [
            col
            for col in X.columns
            if (
                any([col.startswith(prefix) for prefix in self.prefixes])
                & (col not in self.ignore)
            )
        ]
        if self.zero_heavy:
            X_ = X.copy().replace(0, np.nan)
        else:
            X_ = X.copy()
        self.scaler = self.scaler.fit(X_[self.prefix_cols])
        return self

    def transform(self, X):
        X_ = X.copy()
        X_[self.prefix_cols] = self.scaler.transform(X_[self.prefix_cols])
        return X_

In [None]:
#| label: train-evaluate
#| eval: false
#| echo: true
#| code-overflow: scroll
#| code-summary: "Mostrar/esconder código"
#| code-fold: true

class TrainEvaluate:
    """
    This class can be used to train, validate and test sklearn Pipeline objects.
    """
    def __init__(self, model: Pipeline, param_grid: dict, target: str,
                 njobs: int = 8, verbose: bool = True) -> None:
        """
        model: sklearn Pipeline with the model.
        param_grid: Dictionary of parameters to search over.
        target: Name of the column to predict.
        save_model: Wheter to save the model or not.
        save_name: Name of the file to save the model.
        njobs: Number of jobs to run in parallel.
        verbose: Wheter to print the progress or not.
        Initialize the class with the model, param_grid, and target variable.
        """
        self.model = model
        self.param_grid = param_grid
        self.target = target
        self.njobs = njobs
        self.verbose = verbose
        pass

    def _validation_split(self, df: pd.DataFrame) -> tuple:
        """
        df: Pandas DataFrame with the data.
        Split the data into train and validation sets.
        """
        y = df[self.target]
        X = df.drop(self.target, axis=1)
        X_train, X_val, y_train, y_val = train_test_split(
            X,
            y,
            test_size=0.25,
            random_state=42
        )
        return (X_train, X_val, y_train, y_val)
    
    def _grid_search(self, X_train: pd.DataFrame, y_train: Union[pd.DataFrame, pd.Series]) -> GridSearchCV:
        """
        X_train: Pandas DataFrame with the training data.
        y_train: Pandas Series with the training target.
        """
        skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
        grid_search = GridSearchCV(
            estimator=self.model,
            param_grid=self.param_grid,
            scoring="roc_auc",
            n_jobs=self.njobs,
            cv=skf
        )
        grid_search = grid_search.fit(X_train, y_train)
        return grid_search
    
    def _profit(self, y_true: Union[np.ndarray, pd.DataFrame, pd.Series],
                y_pred: Union[np.ndarray, pd.DataFrame, pd.Series]) -> float:
        """
        y_true: Pandas Series with the true target.
        y_pred: Pandas Series with the predicted target.
        Calculate the profit metric of the model.
        """
        tp = np.sum((y_pred == 1) & (y_true == 1))
        fp = np.sum((y_pred == 1) & (y_true == 0))
        n = len(y_true)
        profit = (90 * tp - 10 * fp)
        return profit
    
    def _threshold_tuning(self, X_val: pd.DataFrame, y_val: Union[pd.DataFrame, pd.Series]) -> float:
        """
        X_val: Pandas DataFrame with the validation data.
        y_val: Pandas Series with the validation target.
        Find the threshold that maximizes the profit metric.
        """
        y_proba = self.best_model_.predict_proba(X_val)[:, 1]

        def profit_treshold(x: float) -> float:
            """
            x: Threshold to test.
            Returns negative of the profit metric.
            """
            y_pred = (y_proba >= x).astype(int)
            scalar = -self._profit(y_val, y_pred)
            return scalar
        
        threshold = minimize_scalar(
            profit_treshold,
            bounds=(0, 1),
            method="bounded"
        )
        self.threshold = threshold.x
        return threshold.x
        
    def fit(self, df: pd.DataFrame) -> None:
        """
        df: Pandas DataFrame with the data.
        path: Path to a fitted model.
        Splits data between train and validation, performs GridSearchCV,
        adjusts the threshold based on profit metric on the validation set,
        and fits the model on the original data.
        """
        if self.verbose:
            print("Splitting data into train and validation sets...")
        X_train, X_val, y_train, y_val = self._validation_split(df)
        if self.verbose:
            print("Done!")
            print("Performing GridSearchCV...")
        self.best_model_ = self._grid_search(X_train, y_train).best_estimator_
        if self.verbose:
            print("Done!")
            print("Adjusting threshold based on validation set...")
        self.threshold = self._threshold_tuning(X_val, y_val)
        if self.verbose:
            print("Done!")
            print("Fitting model on the whole dataset...")
        self.best_model_ = self.best_model_.fit(df.drop(self.target, axis=1), df[self.target])
        if self.verbose:
            print("Done!")

        return self
    
    def predict_proba(self, df: pd.DataFrame) -> np.ndarray:
        """
        df: Pandas DataFrame with the data.
        Predicts the target variable using the best model.
        """
        return self.best_model_.predict_proba(df)[:, 1]
    
    def predict(self, df: pd.DataFrame) -> np.ndarray:
        """
        df: Pandas DataFrame with the data.
        Predicts the target variable using the best model and the threshold.
        """
        y_proba = self.predict_proba(df)
        y_pred = (y_proba >= self.threshold).astype(int)
        return y_pred
    
    def evaluate(self, df: pd.DataFrame) -> dict:
        """
        df: Pandas DataFrame with the test data.
        Evaluates the model on the data.
        """
        X_test = df.drop(self.target, axis=1)
        y_true = df[self.target]
        y_proba = self.predict_proba(X_test)
        y_pred = self.predict(X_test)

        tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()

        self.business_metrics = {
            "Profit (Total)": tp * 90 - fp * 10,
            "Profit (per Customer)": (tp * 90 - fp * 10) / len(y_true),
            "True Positive Profit (Total)": tp * 90,
            "True Positive Profit (per Customer)": tp * 90 / len(y_true),
            "False Positive Loss (Total)": fp * 10,
            "False Positive Loss (per Customer)": fp * 10 / len(y_true),
            "False Negative Potential Profit Loss (Total)": fn * 90,
            "False Negative Potential Profit Loss (per Customer)": fn * 90 / len(y_true),
            "True Negative Loss Prevention (Total)": tn * 10,
            "True Negative Loss Prevention (per Customer)": tn * 10 / len(y_true)
        }

        self.classification_metrics = {
            "Classification Threshold": self.threshold,
            "ROC AUC": roc_auc_score(y_true, y_proba),
            "Precision": precision_score(y_true, y_pred),
            "Recall": recall_score(y_true, y_pred),
            "F1": f1_score(y_true, y_pred),
            "Accuracy": accuracy_score(y_true, y_pred)
        }

        return self

    def _predict_profit(self, model, X: pd.DataFrame, y: pd.Series) -> float:
        """
        X: Pandas DataFrame with the data.
        y: Pandas Series with the target.
        Predicts the profit metric using the best model and custom threshold.
        """
        y_pred = model.predict(X)
        return self._profit(y, y_pred)

    def get_feature_importances(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        df: Pandas DataFrame with the data.
        Implements permutation feature importances on the data using the best model and custom threshold.
        """
    
        X = df.drop(self.target, axis=1)
        X = self.best_model_.steps[0][1].transform(X)
        y = df[self.target]
        result = permutation_importance(
            self.best_model_.steps[1][1],
            X,
            y,
            scoring=self._predict_profit,
            n_repeats=30,
            random_state=42,
            n_jobs=self.njobs
        )
        feature_importances = pd.DataFrame({
            "Feature": X.columns,
            "Importance": result.importances_mean
        })
        self.feature_importances = feature_importances.sort_values("Importance", ascending=False)
        return self.feature_importances
    
    def rank_customers(self, df: pd.DataFrame) -> pd.Series:
        """
        df: Pandas DataFrame with the data.
        Ranks the customers by their probability of insatisfaction.
        """
        df_ = df.copy()
        X = df_.drop(self.target, axis=1)
        y = df_[self.target]

        def apply_rank(x: float) -> int:
            """
            x: Probability of insatisfaction.
            Applies the rank (1 to 5) to the probability of insatisfaction.
            """
            thresholds = [c * self.threshold / 4 for c in range(5)][::-1]
            for rank, threshold in enumerate(thresholds):
                if x >= threshold:
                    return rank + 1
            return 5

        df_["rank"] = self.predict_proba(X)
        return df_["rank"].apply(apply_rank)
    
def build_model(path: str = None, train_df: pd.DataFrame = None, model: Pipeline = None,
                param_grid: dict = None, target: str = None,
                njobs: int = 8, verbose: bool = True) -> TrainEvaluate:
    """
    path: Path to a fitted model.
    train_df: Pandas DataFrame with the training data.
    model: sklearn Pipeline with the model.
    param_grid: Dictionary of parameters to search over.
    target: Name of the column to predict.
    njobs: Number of jobs to run in parallel.
    verbose: Wheter to print the progress or not.
    Builds a TrainEvaluate object.
    """

    train_evaluate = TrainEvaluate(model, param_grid, target, njobs, verbose)
    train_evaluate = train_evaluate.fit(train_df)
    with open(path, "wb") as f:
        pickle.dump(train_evaluate, f)
    return train_evaluate