In [None]:
import os
os.chdir(r"D:\PythonApps\ufc_complete_dataset")

In [None]:
import mlflow

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

import pandas as pd
import numpy as np

from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score, mean_absolute_error

from dataclasses import dataclass
from tqdm import tqdm
from IPython.core.display import HTML
from IPython.display import display

import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
mlflow.set_tracking_uri("file:///tmp/mlflow_siamese")

# Prepare dataset

In [None]:
df = pd.read_csv("resources/df_features.csv", index_col=0)
df.sample(5)

In [None]:
df.columns

In [None]:
df["is_red_winner"] = df["winner"].apply(lambda x: 1 if x == "Red" else 0)

In [None]:
feature_cols = [
'fighter_hist_wins',
'fighter_hist_looses', 
'fighter_hist_total',
'fighter_title_fights', 
'wins_ratio', 
'tf_ratio',
'wins_streak',
'lost_streak',
'SLpM_norm', 
'sig_str_acc_norm', 
'SApM_norm',
'str_def_norm', 
'td_avg_norm', 
'td_acc_norm',
'significant_strikes', 
'damage_defense', 
'offensive_grappling',
'defensive_grappling', 
'submissions'
]
r_features = [f"r_{x}" for x in feature_cols]
b_features = [f"b_{x}" for x in feature_cols]

x_cols = r_features + b_features
target = "is_red_winner"

## Delete records with no data

In [None]:
is_empty_data = []
for i in range(len(df)):
    row = df.iloc[i][x_cols].to_list()
    is_empty = True
    for x in row:
        if bool(x):
            is_empty = False
            break
    is_empty_data.append(is_empty)

df["is_empty"] = is_empty_data
diff_df = df[df["is_empty"] == False].reset_index(drop=True).drop("is_empty", axis=1)

## Normalization

In [None]:
scaler = MinMaxScaler()
df[x_cols] = scaler.fit_transform(df[x_cols])

In [None]:
df.sample(5)

## Train / test split

In [None]:
class SNNDataset(torch.utils.data.Dataset):

    def __init__(self, x1, x2, y):
        self.x1 = x1
        self.x2 = x2
        self.y = y
        assert x1.shape == x2.shape
    
    def __len__(self):
        return len(self.x1)
    
    def __getitem__(self, idx):
        return self.x1[idx], self.x2[idx], self.y[idx]

In [None]:
train_df = df[:-200]
test_df = df[-200:]


x1_train = torch.tensor(train_df[r_features].astype(float).to_numpy(), dtype=torch.float32)
x2_train = torch.tensor(train_df[b_features].astype(float).to_numpy(), dtype=torch.float32)
y_train = torch.tensor(train_df[target].astype(float).to_numpy(), dtype=torch.float32)

x1_test = torch.tensor(test_df[r_features].astype(float).to_numpy(), dtype=torch.float32)
x2_test = torch.tensor(test_df[b_features].astype(float).to_numpy(), dtype=torch.float32)
y_test = torch.tensor(test_df[target].astype(float).to_numpy(), dtype=torch.float32)


print("x1_train", x1_train.shape)
print("x2_train", x2_train.shape)
print("x1_test", x1_test.shape)
print("x2_test", x2_test.shape)
print("y_train", y_train.shape)
print("y_test", y_test.shape)

In [None]:
train_dataset = SNNDataset(x1_train, x2_train, y_train)
test_dataset = SNNDataset(x1_test, x2_test, y_test)

# Siamese Model
- 2 linear layers to process fighters' vectors
- dot product to compare these vectors
- linear layer as an output

In [None]:
@dataclass(frozen=True)
class SNNParams:
    input_size: int = 19
    hidden_size: int = 19**2
    output_size: int = 1
    diff_function: str = "diff"  # diff

In [None]:
class SNN(nn.Module):

    def __init__(self, params = SNNParams()):
        super(SNN, self).__init__()
        self.params = params
        self.fc = nn.Sequential(
            nn.Linear(self.params.input_size, self.params.hidden_size),
            nn.ReLU(),
            nn.Linear(self.params.hidden_size, self.params.hidden_size),
            nn.ReLU(),
            nn.Linear(self.params.hidden_size, self.params.hidden_size),
            nn.ReLU()
        )
        self.output = nn.Sequential(
            nn.Linear(self.params.hidden_size, self.params.hidden_size),
            nn.Dropout(0.1),
            nn.ReLU(),
            nn.Linear(self.params.hidden_size, self.params.output_size)
        )
    
    def forward(self, x1: torch.tensor, x2: torch.tensor):
        x1 = self.fc(x1)
        x2 = self.fc(x2)
        diff = self._diff(x1, x2)
        return torch.sigmoid(self.output(diff))
    
    def _diff(self, x1, x2):
        match self.params.diff_function:
            case "dot":
                result = x1 * x2
            case "diff":
                result = x1 - x2
            case _:
                raise ValueError(f"Unknown diff function: {self.params.diff_function}")
        return result


# Training utils

In [None]:
def train_epoch(model, dataloader, optimizer, criterion):
    model.train()
    total_loss = 0
    for x1, x2, y in dataloader:
        optimizer.zero_grad()
        output = model(x1, x2)[0]
        loss = criterion(output, y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        
    avg_loss = total_loss / len(dataloader)
    
    return model, avg_loss


def val_epoch(model, dataloader, optimizer, criterion):
    model.eval()
    total_loss = 0
    for x1, x2, y in dataloader:
        optimizer.zero_grad()
        output = model(x1, x2)[0]
        loss = criterion(output, y)
        total_loss += loss.item()
        
    avg_loss = total_loss / len(dataloader)
    
    return avg_loss


def early_stoppage(loss_history: list[tuple[float, float]], min_incr: float, last_epochs: int = 3) -> bool:
    stop = False
    val_loss_data = [x[1] for x in loss_history]
    if len(loss_history) > last_epochs:
        last_results = np.mean(val_loss_data[-last_epochs-1:])
        diff = last_results - val_loss_data[-1]
        if diff < min_incr:
            stop = True
            print("Early stoppage!")
    return stop
            

def train_and_validate(model, train_dataloader, val_dataloader, optimizer, criterion, training_params):
    loss_history = []
    for epoch in range(training_params.epochs):
        model, train_loss = train_epoch(model, train_dataloader, optimizer, criterion,)
        val_loss = val_epoch(model, val_dataloader, optimizer, criterion)
        loss_history.append((train_loss, val_loss))
        print(f"Epoch {epoch+1}/{training_params.epochs}: train loss = {round(train_loss, 3)}, val loss = {round(val_loss, 3)}")
        if early_stoppage(loss_history, training_params.early_stoppage_min_incr, training_params.early_stoppage_last_epochs):
            break
    return model, loss_history

# Training

In [None]:
@dataclass(frozen=True)
class TrainingParams:
    lr: float = 0.001
    batch_size: int = 1
    epochs: int = 50
    early_stoppage_last_epochs: int = 5
    early_stoppage_min_incr: float = 0.001


training_params = TrainingParams()

In [None]:
model = SNN()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=training_params.lr)


train_dataloader = DataLoader(train_dataset, batch_size=training_params.batch_size, shuffle=False)
val_dataloader = DataLoader(test_dataset, batch_size=training_params.batch_size, shuffle=False)


model, loss_history = train_and_validate(model, train_dataloader, val_dataloader, optimizer, criterion, training_params)

In [None]:
fig = go.Figure()

x = [i+1 for i in range(training_params.epochs)]
fig.add_trace(
    go.Scatter(
        name="Train MSE",
        x=x,
        y=[x[0] for x in loss_history],
        # mode="lines"
    )
)
fig.add_trace(
    go.Scatter(
        name="Val MSE",
        x=x,
        y=[x[1] for x in loss_history],
        # mode="lines"
    )
)

fig.update_layout(
    title="Train history",
    xaxis_title="Epoch",
    yaxis_title="Loss",
    width=800,
    height=700,
)

fig.show()

In [None]:
y_pred_train = model(x1_train, x2_train).ravel().detach().numpy()
y_pred_test = model(x1_test, x2_test).ravel().detach().numpy()

train_acc = accuracy_score(y_train, [1 if x > 0.5 else 0 for x in y_pred_train])
test_acc = accuracy_score(y_test, [1 if x > 0.5 else 0 for x in y_pred_test])
print(f"train acc = {round(train_acc, 2)}\ttest acc = {round(test_acc, 2)}")

train_mae = mean_absolute_error(y_train, y_pred_train)
test_mae = mean_absolute_error(y_test, y_pred_test)
print(f"train mae = {round(train_mae, 2)}\ttest mae = {round(test_mae, 2)}")