In [1]:
import os
from kedro.framework.startup import bootstrap_project
from kedro.framework.session import KedroSession
import warnings

warnings.filterwarnings("ignore")

# Encontrar o caminho absoluto do diretório atual
notebook_cwd = os.getcwd()

# Definir o caminho correto para a raiz do projeto Kedro
project_path = r"c:\Users\gufer\OneDrive\Documentos\FIAP\Fase_03\mtg-project"

# Verificar o diretório atual e o caminho do projeto
print(f"Notebook current working directory: {notebook_cwd}")
print(f"Project path: {project_path}")

# Alterar para o diretório raiz do projeto Kedro
os.chdir(project_path)

# Bootstrap o projeto Kedro
bootstrap_project(project_path)

# Inicialize o contexto do Kedro
with KedroSession.create() as session:
    context = session.load_context()

# Recarregar o catálogo
catalog = context.catalog

# Acessar os parâmetros
params = context.params

# Listar o catálogo
catalog.list()

Notebook current working directory: c:\Users\gufer\OneDrive\Documentos\FIAP\Fase_03\mtg-project\notebooks\gmferratti\pipeline
Project path: c:\Users\gufer\OneDrive\Documentos\FIAP\Fase_03\mtg-project


In [2]:
from typing import List, Dict
from classes.deck import Deck
from classes.player import Player
from classes.player_tracker import PlayerTracker
from src.mtg_project.pipelines.utils import setup_logger
import pandas as pd
import numpy as np
from faker import Faker
import random

In [3]:

def get_last_file(partitioned_data):
    """
    Obtém o last file da partição mais recente de um PartitionedDataset.

    Args:
        partitioned_data (dict): Dicionário com as partições de dados.

    Returns:
        Any: O ultimo arquivo da partição mais recente.
    """
    # Obter a chave da partição mais recente
    latest_partition = max(partitioned_data.keys())
    
    # Obter os dados da partição mais recente
    data = partitioned_data[latest_partition]
    
    return data

In [6]:
with KedroSession.create(env="local", project_path=project_path) as session:
    session.run(pipeline_name="modeling")

In [10]:
catalog.load("sampled_decks")


[1m[[0m
    Player:
  Name: William Clark
  Deck: [3;35mNone[0m
  Turn: [1;36m0[0m
  Hand: [1;36m0[0m cards
  Library: [1;36m0[0m cards
  Lands played: [1;36m0[0m
  Mana pool: [1;36m0[0m
  Spells played: [1;36m0[0m
,
    Player:
  Name: Tara White
  Deck: [3;35mNone[0m
  Turn: [1;36m0[0m
  Hand: [1;36m0[0m cards
  Library: [1;36m0[0m cards
  Lands played: [1;36m0[0m
  Mana pool: [1;36m0[0m
  Spells played: [1;36m0[0m
,
    Player:
  Name: Christine Parrish
  Deck: [3;35mNone[0m
  Turn: [1;36m0[0m
  Hand: [1;36m0[0m cards
  Library: [1;36m0[0m cards
  Lands played: [1;36m0[0m
  Mana pool: [1;36m0[0m
  Spells played: [1;36m0[0m
,
    Player:
  Name: Katie Meyer
  Deck: [3;35mNone[0m
  Turn: [1;36m0[0m
  Hand: [1;36m0[0m cards
  Library: [1;36m0[0m cards
  Lands played: [1;36m0[0m
  Mana pool: [1;36m0[0m
  Spells played: [1;36m0[0m

[1m][0m

In [4]:
def create_players(n_players: int):
    """
    Cria uma lista de objetos Player com nomes aleatórios.

    Args:
        n_players (int): Número de jogadores a serem criados.

    Returns:
        List[Player]: Lista de objetos Player com nomes gerados aleatoriamente.
    """
    # Inicializando o gerador de dados falsos Faker
    fake = Faker()
    
    # Gerando uma lista de nomes aleatórios usando o Faker
    player_names = [fake.first_name() + " " + fake.last_name() for _ in range(n_players)]
    
    # Criando uma lista de objetos Player a partir dos nomes gerados
    players = [Player(name) for name in player_names]

    # Retornando a lista de objetos Player
    return players

n_players = catalog.load("params:simulation.n_players")
players = create_players(n_players)
catalog.save("players", players)

In [5]:
import random
import os
from typing import List, Dict

def assign_decks_to_players(
        players: List[Player], 
        sampled_decks: Dict[str, str],
        log_folder: str) -> List[Player]:
    """
    Função para atribuir decks aleatórios a cada player na lista de players.

    A função tentará atribuir um deck a cada player chamando o método assign_deck().
    Caso ocorra algum erro na atribuição, tentará com outro deck disponível.

    Args:
        players (list): Lista de objetos Player.
        sampled_decks (dict): Dicionário com os nomes e caminhos dos decks.
        log_folder (str): Caminho da pasta para salvar o log.

    Returns:
        List[Player]: Lista de objetos Player com decks atribuídos.
    """
    # Caminho do arquivo de log
    log_filepath = os.path.join(log_folder, 'decks_assignment.txt')

    # Cria a pasta de log se ela não existir
    os.makedirs(log_folder, exist_ok=True)

    # Configura o logger geral
    logger = setup_logger("validate_decks", log_filepath)
    
    # Log de início da validação
    logger.info("Validating decks...")

    # Convertemos as chaves do dicionário para uma lista de nomes de decks disponíveis
    available_decks = list(sampled_decks.keys())
    
    for player in players:
        assigned = False
        while not assigned and available_decks:
            try:
                # Seleciona um deck aleatório da lista de decks disponíveis
                deck_name = random.choice(available_decks)

                # Obter o caminho completo do deck a partir do dicionário sampled_decks
                deck_path = sampled_decks[deck_name]

                # Cria um novo objeto Deck
                deck = Deck()

                # Carrega o deck a partir do arquivo .txt no caminho obtido
                deck.load_deck_from_txt(deck_path)

                # Atribui o deck ao player
                player.assign_deck(deck)
                logger.info(f"Deck '{deck_name}' assigned to player '{player.name}'")
                
                # Remove o deck da lista de decks disponíveis para evitar reutilização
                available_decks.remove(deck_name)

                assigned = True  # Deck atribuído com sucesso
            except Exception as e:
                # Em caso de erro, tenta outro deck
                logger.error(f"Failed to assign deck '{deck_name}' to player '{player.name}': {e}")
                continue
        
        
        # Se não houver mais decks disponíveis e não conseguir atribuir, lança um erro
        if not assigned:
            raise ValueError(f"No available decks left to assign to player '{player.name}'.")

    logger.info("Deck assignment process completed.")

    return players

players = catalog.load("players")
sampled_decks = catalog.load("sampled_decks")
log_folder = catalog.load("params:simulation.log_folder")
# players_with_decks = assign_decks_to_players(players, sampled_decks, log_folder)
# catalog.save("players_with_decks", players_with_decks)

In [6]:
def simulate_player_matches(params: dict, players_with_decks: list) -> pd.DataFrame:
    """
    Simulates Magic: The Gathering matches for a list of players based on the provided simulation parameters.

    Parameters:
    -----------
    params : dict
        A dictionary containing the simulation parameters, including:
        - 'max_mulligans': Maximum number of mulligans allowed per player.
        - 'mulligan_prob': Probability of a player choosing to mulligan.
        - 'hand_size_stop': Minimum hand size at which the simulation will stop.
        - 'max_turns': Maximum number of turns per match.
        - 'extra_land_prob': Probability of playing an extra land during a turn.
        - 'matches_per_player': Number of matches to simulate per player.
        - 'log_folder': Folder path for logging the simulation process.
    
    players_with_decks : list
        A list of Player objects, each with an assigned deck to be used in the simulation.

    Returns:
    --------
    pd.DataFrame
        A DataFrame containing the match data for all players across all matches and turns, including:
        - Player attributes at each turn.
        - Match number for each simulation.
    """
    
    # Atribuir os parâmetros
    max_mulligans = params["max_mulligans"]
    mulligan_prob = params["mulligan_prob"]
    hand_size_stop = params["hand_size_stop"]
    max_turns = params["max_turns"]
    extra_land_prob = params["extra_land_prob"]
    matches_per_player = params["matches_per_player"]
    log_folder = params["log_folder"]

    # Caminho do arquivo de log
    log_filepath = os.path.join(log_folder, 'player_matches.txt')

    # Cria a pasta de log se ela não existir
    os.makedirs(log_folder, exist_ok=True)

    # Configura o logger geral
    logger = setup_logger("player_matches", log_filepath)

    # Log de início da validação
    logger.info("Initiating simulations...")

    # Inicializa o tracker para armazenar os dados
    tracker = PlayerTracker()

    # Loop através dos jogadores e realizar as simulações de partidas
    for player in players_with_decks:
        for match in range(matches_per_player):
            # Simular várias partidas para o jogador
            player.play_a_match(tracker, 
                                max_mulligans, 
                                mulligan_prob, 
                                max_turns, 
                                hand_size_stop, 
                                extra_land_prob)

    # Obter os dados de todas as partidas e turnos
    matches_df = tracker.get_data()
    
    return matches_df

# Chamada da função
params = catalog.load("params:simulation")
players_with_decks = catalog.load("players_with_decks")
selected_features_df = simulate_player_matches(params, players_with_decks)
catalog.save("matches_df",selected_features_df)

In [7]:
import pandas as pd

def feature_engineering(matches_df: pd.DataFrame) -> pd.DataFrame:
    """
    Realiza engenharia de features nos dados das partidas de Magic: The Gathering, com lag features 
    e rolling features aplicadas separadamente por jogador e partida, iniciando a contagem a partir
    do último turno 0 em caso de mulligan.

    Args:
        matches_df (pd.DataFrame): DataFrame contendo os dados das partidas.

    Retorna:
        pd.DataFrame: DataFrame com novas features calculadas.
    """

    # Configura o logger geral
    logger = setup_logger("feature_engineering")

    logger.info("Criando variáveis cumulativas por jogador e partida...")

    # Garantir que 'spent_mana' esteja no formato correto
    matches_df["spent_mana"] = matches_df["spent_mana"].astype(int)

    # Criação de variáveis cumulativas por 'name' e 'match'
    matches_df['cum_mana_pool'] = matches_df.groupby(['name', 'match'])['mana_pool'].cumsum()
    matches_df["cum_spent_mana"] = matches_df.groupby(['name', 'match'])["spent_mana"].cumsum()

    logger.info("Criando variáveis de razão...")

    # Criação de variáveis baseadas em razões: feitiços por turno e terrenos por turno
    matches_df['spell_ratio'] = (matches_df['spells_played'] / (matches_df['turn'] + 1)).round(2)
    matches_df['land_ratio'] = (matches_df['lands_played'] / (matches_df['turn'] + 1)).round(2)

    logger.info("Criando variável de eficiência da curva de mana...")

    # Criação da variável de eficiência da curva de mana (razão entre mana gasto e mana acumulado)
    matches_df['mana_curve_efficiency'] = matches_df['cum_spent_mana'] / matches_df['cum_mana_pool']

    # Tratamento de valores infinitos e valores ausentes
    matches_df['mana_curve_efficiency'].replace([float('inf'), -float('inf')], 0, inplace=True)
    matches_df['mana_curve_efficiency'].fillna(0, inplace=True)
    matches_df['mana_curve_efficiency'] = matches_df['mana_curve_efficiency'].round(2)

    logger.info("Identificando o último turno 0 por jogador e partida...")

    # Encontrar o último turno 0 em cada partida
    matches_df['is_last_turn_0'] = matches_df.groupby(['name', 'match'])['turn'].transform(lambda x: (x == 0).cumsum())

    # Criar uma máscara para selecionar apenas os turnos após o último turno 0
    valid_turns_mask = matches_df.groupby(['name', 'match'])['is_last_turn_0'].transform(max) == matches_df['is_last_turn_0']

    logger.info("Criando lag features a partir do último turno 0...")

    # Aplicar as lag features apenas nos turnos válidos (após o último turno 0)
    matches_df.loc[valid_turns_mask, 'mana_curve_efficiency_lag_1'] = matches_df.groupby(['name', 'match'])['mana_curve_efficiency'].shift(1)
    matches_df.loc[valid_turns_mask, 'mana_curve_efficiency_lag_2'] = matches_df.groupby(['name', 'match'])['mana_curve_efficiency'].shift(2)
    matches_df.loc[valid_turns_mask, 'spell_ratio_lag_1'] = matches_df.groupby(['name', 'match'])['spell_ratio'].shift(1)
    matches_df.loc[valid_turns_mask, 'land_ratio_lag_1'] = matches_df.groupby(['name', 'match'])['land_ratio'].shift(1)

    logger.info("Criando rolling features a partir do último turno 0...")

    # Aplicar rolling features a partir do último turno 0
    matches_df.loc[valid_turns_mask, 'rolling_mean_mana_curve_efficiency_3'] = matches_df.groupby(['name', 'match'])['mana_curve_efficiency'].rolling(window=3).mean().reset_index(level=[0,1], drop=True)
    matches_df.loc[valid_turns_mask, 'rolling_mean_spell_ratio_3'] = matches_df.groupby(['name', 'match'])['spell_ratio'].rolling(window=3).mean().reset_index(level=[0,1], drop=True)
    matches_df.loc[valid_turns_mask, 'rolling_mean_land_ratio_3'] = matches_df.groupby(['name', 'match'])['land_ratio'].rolling(window=3).mean().reset_index(level=[0,1], drop=True)

    # Tratamento de valores nulos gerados pelos shifts e rolling
    matches_df.fillna(0, inplace=True)

    # Remover a coluna auxiliar 'is_last_turn_0'
    matches_df.drop(columns=['is_last_turn_0'], inplace=True)

    logger.info("Engenharia de features concluída.")

    return matches_df


pd.set_option('display.max_columns', None)
    
# Chamada da função
matches_df = catalog.load("matches_df")
features_df = feature_engineering(matches_df)
catalog.save("features_df", features_df)

In [8]:
def feature_selection(
        features_df: pd.DataFrame,
        target: str,
        threshold_features: float,
        derived_features: list = None,
        key_columns: list = None,
        cols_to_keep: list= None) -> pd.DataFrame:
    """
    Auxilia na seleção de features, removendo aquelas diretamente derivadas do target, 
    colunas-chave, e features altamente correlacionadas entre si.

    Args:
        features_df (pd.DataFrame): DataFrame contendo as features.
        target (str): Nome da variável alvo.
        threshold_features (float): Limiar para remover features com alta correlação.
        derived_features (list, optional): Lista de features derivadas do target para serem removidas.
        key_columns (list, optional): Lista de colunas-chave para serem removidas (ex: 'match', 'turn').

    Returns:
        pd.DataFrame: DataFrame com as features selecionadas.
    """
    # Configura o logger geral
    logger = setup_logger("feature_selection")
    logger.info("Iniciando o processo de seleção de features...")

    # Separar apenas colunas numéricas para o cálculo da correlação
    numeric_features_df = features_df.select_dtypes(include=[np.number])

    logger.info(f"Número inicial de features numéricas: {numeric_features_df.shape[1]}")

    # Remover features derivadas do target, se fornecidas
    if derived_features:
        numeric_features_df = numeric_features_df.drop(columns=derived_features, errors='ignore')
        logger.info(f"Features derivadas do target removidas: {derived_features}")

    # Remover colunas-chave nao categoricas ou string
    if key_columns:
        numeric_features_df = numeric_features_df.drop(columns=key_columns, errors='ignore')
        logger.info(f"Colunas-chave removidas: {key_columns}")

    # Remover a variável alvo do conjunto de features
    features_without_target = numeric_features_df.drop(columns=[target], errors='ignore')

    # Calcular a matriz de correlação entre as features (excluindo o target)
    corr_matrix = features_without_target.corr().abs()

    # Criar uma máscara para identificar as correlações acima do limiar entre as features, excluindo a diagonal
    upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))

    # Identificar colunas com alta correlação entre si, usando o limiar definido
    to_drop_features = [column for column in upper.columns if any(upper[column] > threshold_features)]

    # Manter compulsoriamente algumas colunas
    if cols_to_keep:
        to_drop_features = [col for col in to_drop_features if col not in cols_to_keep]

    # Remover as colunas altamente correlacionadas entre si
    features_cleaned = features_without_target.drop(columns=to_drop_features)

    logger.info(f"Número de features após a remoção de correlação maior que {threshold_features}: {features_cleaned.shape[1]}")    

    # Selecionar as colunas finais de features
    selected_features_cols = features_cleaned.columns.tolist()

    # Reconstruir o DataFrame final, reinserindo key_columns e target
    features_cleaned = pd.concat([features_cleaned, features_df[key_columns], features_df[[target]]], axis=1)

    logger.info("Processo de seleção de features concluído.")

    return features_cleaned, selected_features_cols


derived_features = [
    "cum_spent_mana", 
    "cum_mana_pool", 
    "spent_mana", 
    "mana_pool"
]

key_cols = [
    "name",
    "deck_name",
    "match",
    "turn"
]

selected_features_df = catalog.load("features_df")
threshold_features = catalog.load("params:modeling.feature_engineering.feat_corr_threshold")

selected_features_df, selected_features_cols = feature_selection(
    features_df=selected_features_df,         
    target="mana_curve_efficiency", 
    threshold_features=threshold_features,        
    derived_features=derived_features, 
    key_columns=key_cols,
    cols_to_keep=["W","U","B","R","G"]     
)

# Salvando o DataFrame de features selecionadas
catalog.save("selected_features_df", selected_features_df)
catalog.save("selected_features_cols", selected_features_cols)

In [9]:
import random
import pandas as pd
from typing import List

def train_test_split(
        selected_features_df: pd.DataFrame,
        target_column: str,
        final_features_list: List[str] = selected_features_cols, 
        hide_players: bool = False, 
        n_test_players: int = None, 
        hide_advanced_turns: bool = False,
        turn_threshold: int = None,) -> None:
    """
    Segrega as partidas em treino e teste, permitindo que o modelo nunca veja um determinado grupo de jogadores ou
    escondendo os turnos mais avançados de cada jogador durante o treino.

    Filtra o DataFrame pelas features selecionadas e produz os DataFrames de treino e teste para features e targets.

    Args:
        features_df (pd.DataFrame): O DataFrame contendo as features selecionadas do DF das partidas.
        final_features_list (List[str]): Lista das features selecionadas para o modelo.
        target_column (str): Nome da coluna target.
        n_test_players (int, opcional): Número de jogadores a serem amostrados aleatoriamente para o conjunto de teste.
        hide_advanced_turns (bool, opcional): Se True, usa a estratégia de esconder os turnos mais avançados no conjunto de teste.
        turn_threshold (int, opcional): Limite de turnos para segregar treino e teste. Os turnos maiores que esse valor serão usados como teste.
        hide_players (bool, opcional): Se True, usa a estratégia de esconder jogadores do conjunto de teste.

    Retorna:
        Tuple: DataFrames de treino e teste para features e targets.
    """
    # Configura o logger geral
    logger = setup_logger("train_test_split")
    
    if hide_advanced_turns and turn_threshold is None:
        raise ValueError("Se `hide_advanced_turns` for True, `turn_threshold` deve ser fornecido.")
    
    if hide_players and n_test_players is None:
        raise ValueError("Se `hide_players` for True, `n_test_players` deve ser fornecido.")
    
    if hide_advanced_turns and hide_players:
        raise ValueError("Apenas uma estratégia pode ser usada de cada vez: `hide_advanced_turns` ou `hide_players`.")
    
    if hide_advanced_turns:
        # Estratégia de esconder turnos mais avançados
        logger.info(f"Usando a estratégia de esconder turnos mais avançados (turnos > {turn_threshold}).")
        
        # Dividir os dados entre treino e teste com base no turn_threshold
        train_df = selected_features_df[selected_features_df['turn'] <= turn_threshold]
        test_df = selected_features_df[selected_features_df['turn'] > turn_threshold]
    
    elif hide_players:
        # Estratégia de esconder jogadores
        logger.info(f"Usando a estratégia de esconder {n_test_players} jogadores.")
        
        # Verifica se a quantidade de jogadores para o teste é válida
        unique_players = selected_features_df['name'].unique()
        if n_test_players > len(unique_players):
            raise ValueError(f"O número de jogadores de teste ({n_test_players}) excede o número de jogadores únicos ({len(unique_players)}).")
        
        # Amostrando jogadores aleatoriamente
        test_players = random.sample(list(unique_players), n_test_players)
        logger.info(f"Jogadores selecionados para o conjunto de teste: {test_players}")
        
        # Segregar os dados entre treino e teste com base nos jogadores amostrados
        test_df = selected_features_df[selected_features_df['name'].isin(test_players)]
        train_df = selected_features_df[~selected_features_df['name'].isin(test_players)]
    
    else:
        raise ValueError("Nenhuma estratégia foi selecionada. Use `hide_advanced_turns` ou `hide_players`.")
    
    # Filtrando apenas as features selecionadas
    train_features = train_df[final_features_list]
    test_features = test_df[final_features_list]

    # Extraindo os targets
    train_target = train_df[[target_column]]
    test_target = test_df[[target_column]]

    return train_features, test_features, train_target, test_target

selected_features_df = catalog.load("selected_features_df")
n_test_players = catalog.load("params:modeling.feature_selection.n_test_players")
turn_threshold = catalog.load("params:modeling.feature_selection.turn_threshold")
hide_advanced_turns = catalog.load("params:modeling.feature_selection.hide_advanced_turns")
hide_players = catalog.load("params:modeling.feature_selection.hide_players")
selected_features_cols = catalog.load("selected_features_cols")

(train_features, test_features, train_target, test_target) = train_test_split(
    selected_features_df=selected_features_df,
    target_column="mana_curve_efficiency",
    hide_advanced_turns=hide_advanced_turns,
    turn_threshold=turn_threshold,
    hide_players=hide_players,
    n_test_players=n_test_players
)

# Salvando as features e targets no catálogo
catalog.save("train_features", train_features)
catalog.save("test_features", test_features)
catalog.save("train_target", train_target)
catalog.save("test_target", test_target)

In [10]:
train_target.describe()

Unnamed: 0,mana_curve_efficiency
count,244.0
mean,0.566803
std,0.399955
min,0.0
25%,0.0
50%,0.71
75%,0.9
max,1.0


In [11]:
import logging
import pickle
from sklearn.tree import DecisionTreeRegressor
from sklearn.model_selection import GridSearchCV
from datetime import datetime

# Configuração do logger
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)


# Obter a data e hora atuais
now = datetime.now()

# Definir as constantes para data e hora de execução
RUN_DATE = now.strftime('%Y-%m-%d')  # Formato: 'YYYY-MM-DD'
RUN_TIME = now.strftime('%H:%M')  # Formato: 'HH:MM'

def fit_model(
    train_features: pd.DataFrame,
    train_target: pd.DataFrame,
    param_grid: dict,
) -> str:
    """
    Ajusta um modelo de árvore de decisão com suporte a features temporais e realiza tuning de hiperparâmetros com GridSearchCV.
    O modelo ajustado é salvo em um arquivo .pkl.

    Args:
        train_features (pd.DataFrame): DataFrame com as features de treino.
        train_target (pd.DataFrame): DataFrame com a variável target de treino.
        param_grid (dict): Dicionário com os hiperparâmetros a serem ajustados.
        model_path (str): Caminho para salvar o modelo ajustado (.pkl).

    Returns:
        model_path (str): Caminho do arquivo do modelo ajustado salvo.
    """
    logger.info("Iniciando o ajuste do modelo e o tuning de hiperparâmetros.")

    # Criando o modelo base
    model = DecisionTreeRegressor(random_state=42)
    
    logger.info("Configurando o GridSearchCV com a seguinte grade de hiperparâmetros:")
    logger.info(param_grid)

    # Configurando o GridSearchCV para tuning de hiperparâmetros
    grid_search = GridSearchCV(
        estimator=model,
        param_grid=param_grid,
        scoring='neg_mean_squared_error',
        cv=5,
        verbose=1
    )
    
    # Ajustando o modelo com os dados de treino e realizando tuning
    grid_search.fit(train_features, train_target)
    
    # Extraindo o melhor modelo e seus parâmetros
    best_model = {run_key:grid_search.best_estimator_}
    best_hiper_params = {run_key:grid_search.best_params_}

    logger.info("O melhor modelo foi encontrado com os seguintes hiperparâmetros:")
    logger.info(best_hiper_params)

    return best_model, best_hiper_params

train_features = catalog.load("train_features")
train_target = catalog.load("train_target")
param_grid = catalog.load("params:modeling.model_selection.params_grid")
best_model, best_hiper_params = fit_model(train_features, train_target, param_grid)
catalog.save("best_model", best_model)
catalog.save("best_hiper_params", best_hiper_params)

Fitting 5 folds for each of 1620 candidates, totalling 8100 fits


In [13]:
best_hiper_params


[1m{[0m
    [32m'max_depth'[0m: [1;36m10[0m,
    [32m'max_features'[0m: [32m'sqrt'[0m,
    [32m'max_leaf_nodes'[0m: [1;36m20[0m,
    [32m'min_samples_leaf'[0m: [1;36m1[0m,
    [32m'min_samples_split'[0m: [1;36m20[0m,
    [32m'min_weight_fraction_leaf'[0m: [1;36m0.0[0m
[1m}[0m

In [None]:
def predict_and_evaluate_model(
        model: pickle, 
        test_features: pd.DataFrame) -> pd.Series:
    """
    Carrega o modelo salvo e faz previsões nos dados de teste.

    Args:
        model (pickle): Arquivo do modelo ajustado.
        test_features (pd.DataFrame): DataFrame com as features de teste.

    Returns:
        y_pred (pd.Series): Previsões do modelo.
    """
    logger.info(f"Carregando o modelo...")
    logger.info("Fazendo previsões nos dados de teste.")
    
    # Fazendo previsões
    y_pred = model.predict(test_features)
    
    # Calculando todas as metricas de erro

    # Calculando os shap_value

    return y_pred, shap_values, error_metrics


#model = catalog.load("model_pkl")