<a href="https://colab.research.google.com/github/SirSirocco/DataScience_2025_1/blob/main/experiment_plan.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# DEPENDÊNCIAS GERAIS

In [54]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import warnings
import seaborn as sns

from google.colab import drive
from itertools import product
from sklearn.base import clone
from sklearn.decomposition import PCA
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import KFold
from sklearn.preprocessing import StandardScaler

# FUNÇÕES

## Utilitário

In [55]:
def flatten_dict(dictionary: dict, fields_to_flatten: list[str]) -> dict:
    """
    Desestrutura os campos selecionados de um dicionário, retornando apenas os atributos desejados.

    :param dictionary: Dicionário original que contém os campos a serem desestruturados.
    :param fields_to_flatten: Lista de nomes dos campos que devem ser desestruturados (flatten).

    :return: Novo dicionário contendo apenas os atributos dos campos selecionados, combinados em um nível.
    """
    dict_flat = dict()

    for field in fields_to_flatten:
        value = dictionary.get(field)
        if isinstance(value, dict):
            dict_flat.update(value)

    return dict_flat

def dict_to_flat_df(dictionary: dict,
                    fields_to_flatten: list[str],
                    key_name: str = "key") -> pd.DataFrame:
    """
    Converte um dicionário com estrutura aninhada em um DataFrame tabular, desestruturando
    apenas os campos especificados.

    :param dictionary: Dicionário onde cada chave representa um identificador único
                       e cada valor é um dicionário com possíveis campos aninhados.
    :param fields_to_flatten: Lista de nomes de campos a serem desestruturados. Cada um deve
                               corresponder a uma chave cujo valor é um dicionário.
    :param key_name: Nome da coluna que armazenará os identificadores do dicionário original.
                     Padrão é "key".

    :return: DataFrame com uma coluna para os identificadores e colunas adicionais contendo os
             atributos resultantes da desestruturação dos campos selecionados.
    """
    rows = list()

    for identifier, nested_dict in dictionary.items():
        flat_fields = flatten_dict(nested_dict, fields_to_flatten)
        flat_fields[key_name] = identifier
        rows.append(flat_fields)

    df = pd.DataFrame(rows)
    df = df[[key_name] + [col for col in df.columns if col != key_name]]  # Coloca a chave primeiro
    return df

def dict_combinations(dictionary):
    # Passo 1: Obtemos as chaves do dicionário (ex: ["key1", "key2"])
    keys = list(dictionary.keys())

    # Passo 2: Obtemos os valores associados a cada chave (listas de possibilidades)
    # Exemplo: [["a", "b", "c"], ["d", "e", "f"]]
    values = list(dictionary.values())

    # Passo 3: Calculamos o produto cartesiano dessas listas de valores
    # Isso gera todas as combinações possíveis, como ("a", "d"), ("a", "e"), ..., ("c", "f")
    combinations = product(*values)

    # Passo 4: Para cada combinação, associamos os valores às suas respectivas chaves
    # Isso é feito com a função zip, e transformamos o resultado em um dicionário
    # Resultado final: uma lista de dicionários, cada um representando uma combinação possível
    return [dict(zip(keys, combo)) for combo in combinations]

## Exibição

In [None]:
def df_show_domain(df: pd.DataFrame, columns: list[str]) -> None:
    """
    Exibe o tipo e os valores possíveis de columns segundo a ordem da lista de
    colunas.

    :param df: DataFrame a ter os domínios exibidos.
    :param columns: Lista dos nomes das colunas a serem exibidas.
    :return: None
    """
    for col in columns:
        print(f"Coluna:   {col}")
        print(f"dtype:    {df[col].dtype}")
        print(f"Domínio:  {df[col].unique()}\n")

def df_show_head(df: pd.DataFrame, n: int = 5) -> None:
    display(df.head(n))
    print(f"Shape: {df.shape}")

def df_show_null(df: pd.DataFrame) -> None:
    display(df.isnull().sum())

## Validação

In [42]:
def is_valid_col_scenario(col_scenario: dict):
    return col_scenario is not None

def is_valid_series(s: pd.Series):
    return s is not None and (not s.empty) and s.notnull().all() and pd.api.types.is_numeric_dtype(s)

## Pré-Processamento

In [None]:
def preprocessing_map(df: pd.DataFrame, features: list[str], maps: list[dict]) -> pd.DataFrame:
    """
    Aplica mapeamentos personalizados a múltiplas colunas de um DataFrame.

    :param df: DataFrame original com os dados a serem transformados.
    :param features: Lista de nomes de colunas que serão mapeadas.
    :param maps: Lista de dicionários contendo os mapeamentos a serem aplicados, um para cada coluna,
                 respeitando a ordem das colunas em features.

    :return: Novo DataFrame com os mapeamentos aplicados nas colunas especificadas.
    """
    df_copy = df.copy()

    for feature, feature_map in zip(features, maps):
        df_copy[feature] = df[feature].map(feature_map)

    return df_copy

def subs_na_mean(df: pd.DataFrame, columns: list[str]) -> pd.DataFrame:
    df_copy = df.copy()
    for col in columns:
        mean = df[col].mean()
        df_copy[col] = df[col].fillna(mean)
    return df_copy

def subs_outliers(df: pd.DataFrame, columns: list[str], view:bool = False) -> pd.DataFrame:
    """
    Substitui outliers pela média. Se view for True, exibe
    dois gráficos para cada coluna: antes e depois da mudança.
    """

    # Guarda resultado anterior
    df_before = df.copy()

    for column in columns:
        # Obtém quartis 1 e 3
        q1 = df[column].quantile(0.25)
        q3 = df[column].quantile(0.75)

        # Obtém IQR
        iqr = q3 - q1

        """
        Outlier se em (-inf, Q1 - 1.5 * IQR) ou (Q3 + 1.5 * IQR, +inf).
        Subsititui outliers por NA ('not available') segundo método IQR (InterQuartile Range).
        """
        df.loc[(df[column] < q1 - 1.5 * iqr) | (df[column] > q3 + 1.5 * iqr), column] = pd.NA

        # Substitui NAs pela média
        df[column] = df[column].fillna(df[column].mean())

    if view:
        for column in columns:
            # Exibe resultado inicial
            plt.boxplot(df_before[column])
            plt.title(f"ANTES: {column}")
            plt.show()

            # Exibe novo resultado
            plt.boxplot(df[column])
            plt.title(f"DEPOIS: {column}")
            plt.show()

    return df

## Casos de pré-processamento

In [13]:
def apply_column_removal(df, X, y, col_scenario, prefix, radixes):
    key = prefix
    if is_valid_col_scenario(col_scenario):
        columns = col_scenario["columns"]
        df = df.drop(columns=columns)
        key += col_scenario["name"]
    else:
        key += radixes["absent"]
        columns = None
    return df, X, y, key, {"col_rem": columns}

def apply_feature_engineering(df, X, y, feat_eng, prefix, radixes):
    key = prefix
    if is_valid_series(feat_eng):
        df = pd.concat([df, feat_eng], axis=1)
        key += f"_{feat_eng.name}"
        feat_name = feat_eng.name
    else:
        key += radixes["absent"]
        feat_name = None
    return df, X, y, key, {"feat_eng": feat_name}

def apply_outliers(df, X, y, out, prefix, radixes):
    key = prefix
    if out:
        df = subs_outliers(df, df.columns, view=False)
        key += radixes["True"]
    else:
        key += radixes["False"]
    X = df.drop(columns=[y.name])
    y = df[y.name]
    return df, X, y, key, {"outliers": out}

apply_normalization_scaler = StandardScaler()
def apply_normalization(df, X, y, norm, prefix, radixes):
    key = prefix

    if norm:
        X = pd.DataFrame(apply_normalization_scaler.fit_transform(X), columns=X.columns, index=X.index)
        key += radixes["True"]
    else:
        key += radixes["False"]
    return df, X, y, key, {"normalization": norm}

def apply_pca(df, X, y, pca_n, prefix, radixes):
    key = f"{prefix}{pca_n}"
    pca_n = min(len(X.columns), pca_n)
    pca_obj = PCA(n_components=pca_n)
    X_pca = pca_obj.fit_transform(X)
    return df, X_pca, y, key, {"pca": pca_n}

def build_key(key_parts, separator="_"):
    return separator.join(key_parts)

def get_preprocessing_cases(df: pd.DataFrame,
                            target: str,
                            random_seed: int,
                            col_scenario_list: list[dict],
                            feat_eng_list: list[pd.Series],
                            outliers: list[bool],
                            normalization: list[bool],
                            pca: list[int]) -> dict:
    """
    Gera múltiplos cenários de pré-processamento combinando engenharia de atributos,
    tratamento de outliers, normalização e redução de dimensionalidade (PCA).

    Cada cenário gera um conjunto `X`, `y` e um dicionário com parâmetros das transformações aplicadas.
    As chaves do dicionário final descrevem o pipeline aplicado via prefixos e sufixos codificados.

    :param df: DataFrame original contendo os dados brutos.
    :param target: Nome da coluna alvo (variável dependente) no DataFrame.
    :param random_seed: Semente para controle de aleatoriedade (reprodutibilidade).
    :param feat_eng_list: Lista de séries representando atributos derivados para engenharia de características.
                          Pode conter séries válidas ou None para ausência de feature engineering.
    :param outliers: Lista indicando se o tratamento de outliers deve ser aplicado.
    :param normalization: Lista indicando se a normalização (z-score) deve ser aplicada.
    :param pca: Lista com números de componentes principais a serem testados via PCA.

    :return: Dicionário cujas chaves são strings descritivas do pipeline (ex: 'FEna_OUT1_NORM0_PCA2'), e
             os valores são dicionários contendo:
             - "X": DataFrame com os atributos processados.
             - "y": Série da variável alvo.
             - "params": Parâmetros que descrevem as transformações aplicadas no cenário.
    """
    PREFIXES = {"col_rem": "COLRM", "feat_eng": "FE", "outliers": "OUT", "normalization": "NORM", "pca": "PCA"}
    RADIXES = {"absent": "na", "True": "1", "False": "0"}
    dict_cases = dict()
    df_original = df.copy()

    np.random.seed(random_seed) # Fixa semente aleatória

    for col_scn, feat_eng, out, norm, pca_n in product(col_scenario_list, feat_eng_list, outliers, normalization, pca):
        # Etapas de pré-processamento
        param_sequence = [
            (apply_column_removal, col_scn, PREFIXES["col_rem"]),
            (apply_feature_engineering, feat_eng, PREFIXES["feat_eng"]),
            (apply_outliers, out, PREFIXES["outliers"]),
            (apply_normalization, norm, PREFIXES["normalization"]),
            (apply_pca, pca_n, PREFIXES["pca"]),
        ]

        key_parts = list()
        df = df_original.copy()
        X = df.drop(columns=[target])
        y = df[target]
        dict_params = dict()

        # Executa etapas
        for func, param, prefix in param_sequence:
            df, X, y, key, update = func(df, X, y, param, prefix, RADIXES)
            key_parts.append(key)
            dict_params.update(update)

        key = build_key(key_parts)
        dict_cases[key] = {"X": X, "y": y, "params": dict_params}

    return dict_cases

## Validação Cruzada

In [None]:
def cross_val(model, X, y, n_splits=5, metric="r2", shuffle=True, random_state=None):
    """
    Realiza validação cruzada com K-Fold para treinar e avaliar um modelo,
    utilizando uma métrica especificada para avaliar o desempenho em cada fold.
    O modelo é clonado e treinado novamente a cada divisão dos dados.

    Args:
        model (sklearn.base.BaseEstimator): Modelo que implementa os métodos `fit` e `predict`.
        X (pandas.DataFrame): Conjunto de dados de entrada (features).
        y (pandas.Series): Vetor alvo (target) correspondente às amostras em `X`.
        n_splits (int, optional): Número de folds da validação cruzada. Default é 5.
        metric (str, optional): Métrica de avaliação a ser utilizada.
            Opções válidas: "r2", "mse", "mae". Caso a métrica seja inválida, será usada a default: "r2".
        shuffle (bool, optional): Se True, embaralha os dados antes de dividir em folds. Default é True.
        random_state (int or None, optional): Semente usada para o embaralhamento. Default é None (comportamento randômico).

    Returns:
        tuple:
            models (list): Lista de modelos treinados, um para cada fold.
            metrics (list): Lista de valores da métrica de avaliação para cada fold.

    Note:
        Se uma métrica inválida for fornecida, a função utilizará "r2" como padrão, sem gerar erro.
    """
    DICT_METRICS = {
        "r2":   r2_score,
        "mse":  mean_squared_error,
        "mae":  mean_absolute_error
    }
    models = list()
    metrics = list()
    metric_to_use = DICT_METRICS.get(metric, r2_score)

    # Define os índices que delimitam as partições
    kfolds = KFold(n_splits=n_splits, shuffle=shuffle, random_state=random_state)

    for i, (train_index, test_index) in enumerate(kfolds.split(X)):
        X_train, X_test = X.iloc[train_index], X.iloc[test_index]
        y_train, y_test = y.iloc[train_index], y.iloc[test_index]

        copy = clone(model)
        copy.fit(X_train, y_train)

        models.append(copy)
        metrics.append(metric_to_use(y_test, copy.predict(X_test))) # Calcula métrica

    return models, metrics

## Casos de Modelo

In [None]:
def get_model_cases(model,
                    dict_cases_pre: dict,
                    params: dict,
                    fold_num: int,
                    cross_val_ref: str,
                    random_seed: int) -> dict:
    results = dict()

    for base_key, data in dict_cases_pre.items():
        X = data["X"]
        y = data["y"]

        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=train_size, random_state=random_seed)

        for metric, k in product(metrics, k_values):
            knn = KNeighborsClassifier(n_neighbors=k, metric=metric)
            knn.fit(X_train, y_train)
            y_pred = knn.predict(X_test)

            metric_dict = get_metrics_classification(y_test, y_pred, classes, view=False)
            new_key = f"{base_key}_KNN_MET{metric}_K{k}"

            results[new_key] = {
                "X": X,
                "y": y,
                "params": {**data["params"], "knn_metric": metric, "knn_k": k},
                "metrics": metric_dict
            }

    return results

# SETUP DO AMBIENTE

In [14]:
drive.mount("/content/drive")

Mounted at /content/drive


# OBTENÇÃO DO DATASET

In [16]:
PATH = "/content/drive/MyDrive/07_per_shared/projCDat_25_1/datasets/cooked/_all/all_merged.csv"
df = pd.read_csv(PATH)
df_show_head(df)

Unnamed: 0,_ano,_estado,_mes,car_c02_emitido,cli_pressao_atm_med,cli_temp_ar_med,cli_temp_orvalho_med,cli_umid_rel_med,cli_umid_rel_min_max,cli_umid_rel_min_med,cli_umid_rel_min_min,cli_veloc_vento_max,cli_veloc_vento_med,que_area_queimada,que_focos_qtd
0,2008,AC,7,26276980.0,986.843612,28.142731,18.914978,59.555066,95.0,54.432558,29.0,5.1,2.152915,4957.0,165.0
1,2008,AC,9,26276980.0,991.705941,24.446194,19.467987,75.811881,97.0,72.4967,25.0,1.0,0.210504,46073.0,2947.0
2,2008,AC,10,26276980.0,990.32836,25.229298,21.617473,81.870968,96.0,78.72043,29.0,1.0,0.204959,30355.0,856.0
3,2008,AC,11,26276980.0,988.610987,25.19541,22.624478,86.905424,96.0,84.048679,42.0,1.0,0.18697,2082.0,63.0
4,2008,AC,12,26276980.0,988.692608,24.89879,22.727554,88.52957,96.0,86.116935,53.0,1.0,0.179442,127.0,4.0


Shape: (1025, 15)


# CODIFICAÇÃO

In [27]:
# Codificação OneHot simplificada
TO_ENCODE = ["_estado"]
df_encoded = pd.get_dummies(df, columns=TO_ENCODE, dtype="Int32")

df_show_head(df_encoded)

Unnamed: 0,_ano,_mes,car_c02_emitido,cli_pressao_atm_med,cli_temp_ar_med,cli_temp_orvalho_med,cli_umid_rel_med,cli_umid_rel_min_max,cli_umid_rel_min_med,cli_umid_rel_min_min,...,cli_veloc_vento_med,que_area_queimada,que_focos_qtd,_estado_AC,_estado_AM,_estado_AP,_estado_PA,_estado_RO,_estado_RR,_estado_TO
0,2008,7,26276980.0,986.843612,28.142731,18.914978,59.555066,95.0,54.432558,29.0,...,2.152915,4957.0,165.0,1,0,0,0,0,0,0
1,2008,9,26276980.0,991.705941,24.446194,19.467987,75.811881,97.0,72.4967,25.0,...,0.210504,46073.0,2947.0,1,0,0,0,0,0,0
2,2008,10,26276980.0,990.32836,25.229298,21.617473,81.870968,96.0,78.72043,29.0,...,0.204959,30355.0,856.0,1,0,0,0,0,0,0
3,2008,11,26276980.0,988.610987,25.19541,22.624478,86.905424,96.0,84.048679,42.0,...,0.18697,2082.0,63.0,1,0,0,0,0,0,0
4,2008,12,26276980.0,988.692608,24.89879,22.727554,88.52957,96.0,86.116935,53.0,...,0.179442,127.0,4.0,1,0,0,0,0,0,0


Shape: (1025, 21)


# ENGENHARIA DE CARACTERÍSTICAS

In [28]:
"""
Para capturar a intensidade dos focos de incêndio, dividimos a área total queimada
pela quantidade de focos. Talvez essa característica derivada revele nuances aos modelos
que não seriam perceptíveis pela consideração individualizada de ambos os atributos.
"""
se_intensity = df_encoded["que_area_queimada"] / (df_encoded["que_focos_qtd"])
se_intensity.name = "que_focos_intensidade"
display(se_intensity)

Unnamed: 0,que_focos_intensidade
0,30.042424
1,15.633865
2,35.461449
3,33.047619
4,31.750000
...,...
1020,281.112392
1021,154.337561
1022,101.463881
1023,396.212500


# SELEÇÃO DE COLUNAS

In [39]:
scn_not_corr = {
    "name": "not_corr",
    "columns": [
        "cli_temp_orvalho_med",
        "cli_umid_rel_med",
        "cli_umid_rel_min_min",
        "cli_veloc_vento_max"
    ]
}

scn_stateless = {
    "name": "stateless",
    "columns": [col for col in df_encoded.columns if col.startswith("_estado")]
}

# SETUP

In [40]:
# Constantes simbólicas
PRE =   "pre"     # Pré-processamento
RF =    "randfor" # Random Forest
XGB =   "xgboost" # XGBoost
MLR =   "mlinreg" # Regressão Linear Múltipla
ALL =   "all"     # Todos os modelos considerados simultaneamente

# Dicionários para armazenar os resultados
dict_cases = dict()
dict_df_cases = dict()

# PARÂMETROS GERAIS
RANDOM_SEED = 42
TRAIN_SIZE = 0.30
TARGET = "que_area_queimada"

# CASOS

## CASOS DE PRÉ-PROCESSAMENTO

In [48]:
# Parâmetros auxiliares (mude se desejar)
SCENARIOS = [None, scn_not_corr , scn_stateless]
FEATURE_ENGINEERING = [None, se_intensity]
OUTLIERS = [False, True]
NORMALIZATION = [False, True]
PCA_LIST = [2, 4, 6, 8, 10, 12, 15, 18, 20]

# Obtém casos de pré-processamento
dict_cases[PRE] = get_preprocessing_cases(df_encoded,
                        TARGET,
                        RANDOM_SEED,
                        SCENARIOS,
                        FEATURE_ENGINEERING,
                        OUTLIERS,
                        NORMALIZATION,
                        PCA_LIST
                        )
dict_df_cases[PRE] = pd.DataFrame(data=dict_cases[PRE]).T

df_show_head(dict_df_cases[PRE])

Unnamed: 0,X,y,params
COLRMna_FEna_OUT0_NORM0_PCA2,"[[-81243294.26560223, -361.8358884636149], [-8...",0 4957.0 1 46073.0 2 303...,"{'col_rem': None, 'feat_eng': None, 'outliers'..."
COLRMna_FEna_OUT0_NORM0_PCA4,"[[-81243294.26560223, -361.835888463615, -23.6...",0 4957.0 1 46073.0 2 303...,"{'col_rem': None, 'feat_eng': None, 'outliers'..."
COLRMna_FEna_OUT0_NORM0_PCA6,"[[-81243294.26560223, -361.835888463615, -23.6...",0 4957.0 1 46073.0 2 303...,"{'col_rem': None, 'feat_eng': None, 'outliers'..."
COLRMna_FEna_OUT0_NORM0_PCA8,"[[-81243294.26560223, -361.835888463615, -23.6...",0 4957.0 1 46073.0 2 303...,"{'col_rem': None, 'feat_eng': None, 'outliers'..."
COLRMna_FEna_OUT0_NORM0_PCA10,"[[-81243294.26560223, -361.835888463615, -23.6...",0 4957.0 1 46073.0 2 303...,"{'col_rem': None, 'feat_eng': None, 'outliers'..."


Shape: (216, 3)


## CASOS DE KNN

In [None]:
# Parâmetros auxiliares (mude se desejar)
METRICS = ["euclidean", "manhattan", "cosine"]
K_VALUES = [k for k in range(1, 22, 2)]

# Obtém casos de KNN
dict_cases[KNN] = get_knn_cases(dict_cases[PRE],
                               METRICS,
                               K_VALUES,
                               CLASSES,
                               TRAIN_SIZE,
                               RANDOM_SEED
                               )
dict_df_cases[KNN] = pd.DataFrame(data=dict_cases[KNN]).T

df_show_head(dict_df_cases[KNN])

# GERAÇÃO DA PLANILHA

In [None]:
FLATTEN = ["params", "metrics"]
KEY_NAME = "case"
dict_df_final = dict()

for key, value in dict_cases.items():
    dict_df_final[key] = dict_to_flat_df(value, FLATTEN, KEY_NAME)
    # df_show_head(dict_df_final[key])

cases_to_cmp = [dict_df_final[key] for key in [KNN, DTREE, LOGIST]]
dict_df_final[ALL] = pd.concat(cases_to_cmp, axis=0).reset_index(drop=True)
dict_df_final[ALL].dropna(axis=1, inplace=True)

df_show_head(dict_df_final[ALL])