In [1]:
%matplotlib widget
# %matplotlib ipympl

In [2]:
import os
import random
from itertools import islice
from collections.abc import Sequence
from copy import deepcopy
import tqdm.notebook as tq

import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
import matplotlib.animation as animation
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
import torch.nn.functional as F

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import accuracy_score, f1_score
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import make_classification

from sklearn.decomposition import PCA

In [3]:
eps = 1e-9

SEED = 81020204
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

rng = np.random.default_rng(seed=SEED)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [4]:
try:
    X_train = pd.read_csv('datasets/train_x.csv', delimiter=',', index_col=0)
    y_train = pd.read_csv('datasets/train_y.csv', delimiter=',', index_col=0)
    X_test  = pd.read_csv('datasets/test_x.csv',  delimiter=',', index_col=0)
except Exception:
    print('No such files')

In [None]:
palette = px.colors.qualitative.Plotly

sns.palplot(palette)

---

## Задание 1

Напишите функцию, которая моделирует один нейрон с сигмоидной активацией и реализует вычисление градиента для обновления весов и смещений нейрона. ``Функция должна принимать список векторов признаков, ассоциированные бинарные метки класса, начальные веса, начальное смещение, скорость обучения и количество эпох. Функция должна обновлять веса и смещение с помощью градиентного спуска (классической версии) на основе функции потерь NLL и возвращать обновленные веса, смещение и список значений NLL для каждой эпохи, округленное до четырех десятичных знаков.`` Проведите обучение на предоставленном наборе данных из задания 4 (для двух разных лет). Опционально сгенерируйте другие подходящие наборы данных. Опишите ваши результаты. Предоставленная функция будет также протестирована во время защиты ДЗ. Можно использовать только чистый torch (без использования autograd и torch.nn)

- {*} Выберите другую функцию потерь, проведите обучение с ее помощью. Сгенерируйте датасеты, на которых будет видна разница между алгоритмами. Покажите, в каких случаях выбор влияет на обучение. (2 балла)
- {*} Реализуйте один из следующих видов градиентного спуска: Stochastic Gradient Descent (SGD), Batch Gradient Descent, Mini-Batch Gradient Descent. Проведите эксперименты, покажите разницу в сходимости, сходимость в зависимости от формы поверхности. (2 балла)

Пример входа:
```{python} 
input: 
    features = [[1.0, 2.0], [2.0, 1.0], [-1.0, -2.0]], 
    labels = [1, 0, 0], 
    initial_weights = [0.1, -0.2], 
    initial_bias = 0.0, 
    learning_rate = 0.1, 
    epochs = 2
        
output: 
    updated_weights = [0.0808, -0.1916], updated_bias = -0.0214, mse_values = [0.2386, 0.2348]```

---

### 0.0. Вспомогательные функции визуализации

In [6]:
# График разделающей поверхности (не используется)

def plot_3d_decision_surface_plotly(X, y, weights: dict[str, list[float]] = {}, title=''):
    fig = go.Figure()

    # Scatter plot of data points
    for i, cls in enumerate(np.unique(y)):
        cls_values = X[np.nonzero(y == cls)]
        fig.add_trace(go.Scatter3d(
            x=cls_values[:, 0], y=cls_values[:, 1], z=cls_values[:, 2],
            mode='markers',
            marker=dict(
                color=palette[i], 
                size=8, 
                opacity=0.8,
                line=dict(color='white', width=0)
            )
        ))

    xlim, ylim = ([X[:, i].min(), X[:, i].max()] for i in range(2))
    x_grid, y_grid = np.meshgrid(np.linspace(*xlim, num=2), np.linspace(*ylim, num=2))

    if weights:
        for name, w in weights.items():
            z_grid = -(w[0] * x_grid + w[1] * y_grid + w[-1]) / w[2]
            fig.add_trace(go.Surface(x=x_grid, y=y_grid, z=z_grid, opacity=0.5, name=name))

    fig.update_layout(
        width=1000,
        height=800,
        scene=dict(
            xaxis_title='Feature 1',
            yaxis_title='Feature 2',
            zaxis_title='Feature 3'
        ),
        title=title
    )
    fig.show()

# Example usage
# plot_3d_decision_surface_plotly(X, y, weights={'Model 1': [1, -1, 1, 0.5]})

In [7]:
# График разделяющих поверхностей (не используется)

def plot_3d_decision_surface_matplotlib(X, y, weights: dict[str, list[float]] = {}, title=''):
    fig = plt.figure(figsize=(8, 6))
    ax = fig.add_subplot(111, projection='3d')

    print(len(X))
    ax.scatter(X[:, 0], X[:, 1], X[:, 2], c=y, cmap='Set1', s=50, alpha=0.7)

    xlim, ylim = ([X[:, i].min(), X[:, i].max()] for i in range(2))
    x_grid, y_grid = np.meshgrid(np.linspace(*xlim, num=100), np.linspace(*ylim, num=100))

    if weights:
        for name, w in weights.items():
            z_grid = -(w[0] * x_grid + w[1] * y_grid + w[-1]) / w[2]
            ax.plot_surface(x_grid, y_grid, z_grid, alpha=0.3, rstride=100, cstride=100, label=name)
    
    ax.set_xlabel('Feature 1')
    ax.set_ylabel('Feature 2')
    ax.set_zlabel('Feature 3')
    plt.title(title)
    plt.legend(title='Loss')
    plt.show()

# plot_3d_decision_surface_matplotlib(X, y, weights={'Model 1': [1, -1, 1, 0.5]})

In [8]:
# График разделающих прямых простых моделей с двумя параметрами и одним сдвигом

# weights = {'name': [w_1, w_2, bias]}
def plot_decision_bounds(X, y, weights: dict[str: list[float]] = {}, plot_params = {}, palette = palette, title=''):
    plt.figure(figsize=(10, 6))
    if not plot_params:
        sns.scatterplot(x=X[:, 0], y=X[:, 1], hue=y, palette='Set1', alpha=0.6, edgecolor=None, legend=True if len(weights) < 2 else False)
    else:
        sns.scatterplot(x=X[:, 0], y=X[:, 1], hue=y, legend=True if len(weights) < 2 else False, **plot_params)
    xmin, xmax = min(X[:, 0]), max(X[:, 0])
    ymin, ymax = min(X[:, 1]), max(X[:, 1])

    if weights:
        for i, (name, w) in enumerate(weights.items()):
            xd = np.array([xmin, xmax])
            yd = -(w[0] * xd + w[-1]) / w[1]
            plt.plot(xd, yd, lw=1, ls='--', label=name, color='black' if len(weights) == 1 else palette[i])

    plt.title(title)
    plt.xlabel('Feature 1')
    plt.ylabel('Feature 2')
    plt.legend(title='Class' if not weights else 'Loss')
    plt.xlim(xmin, xmax)
    plt.ylim(ymin, ymax)
    plt.show()

In [9]:
# Графики значений функций потерь

# TODO: make cut of loss values for models with more train epochs than other
def plot_losses(loss: dict[str: list[float]], plot_engine: str = 'matplotlib', palette: list[hex] = None):
    df_tmp = pd.DataFrame(loss)
    df_tmp['Epoch'] = df_tmp.index + 1

    if plot_engine == 'matplotlib':

        df_melted = df_tmp.melt(id_vars=['Epoch'], var_name='Model', value_name='Loss')

        # sns.set(style="whitegrid")

        plt.figure(figsize=(10, 6))
        sns.lineplot(data=df_melted, x='Epoch', y='Loss', hue='Model', palette=palette)

        plt.title('Loss Convergence Across Models')
        plt.xlabel('Epoch')
        plt.ylabel('Loss Value')
        plt.legend(title='Models')
        plt.grid(True, axis='both', linestyle='--', color='gray', linewidth=0.5)

        # sns.reset_defaults()

        plt.show()

    elif plot_engine == 'plotly':
        fig = go.Figure()

        for model_name in loss:
            fig.add_trace(go.Scatter(x=df_tmp['Epoch'], y=df_tmp[model_name], mode='lines',
                                    name=model_name, marker=dict(size=8), line=dict(width=2)))

        fig.update_layout(
            title='Loss Convergence Across Models',
            xaxis_title='Epoch',
            yaxis_title='Loss Value',
            legend_title='Models',
            template='plotly_white',
            font=dict(family="Arial", size=14, color="black"),
            plot_bgcolor='#f0f0f0',
            xaxis=dict(showgrid=True, gridwidth=1, gridcolor='lightgray'),
            yaxis=dict(showgrid=True, gridwidth=1, gridcolor='lightgray')
        )

        fig.show()

    pass

---

### 0.1. Подготовка данных из 4 задания

In [None]:
y_train.year.value_counts()[:5]

In [None]:
subsamp_mask = y_train.year.isin([2006, 2007])
y_train_subsamp = y_train[subsamp_mask]
X_train_subsamp = X_train[subsamp_mask]
X_train_subsamp.shape, y_train_subsamp.shape

In [None]:
X_train_subsamp_train, X_train_subsamp_test, y_train_subsamp_train, y_train_subsamp_test = (
    train_test_split(X_train_subsamp, y_train_subsamp, test_size=0.25, shuffle=True, random_state=SEED)
)
X_train_subsamp_train.shape, X_train_subsamp_test.shape

In [None]:
std_scaler = StandardScaler()
X_train_subsamp_train = std_scaler.fit_transform(X_train_subsamp_train)
X_train_subsamp_test  = std_scaler.transform(X_train_subsamp_test)
X_train_subsamp_train[0, :5]

In [None]:
label_encoder = LabelEncoder()
y_train_subsamp_train = label_encoder.fit_transform(y_train_subsamp_train.values.ravel())
y_train_subsamp_test = label_encoder.transform(y_train_subsamp_test.values.ravel())
y_train_subsamp_train[:5]

---

### 0.2. Генерация другой задачи классификации

In [None]:
X, y = make_classification(n_samples=10000, 
                           n_features=2,
                           n_informative=2, 
                           n_redundant=0, 
                           n_classes=2,
                           flip_y=0.1,
                           class_sep=2,
                           n_clusters_per_class=1,
                           random_state=SEED, 
                           shuffle=True)
X.shape, y.shape

In [None]:
plot_decision_bounds(X, y)

---

---

### 1.1. Numpy class

[BCE == NLL для бинарной классификации](https://en.wikipedia.org/wiki/Loss_functions_for_classification#Logistic_loss)

In [17]:
class LogRegNumpy():

    def __init__(self, 
                 initial_weights: list[list[float]] = [], 
                 initial_bias: list[float] = [],
                 tolerance: float = 1e-4,
                 early_stop: bool = False,
                 n_startup_rounds: int = 50,
                 early_stop_rounds: int = 50,
                 random_seed: int = SEED):
        
        self.weights = np.array(initial_weights).ravel() if not isinstance(initial_weights, np.ndarray) else initial_weights
        self.bias = np.array(initial_bias).ravel() if not isinstance(initial_bias, np.ndarray) else initial_bias
        self.tolerance = tolerance
        self.early_stop = early_stop
        self.n_startup_rounds = n_startup_rounds
        self.early_stop_rounds = early_stop_rounds
        self.random_seed = random_seed
        self.eps = 1e-9

    def fit(self, 
            features: list[list[float]], 
            labels: list[int], 
            learning_rate: float = 1e-3, 
            epochs: int = 100,
            return_weights_history: bool = False) -> None | list[list[float]]:

        X = np.array(features).squeeze() if not isinstance(features, np.ndarray) else deepcopy(features)
        X = self.add_ones(X)
        y = np.array(labels).squeeze() if not isinstance(labels, np.ndarray) else deepcopy(labels)

        self.init_weights(X)
        
        w = np.hstack([self.bias, self.weights])

        loss_values = []
        if return_weights_history:
            weights_values = []
            weights_values.append(w.copy())
            
        no_improvement_counter = 0

        for epoch in range(epochs):
            y_pred = self.forward(X, w)
            loss = self.loss_fn(y, y_pred)

            if self.early_stop:
                if epoch > self.n_startup_rounds and 0 < (loss_values[-1] - loss) < self.tolerance:
                    no_improvement_counter += 1
                    if no_improvement_counter >= self.early_stop_rounds:
                        print(f"Early stopping at epoch {epoch}")
                        break
                else:
                    no_improvement_counter = 0

            loss_values.append(loss)
            if return_weights_history:
                weights_values.append(w.copy())
            
            grad = self.gradient(X, y, y_pred)
            w -= learning_rate * grad

        self.weights = w[1:]
        self.bias = w[0]
        self.loss_values = loss_values

        if return_weights_history:
            return np.array(weights_values)


    def predict(self, features: list[list[float]]):
        X = np.array(features).squeeze() if not isinstance(features, np.ndarray) else deepcopy(features)
        X = self.add_ones(X)

        w = np.hstack([self.bias, self.weights])

        probs = self.forward(X, w)

        return np.where(probs >= 0.5, 1, 0)
    

    def add_ones(self, x):
        return np.hstack([np.ones((x.shape[0], 1)), x])
    
    def init_weights(self, x):
        rng_ = np.random.default_rng(seed=self.random_seed)
        if self.weights.size == 0:
            self.weights = rng_.standard_normal(x.shape[1] - 1, dtype=np.float32)
        if self.bias.size == 0:
            self.bias = rng_.standard_normal(1, dtype=np.float32)
    
    def sigmoid(self, x):
        if x >= 0:
            return 1 / (1 + np.exp(-x))
        else:
            return np.exp(x) / (1 + np.exp(x))
    
    def forward(self, x, w):
        logits = np.dot(x, w)
        return np.array([self.sigmoid(value) for value in logits]) # probabilities
    
    def loss_fn(self, y_true, y_pred):
        y_pos_loss = y_true * np.log(y_pred + self.eps)
        y_neg_loss = (1 - y_true) * np.log(1 - y_pred + self.eps)
        return -np.mean(y_pos_loss + y_neg_loss)
    
    def gradient(self, x, y_true, y_pred):
        return np.dot((y_pred - y_true), x) / x.shape[0]

#### Пример 1

In [None]:
features = [[1.0, 2.0], [2.0, 1.0], [-1.0, -2.0]]
labels = [1, 0, 0]
initial_weights = [0.1, -0.2]
initial_bias = 0.0
learning_rate = 0.1
epochs = 2

log_reg_numpy = LogRegNumpy(initial_weights=initial_weights, initial_bias=initial_bias)
log_reg_numpy.fit(features=features,
                  labels=labels,
                  learning_rate=learning_rate,
                  epochs=epochs)

log_reg_numpy.weights, log_reg_numpy.bias

#### Пример 2

In [None]:
log_reg_numpy = LogRegNumpy()

log_reg_numpy.fit(features = X_train_subsamp_train, 
                  labels = y_train_subsamp_train, 
                  learning_rate = 1e-3, 
                  epochs = 5)

log_reg_numpy.weights, log_reg_numpy.bias

In [None]:
accuracy_score(y_train_subsamp_train, log_reg_numpy.predict(X_train_subsamp_train))

#### Пример 3

In [None]:
log_reg_numpy = LogRegNumpy(random_seed=2147483647, n_startup_rounds=100, tolerance=1e-6)

log_reg_numpy.fit(features = X, 
                  labels = y, 
                  learning_rate = 1e-3, 
                  epochs = 10000)

log_reg_numpy.weights, log_reg_numpy.bias

In [None]:
weights = {'NLL': np.hstack((log_reg_numpy.weights, np.array([log_reg_numpy.bias])))}

plot_decision_bounds(X, y, weights)
accuracy_score(y, log_reg_numpy.predict(X))

---

### 1.2. Numpy function

In [23]:
def sigmoid_single_input(x: float):
    if x >= 0:
        return 1 / (1 + np.exp(-x))
    else:
        return np.exp(x) / (1 + np.exp(x))
    
def sigmoid(x: list[float]):
    return np.array([sigmoid_single_input(value) for value in x])

def nll_loss(probs, labels):
    y_pos_loss = labels * np.log(probs + eps)
    y_neg_loss = (1 - labels) * np.log(1 - probs + eps)
    return -np.mean(y_pos_loss + y_neg_loss)

# def nll_loss(probs, labels):
#     return -(np.dot(labels, np.log(probs)) + np.dot(1 - labels, np.log(1 - probs))) / probs.shape[0]

In [24]:
def single_neuron(features: list[list[float]], 
                  labels: list[int], 
                  initial_weights: list[float],
                  initial_bias: list[float],
                  learning_rate: float,
                  epochs: int,
                  scorer: callable = None) -> Sequence[list[float], list[float], list[float]]:
    features_np = np.array(features, dtype=np.float32)
    labels_np = np.array(labels, dtype=np.int64)
    weights = np.array(initial_weights, dtype=np.float32)
    bias = np.array(initial_bias, dtype=np.float32)

    loss_values = []

    for epoch in range(epochs):
        logits = np.dot(features_np, weights) + bias
        probs = sigmoid(logits)
        loss = nll_loss(probs, labels_np)

        loss_values.append(loss)
        
        # GD
        diff = np.subtract(probs, labels_np)
        dw = np.dot(diff, features_np) / features_np.shape[0]
        db = diff.mean()
        weights -= learning_rate * dw
        bias -= learning_rate * db

    if scorer is not None:
        preds = [1 if p >= 0.5 else 0 for p in probs]
        score = scorer(labels_np, preds)
        print(f'{scorer.__name__}: {score:.4f}')

    return np.round(weights, 4), np.round(bias, 4), np.round(loss_values, 4)

#### Пример 1

In [None]:
single_neuron(features = [[1.0, 2.0], [2.0, 1.0], [-1.0, -2.0]], 
              labels = [1, 0, 0], 
              initial_weights = [0.1, -0.2], 
              initial_bias = 0.0, 
              learning_rate = 0.1, 
              epochs = 2,
              scorer = accuracy_score)

#### Пример 2

In [None]:
init_weights = rng.standard_normal(X_train_subsamp_train.shape[1] + 1, dtype=np.float32)

single_neuron(features = X_train_subsamp_train, 
              labels = y_train_subsamp_train, 
              initial_weights = init_weights[:-1], 
              initial_bias = init_weights[-1], 
              learning_rate = 1e-3, 
              epochs = 5,
              scorer = accuracy_score)

---

---

### 2.1. Pytorch class manual (single gradient formula)

In [27]:
class LogRegTorch():

    def __init__(self, 
                 initial_weights: list[list[float]] = [], 
                 initial_bias: list[float] = [],
                 gen_machine: str = 'torch',
                 tolerance: float = 1e-4,
                 n_startup_rounds: int = 50,
                 early_stop_rounds: int = 50,
                 random_seed: int = SEED):
        
        self.weights = (torch.tensor(initial_weights, dtype=torch.float32).ravel() 
                        if not isinstance(initial_weights, torch.FloatTensor) else initial_weights)
        self.bias = (torch.tensor(initial_bias, dtype=torch.float32).ravel() 
                     if not isinstance(initial_bias, torch.FloatTensor) else initial_bias)
        self.eps = 1e-9
        self.gen_machine = gen_machine
        self.tolerance = tolerance
        self.n_startup_rounds = n_startup_rounds
        self.early_stop_rounds = early_stop_rounds
        self.random_seed = random_seed


    def fit(self, 
            features: list[list[float]], 
            labels: list[int], 
            learning_rate: float = 1e-3, 
            epochs: int = 100):

        X = (torch.tensor(features, dtype=torch.float32).squeeze() 
             if not isinstance(features, torch.FloatTensor) else deepcopy(features))
        X = self.add_ones(X)
        y = (torch.tensor(labels, dtype=torch.float32).squeeze() 
             if not isinstance(labels, torch.FloatTensor) else deepcopy(labels))

        self.init_weights(X)
        
        w = torch.hstack([self.bias, self.weights])

        loss_values = []
        no_improvement_counter = 0
        
        for epoch in range(epochs):
            if (epoch+1) % 100 == 0:
                print(f'Epoch {epoch + 1}')
            y_pred = self.forward(X, w)
            loss = self.loss_fn(y, y_pred)

            # Early stopping
            if epoch > self.n_startup_rounds and 0 < (loss_values[-1] - loss) < self.tolerance:
                no_improvement_counter += 1
                if no_improvement_counter >= self.early_stop_rounds:
                    print(f"Early stopping at epoch {epoch}")
                    break
            else:
                no_improvement_counter = 0

            loss_values.append(loss)
            
            grad = self.gradient(X, y, y_pred)
            w -= learning_rate * grad

        self.weights = w[1:]
        self.bias = w[0]
        self.loss_values = loss_values


    def predict(self, features: list[list[float]]):
        X = (torch.tensor(features, dtype=torch.float32).squeeze() 
             if not isinstance(features, torch.FloatTensor) else deepcopy(features))
        X = self.add_ones(X)

        w = torch.hstack([self.bias, self.weights])

        probs = self.forward(X, w)

        return torch.where(probs >= 0.5, 1, 0)
    

    def add_ones(self, x):
        return torch.hstack([torch.ones(x.shape[0], 1), x])
    
    def init_weights(self, X):
        if self.gen_machine == 'torch':

            generator = torch.Generator()
            generator.manual_seed(self.random_seed)
            if self.weights.nelement() == 0:
                self.weights = torch.normal(0, 1, size=(X.shape[1] - 1, ), generator=generator, dtype=torch.float32)
            if self.bias.nelement() == 0:
                self.bias = torch.normal(0, 1, size=(1,), generator=generator, dtype=torch.float32)

        elif self.gen_machine == 'numpy':

            rng_ = np.random.default_rng(seed=self.random_seed)
            if self.weights.nelement() == 0:
                self.weights = torch.tensor(rng_.standard_normal(X.shape[1] - 1, dtype=np.float32), dtype=torch.float32)
            if self.bias.nelement() == 0:
                self.bias = torch.tensor(rng_.standard_normal(1, dtype=np.float32), dtype=torch.float32)
    
    def sigmoid(self, x):
        if x >= 0:
            return 1 / (1 + torch.exp(-x))
        else:
            return torch.exp(x) / (1 + torch.exp(x))
    
    def forward(self, x, w):
        logits = torch.matmul(x, w)
        return torch.tensor([self.sigmoid(value) for value in logits], dtype=torch.float32) # probabilities
    
    def loss_fn(self, y_true, y_pred):
        y_pos_loss = y_true * torch.log(y_pred + self.eps)
        y_neg_loss = (1 - y_true) * torch.log(1 - y_pred + self.eps)
        return -torch.mean(y_pos_loss + y_neg_loss)
    
    def gradient(self, x, y_true, y_pred):
        return torch.matmul((y_pred - y_true), x) / x.shape[0]

#### Пример 1

In [None]:
features = [[1.0, 2.0], [2.0, 1.0], [-1.0, -2.0]]
labels = [1, 0, 0]
initial_weights = [0.1, -0.2]
initial_bias = 0.0
learning_rate = 0.1
epochs = 2

log_reg_torch = LogRegTorch(initial_weights=initial_weights, initial_bias=initial_bias)
log_reg_torch.fit(features=features,
                  labels=labels,
                  learning_rate=learning_rate,
                  epochs=epochs)

log_reg_torch.weights, log_reg_torch.bias

#### Пример 2

In [None]:
log_reg_torch = LogRegTorch(gen_machine='numpy')

log_reg_torch.fit(features = X_train_subsamp_train, 
                  labels = y_train_subsamp_train, 
                  learning_rate = 1e-3, 
                  epochs = 5)

log_reg_torch.weights, log_reg_torch.bias, log_reg_torch.loss_values

In [None]:
accuracy_score(y_train_subsamp_train, log_reg_torch.predict(X_train_subsamp_train))

#### Пример 3

In [None]:
log_reg_torch = LogRegTorch(gen_machine='numpy', random_seed=2147483647, n_startup_rounds=100, tolerance=1e-6)

log_reg_torch.fit(features = X, 
                  labels = y, 
                  learning_rate = 1e-3, 
                  epochs = 200)

log_reg_torch.weights, log_reg_torch.bias

In [None]:
weights = {'NLL': np.hstack((log_reg_torch.weights, np.array([log_reg_torch.bias])))}

plot_decision_bounds(X, y, weights)
accuracy_score(y, log_reg_torch.predict(X).numpy())

---

### 2.2. Pytorch function (single gradient formula)

In [33]:
def sigmoid_single_input_pt(x: float):
    if x >= 0:
        return 1 / (1 + torch.exp(-x))
    else:
        return torch.exp(x) / (1 + torch.exp(x))
    
def sigmoid_pt(x: list[float]):
    return torch.tensor([sigmoid_single_input_pt(value) for value in x])

def nll_loss_pt(probs, labels):
    y_pos_loss = labels * torch.log(probs + eps)
    y_neg_loss = (1 - labels) * torch.log(1 - probs + eps)
    return -torch.mean(y_pos_loss + y_neg_loss)

# def nll_loss_pt(probs, labels):
#     return -(torch.dot(labels, torch.log(probs + eps)) + torch.dot(1 - labels, torch.log(1 - probs + eps))) / probs.shape[0]

In [34]:
def single_neuron_pt(features: list[list[float]], 
                     labels: list[int], 
                     initial_weights: list[float],
                     initial_bias: list[float],
                     learning_rate: float,
                     epochs: int,
                     scorer: callable = None) -> Sequence[list[float], list[float], list[float]]:
    # requires_grad = False по дефолту
    features_pt = torch.tensor(features, dtype=torch.float32)
    labels_pt = torch.tensor(labels, dtype=torch.float32)
    if not torch.is_tensor(initial_weights):
        weights = torch.tensor(initial_weights, dtype=torch.float32)
        bias = torch.tensor(initial_bias, dtype=torch.float32)
    else:
        weights = initial_weights.clone().detach()
        bias = initial_bias.clone().detach()

    loss_values = []

    for epoch in range(epochs):
        logits = torch.matmul(features_pt, weights) + bias
        probs = sigmoid_pt(logits)
        loss = nll_loss_pt(probs, labels_pt)

        loss_values.append(torch.round(loss, decimals=4))
        
        # GD
        diff = probs - labels_pt
        dw = torch.matmul(diff, features_pt) / features_pt.shape[0]
        db = diff.mean()
        weights -= learning_rate * dw
        bias -= learning_rate * db

    if scorer is not None:
        preds = [1 if p >= 0.5 else 0 for p in probs]
        score = scorer(labels_pt.numpy(), preds)
        print(f'{scorer.__name__}: {score:.4f}')

    return torch.round(weights, decimals=4), torch.round(bias, decimals=4), torch.tensor(loss_values)

#### Пример 1

In [None]:
single_neuron_pt(features = [[1.0, 2.0], [2.0, 1.0], [-1.0, -2.0]], 
                labels = [1, 0, 0], 
                initial_weights = [0.1, -0.2], 
                initial_bias = 0.0, 
                learning_rate = 0.1, 
                epochs = 2,
                scorer=accuracy_score)

#### Пример 2

In [None]:
# init_weights_tensor = torch.empty(X_train_subsamp_train.shape[1] + 1)
# nn.init.uniform_(init_weights_tensor, -5, 5)
init_weights_tensor = torch.tensor(init_weights, dtype=torch.float32)

single_neuron_pt(features = X_train_subsamp_train, 
                labels = y_train_subsamp_train, 
                initial_weights = init_weights_tensor[:-1], 
                initial_bias = init_weights_tensor[-1], 
                learning_rate = 1e-3, 
                epochs = 5,
                scorer = accuracy_score)

---

### 3.1. Numpy class (chain rule)

In [37]:
class LogRegNumpyChainRule(LogRegNumpy):

    def __init__(self, 
                 initial_weights: list[list[float]] = [], 
                 initial_bias: list[float] = [],
                 tolerance: float = 1e-4,
                 n_startup_rounds: int = 50,
                 early_stop_rounds: int = 50,
                 random_seed: int = SEED):
        
        super(LogRegNumpyChainRule, self).__init__(initial_weights, initial_bias, tolerance, 
                                                   n_startup_rounds, early_stop_rounds, random_seed)
    
    def gradient(self, x, y_true, y_pred):
        # dL_dp = -(labels_np / (probs + eps)) + ((1 - labels_np) / (1 - probs + eps))
        dL_dp = (y_pred - y_true) / (y_pred  * (1 - y_pred) + self.eps)
        dp_dz = y_pred * (1 - y_pred)
        dL_dz = dL_dp * dp_dz
        dL_dw = np.dot(dL_dz, x) / x.shape[0]
        return dL_dw

#### Пример 1

In [None]:
features = [[1.0, 2.0], [2.0, 1.0], [-1.0, -2.0]]
labels = [1, 0, 0]
initial_weights = [0.1, -0.2]
initial_bias = 0.0
learning_rate = 0.1
epochs = 2

log_reg_numpy_chain_rule = LogRegNumpyChainRule(initial_weights=initial_weights, initial_bias=initial_bias)
log_reg_numpy_chain_rule.fit(features=features,
                            labels=labels,
                            learning_rate=learning_rate,
                            epochs=epochs)

log_reg_numpy_chain_rule.weights, log_reg_numpy_chain_rule.bias

#### Пример 2

In [None]:
log_reg_numpy_chain_rule = LogRegNumpyChainRule()

log_reg_numpy_chain_rule.fit(features = X_train_subsamp_train, 
                            labels = y_train_subsamp_train, 
                            learning_rate = 1e-3, 
                            epochs = 5)

log_reg_numpy_chain_rule.weights, log_reg_numpy_chain_rule.bias

In [None]:
accuracy_score(y_train_subsamp_train, log_reg_numpy_chain_rule.predict(X_train_subsamp_train))

#### Пример 3

In [None]:
log_reg_numpy_chain_rule = LogRegNumpyChainRule(random_seed=2147483647, n_startup_rounds=100, tolerance=1e-6)

log_reg_numpy_chain_rule.fit(features = X, 
                             labels = y, 
                             learning_rate = 1e-3, 
                             epochs = 10000)

log_reg_numpy_chain_rule.weights, log_reg_numpy_chain_rule.bias

In [None]:
weights = {'NLL': np.hstack((log_reg_numpy_chain_rule.weights, np.array([log_reg_numpy_chain_rule.bias])))}

plot_decision_bounds(X, y, weights)
accuracy_score(y, log_reg_numpy_chain_rule.predict(X))

---

### 3.2. Numpy function (chain rule)

In [43]:
def single_neuron_sequential_grad(features: list[list[float]], 
                                  labels: list[int], 
                                  initial_weights: list[float],
                                  initial_bias: list[float],
                                  learning_rate: float,
                                  epochs: int,
                                  scorer: callable = None) -> Sequence[list[float], list[float], list[float]]:
    features_np = np.array(features, dtype=np.float32)
    labels_np = np.array(labels, dtype=np.int64)
    weights = np.array(initial_weights, dtype=np.float32)
    bias = np.array(initial_bias, dtype=np.float32)

    loss_values = []

    for epoch in range(epochs):
        logits = np.dot(features_np, weights) + bias
        probs = sigmoid(logits)
        loss = nll_loss(probs, labels_np)

        loss_values.append(loss)
        
        # GD
        # dL_dp = -(labels_np / (probs + eps)) + ((1 - labels_np) / (1 - probs + eps))
        dL_dp = (probs - labels_np) / (probs  * (1 - probs) + eps)
        dp_dz = probs * (1 - probs)
        dL_dz = dL_dp * dp_dz
        dL_dw = np.dot(dL_dz, features_np) / features_np.shape[0]
        dL_db = np.mean(dL_dz)

        weights -= learning_rate * dL_dw
        bias -= learning_rate * dL_db

    if scorer is not None:
        preds = np.where(probs >= 0.5, 1, 0)
        score = scorer(labels, preds)
        print(f'{scorer.__name__}: {score:.4f}')

    return np.round(weights, 4), np.round(bias, 4), np.round(loss_values, 4)

#### Пример 1

In [None]:
single_neuron_sequential_grad(features = [[1.0, 2.0], [2.0, 1.0], [-1.0, -2.0]], 
                              labels = [1, 0, 0], 
                              initial_weights = [0.1, -0.2], 
                              initial_bias = 0.0, 
                              learning_rate = 0.1, 
                              epochs = 2,
                              scorer = accuracy_score)

#### Пример 2

In [None]:
single_neuron_sequential_grad(features = X_train_subsamp_train, 
              labels = y_train_subsamp_train, 
              initial_weights = init_weights[:-1], 
              initial_bias = init_weights[-1], 
              learning_rate = 1e-3, 
              epochs = 5,
              scorer = accuracy_score)

---

### 4.1. Pytorch class (autograd exploitation)

In [46]:
class LogRegTorchAutograd():

    def __init__(self, 
                 initial_weights: list[list[float]] = [], 
                 initial_bias: list[float] = [],
                 gen_machine: str = 'torch',
                 tolerance: float = 1e-4,
                 n_startup_rounds: int = 50,
                 early_stop_rounds: int = 50,
                 random_seed: int = SEED):
        
        self.weights = (torch.tensor(initial_weights, dtype=torch.float32).ravel() 
                        if not isinstance(initial_weights, torch.FloatTensor) else initial_weights)
        self.bias = (torch.tensor(initial_bias, dtype=torch.float32).ravel() 
                     if not isinstance(initial_bias, torch.FloatTensor) else initial_bias)
        self.gen_machine = gen_machine
        self.tolerance = tolerance
        self.n_startup_rounds = n_startup_rounds
        self.early_stop_rounds = early_stop_rounds
        self.random_seed = random_seed


    def fit(self, 
            features: list[list[float]], 
            labels: list[int], 
            learning_rate: float = 1e-3, 
            epochs: int = 100):

        X = (torch.tensor(features, dtype=torch.float32).squeeze() 
             if not isinstance(features, torch.FloatTensor) else deepcopy(features))
        y = (torch.tensor(labels, dtype=torch.float32).squeeze() 
             if not isinstance(labels, torch.FloatTensor) else deepcopy(labels))

        self.init_weights(X)
        
        self.linear = nn.Linear(X.shape[1], 1)
        # Можно вместо такого костыля воспользоваться nn.Parameter
        self.linear.weight.data = self.weights.view(1, -1)
        self.linear.bias.data = self.bias.view(1)
        loss_function = nn.BCEWithLogitsLoss(reduction='mean')

        loss_values = []
        
        for epoch in range(epochs):
            y_pred = self.linear(X).squeeze()
            loss = loss_function(y_pred, y)

            no_improvement_counter = 0

            # Early stopping
            if epoch > self.n_startup_rounds and 0 < (loss_values[-1] - loss) < self.tolerance:
                no_improvement_counter += 1
                if no_improvement_counter >= self.early_stop_rounds:
                    print(f"Early stopping at epoch {epoch}")
                    break
            else:
                no_improvement_counter = 0

            loss_values.append(loss.item())

            loss.backward()
            with torch.no_grad():
                self.linear.weight -= learning_rate * self.linear.weight.grad
                self.linear.bias -= learning_rate * self.linear.bias.grad

            # Интересно, что я изначально забыл перенести в класс зануление градиентов.
            # При этом модель с накопленными градиентами смогла сойтись в примере 3.
            # А с занулением она никак не может этого сделать.
            self.linear.weight.grad.zero_()
            self.linear.bias.grad.zero_()

        self.weights = self.linear.weight.data.squeeze()
        self.bias = self.linear.bias.data.squeeze()
        self.loss_values = loss_values


    def predict(self, features: list[list[float]]):
        X = (torch.tensor(features, dtype=torch.float32).squeeze() 
             if not isinstance(features, torch.FloatTensor) else deepcopy(features))

        probs = self.linear(X).squeeze()

        return torch.where(probs >= 0.5, 1, 0)
    
    
    def init_weights(self, X):
        if self.gen_machine == 'torch':

            generator = torch.Generator()
            generator.manual_seed(self.random_seed)
            if self.weights.nelement() == 0:
                self.weights = torch.normal(0, 1, size=(X.shape[1], ), generator=generator, dtype=torch.float32)
            if self.bias.nelement() == 0:
                self.bias = torch.normal(0, 1, size=(1,), generator=generator, dtype=torch.float32)

        elif self.gen_machine == 'numpy':

            rng_ = np.random.default_rng(seed=self.random_seed)
            if self.weights.nelement() == 0:
                self.weights = torch.tensor(rng_.standard_normal(X.shape[1], dtype=np.float32), dtype=torch.float32)
            if self.bias.nelement() == 0:
                self.bias = torch.tensor(rng_.standard_normal(1, dtype=np.float32), dtype=torch.float32)
    

#### Пример 1

In [None]:
features = [[1.0, 2.0], [2.0, 1.0], [-1.0, -2.0]]
labels = [1, 0, 0]
initial_weights = [0.1, -0.2]
initial_bias = 0.0
learning_rate = 0.1
epochs = 2

log_reg_torch_autograd = LogRegTorchAutograd(initial_weights=initial_weights, initial_bias=initial_bias)
log_reg_torch_autograd.fit(features=features,
                          labels=labels,
                          learning_rate=learning_rate,
                          epochs=epochs)

log_reg_torch_autograd.weights, log_reg_torch_autograd.bias

#### Пример 2

In [None]:
log_reg_torch_autograd = LogRegTorchAutograd(gen_machine='numpy')

log_reg_torch_autograd.fit(features = X_train_subsamp_train, 
                           labels = y_train_subsamp_train, 
                           learning_rate = 1e-3, 
                           epochs = 5)

log_reg_torch_autograd.weights, log_reg_torch_autograd.bias, log_reg_torch_autograd.loss_values

In [None]:
accuracy_score(y_train_subsamp_train, log_reg_torch_autograd.predict(X_train_subsamp_train))

#### Пример 3

In [None]:
log_reg_torch_autograd = LogRegTorchAutograd(random_seed=2147483647, n_startup_rounds=100, tolerance=1e-6)

log_reg_torch_autograd.fit(features = X, 
                           labels = y, 
                           learning_rate = 1e-3, 
                           epochs = 20000)

log_reg_torch_autograd.weights, log_reg_torch_autograd.bias

In [None]:
weights = {'NLL': np.hstack((log_reg_torch_autograd.weights, np.array([log_reg_torch_autograd.bias])))}

plot_decision_bounds(X, y, weights)
accuracy_score(y, log_reg_torch_autograd.predict(X))

---

### 4.2. Pytorch class (autograd exploitation 2)

In [52]:
class LogRegTorchAutograd2(LogRegTorchAutograd):

    def __init__(self, 
                 initial_weights: list[list[float]] = [], 
                 initial_bias: list[float] = [],
                 gen_machine: str = 'torch',
                 tolerance: float = 1e-4,
                 n_startup_rounds: int = 50,
                 early_stop_rounds: int = 50,
                 random_seed: int = SEED):
        
        super(LogRegTorchAutograd2, self).__init__(initial_weights, initial_bias, gen_machine, tolerance,
                                                   n_startup_rounds, early_stop_rounds, random_seed)


    def fit(self, 
            features: list[list[float]], 
            labels: list[int], 
            learning_rate: float = 1e-3, 
            epochs: int = 100):

        X = (torch.tensor(features, dtype=torch.float32).squeeze() 
             if not isinstance(features, torch.FloatTensor) else deepcopy(features))
        y = (torch.tensor(labels, dtype=torch.float32).squeeze() 
             if not isinstance(labels, torch.FloatTensor) else deepcopy(labels))

        self.init_weights(X)
        
        self.linear = nn.Linear(X.shape[1], 1)
        self.linear.weight.data = self.weights.view(1, -1)
        self.linear.bias.data = self.bias.view(1)
        loss_function = nn.BCEWithLogitsLoss(reduction='mean')

        loss_values = []
        
        for epoch in range(epochs):
            y_pred = self.linear(X).squeeze()
            loss = loss_function(y_pred, y)

            no_improvement_counter = 0

            # Early stopping
            if epoch > self.n_startup_rounds and 0 < (loss_values[-1] - loss) < self.tolerance:
                no_improvement_counter += 1
                if no_improvement_counter >= self.early_stop_rounds:
                    print(f"Early stopping at epoch {epoch}")
                    break
            else:
                no_improvement_counter = 0

            loss_values.append(loss.item())

            # changes here
            weights_grad = torch.autograd.grad(loss, self.linear.weight, retain_graph=True)[0]
            bias_grad = torch.autograd.grad(loss, self.linear.bias)[0]
            with torch.no_grad():
                self.linear.weight -= learning_rate * weights_grad
                self.linear.bias -= learning_rate * bias_grad

        self.weights = self.linear.weight.data.squeeze()
        self.bias = self.linear.bias.data.squeeze()
        self.loss_values = loss_values

#### Пример 3

In [None]:
log_reg_torch_autograd_2 = LogRegTorchAutograd2(random_seed=2147483647, n_startup_rounds=100, tolerance=1e-6)

log_reg_torch_autograd_2.fit(features = X, 
                             labels = y, 
                             learning_rate = 1e-4, 
                             epochs = 20000)

log_reg_torch_autograd_2.weights, log_reg_torch_autograd_2.bias

In [None]:
weights = {'NLL': np.hstack((log_reg_torch_autograd_2.weights, np.array([log_reg_torch_autograd_2.bias])))}

plot_decision_bounds(X, y, weights)
accuracy_score(y, log_reg_torch_autograd_2.predict(X))

---

### 4.3. Pytorch function (autograd exploitation)

In [55]:
def single_neuron_autograd(features: list[list[float]], 
                            labels: list[int], 
                            initial_weights: list[float],
                            initial_bias: list[float],
                            learning_rate: float,
                            epochs: int,
                            scorer: callable = None) -> Sequence[list[float], list[float], list[float]]:
    features_tensor = torch.tensor(features, dtype=torch.float32)
    labels_tensor = torch.tensor(labels, dtype=torch.float32)
    if not torch.is_tensor(initial_weights):
        weights = torch.tensor(initial_weights, dtype=torch.float32)
        bias = torch.tensor(initial_bias, dtype=torch.float32)
    else:
        weights = initial_weights.clone().detach()
        bias = initial_bias.clone().detach()

    linear = nn.Linear(features_tensor.shape[1], 1)
    linear.weight.data = weights.view(1, -1)
    linear.bias.data = bias.view(1)
    loss_fn = nn.BCEWithLogitsLoss(reduction='mean')

    loss_values = []
    for epoch in range(epochs):
        logits = linear(features_tensor).squeeze(1)
        loss = loss_fn(logits, labels_tensor)
        loss_values.append(loss.item())

        # GD
        loss.backward()
        with torch.no_grad():
            linear.weight -= learning_rate * linear.weight.grad
            linear.bias -= learning_rate * linear.bias.grad

        linear.weight.grad.zero_()
        linear.bias.grad.zero_()

    if scorer is not None:
        probs = torch.sigmoid(logits)
        preds = torch.where(probs >= 0.5, 1, 0).numpy()
        print(f'{scorer.__name__}: {scorer(labels, preds)}')

    return (np.round(linear.weight.data.squeeze(0).tolist(), 4), 
            np.round(linear.bias.data.tolist(), 4), 
            np.round(loss_values, 4))

#### Пример 1

In [None]:
single_neuron_autograd(features = [[1.0, 2.0], [2.0, 1.0], [-1.0, -2.0]], 
                        labels = [1, 0, 0], 
                        initial_weights = [0.1, -0.2], 
                        initial_bias = 0.0, 
                        learning_rate = 0.1, 
                        epochs = 2,
                        scorer = accuracy_score)

#### Пример 2

In [None]:
single_neuron_autograd(features = X_train_subsamp_train, 
                       labels = y_train_subsamp_train, 
                       initial_weights = init_weights_tensor[:-1], 
                       initial_bias = init_weights_tensor[-1], 
                       learning_rate = 1e-3, 
                       epochs = 5,
                       scorer = accuracy_score)

---

### 5. Pytorch NN

In [58]:
class LogisticRegressionNet(nn.Module):
    def __init__(self, input_dim, output_dim, init_weights = None, init_bias = None):
        super(LogisticRegressionNet, self).__init__()
        self.linear = nn.Linear(input_dim, output_dim)
        if init_weights is not None:
            self.linear.weight.data = init_weights.reshape(output_dim, input_dim)
        if init_bias is not None:
            self.linear.bias.data = init_bias.reshape(output_dim)

    def forward(self, x):
        out = self.linear(x)
        out = F.sigmoid(out) 
        # действительно, выглядит так, что использование F.sigmoid + BCELoss является
        # менее вычислительно устойчивым/стабильным подходом, чем использование
        # BCEWithLogitsLoss. Использование второго подхода дает схожие со всеми 
        # остальными методами значения потерь, тогда как явное использование сигмоиды
        # приводит к завышенным значениям функции потерь
        return out

#### Пример 1

In [59]:
features = torch.tensor([[1.0, 2.0], [2.0, 1.0], [-1.0, -2.0]])
labels = torch.tensor([1, 0, 0])
initial_weights = torch.tensor([0.1, -0.2])
initial_bias = torch.tensor([0.0])
learning_rate = 0.1
epochs = 2

logistic_regression_net = LogisticRegressionNet(features.shape[1], 1, initial_weights, initial_bias)
loss_fn = nn.BCELoss()
optimizer = torch.optim.SGD(logistic_regression_net.parameters(), lr=learning_rate, momentum=0.0)

In [60]:
def log_reg_net_train(model, X, y, loss_fn, optimizer):

    loss_values = []

    for epoch in range(epochs):
        optimizer.zero_grad()

        outputs = model(X)
        loss = loss_fn(outputs.squeeze(), y.float())

        loss_values.append(loss.item())

        loss.backward()
        optimizer.step()

    return (model.state_dict()['linear.weight'].squeeze(), 
            model.state_dict()['linear.bias'].squeeze(), 
            np.round(loss_values, 4))

In [None]:
log_reg_net_train(logistic_regression_net, features, labels, loss_fn, optimizer)

#### Пример 2

In [62]:
features = torch.tensor(X_train_subsamp_train, dtype=torch.float32)
labels = torch.tensor(y_train_subsamp_train, dtype=torch.float32)
initial_weights = init_weights_tensor[:-1]
initial_bias = init_weights_tensor[-1]
learning_rate = 1e-3
epochs = 5

logistic_regression_net = LogisticRegressionNet(features.shape[1], 1, initial_weights, initial_bias)
loss_fn = nn.BCELoss()
optimizer = torch.optim.SGD(logistic_regression_net.parameters(), lr=learning_rate, momentum=0.0)

In [None]:
log_reg_net_train(logistic_regression_net, features, labels, loss_fn, optimizer)

1) sigmoid + BCE: [-1.0791,  1.2970, -0.4337,  0.7704, -1.3520], Loss:  array([5.7214, 5.7211, 5.7208, 5.7204, 5.7202])

2) BCEWithLogitsLoss: [-1.0776,  1.2973, -0.4331,  0.7684, -1.3524], Loss: array([3.6517, 3.6512, 3.6508, 3.6504, 3.6499])

In [None]:
with torch.no_grad():
    probs = logistic_regression_net(features).squeeze().numpy()
    preds = np.where(probs >= 0.5, 1, 0)
    print(f'Accuracy score: {accuracy_score(labels, preds)}')

#### Пример 3

In [65]:
features = torch.tensor(X, dtype=torch.float32)
labels = torch.tensor(y, dtype=torch.float32)
learning_rate = 1e-3
epochs = 20000

logistic_regression_net = LogisticRegressionNet(features.shape[1], 1)
loss_fn = nn.BCELoss()
optimizer = torch.optim.SGD(logistic_regression_net.parameters(), lr=learning_rate, momentum=0.0)

In [66]:
_ = log_reg_net_train(logistic_regression_net, features, labels, loss_fn, optimizer)

In [None]:
log_reg_net_weights = logistic_regression_net.state_dict()['linear.weight'].squeeze().numpy()
log_reg_net_bias = logistic_regression_net.state_dict()['linear.bias'].squeeze().numpy()

log_reg_net_weights, log_reg_net_bias

In [None]:
weights = {'NLL': np.hstack((log_reg_net_weights, np.array([log_reg_net_bias])))}

plot_decision_bounds(X, y, weights)

probs = logistic_regression_net(features).detach().numpy().squeeze()
preds = np.where(probs >= 0.5, 1, 0)
accuracy_score(y, preds)

---

### 6. SKlearn logistic regression

In [None]:
sklearn_logistic_regression = LogisticRegression(penalty=None, 
                                                 random_state=SEED,
                                                 solver='lbfgs',
                                                 tol=1e-5, 
                                                 max_iter=100, 
                                                 n_jobs=1)

sklearn_logistic_regression.fit(X_train_subsamp_train, y_train_subsamp_train)

#### Пример 2

In [None]:
preds = sklearn_logistic_regression.predict(X_train_subsamp_train)
score = accuracy_score(y_train_subsamp_train, preds)
print(f'Accuracy score: {score:.4f}')

In [None]:
sklearn_logistic_regression.coef_, sklearn_logistic_regression.intercept_

#### Пример 3

In [None]:
sklearn_logistic_regression.fit(X, y)

In [None]:
preds = sklearn_logistic_regression.predict(X)
score = accuracy_score(y, preds)
print(f'Accuracy score: {score:.4f}')

In [None]:
weights = {'sklearn logreg': np.hstack((sklearn_logistic_regression.coef_.squeeze(), sklearn_logistic_regression.intercept_))}

plot_decision_bounds(X, y, weights)

---

### *1

In [75]:
class LogRegNumpyMultiloss(LogRegNumpy):

    def __init__(self, 
                 initial_weights: list[list[float]] = [], 
                 initial_bias: list[float] = [],
                 loss_fn_name: str = 'nll',
                 alpha: float = 0.25,
                 gamma: float = 2.0,
                 tolerance: float = 1e-4,
                 early_stop: bool = False,
                 n_startup_rounds: int = 50,
                 early_stop_rounds: int = 50,
                 random_seed: int = SEED):
        
        super(LogRegNumpyMultiloss, self).__init__(initial_weights, initial_bias, tolerance, early_stop,
                                                  n_startup_rounds, early_stop_rounds, random_seed)
        self.loss_fn_name = loss_fn_name
        self.alpha = alpha
        self.gamma = gamma
    
    def loss_fn(self, y_true, y_pred):
        if self.loss_fn_name == 'nll':
            y_pos_loss = y_true * np.log(y_pred + self.eps)
            y_neg_loss = (1 - y_true) * np.log(1 - y_pred + self.eps)
            return -np.mean(y_pos_loss + y_neg_loss)
        
        elif self.loss_fn_name == 'hinge':
            y_true_adjusted = np.where(y_true == 1, 1, -1)  # Convert 0 to -1 for hinge loss
            hinge_loss = np.maximum(0, 1 - y_true_adjusted * (2 * y_pred - 1))  # Hinge formula # maximum ~ np.where
            return np.mean(hinge_loss)
        
        elif self.loss_fn_name == 'focal':
            p_t = np.where(y_true == 1, y_pred, 1 - y_pred)
            focal_loss = -self.alpha * ((1 - p_t) ** self.gamma) * np.log(p_t + self.eps)
            return np.mean(focal_loss)
    
    def gradient(self, x, y_true, y_pred):
        if self.loss_fn_name == 'nll':
            return np.dot((y_pred - y_true), x) / x.shape[0]
        
        elif self.loss_fn_name == 'hinge':
            y_true_adjusted = np.where(y_true == 1, 1, -1)
            margin_mask = y_true_adjusted * (2 * y_pred - 1) < 1  # Only count misclassified points
            grad = -np.dot(margin_mask * y_true_adjusted, x) / x.shape[0]
            return grad
        
        elif self.loss_fn_name == 'focal':
            term_0 = (-self.gamma * (1 - y_pred) ** (self.gamma - 1) * np.log(y_pred + self.eps) 
                      + (1 - y_pred) ** self.gamma / (y_pred + self.eps))
            term_0 *= y_true

            term_1 = (self.gamma * y_pred ** (self.gamma - 1) * np.log(1 - y_pred + self.eps) 
                      - y_pred ** self.gamma / (1 - y_pred + self.eps))
            term_1 *= (1 - y_true)

            focal_grad = -self.alpha * (term_0 + term_1) * y_pred * (1 - y_pred)
            return np.dot(focal_grad, x) / x.shape[0]

#### Пример 1

In [None]:
features = [[1.0, 2.0], [2.0, 1.0], [-1.0, -2.0]]
labels = [1, 0, 0]
initial_weights = [0.1, -0.2]
initial_bias = 0.0
learning_rate = 0.1
epochs = 2

log_reg_numpy_mult_loss = LogRegNumpyMultiloss(initial_weights=initial_weights, initial_bias=initial_bias,
                                              loss_fn_name='hinge')
log_reg_numpy_mult_loss.fit(features=features,
                            labels=labels,
                            learning_rate=learning_rate,
                            epochs=epochs)

log_reg_numpy_mult_loss.weights, log_reg_numpy_mult_loss.bias

#### Пример 2

In [None]:
log_reg_numpy_mult_loss = LogRegNumpyMultiloss(loss_fn_name='hinge')

log_reg_numpy_mult_loss.fit(features = X_train_subsamp_train, 
                            labels = y_train_subsamp_train, 
                            learning_rate = 1e-3, 
                            epochs = 5)

log_reg_numpy_mult_loss.weights, log_reg_numpy_mult_loss.bias

In [None]:
accuracy_score(y_train_subsamp_train, log_reg_numpy_mult_loss.predict(X_train_subsamp_train))

#### Пример 3

In [None]:
# Dataset 1: Favoring NLL Loss (overlapping classes, noisy decision boundaries)
X_nll, y_nll = make_classification(n_samples=1000, 
                                   n_features=2,
                                   n_informative=2, 
                                   n_redundant=0,
                                   n_repeated=0,
                                   n_clusters_per_class=2,
                                   flip_y=0.12, # Add some label noise
                                   class_sep=0.8, # Reduce class separability
                                   random_state=SEED)

# Dataset 2: Favoring Hinge Loss (clear, separable classes)
X_hinge, y_hinge = make_classification(n_samples=1000, 
                                       n_features=2,
                                       n_informative=2, 
                                       n_redundant=0, 
                                       n_repeated=0,
                                       n_clusters_per_class=1,
                                       flip_y=0.0, # No noise
                                       class_sep=2, # High class separability
                                       random_state=SEED)

# Dataset 3: Favoring Focal Loss (clear, separable classes, imbalanced)
X_focal, y_focal = make_classification(n_samples=1000, 
                                       n_features=2, 
                                       n_informative=2,
                                       n_redundant=0, 
                                       n_clusters_per_class=2, 
                                       flip_y=0.01,
                                       weights=[0.1, 0.9],
                                       class_sep=1.0,
                                       random_state=SEED)


# Plotting the datasets
fig, axs = plt.subplots(1, 3, figsize=(18, 7))

# Plot NLL-favoring dataset
sns.scatterplot(x=X_nll[:, 0], y=X_nll[:, 1], hue=y_nll, palette='coolwarm', ax=axs[0])
axs[0].set_title("NLL Loss Favoring Dataset")

# Plot Hinge-favoring dataset
sns.scatterplot(x=X_hinge[:, 0], y=X_hinge[:, 1], hue=y_hinge, palette='coolwarm', ax=axs[1])
axs[1].set_title("Hinge Loss Favoring Dataset")

# Plot Focal-favoring dataset
sns.scatterplot(x=X_focal[:, 0], y=X_focal[:, 1], hue=y_focal, palette='coolwarm', ax=axs[2])
axs[2].set_title("Focal Loss Favoring Dataset")

plt.show()

##### NLL

In [None]:
log_reg_numpy = LogRegNumpy(random_seed=2147483647, n_startup_rounds=100, tolerance=1e-6)

log_reg_numpy.fit(features = X_nll, 
                  labels = y_nll, 
                  learning_rate = 1e-3, 
                  epochs = 10000)

log_reg_numpy.weights, log_reg_numpy.bias

In [None]:
log_reg_numpy_hinge = LogRegNumpyMultiloss(random_seed=2147483647, n_startup_rounds=100, tolerance=1e-6, loss_fn_name='hinge')

log_reg_numpy_hinge.fit(features = X_nll, 
                        labels = y_nll, 
                        learning_rate = 1e-3, 
                        epochs = 10000)

log_reg_numpy_hinge.weights, log_reg_numpy_hinge.bias

In [None]:
log_reg_numpy_focal = LogRegNumpyMultiloss(random_seed=2147483647, n_startup_rounds=100, tolerance=1e-8, loss_fn_name='focal')

log_reg_numpy_focal.fit(features = X_nll, 
                        labels = y_nll, 
                        learning_rate = 5e-3, 
                        epochs = 10000) # при большем числе эпох результат ухудшается

log_reg_numpy_focal.weights, log_reg_numpy_focal.bias

In [None]:
weights = {'NLL': np.hstack((log_reg_numpy.weights, np.array([log_reg_numpy.bias]))),
           'Hinge': np.hstack((log_reg_numpy_hinge.weights, np.array([log_reg_numpy_hinge.bias]))),
           'Focal': np.hstack((log_reg_numpy_focal.weights, np.array([log_reg_numpy_focal.bias])))}

plot_decision_bounds(X_nll, 
                     y_nll, 
                     weights,
                     dict(palette='coolwarm'),
                     title='NLL loss favoring dataset')

print(f'NLL loss accuracy score: {accuracy_score(y_nll, log_reg_numpy.predict(X_nll)):.4f}')
print(f'Hinge loss accuracy score: {accuracy_score(y_nll, log_reg_numpy_hinge.predict(X_nll)):.4f}')
print(f'Focal loss accuracy score: {accuracy_score(y_nll, log_reg_numpy_focal.predict(X_nll)):.4f}')

In [None]:
plot_losses

##### Hinge

In [None]:
log_reg_numpy = LogRegNumpy(random_seed=2147483647, n_startup_rounds=100, tolerance=1e-6)

log_reg_numpy.fit(features = X_hinge, 
                  labels = y_hinge, 
                  learning_rate = 1e-3, 
                  epochs = 10000)

log_reg_numpy.weights, log_reg_numpy.bias

In [None]:
log_reg_numpy_hinge = LogRegNumpyMultiloss(random_seed=2147483647, n_startup_rounds=100, tolerance=1e-6, loss_fn_name='hinge')

log_reg_numpy_hinge.fit(features = X_hinge, 
                        labels = y_hinge, 
                        learning_rate = 1e-3, 
                        epochs = 10000)

log_reg_numpy_hinge.weights, log_reg_numpy_hinge.bias

In [None]:
log_reg_numpy_focal = LogRegNumpyMultiloss(random_seed=2147483647, n_startup_rounds=100, tolerance=1e-8, loss_fn_name='focal')

log_reg_numpy_focal.fit(features = X_hinge, 
                        labels = y_hinge, 
                        learning_rate = 1e-2, 
                        epochs = 10000)

log_reg_numpy_focal.weights, log_reg_numpy_focal.bias

In [None]:
weights = {'NLL': np.hstack((log_reg_numpy.weights, np.array([log_reg_numpy.bias]))),
           'Hinge': np.hstack((log_reg_numpy_hinge.weights, np.array([log_reg_numpy_hinge.bias]))),
           'Focal': np.hstack((log_reg_numpy_focal.weights, np.array([log_reg_numpy_focal.bias])))}

plot_decision_bounds(X_hinge, 
                     y_hinge, 
                     weights,
                     dict(palette='coolwarm'),
                     title='Hinge loss favoring dataset')

print(f'NLL loss accuracy score: {accuracy_score(y_hinge, log_reg_numpy.predict(X_hinge)):.4f}')
print(f'Hinge loss accuracy score: {accuracy_score(y_hinge, log_reg_numpy_hinge.predict(X_hinge)):.4f}')
print(f'Focal loss accuracy score: {accuracy_score(y_hinge, log_reg_numpy_focal.predict(X_hinge)):.4f}')

##### Focal

In [None]:
log_reg_numpy = LogRegNumpy(random_seed=2147483647, n_startup_rounds=100, tolerance=1e-6)

log_reg_numpy.fit(features = X_focal, 
                  labels = y_focal, 
                  learning_rate = 1e-3, 
                  epochs = 15000)

log_reg_numpy.weights, log_reg_numpy.bias

In [None]:
log_reg_numpy_hinge = LogRegNumpyMultiloss(random_seed=2147483647, n_startup_rounds=100, tolerance=1e-6, loss_fn_name='hinge')

log_reg_numpy_hinge.fit(features = X_focal, 
                        labels = y_focal, 
                        learning_rate = 1e-3, 
                        epochs = 15000)

log_reg_numpy_hinge.weights, log_reg_numpy_hinge.bias

In [None]:
log_reg_numpy_focal = LogRegNumpyMultiloss(random_seed=2147483647, n_startup_rounds=100, tolerance=1e-8, loss_fn_name='focal')

log_reg_numpy_focal.fit(features = X_focal, 
                        labels = y_focal, 
                        learning_rate = 1e-2, 
                        epochs = 15000)

log_reg_numpy_focal.weights, log_reg_numpy_focal.bias

In [None]:
weights = {'NLL': np.hstack((log_reg_numpy.weights, np.array([log_reg_numpy.bias]))),
           'Hinge': np.hstack((log_reg_numpy_hinge.weights, np.array([log_reg_numpy_hinge.bias]))),
           'Focal': np.hstack((log_reg_numpy_focal.weights, np.array([log_reg_numpy_focal.bias])))}

plot_decision_bounds(X_focal, 
                     y_focal, 
                     weights,
                     dict(palette='coolwarm'),
                     title='Focal loss favoring dataset')

for scorer in (accuracy_score, f1_score):
    print(f'NLL loss {scorer.__name__}: {scorer(y_focal, log_reg_numpy.predict(X_focal)):.4f}')
    print(f'Hinge loss {scorer.__name__}: {scorer(y_focal, log_reg_numpy_hinge.predict(X_focal)):.4f}')
    print(f'Focal loss {scorer.__name__}: {scorer(y_focal, log_reg_numpy_focal.predict(X_focal)):.4f}')

---

### *2

In [93]:
class DimensionalityReduction:
    def __init__(self, params_history, n_components=2, seed=None):

        self.params_history = np.vstack([p.flatten() for p in params_history])
        self.n_components = n_components
        self.seed = seed

    def reduce_params(self):
        pca = PCA(n_components=self.n_components, random_state=self.seed)
        path_2d = pca.fit_transform(self.params_history)
        reduced_dirs = pca.components_  # The PCA directions

        return path_2d, reduced_dirs

In [94]:
RES = 50
MARGIN = 0.5

class LossGrid:
    """The loss grid class that holds the values of 2D slice from the loss landscape."""

    def __init__(
        self,
        optim_path,
        model,
        data,
        path_2d,
        directions,
        res=RES,
        tqdm_disable=False,
    ):
        """Init a loss grid object.

        Args:
            optim_path: The full-dimensional flattened parameters during training.
            model: The model for loss evaluation.
            data: The data module for loss evaluation.
            path_2d: The list of 2D coordinates.
            directions: The 2D directions/axes.
            res (optional): Resolution of the grid. Defaults to RES.
            tqdm_disable (optional): Whether to disable progress bar. Defaults to False.
            save_grid (optional): Whether to save the grid to file. Defaults to True.
            load_grid (optional): Whether to load from file. Defaults to False.
            filepath (optional): Defaults to "./checkpoints/lossgrid.p".
        """
        self.dir0, self.dir1 = directions
        self.path_2d = path_2d
        self.path_nd = optim_path
        self.optim_point = optim_path[-1]
        self.optim_point_2d = path_2d[-1]

        alpha = self._compute_stepsize(res)
        self.params_grid = self.build_params_grid(res, alpha)

        self.loss_values_2d, self.argmin, self.loss_min = self.compute_loss_2d(
            model, data, tqdm_disable=tqdm_disable
        )
        self.loss_values_optimizer = self.compute_optimizer_loss(model, data, tqdm_disable)

        self.loss_values_log_2d = np.log(self.loss_values_2d)
        self.loss_values_optimizer_log = np.log(self.loss_values_optimizer)
        self.coords = self._convert_coords(res, alpha)
        # True optim in loss grid
        self.true_optim_point = self.indices_to_coords(self.argmin, res, alpha)

    def build_params_grid(self, res, alpha):
        """
        Produce the grid for the contour plot.

        Start from the optimal point, span directions of the pca result with
        stepsize alpha, resolution res.
        """
        grid = []
        for i in range(-res, res):
            row = []
            for j in range(-res, res):
                w_new = (
                    self.optim_point
                    + i * alpha * self.dir0
                    + j * alpha * self.dir1
                )
                row.append(w_new)
            grid.append(row)
        assert (grid[res][res] == self.optim_point).all()
        return grid

    def compute_optimizer_loss(self, model, data, tqdm_disable):
        X, y = data
        X = np.c_[np.ones((X.shape[0], 1)), X]
        loss = []

        for i in tq.tqdm(range(self.path_nd.shape[0]), disable=tqdm_disable):
            y_pred = model.forward(X, self.path_nd[i])
            loss_val = model.loss_fn(y, y_pred)
            loss.append(loss_val)
        return np.array(loss)

    def compute_loss_2d(self, model, data, tqdm_disable=False):
        """Compute loss values for each weight vector in grid for the model and data."""
        X, y = data
        X = np.c_[np.ones((X.shape[0], 1)), X]
        loss_2d = []
        n = len(self.params_grid)
        m = len(self.params_grid[0])
        loss_min = float("inf")
        argmin = ()
        print("Generating loss values for the contour plot...")
        with tq.tqdm(total=n * m, disable=tqdm_disable) as pbar:
            for i in range(n):
                loss_row = []
                for j in range(m):
                    w_ij = self.params_grid[i][j]
                    # Load flattened weight vector into model
                    y_pred = model.forward(X, w_ij)
                    loss_val = model.loss_fn(y, y_pred)
                    if loss_val < loss_min:
                        loss_min = loss_val
                        argmin = (i, j)
                    loss_row.append(loss_val)
                    pbar.update(1)
                loss_2d.append(loss_row)
        # This transpose below is very important for a correct contour plot because
        # originally in loss_2d, dir1 (y) is row-direction, dir0 (x) is column
        loss_2darray = np.array(loss_2d).T
        print("\nLoss values generated.")
        return loss_2darray, argmin, loss_min

    def _convert_coord(self, i, ref_point_coord, alpha):
        """
        Convert from integer index to the coordinate value.

        Given a reference point coordinate (1D), find the value i steps away with
        step size alpha.
        """
        return i * alpha + ref_point_coord

    def _convert_coords(self, res, alpha):
        """
        Convert the coordinates from (i, j) indices to (x, y) values.

        Remember that for PCA, the coordinates have unit vectors as the top 2 PCs.

        Original path_2d has PCA output, i.e. the 2D projections of each W step
        onto the 2D space spanned by the top 2 PCs.
        We need these steps in (i, j) terms with unit vectors
        reduced_w1 = (1, 0) and reduced_w2 = (0, 1) in the 2D space.

        We center the plot on optim_point_2d, i.e.
        let center_2d = optim_point_2d

        ```
        i = (x - optim_point_2d[0]) / alpha
        j = (y - optim_point_2d[1]) / alpha

        i.e.

        x = i * alpha + optim_point_2d[0]
        y = j * alpha + optim_point_2d[1]
        ```

        where (x, y) is the 2D points in path_2d from PCA. Again, the unit
        vectors are reduced_w1 and reduced_w2.
        Return the grid coordinates in terms of (x, y) for the loss values
        """
        converted_coord_xs = []
        converted_coord_ys = []
        for i in range(-res, res):
            x = self._convert_coord(i, self.optim_point_2d[0], alpha)
            y = self._convert_coord(i, self.optim_point_2d[1], alpha)
            converted_coord_xs.append(x)
            converted_coord_ys.append(y)
        return np.array(converted_coord_xs), np.array(converted_coord_ys)

    def indices_to_coords(self, indices, res, alpha):
        """Convert the (i, j) indices to (x, y) coordinates.

        Args:
            indices: (i, j) indices to convert.
            res: Resolution.
            alpha: Step size.

        Returns:
            The (x, y) coordinates in the projected 2D space.
        """
        grid_i, grid_j = indices
        i, j = grid_i - res, grid_j - res
        x = i * alpha + self.optim_point_2d[0]
        y = j * alpha + self.optim_point_2d[1]
        return x, y

    def _compute_stepsize(self, res):
        dist_2d = self.path_2d[-1] - self.path_2d[0]
        dist = (dist_2d[0] ** 2 + dist_2d[1] ** 2) ** 0.5
        return dist * (1 + MARGIN) / res

In [95]:
class LogRegNumpyMiniBatch(LogRegNumpyMultiloss):

    def __init__(self, 
                 initial_weights: list[list[float]] = [], 
                 initial_bias: list[float] = [],
                 loss_fn_name: str = 'nll',
                 alpha: float = 0.25,
                 gamma: float = 2.0,
                 tolerance: float = 1e-4,
                 early_stop: bool = False,
                 n_startup_rounds: int = 50,
                 early_stop_rounds: int = 50,
                 random_seed: int = SEED):
        
        super(LogRegNumpyMiniBatch, self).__init__(initial_weights, initial_bias, loss_fn_name, alpha, gamma,
                                                   tolerance, early_stop, n_startup_rounds, early_stop_rounds, random_seed)
    
    def fit(self, 
            features: list[list[float]], 
            labels: list[int],
            batch_size: int = 1,
            learning_rate: float = 1e-3, 
            epochs: int = 100,
            return_weights_history: bool = False) -> None | list[list[float]]:

        X = np.array(features).squeeze() if not isinstance(features, np.ndarray) else deepcopy(features)
        X = self.add_ones(X)
        y = np.array(labels).squeeze() if not isinstance(labels, np.ndarray) else deepcopy(labels)

        self.init_weights(X)
        
        w = np.hstack([self.bias, self.weights])

        loss_values = []
        if return_weights_history:
            weights_values = []
        rng_train = np.random.default_rng(seed=self.random_seed)
        num_batches = np.ceil(X.shape[0] / batch_size)
        add_to_full_batch = int(batch_size * num_batches - X.shape[0])

        for epoch in range(epochs):

            train_loss = 0

            idx_set = np.arange(0, X.shape[0], 1)
            idx_set = np.hstack([idx_set, rng_train.choice(idx_set, add_to_full_batch, replace=False)])
            rng_train.shuffle(idx_set)

            for idx in range(0, X.shape[0] - batch_size, batch_size):
                X_batch = X[idx_set[idx : idx + batch_size]]
                y_batch = y[idx_set[idx : idx + batch_size]]
                y_pred = self.forward(X_batch, w)
                loss = self.loss_fn(y_batch, y_pred)

                train_loss += loss.item()
                
                grad = self.gradient(X_batch, y_batch, y_pred)
                w -= learning_rate * grad
            
            train_loss /= num_batches
            loss_values.append(train_loss)
            if return_weights_history:
                weights_values.append(w.copy())

        self.weights = w[1:]
        self.bias = w[0]
        self.loss_values = loss_values

        if return_weights_history:
            return np.array(weights_values)

    def predict(self, features: list[list[float]], batch_size: int = 32):
        X = np.array(features).squeeze() if not isinstance(features, np.ndarray) else deepcopy(features)
        X = self.add_ones(X)

        w = np.hstack([self.bias, self.weights])
        
        idx_set = np.arange(0, X.shape[0], 1)
        probs = np.empty(0)
        for idx in range(0, X.shape[0] - batch_size, batch_size):
            X_batch = X[idx_set[idx : idx + batch_size]]
            probs_batch = self.forward(X_batch, w)
            probs = np.append(probs, probs_batch)
        
        probs = np.append(probs, self.forward(X[idx + batch_size : X.shape[0]], w))

        return np.where(probs >= 0.5, 1, 0)

In [96]:
def train_multiple_models(X: list[list[float]], y: list[float], model_classes: list[LogRegNumpy], scorers: list[callable],
                          model_params: list[dict], fit_params: list[dict], names: list[str], return_models: bool = False):

    weights, loss, scores, weights_history, models = dict(), dict(), dict(), dict(), dict()

    for model_class, m_params, f_params, name in zip(model_classes, model_params, fit_params, names):
        print('Training model', name)
        model = model_class(**m_params)
        if 'return_weights_history' in f_params:
            if f_params['return_weights_history']:
                weights_history[name] = model.fit(features=X, labels=y, **f_params)
        else:
            model.fit(features=X, labels=y, **f_params)

        weights[name] = np.hstack((model.weights, np.array([model.bias])))
        loss[name] = model.loss_values

        scores[name] = dict()
        for scorer in scorers:
            scores[name][scorer.__name__] = scorer(y, model.predict(X))

        if return_models:
            models[name] = model

    if return_models:
        if weights_history:
            return weights, loss, scores, weights_history, models
        return weights, loss, scores, models
    elif weights_history:
        return weights, loss, scores, weights
    else:
        return weights, loss, scores

In [97]:
# # Костыль section
# NLL_tmp = LogRegNumpyMultiloss(random_seed=2147483647)
# NLL_tmp.fit(X_nll, y_nll, epochs=0)

# init_weights, init_bias = NLL_tmp.weights, NLL_tmp.bias

In [98]:
model_classes = [
    LogRegNumpyMultiloss,
    LogRegNumpyMiniBatch,
    LogRegNumpyMiniBatch,
] * 3

scorers = [
    accuracy_score,
    f1_score,
]

# !!! Даже с одним сидом я не могу сделать веса воспроизводимыми (почему?)
model_params = [
    dict(random_seed=2147483647, n_startup_rounds=100, tolerance=1e-6, loss_fn_name='nll'),
] * 3 + [
    dict(random_seed=2147483647, n_startup_rounds=100, tolerance=1e-6, loss_fn_name='hinge'),   
] * 3 + [
    dict(random_seed=2147483647, n_startup_rounds=100, tolerance=1e-8, loss_fn_name='focal'),
] * 3

fit_params = [
    dict(learning_rate=1e-3, epochs=10000, return_weights_history=True),
    dict(batch_size=8, learning_rate=1e-3, epochs=10000, return_weights_history=True),
    dict(batch_size=32, learning_rate=1e-3, epochs=10000, return_weights_history=True),

    dict(learning_rate=1e-3, epochs=10000, return_weights_history=True),
    dict(batch_size=8, learning_rate=1e-3, epochs=10000, return_weights_history=True),
    dict(batch_size=32, learning_rate=1e-3, epochs=10000, return_weights_history=True),

    dict(learning_rate=5e-3, epochs=10000, return_weights_history=True),
    dict(batch_size=8, learning_rate=5e-3, epochs=10000, return_weights_history=True),
    dict(batch_size=32, learning_rate=5e-3, epochs=10000, return_weights_history=True),
]

model_names = (['NLL', 'NLL (batch 8)', 'NLL (batch 32)'] + 
               ['Hinge', 'Hinge (batch 8)', 'Hinge (batch 32)'] + 
               ['Focal', 'Focal (batch 8)', 'Focal (batch 32)'])

##### NLL

In [None]:
weights, loss, scores, weights_history, models = train_multiple_models(X_nll, y_nll, 
                                                                       model_classes, 
                                                                       scorers,
                                                                       model_params, 
                                                                       fit_params, 
                                                                       model_names, 
                                                                       True)

In [None]:
# for name, s in scores.items():
#     for score_name, score_value in s.items():
#         print(f'{name} {score_name}: {score_value:.4f}')

df_comp = pd.DataFrame(scores)
df_comp = df_comp.transpose()
display(df_comp.style.background_gradient(cmap='coolwarm'))

In [None]:
plot_decision_bounds(X_nll, y_nll, dict(islice(weights.items(), 3)), dict(palette='coolwarm'), title='NLL loss favoring dataset')
plot_decision_bounds(X_nll, y_nll, dict(islice(weights.items(), 3, 6)), dict(palette='coolwarm'), title='NLL loss favoring dataset')
plot_decision_bounds(X_nll, y_nll, dict(islice(weights.items(), 6, 9)), dict(palette='coolwarm'), title='NLL loss favoring dataset')

In [None]:
plot_losses(dict(islice(loss.items(), 3)), 'matplotlib')
plot_losses(dict(islice(loss.items(), 3, 6)), 'matplotlib', palette=palette[:3])
plot_losses(dict(islice(loss.items(), 6, 9)), 'matplotlib', palette=palette[3:6])

In [103]:
def get_reduced_params(weights_history: dict[str: list[list[float]]]):
    result = dict()
    for name, history in weights_history.items():
        dim_reducer = DimensionalityReduction(history, 2, SEED)
        result[name] = dim_reducer.reduce_params()

    return result

In [104]:
reduced_params = get_reduced_params(weights_history)

In [105]:
def get_grids(weights_history: dict[str: list[list[float]]],
              models: dict[str: LogRegNumpyMultiloss|LogRegNumpyMiniBatch],
              data: tuple[list[list[float]], list[int]],
              reduced_params: dict[str: Sequence[list[list[float]]], list[list[float]]], 
              res: int):
    
    result = dict()
    for name in reduced_params.keys():
        result[name] = LossGrid(weights_history[name], models[name], data, reduced_params[name][0], reduced_params[name][1], res, True)
        
    return result

In [None]:
loss_grids = get_grids(weights_history, models, (X_nll, y_nll), reduced_params, 50)

In [711]:
def plot_contour(reduced_params, loss_grid, coords, true_optim_point, animate=False, step=20, label='', color=palette[1]):
    fig, ax = plt.subplots(figsize=(8, 6))
    coords_x, coords_y = coords
    ax.contourf(coords_x, coords_y, loss_grid, levels=35, alpha=0.9, cmap="YlGnBu")

    ax.plot(true_optim_point[0], true_optim_point[1], 'k*', markersize=8, label="True Optimum")
    ax.text(true_optim_point[0] + 0.1, true_optim_point[1] + 0.1,
            f'({true_optim_point[0]:.2f}, {true_optim_point[1]:.2f})',
            fontsize=10, color='black')

    if not animate:
        w1s = [step[0] for step in reduced_params]
        w2s = [step[1] for step in reduced_params]
        ax.plot(w1s, w2s, lw=2, label=label, color=color)
    else:
        line, = ax.plot([], [], lw=2, label=label, color=color)
        w1s = [step[0] for step in reduced_params[::step]]
        w2s = [step[1] for step in reduced_params[::step]]

        def update(i):
            line.set_xdata(w1s[:i])
            line.set_ydata(w2s[:i])
            return line

        anim = animation.FuncAnimation(fig, update, frames=len(w1s), interval=100, blit=False, repeat=False)

    ax.legend()
    plt.title("Optimizer Path Comparison")
    plt.show()

In [None]:
for i, name in enumerate(model_names):
    plot_contour(reduced_params[name][0], 
                 loss_grids[name].loss_values_log_2d, 
                 loss_grids[name].coords, 
                 loss_grids[name].true_optim_point, False, 50, label=name, color=palette[i])

In [713]:
def plot_3d_surface(steps, loss_grid, coords, true_optim_point, title='', label='', color=palette[0], plot_engine='plotly'):
    coords_x, coords_y = coords

    w1s = [step[0] for step in steps]
    w2s = [step[1] for step in steps]
    ls  = [step[2] for step in steps]

    if plot_engine == 'plotly':
        # Add the loss surface
        surface = go.Surface(z=loss_grid, x=coords_x, y=coords_y, colorscale="YlGnBu", opacity=0.7, )

        # Add the optimizer path
        
        optim_path = go.Scatter3d(x=w1s, y=w2s, z=ls, mode='lines', line=dict(color=color, width=4), name=label)

        # Mark the true optimum point
        true_optim = go.Scatter3d(x=[true_optim_point[0]], y=[true_optim_point[1]], z=[loss_grid.min()],
                                mode='markers', marker=dict(color='black', size=10, symbol='cross'), name='True Optimum')

        layout = go.Layout(title=title,
                        scene=dict(xaxis_title='w0', yaxis_title='w1', zaxis_title='Loss'),
                        legend=dict(yanchor="top",y=0.99,xanchor="left",x=0.01),
                        height=800, width=800)

        fig = go.Figure(data=[surface, optim_path, true_optim], layout=layout)
        fig.show()
        
    elif plot_engine == 'matplotlib':
        
        fig = plt.figure(figsize=(10, 8))
        ax = fig.add_subplot(111, projection='3d')

        X, Y = np.meshgrid(coords_x, coords_y)
        ax.plot_surface(X, Y, loss_grid, cmap='YlGnBu', alpha=0.7)
        ax.plot(w1s, w2s, ls, color=color, linewidth=2, label=label)
        ax.scatter(true_optim_point[0], true_optim_point[1], np.min(loss_grid), color='black', s=100, marker='x', label='True Optimum')

        ax.set_title(title)
        ax.set_xlabel("w0")
        ax.set_ylabel("w1")
        ax.set_zlabel("Loss")
        ax.legend()

        plt.show()

In [None]:
for i, name in enumerate(model_names):
    path_3d = np.c_[reduced_params[name][0], loss_grids[name].loss_values_optimizer_log]
    plot_3d_surface(path_3d, 
                    loss_grids[name].loss_values_log_2d, 
                    loss_grids[name].coords, 
                    loss_grids[name].true_optim_point,
                    '',
                    name,
                    palette[i],
                    'plotly')

---