In [9]:
%load_ext autoreload 
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [10]:
import os
import numpy as np
import pandas as pd
from datetime import datetime

pd.set_option('display.max_columns', 1000)
# pd.set_option('display.max_rows', 100)

In [11]:
import sqlite3
import pandas as pd
import os
from pathlib import Path


def create_connection(db_path):
    """Cria a conexão com o banco de dados SQLite."""
    connection = None
    try:
        connection = sqlite3.connect(db_path)

        print("Conexão SQLite estabelecida.")
    except sqlite3.Error as e:
        print(f"Erro ao conectar ao SQLite: {e}")
    return connection

BASE_DIR = os.path.dirname(os.path.abspath('__file__'))
DATA_DIR = os.path.join(BASE_DIR, 'database')
FT_DIR = os.path.join(BASE_DIR, 'modelagem', 'feature_eng', 'data')

conn = create_connection(db_path=Path(os.path.join(DATA_DIR, "soccer_data.db")))
country = "Brazil"
query = "SELECT * FROM soccer_data WHERE country = ?"

df_hist_soccer_matches = pd.read_sql_query(query, conn, params=(country,))

conn.close()


Conexão SQLite estabelecida.


In [12]:
conn.close()

In [13]:
df_hist_soccer_matches.head()

Unnamed: 0,id,country,league,season,home_team,away_team,home_score,away_score,result,psch,pscd,psca,maxch,maxcd,maxca,avgch,avgcd,avgca,bfech,bfecd,datetime,hash,last_updated
0,8029,Brazil,Serie A,2012,Palmeiras,Portuguesa,1,1,D,1.75,3.86,5.25,1.76,3.87,5.31,1.69,3.5,4.9,,,2012-05-19 22:30:00,10444097902145517897,2025-04-03 15:44:39
1,8030,Brazil,Serie A,2012,Sport Recife,Flamengo RJ,1,1,D,2.83,3.39,2.68,2.83,3.42,2.7,2.59,3.23,2.58,,,2012-05-19 22:30:00,7876314183501917566,2025-04-03 15:44:39
2,8031,Brazil,Serie A,2012,Figueirense,Nautico,2,1,H,1.6,4.04,6.72,1.67,4.05,7.22,1.59,3.67,5.64,,,2012-05-20 01:00:00,9296066046964045682,2025-04-03 15:44:39
3,8032,Brazil,Serie A,2012,Botafogo RJ,Sao Paulo,4,2,H,2.49,3.35,3.15,2.49,3.39,3.15,2.35,3.26,2.84,,,2012-05-20 20:00:00,3618841616446699339,2025-04-03 15:44:39
4,8033,Brazil,Serie A,2012,Corinthians,Fluminense,0,1,A,1.96,3.53,4.41,1.96,3.53,4.41,1.89,3.33,3.89,,,2012-05-20 20:00:00,11994628649421207242,2025-04-03 15:44:39


## Agrupando dados 

In [14]:
# def group_infos(df_home_team, away_team):
#     df_home_team = df_home_team.copy()
#     away_team = away_team.copy()
#     group_result_home_team = df_home_team.groupby(["home_team"]).agg(
#         gols_marcado_casa = ("home_score", "sum"),
#         gols_sofridos_casa = ("away_score", "sum"),
#         jogos_em_casa = ("away_score", "count"),
#     ).reset_index(names="time")

#     group_result_away_score = away_team.groupby(["away_team"]).agg(
#         gols_marcado_fora = ("away_score", "sum"),
#         gols_sofridos_fora = ("home_score", "sum"),
#         jogos_em_fora = ("away_score", "count"),
#     ).reset_index(names="time")

#     df_merge_data = pd.merge(group_result_home_team, group_result_away_score)
#     return df_merge_data



## Calcumando a força de Ataque e Defesa

In [15]:
import numpy as np
import scipy.stats as stats

# Agrupando dados
def get_group_infos(away_team):
    group_result_home_team = away_team.groupby(["home_team"]).agg(
        gols_marcado_casa = ("home_score", "sum"),
        gols_sofridos_casa = ("away_score", "sum"),
        jogos_em_casa = ("away_score", "count"),
    ).reset_index(names="time")

    group_result_away_score = away_team.groupby(["away_team"]).agg(
        gols_marcado_fora = ("away_score", "sum"),
        gols_sofridos_fora = ("home_score", "sum"),
        jogos_em_fora = ("away_score", "count"),
    ).reset_index(names="time")

    df_merge_data = pd.merge(group_result_home_team, group_result_away_score)
    return df_merge_data

def calc_attack_and_defense_force(df_merge_data):
    df = df_merge_data.copy()
    # Gols marcados pelo time A em casa / Jogos em casa 
    df["attack_force_home_team"] = df["gols_marcado_casa"]/df["jogos_em_casa"].replace(0, 1)
    # Gols sofridos pelo time A em casa / Jogos em casa 
    df["defense_force_home_team"] = df["gols_sofridos_casa"]/df["jogos_em_casa"].replace(0, 1)

    # Gols marcados pelo time B fora de casa / Jogos fora de casa 
    df["attack_force_away_team"] = df["gols_marcado_fora"]/df["jogos_em_fora"].replace(0, 1)
    # Gols sofridos pelo time B fora de casa / Jogos fora de casa 
    df["defense_force_away_team"] = df["gols_sofridos_fora"]/df["jogos_em_fora"].replace(0, 1)


    # Normalizando valores
    # Média geral do campeonato: se um time fez muito mais gols que o outro seus numeros podem enganar, por isso da normalização
    league_avg_goals_home = df["gols_marcado_casa"].sum() / df["jogos_em_casa"].sum()
    league_avg_goals_away = df["gols_marcado_fora"].sum() / df["jogos_em_fora"].sum()

    # Normalizando
    df["attack_force_home_team"] /= league_avg_goals_home
    df["defense_force_home_team"] /= league_avg_goals_away
    df["attack_force_away_team"] /= league_avg_goals_away
    df["defense_force_away_team"] /= league_avg_goals_home


    # Gerando estimativas de gols esperados com a normalização
    df["expected_goals_home_team"] = league_avg_goals_home * df["attack_force_home_team"] * df["defense_force_away_team"]
    df["expected_goals_away_team"] = league_avg_goals_away * df["attack_force_away_team"] * df["defense_force_home_team"]

    return df


def get_goal_distribution(lambda_time: float, k_values: int = 6):
    """
    Calcula a distribuição de Poisson para gols esperados de um time.

    :param lambda_time: Média esperada de gols do time (float)
    :param k_values: Número máximo de gols a considerar (int)
    :return: Número de gols mais provável, probabilidade desse número de gols, e um dicionário com todas as probabilidades
    """
    k_range_values = np.arange(k_values)

    # Calcula a probabilidade para cada número de gols
    poisson_prob = stats.poisson.pmf(k_range_values, lambda_time)

    # Encontra o número de gols mais provável
    num_goals = np.argmax(poisson_prob)

    # Retorna também um dicionário com todas as probabilidades
    prob_dict = {k: round(prob, 6) for k, prob in zip(k_range_values, poisson_prob)}

    return num_goals, poisson_prob[num_goals], prob_dict

def get_probabilities_of_win_draw_loss(row:pd.Series, max_goals:int=5)->pd.Series:
    """
    Calcula a probabilidade de vitória, empate e derrota para um jogo específico.
    
    Parâmetros:
    row: Linha do DataFrame contendo 'prob_dict_home' e 'prob_dict_away'.
    max_goals: Número máximo de gols considerados.
    
    Retorna:
    (prob_home_win, prob_draw, prob_away_win)
    """
    
    prob_A = row["prob_dict_home"]
    prob_B = row["prob_dict_away"]

    prob_home_win = sum(
        prob_A[i] * sum(prob_B[j] for j in range(i))
        for i in range(1, max_goals+1)
    )

    prob_draw = sum(
        prob_A[i] * prob_B[i]
        for i in range(max_goals+1)
    )

    prob_away_win = sum(
        prob_B[j] * sum(prob_A[i] for i in range(j))
        for j in range(1, max_goals+1)
    )

    # print(f"Vitória do Time A: {prob_home_win:.4f} ({prob_home_win * 100:.2f}%)")
    # print(f"Empate: {prob_draw:.4f} ({prob_draw * 100:.2f}%)")
    # print(f"Vitória do Time B: {prob_away_win:.4f} ({prob_away_win * 100:.2f}%)")

    return prob_home_win, prob_draw, prob_away_win

def predict(df, name_home_team, name_away_team):
    df = df.copy()
    # Selecionar o Atlético-MG como time da casa
    home_team = df[df["time"] == name_home_team].iloc[0]

    # Selecionar o Sport Recife como time visitante
    away_team = df[df["time"] == name_away_team].iloc[0]

    # Criar um dicionário para simular a estrutura de um jogo
    game = {
        "prob_dict_home": home_team["prob_dict_home"],  # Atlético-MG como time da casa
        "prob_dict_away": away_team["prob_dict_away"],  # Sport Recife como visitante
    }

    # Calcular as probabilidades para o jogo específico
    prob_home_win, prob_draw, prob_away_win = get_probabilities_of_win_draw_loss(game)

    # Exibir os resultados
    # print(f"Probabilidade de Vitória do Atlético-MG: {prob_home_win:.2%}")
    # print(f"Probabilidade de Empate: {prob_draw:.2%}")
    # print(f"Probabilidade de Vitória do Sport Recife: {prob_away_win:.2%}")

    # print(prob_home_win, prob_draw, prob_away_win)
    result_text = ["H", "D", "A"]
    return result_text[np.argmax([prob_home_win, prob_draw, prob_away_win])]

In [16]:
df_hist_soccer_matches.head()
df_hist_soccer_matches["datetime"] = pd.to_datetime(df_hist_soccer_matches["datetime"])

In [17]:
def create_table_num_games(df:pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df = df.sort_values("datetime")  # Ordenar pelo tempo

    df_games = pd.concat([
        df[["datetime", "home_team"]].rename(columns={"home_team": "team"}),
        df[["datetime", "away_team"]].rename(columns={"away_team": "team"})
    ])

    df_games["games_played"] = 1  # Cada linha representa um jogo para o time
    df_games["total_games"] = df_games.groupby("team")["games_played"].cumsum()

    return df_games

In [18]:

qtd_min_jogos = 5
# Verifica se os time tem a quantidade minima de dias para calcular a probabilidade

df_simulation = df_hist_soccer_matches.iloc[0:500]

for id, row in df_simulation.iterrows():

    if id >= 100:
        break

    df_table_num_games = create_table_num_games(df_simulation)

    df_result = df_table_num_games[df_table_num_games["datetime"] < row["datetime"]]

    if df_result.empty:
        continue
        print("Dados infuficientes para gerar a probabilidade")

    # elif quanty_matchs_home_team < qtd_min_jogos:
    #     continue
    #     print(f"O time da cada tem apenas {quanty_matchs_home_team} jodos e não será possivel gerar a probabilidade")

    # elif quanty_matchs_home_team < qtd_min_jogos:
    #     continue
    #     print(f"O time visitante tem apenas {quanty_matchs_away_team} jodos e não será possivel gerar a probabilidade")
    
    matchs_home_team = df_result[df_result["team"]==row['home_team']]["total_games"]#.values[-1]
    matchs_away_team = df_result[df_result["team"]==row['away_team']]["total_games"]#.values[-1]

    if matchs_home_team.empty or matchs_away_team.empty:
        continue

    quanty_matchs_home_team = matchs_home_team.values[-1]
    quanty_matchs_away_team = matchs_away_team.values[-1]
    # df_times = 
    # df_merge_data = group_infos(df)
    # calc_attack_and_defense_force(df_merge_data)
    
    list_times = [row['home_team'], row['away_team']]
    
    # Parte 2: gerando grupos de informação
    df = get_group_infos(df_simulation)
    df_merge_data = df[df["time"].isin(list_times)]

    # Parte 3: Calculando força do ateque e defesa
    df_attack_and_defense_force = calc_attack_and_defense_force(df_merge_data)
    df_attack_and_defense_force

    # Parte 4: Calculando a distribuição de gols

    df = df_attack_and_defense_force.copy()
    df[['num_goals_expected_home', 'prob_num_goals_home', 'prob_dict_home']] = df['expected_goals_home_team'].apply(lambda x: pd.Series(get_goal_distribution(x)))
    df[['num_goals_expected_away', 'prob_num_goals_away', 'prob_dict_away']] = df['expected_goals_away_team'].apply(lambda x: pd.Series(get_goal_distribution(x)))

    # Parte 5: Prevendo resultados
    pred = predict(df, name_home_team=row['home_team'], name_away_team=row['home_team'])

    df_simulation.loc[id, "pred"] = pred
    # print(quanty_matchs_home_team, quanty_matchs_away_team)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_simulation.loc[id, "pred"] = pred


In [19]:
df_simulation.dropna(subset="pred", inplace=True)

df_simulation[["season","home_team","away_team","home_score","away_score","result","pred"]]

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_simulation.dropna(subset="pred", inplace=True)


Unnamed: 0,season,home_team,away_team,home_score,away_score,result,pred
10,2012,Atletico GO,Ponte Preta,1,1,D,H
11,2012,Flamengo RJ,Internacional,3,3,D,H
12,2012,Portuguesa,Vasco,0,1,A,H
13,2012,Nautico,Cruzeiro,0,0,D,H
14,2012,Atletico-MG,Corinthians,1,0,H,H
...,...,...,...,...,...,...,...
95,2012,Flamengo RJ,Corinthians,0,3,A,H
96,2012,Sao Paulo,Vasco,0,1,A,H
97,2012,Atletico GO,Figueirense,3,2,H,H
98,2012,Coritiba,Palmeiras,1,1,D,H


In [20]:
map_result_text = {'D': 0, 'A': 1, 'H': 2}


df_simulation.loc[df_simulation.index, "map_result"] = df_simulation["result"].map(map_result_text)
df_simulation.loc[df_simulation.index, "map_pred"] = df_simulation["pred"].map(map_result_text)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_simulation.loc[df_simulation.index, "map_result"] = df_simulation["result"].map(map_result_text)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_simulation.loc[df_simulation.index, "map_pred"] = df_simulation["pred"].map(map_result_text)


---

In [37]:
import numpy as np

def custom_multiclass_accuracy(tp, tn, fp, fn, verbose=False):
    metricas = []
    sensibilidade = tp / (tp + fn) if (tp + fn) != 0 else 0
    especificidade = tn / (tn + fp) if (tn + fp) != 0 else 0
    acc_classe = sensibilidade + especificidade
    metricas.append(acc_classe)

    if verbose:
        # print(f"Classe: {classes}")
        print(f"  TP: {tp}, TN: {tn}, FP: {fp}, FN: {fn}")
        print(f"  Sensibilidade: {sensibilidade:.4f}")
        print(f"  Especificidade: {especificidade:.4f}")
        print(f"  Acurácia personalizada: {acc_classe:.4f}")
        print("-" * 40)
    return np.mean(metricas)

def get_metrics_multiclass(y_true, y_pred):
    """

    """
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    classes = np.unique(np.concatenate([y_true, y_pred]))
    
    for classe in classes:
        tp = tn = fp = fn = 0

        for yt, yp in zip(y_true, y_pred):
            if yt == classe and yp == classe:
                tp += 1
            elif yt != classe and yp != classe:
                tn += 1
            elif yt != classe and yp == classe:
                fp += 1
            elif yt == classe and yp != classe:
                fn += 1

    return tp, tn, fp, fn


In [38]:
from modelagem.utils.metrics import metrics_per_class

y_true = np.array(df_simulation["map_result"].tolist())
y_pred = np.array(df_simulation["map_pred"].tolist())

tp, tn, fp, fn = get_metrics_multiclass(y_pred=y_pred, y_true=y_true)

custom_multiclass_accuracy(tp, tn, fp, fn)

np.float64(0.9733201581027668)

In [39]:
y_true

array([0., 0., 1., 0., 2., 1., 0., 2., 0., 2., 1., 2., 0., 2., 2., 2., 0.,
       0., 1., 0., 2., 1., 2., 1., 0., 2., 2., 0., 2., 2., 2., 1., 2., 2.,
       2., 0., 2., 2., 2., 2., 2., 1., 2., 2., 0., 2., 1., 1., 0., 1., 1.,
       1., 2., 0., 1., 0., 2., 1., 2., 2., 2., 1., 0., 2., 2., 2., 2., 2.,
       0., 1., 2., 1., 2., 1., 0., 1., 0., 0., 2., 2., 2., 0., 2., 1., 2.,
       1., 1., 2., 0., 2.])

In [40]:
y_pred

array([2., 2., 2., 2., 2., 2., 2., 2., 1., 2., 2., 2., 2., 2., 1., 2., 2.,
       2., 2., 2., 2., 2., 2., 2., 1., 2., 2., 2., 2., 2., 2., 2., 1., 2.,
       2., 2., 2., 2., 2., 2., 2., 1., 2., 2., 2., 2., 2., 2., 2., 2., 2.,
       2., 2., 2., 2., 2., 2., 2., 2., 2., 2., 1., 2., 1., 2., 2., 2., 2.,
       2., 2., 1., 2., 2., 2., 2., 2., 2., 2., 2., 1., 2., 2., 2., 2., 2.,
       2., 2., 2., 2., 2.])

In [28]:
import numpy as np
from sklearn.metrics import confusion_matrix

def custom_accuracy(y_true, y_pred):
    """
    Calcula a métrica personalizada:
    Acurácia = (VP / (VP + FN) + VN / (VN + FP))
    
    Args:
        y_true (array-like): Valores reais das classes.
        y_pred (array-like): Valores preditos pelo modelo.
    
    Returns:
        float: Valor da métrica calculada.
    """
    # Obtém matriz de confusão
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    
    # Evita divisões por zero
    sensibilidade = tp / (tp + fn) if (tp + fn) != 0 else 0
    especificidade = tn / (tn + fp) if (tn + fp) != 0 else 0
    
    # Calcula a métrica conforme a fórmula
    accuracy = sensibilidade + especificidade
    return accuracy

# Exemplo de uso


y_true = np.array(df_simulation["map_result"].tolist())
y_pred = np.array(df_simulation["map_pred"].tolist())


# y_true = np.array([1, 0, 1, 1, 0, 0, 1, 0])  # Valores reais
# y_pred = np.array([1, 0, 1, 0, 0, 1, 1, 0])  # Valores preditos

print("Acurácia personalizada:", custom_accuracy(y_true, y_pred))


ValueError: too many values to unpack (expected 4)

In [None]:
confusion_matrix(y_true, y_pred).ravel()

array([ 0,  2, 21,  0,  2, 21,  0,  5, 39])

In [None]:
y_true

array([0, 0, 1, 0, 2, 1, 0, 2, 0, 2, 1, 2, 0, 2, 2, 2, 0, 0, 1, 0, 2, 1,
       2, 1, 0, 2, 2, 0, 2, 2, 2, 1, 2, 2, 2, 0, 2, 2, 2, 2, 2, 1, 2, 2,
       0, 2, 1, 1, 0, 1, 1, 1, 2, 0, 1, 0, 2, 1, 2, 2, 2, 1, 0, 2, 2, 2,
       2, 2, 0, 1, 2, 1, 2, 1, 0, 1, 0, 0, 2, 2, 2, 0, 2, 1, 2, 1, 1, 2,
       0, 2])

In [None]:
np.array([1, 0, 1, 1, 0, 0, 1, 0])

array([1, 0, 1, 1, 0, 0, 1, 0])

In [None]:
tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()

tn, fp, fn, tp

(np.int64(3), np.int64(1), np.int64(1), np.int64(3))