# Initialization

## Load

In [1]:
import os
import sys

sys.path.append("../")  # go to parent dir
DATA_SRC = os.path.join("..", "Data")

data_analysis = 23


In [None]:
from utils.common import CustomDataFrame

path = os.path.join(DATA_SRC, "CSV", "train.csv")

data_df = CustomDataFrame(
    [path],
    index_col="id",
    target_cols=["sii"],
    no_null_cols=["sii"],
)


# Calcula el porcentaje de valores nulos por columna
nulls_info_load_ = data_df.null_percent()


# Clean Data

### Remove Nulls (Columns/Rows)

In [None]:
from utils.common import Show

data_df = data_df.clean_nulls(0.5, show=Show.PLOT)
after_auto_clean_ = data_df.dataframe
display(after_auto_clean_)


### Manual Remove Columns

In [None]:
from pprint import pprint

drop_columns = [
    # LEKEA EL TARGET
    "PCIAT-PCIAT_Total",
    "PCIAT-PCIAT_01",
    "PCIAT-PCIAT_02",
    "PCIAT-PCIAT_03",
    "PCIAT-PCIAT_04",
    "PCIAT-PCIAT_05",
    "PCIAT-PCIAT_06",
    "PCIAT-PCIAT_07",
    "PCIAT-PCIAT_08",
    "PCIAT-PCIAT_09",
    "PCIAT-PCIAT_10",
    "PCIAT-PCIAT_11",
    "PCIAT-PCIAT_12",
    "PCIAT-PCIAT_13",
    "PCIAT-PCIAT_14",
    "PCIAT-PCIAT_15",
    "PCIAT-PCIAT_16",
    "PCIAT-PCIAT_17",
    "PCIAT-PCIAT_18",
    "PCIAT-PCIAT_19",
    "PCIAT-PCIAT_20",
]
#  display(data.dataframe.columns)

print(f"Columns Count BEFORE DROP: {len(data_df.dataframe.columns)}")
data_df.dataframe = data_df.dataframe.drop([
    col for col in drop_columns if col in data_df.dataframe.columns
])

print(f"Columns Count AFTER DROP: {len(data_df.dataframe.columns)}")
print()

print(f"Data Shape: {data_df.dataframe.shape}")
print("Data Columns:")
pprint(data_df.dataframe.columns)

data_df.null_percent()

data_null_cols_clean_ = data_df.dataframe
display(data_null_cols_clean_)


### Convert Columns

In [None]:
import polars as pl
import polars.selectors as cs

from utils.transform import date_to_cyclic

data_df.column_distribution("sii")
display(data_df["sii"].unique().sort())

ciclyc_date_columns = []
for col in ciclyc_date_columns:
    if col not in data_df.dataframe.columns:
        continue
    if data_df.dataframe[col].dtype == pl.String:
        data_df.dataframe = data_df.dataframe.with_columns(pl.col(col).str.to_datetime())
    data_df.dataframe = date_to_cyclic(data_df.dataframe, col, fill_nan=False)

print("String Columns:", data_df.get_columns_types().get(pl.String))


non_str_categories_columns = []
for col in non_str_categories_columns:
    data_df.with_columns(pl.col(col).cast(pl.String))

data_df.with_columns(cs.string().cast(pl.Categorical))
new_types = data_df.get_columns_types()
print("New Types: ")
pprint(new_types)
if True:
    dummy_vars: list[str] = []
    dummy_vars += new_types.get(pl.Categorical("physical"), [])
    dummy_vars += new_types.get(pl.Categorical("lexical"), [])
    dummy_vars = [col for col in dummy_vars if col not in data_df.target_cols]

    data_df.dataframe = data_df.dataframe.to_dummies(dummy_vars, drop_first=True)

    after_dummies_ = data_df.dataframe
    display(after_dummies_)


# Correlaciones

In [None]:
corr_wo_nulls_ = data_df.corr(False, dpi=15)


# MACHINE LEARNING

## Init

In [None]:
import polars.selectors as cs
from sklearn.ensemble import RandomForestClassifier as RandomForestClassifier

ai_df = data_df.dataframe.select(pl.exclude(data_df.index_col))
# .filter(    pl.col("ResultadoDeLaLlamada").is_in(["CU+", "CU-", "CNU"]))

# prueba.to_csv('data_selected_modelo2.csv', index=False)
# df_final = pd.read_csv("data_selected_modelo2.csv")
target = "sii"
# Features/target split
target_data = ai_df.select(pl.col(target))
display(target_data.head())

input_data = ai_df.select(cs.exclude(target))
display(input_data.head())


## Features importance

In [None]:
# Standarize the data to improve the regression's performance

# Define the model
import time
from collections.abc import Callable
from functools import wraps


def calc_time[**Args, R](func: Callable[Args, R]) -> Callable[Args, R]:
    @wraps(func)
    def wrapper(*args: Args.args, **kwargs: Args.kwargs) -> R:
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"Execution time: {end_time - start_time} seconds")
        return result

    return wrapper


def feature_importances_calc(X, y) -> None:
    model = RandomForestClassifier(n_jobs=-1)

    # Fit the model
    calc_time(model.fit)(X, y.to_numpy().ravel())

    # Obtener las importancias de las características
    importances = model.feature_importances_

    # Crear un DataFrame para mostrar la importancia de cada característica
    feature_importances = pl.DataFrame({
        "Feature": X.columns,
        "Importance": importances,
    }).sort(
        by="Importance",
        descending=True,
    )

    display(feature_importances)


print("All Features:")
feature_importances_calc(input_data, target_data)


## RandomForest

In [None]:
from numpy import ndarray
from sklearn.model_selection import train_test_split

# Train test split
print("Train test split")
data_split: tuple[pl.DataFrame, pl.DataFrame, ndarray, ndarray] = train_test_split(  # type: ignore
    input_data.select(sorted(input_data.columns)),
    target_data.to_numpy().ravel(),
    test_size=0.2,
    random_state=42,
    shuffle=True,
)
input_train, input_test, target_train, target_test = data_split
print(f"X_train shape: {input_train.shape}")
print(f"X_test  shape: {input_test.shape}")
display(
    pl.Series(name="cats", values=target_train)
    .value_counts()
    .sort(by="count")
    .plot.bar(x="cats", y="count")
)


## Predictions analysis

* Confusion matrix and testing metrics
* Output probabilities distribution
* Feature importances using different criteria

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from pretty_confusion_matrix import pp_matrix_from_data
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
    RocCurveDisplay,
    accuracy_score,
    fbeta_score,
    precision_score,
    recall_score,
)

display(target_test)
display(type(target_test))


def train_rnd_forest(X_train, y_train) -> RandomForestClassifier:
    print("Start training the model")
    model = RandomForestClassifier(
        n_estimators=25,
        class_weight="balanced_subsample",
        random_state=42,
        n_jobs=-1,
    )
    model.fit(X_train, y_train)
    return model


def print_metrics(
    model: RandomForestClassifier,
    input_test: pl.DataFrame,
    target_test: ndarray,
) -> None:
    print("Model Predictions")
    predictions = model.predict(input_test)

    print("Accuracy: ", accuracy_score(target_test, predictions))
    print("Fbeta score: ", fbeta_score(target_test, predictions, beta=0.5, average="weighted"))
    print("Recall score: ", recall_score(target_test, predictions, average="weighted"))
    print("Precision score: ", precision_score(target_test, predictions, average="weighted"))

    figsize = len(model.classes_) + 2
    pp_matrix_from_data(
        target_test,
        predictions,
        columns=model.classes_.tolist(),
        cmap="PuRd",
        figsize=(figsize, figsize),
    )
    if len(model.classes_) == 2:
        RocCurveDisplay.from_estimator(
            model,
            input_test,
            target_test,
            pos_label=model.classes_[0],
        )
        plt.show()
        print()


def model_analisys(
    input_train: pl.DataFrame,
    target_train: ndarray,
    input_test: pl.DataFrame,
    target_test: ndarray,
    labels: list | None = None,
    model: RandomForestClassifier | None = None,
) -> tuple[RandomForestClassifier, tuple[ndarray, ndarray]]:
    data_train = target_train
    data_test = target_test

    if labels:
        others_labels = np.unique(target_train).tolist()

        correct_label = ",".join(map(str, labels))
        other_label = ",".join([label for label in others_labels if label not in labels])
        other_label = "Other" if len(other_label) > len(correct_label) * 1.5 else other_label

        conversor = np.vectorize(lambda x: correct_label if x in labels else other_label)
        data_train = conversor(data_train)
        data_test = conversor(data_test)

    labels = [str(label) for label in np.unique(data_train).tolist()]

    print(f"Model to classify [ {' | '.join(labels)} ]")
    if model is None:
        # model = train_rnd_forest(input_train, data_train)
        model = calc_time(train_rnd_forest)(input_train, data_train)
    print_metrics(model, input_test, data_test)
    return model, (data_train, data_test)


print("FULL Data    ", np.unique(target_train))
print()

# full category model
model, (data_train, data_test) = model_analisys(
    input_train,
    target_train,
    input_test,
    target_test,
)


In [None]:
importances = model.feature_importances_

# Si tus datos tienen nombres de columnas (por ejemplo, si estás usando un DataFrame de pandas)
# De lo contrario, puedes usar una lista con los nombres de las características
features = input_train.columns

# Crear un DataFrame con las características y sus importancias
feature_importances = pl.DataFrame({"Feature": features, "Importance": importances})

# Ordenar por importancia
feature_importances = feature_importances.sort(by="Importance", descending=True)

# Mostrar el DataFrame
display(feature_importances)

# Graficar las importancias de las características
plt.figure(figsize=(10, 15))
plt.barh(feature_importances["Feature"], feature_importances["Importance"], color="skyblue")
plt.xlabel("Importance")
plt.title("Feature Importances from Random Forest")
plt.gca().invert_yaxis()  # Para que las características más importantes estén en la parte superior
plt.show()


## Cross Validation

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import KFold, cross_val_score

if True:
    # Definir el modelo de Random Forest
    rf_model = RandomForestClassifier(class_weight="balanced_subsample", random_state=42, n_jobs=-1)

    # Definir el número de folds para la validación cruzada (por ejemplo, 5)
    kfold = KFold(n_splits=5, shuffle=True, random_state=42)

    # Realizar la validación cruzada usando 'accuracy' como métrica
    print("Iniciando validación cruzada")
    print("---------------------------")

    cv_scores = cross_val_score(
        rf_model,
        input_train,
        target_train,
        cv=kfold,
        scoring="accuracy",
        n_jobs=-1,
    )
    # Mostrar los resultados
    print(f"Resultados de cada fold: {cv_scores}")
    print(f"Media de accuracy: {np.mean(cv_scores):.2f}")
    print(f"Desviación estándar de accuracy: {np.std(cv_scores):.2f}")

    print("---------------------------")
    print()

    # Comparar la media con el rendimiento en el conjunto de prueba
    print("Comparando con el conjunto de prueba")
    print("---------------------------")

    rf_model.fit(input_train, data_train)
    test_score = rf_model.score(input_test.select(sorted(input_test.columns)), data_test)
    print(f"Accuracy en el conjunto de prueba: {test_score:.2f}")

    print("---------------------------")
