In [None]:
import random
from copy import deepcopy

import pandas as pd
import numpy as np
from scipy.special import logsumexp

import matplotlib.pyplot as plt
import seaborn as sns
import plotly.graph_objects as go

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler, OneHotEncoder
from sklearn.metrics import accuracy_score, f1_score

In [None]:
eps = 1e-9

SEED = 18092025
random.seed(SEED)
np.random.seed(SEED)

rng = np.random.default_rng(seed=SEED)

[Датасет](https://archive.ics.uci.edu/dataset/697/predict+students+dropout+and+academic+success)

In [None]:
try:
    df = pd.read_csv('../datasets/data.csv', delimiter=';')
except Exception:
    print('No such file')

In [None]:
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', None)
pd.set_option('display.width', None)

pd.set_option('display.expand_frame_repr', False)


# pd.reset_option('display.max_columns')
# pd.reset_option('display.max_colwidth')
# pd.reset_option('display.width')
# pd.reset_option('display.expand_frame_repr')

---

## EDA

In [None]:
def display_nans(df):
    nans_per_col = [(col, df[col].isna().sum(), df[col].isna().sum() / df.shape[0] * 100) for col in df.columns]
    dtype = [('col_name', 'U20'), ('nans', int), ('nans_perc', float)]
    nans_per_col = np.array(nans_per_col, dtype=dtype)
    nans_per_col = nans_per_col[nans_per_col['nans'] > 0]
    nans_per_col = np.sort(nans_per_col, order='nans')

    if nans_per_col.shape[0] == 0:
        print('No nans in the dataset')
        return

    df_show = pd.DataFrame(nans_per_col[::-1])
    display(df_show.style.background_gradient(cmap='Blues'))
    
    fig, ax = plt.subplots(1, 1, figsize=(8, 5))

    y_pos = np.arange(len(nans_per_col))
    
    ax.barh(y_pos, nans_per_col['nans_perc'], alpha=0.8, edgecolor='black', linewidth=1) 
    ax.set_yticks(y_pos, labels=nans_per_col['col_name'])
    ax.set_xlabel('Nans, %', fontsize=14)
    ax.set_title('Nans rate for each column', fontsize=16)
    ax.set_xlim(0, min(np.max(df_show['nans_perc']) + 5.0, 100.0))
    ax.tick_params(axis='both', which='major', labelsize=11)
    ax.grid(axis='x', linestyle='--', linewidth=0.5)
    
    plt.show()

In [None]:
display(df.head())
print('Dataset shape: ', df.shape)

In [None]:
def col_names_transform(col_name: str) -> str:
    res_name = col_name.strip().replace("\t", "").replace(' ', '_').lower()
    return res_name

In [None]:
df.columns = map(col_names_transform, df.columns.values)
df.columns

In [None]:
df.describe()

In [None]:
# TODO: добавить другие стат. показатели 

In [None]:
display_nans(df)

In [None]:
df.dtypes

In [None]:
df['target'].value_counts(normalize=True).to_frame().T

---

## Подготовка данных

In [None]:
X, y = df.drop(columns=['target']), df['target']
X.shape, y.shape

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=SEED, shuffle=True, stratify=y)
y_train.shape, y_test.shape

In [None]:
std_scaler = StandardScaler()
X_train_scaled = std_scaler.fit_transform(X_train)
X_test_scaled  = std_scaler.transform(X_test)
X_train_scaled[0, :5]

In [None]:
label_encoder = LabelEncoder()
y_train_enc = label_encoder.fit_transform(y_train.values)
y_test_enc  = label_encoder.transform(y_test.values)
y_train[:5].values, y_train_enc[:5], label_encoder.classes_

---

## Модель линейной классификации

In [None]:
class RunningAverage:
    def __init__(self):
        self.value = 0.0
        self.count = 0

    def update(self, new_value):
        self.count += 1
        self.value = (1 / self.count) * new_value + (1 - 1 / self.count) * self.value
        return self.value
    

class EMA:
    def __init__(self, l=0.1):
        self.value = None
        self.l = l

    def update(self, new_value):
        if self.value is not None:
            self.value = self.l * new_value + (1 - self.l) * self.value
        else:
            self.value = new_value
        return self.value

In [None]:
class LogRegNumpy():

    def __init__(
        self,
        initial_weights: list[list[float]] = None, # (n_features, n_classes)
        initial_bias:    list[float] = None,       # (1, n_classes)
        tolerance:       float = 1e-4,
        early_stop: bool = False,
        n_startup_rounds: int = 50,
        early_stop_rounds: int = 50,
        random_seed: int = SEED
    ):
        self.weights = (np.array(initial_weights) if initial_weights is not None 
                        else np.array([]))
        self.bias = (np.array(initial_bias) if initial_bias is not None 
                     else np.array([]))
        self.tolerance = tolerance
        self.early_stop = early_stop
        self.n_startup_rounds = n_startup_rounds
        self.early_stop_rounds = early_stop_rounds
        self.random_seed = random_seed
        self.eps = 1e-9

        # для рекуррентной оценки
        self.rec_value = None
        self.rec_count = 0
        self.rec_history = []

    
    def fit(
        self,
        features:       list[list[float]],
        labels:         list[int],
        learning_rate:  float = 1e-3,
        epochs:         int = 100,
        shuffle:        bool = True,
        training_mode:  str = 'per_sample', # "full" | "batch"
        momentum:       float = 0.0,
        l2:             float = 0.0,
        return_weights_history:  bool = False,
        verbose:                 bool = False,
        rec_mode:                str = 'off',  # "off" | "mean" | "ema"
        ema_lambda:              float = 0.1,  # λ для EMA
        early_stop_use_rec:      bool = True,  # ранняя остановка по рекурсивной оценке
    ) -> None | list[list[float]]:
        X = (np.array(features).squeeze() if not isinstance(features, np.ndarray) 
             else deepcopy(features).astype(np.float32, copy=False))
        y = (np.array(labels).squeeze() if not isinstance(labels, np.ndarray) 
             else deepcopy(labels).astype(np.int32, copy=False))

        self._init_weights(X, y)
        if return_weights_history:
            weights_values = [self.weights.copy()]
        # velocities for momentum
        Vdw = np.zeros_like(self.weights)
        Vdb = np.zeros_like(self.bias)
            
        self._rec_reset()
        self.loss_values = []
        no_improvement_counter = 0

        N = X.shape[0]

        for epoch in range(0, epochs+1):

            if training_mode == 'per_sample':
                idx = np.arange(N)
                if shuffle:
                    rng.shuffle(idx)
                
                epoch_loss_sum = 0.0

                for i in idx:
                    xi = X[i:i+1, :] # (1, n_features)
                    yi = y[i, np.newaxis] # (1,)
                    logits_i = self.forward(xi)
                    loss_i   = self._loss_fn_opt(yi, logits_i)
                    
                    epoch_loss_sum += loss_i[0]

                    rec_val = self._rec_update(loss_i[0], mode=rec_mode, ema_lambda=ema_lambda)
                    self.rec_history.append(rec_val)

                    w_grad, b_grad = self._gradient(xi, yi, logits_i)

                    # L2
                    if l2 > 0.0:
                        w_grad += l2 * self.weights
                    
                    # momentum
                    Vdw = momentum * Vdw + (1 - momentum) * w_grad
                    Vdb = momentum * Vdb + (1 - momentum) * b_grad

                    # GD step
                    self.weights -= learning_rate * Vdw
                    self.bias    -= learning_rate * Vdb

                mean_epoch_loss = epoch_loss_sum / N
                monitored = self.rec_history[-1] if (rec_mode != 'off' and early_stop_use_rec) else mean_epoch_loss
                self.loss_values.append(monitored)

                if verbose and (epoch % max(1, epochs // 10) == 0):
                    print(f"epoch {epoch:4d} | loss={monitored:.6f}")
            
            elif training_mode == 'full':

                logits = self.forward(X)
                loss = self._loss_fn_opt(y, logits, reduction='mean')
                self.loss_values.append(loss)

                if return_weights_history:
                    weights_values.append(self.weights.copy())
                
                w_grad, b_grad = self._gradient(X, y, logits)
                
                # l2 regularization
                if l2 > 0.0:
                    w_grad += l2 * self.weights

                # momentum
                Vdw = momentum * Vdw + (1 - momentum) * w_grad
                Vdb = momentum * Vdb + (1 - momentum) * b_grad

                # gradient descent step
                self.weights -= learning_rate * Vdw
                self.bias -= learning_rate * Vdb

                if verbose and (epoch % max(1, epochs // 10) == 0):
                    print(f"epoch {epoch:4d} | loss={loss:.6f}")

            else:
                raise ValueError("training_mode must be 'full' or 'per_sample'.")
            
            if self.early_stop and epoch > self.n_startup_rounds and len(self.loss_values) > 1:
                if 0 < (self.loss_values[-2] - self.loss_values[-1]) < self.tolerance:
                    no_improvement_counter += 1
                    if no_improvement_counter >= self.early_stop_rounds:
                        if verbose:
                            print(f"Early stopping at epoch {epoch}")
                        break
                else:
                    no_improvement_counter = 0

        if return_weights_history:
            return np.array(weights_values)
        

    def predict(self, features: list[list[float]]):
        X = (np.array(features).squeeze() if not isinstance(features, np.ndarray) 
             else deepcopy(features).astype(np.float32, copy=False))
        if X.ndim == 1:
            X = X[np.newaxis, :]
        logits = self.forward(X) # (n_samples, n_classes)
        probs  = self._softmax(logits) # не обязательно
        return np.argmax(probs, axis=1)
    
    def _create_onehot_target(self, y: np.array):
        ohe_enc = OneHotEncoder(categories=[np.unique(y)], sparse_output=False)
        y_enc = ohe_enc.fit_transform(y.reshape(-1, 1))
        return y_enc # output -> (n_samples, n_classes)
    
    def _init_weights(self, X, y):
        rng_ = np.random.default_rng(seed=self.random_seed)
        n_unique_classes = np.unique(y).shape[0]
        if self.weights.size == 0:
            self.weights = rng_.standard_normal((X.shape[1], n_unique_classes), dtype=np.float32)
        if self.bias.size == 0:
            self.bias = rng_.standard_normal((1, n_unique_classes), dtype=np.float32)
        
    def _softmax(self, X: np.array) -> np.array:
        Z = X - np.max(X, axis=1, keepdims=True)
        numerator = np.exp(Z)
        denominator = np.sum(numerator, axis=1, keepdims=True)
        softmax_probs = numerator / denominator
        return softmax_probs # -> (n_samples, n_classes)
    
    def forward(self, X):
        # (n_samples, n_features) * (n_features, n_classes)
        logits = np.matmul(X, self.weights) + self.bias # -> (n_samples, n_classes)
        return logits
    
    # def loss_fn_expanded(self, X, y_true):
    #     # (n_samples, n_features) * (n_features, n_classes) + (n_samples, 1) * (1, n_classes) = (n_samples, n_classes)
    #     logits = np.matmul(X, self.weights) + np.matmul(np.ones((X.shape[0], 1)), self.bias)
    #     exp_logits = np.exp(logits)
    #     logits_sum = np.sum(exp_logits, axis=1) # -> (n_samples, 1)
    #     # (n_samples, n_classes) * (n_samples, n_classes)
    #     true_class_logits = logits[np.arange(X.shape[0]), y_true]
    #     return np.mean(np.log(logits_sum) - true_class_logits)

    # def loss_fn(self, y_true, logits):
    #     log_probs = np.log(self.softmax(logits)) # -> (n_samples, classes)
    #     # y_true_ohe = self.create_onehot_target(y_true) # -> (n_samples, classes)
    #     # likelihood = (log_probs * y_true_ohe).sum(axis=1).mean()
    #     likelihood = (log_probs[np.arange(log_probs.shape[0]), y_true]).mean()
    #     return -likelihood
    
    def _loss_fn_opt(self, y_true, logits, reduction=None):
        lse = logsumexp(logits, axis=1, keepdims=True)
        nll = lse - logits
        loss = nll[np.arange(nll.shape[0]), y_true]
        if reduction == 'mean':
            loss = loss.mean()
        return loss
    
    def _rec_reset(self):
        self.rec_value = None
        self.rec_count = 0
        self.rec_history = []

    def _rec_update(self, xi, mode="off", ema_lambda=0.1):
        if mode == "off":
            return xi

        if self.rec_value is None:
            # инициализация последовательности
            self.rec_value = xi
            self.rec_count = 1
            return self.rec_value

        if mode == "mean":
            # running mean: Q_m = (1/m)*xi_m + (1 - 1/m)*Q_{m-1}
            self.rec_count += 1
            m = self.rec_count
            self.rec_value = (1.0/m)*xi + (1.0 - 1.0/m)*self.rec_value
            return self.rec_value

        if mode == "ema":
            # EMA: Q_m = λ xi_m + (1 - λ) Q_{m-1}
            self.rec_value = ema_lambda * xi + (1.0 - ema_lambda) * self.rec_value
            return self.rec_value

        return xi

    def _gradient(self, X, y_true, logits):
        y_prob = self._softmax(logits)
        y_prob[np.arange(y_prob.shape[0]), y_true] -= 1
        y_prob /= y_prob.shape[0]
        w_grad = np.matmul(X.T, y_prob)
        b_grad = y_prob.sum(axis=0, keepdims=True)
        return w_grad, b_grad
    

    def calc_margins(self, X, y_true, plot: bool = False, eps=1e-3, **kwargs):
        logits = self.forward(X)
        true_logits = logits[np.arange(X.shape[0]), y_true]
        logits[np.arange(logits.shape[0]), y_true] = -np.inf
        false_logits = logits.max(axis=1)
        margins = true_logits - false_logits

        if plot:
            
            sorted_idx = np.argsort(margins)
            sorted_margins = margins[sorted_idx]
            
            line_kwargs      = {'lw': 2}
            pos_fill_kwargs  = {'alpha': 0.25, 'color': 'tab:green'}
            neg_fill_kwargs  = {'alpha': 0.25, 'color': 'tab:red'}
            zero_fill_kwargs = {'alpha': 0.25, 'color': 'gold'}

            # masks
            if eps > 0.0:
                mask_zero = np.abs(sorted_margins) <= eps
                mask_pos  = sorted_margins >  eps
                mask_neg  = sorted_margins < -eps
            else:
                mask_zero = np.zeros_like(sorted_margins, dtype=bool)
                mask_pos  = sorted_margins > 0
                mask_neg  = sorted_margins < 0

            plt.figure(figsize=(12, 7))
            # line
            plot_idx = np.arange(sorted_margins.shape[0])
            plt.plot(plot_idx, sorted_margins, **line_kwargs)
            plt.axhline(0.0, color='black', lw=1, alpha=0.7)

            if np.any(mask_neg):
                plt.fill_between(plot_idx, sorted_margins, 0.0, where=mask_neg, interpolate=True, **neg_fill_kwargs)
            if np.any(mask_zero):
                plt.fill_between(plot_idx, sorted_margins, 0.0, where=mask_zero, interpolate=True, **zero_fill_kwargs)
            if np.any(mask_pos):
                plt.fill_between(plot_idx, sorted_margins, 0.0, where=mask_pos, interpolate=True, **pos_fill_kwargs)

            plt.xlabel("sample index (sorted)")
            plt.ylabel("margin")
            plt.title("Margin curve with signed areas")
            plt.tight_layout()
            plt.show()

        return margins

In [None]:
logreg = LogRegNumpy()
logreg._init_weights(X_train_scaled, y_train_enc)

In [None]:
logreg.fit(
    X_train_scaled, y_train_enc, 
    learning_rate=0.01, epochs=10,
    momentum=0.99, l2=0.001,
    shuffle=True, training_mode='per_sample',
    verbose=True, rec_mode='ema', early_stop_use_rec=False)

In [None]:
plt.plot(logreg.loss_values)

In [None]:
plt.plot(logreg.rec_history)

In [None]:
logreg.calc_margins(X_train_scaled, y_train_enc, plot=True, eps=1.0)