# 2.1 Кастомный Dataset класс 

In [123]:
import torch
from torch.utils.data import Dataset
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.compose import ColumnTransformer
from sklearn.model_selection import train_test_split
#без знания pandas и возможностей sklearn я не знаю как решить задачу
class CustomDataset(Dataset):
    def __init__(self, file_path, target_col, numerical_cols=None, categorical_cols=None, transform=True,
                sep = None):
        """
        :param file_path: путь к .csv файлу
        :param target_col: имя целевой переменной
        :param numerical_cols: список числовых признаков
        :param categorical_cols: список категориальных признаков
        :param transform: применять ли нормализацию и кодирование
        """
        self.data = pd.read_csv(file_path, sep=sep)
        
        self.target_col = target_col
        
        # Разделение признаков и целевой переменной
        self.features = self.data.drop(columns=[target_col])
        self.targets = self.data[target_col].values
        
        # Автоматическое определение типов признаков
        if numerical_cols is None and categorical_cols is None:
            numerical_cols = []
            categorical_cols = []
            for col in self.features.columns:
                if self.features[col].dtype == 'O' or len(self.features[col].unique()) < 20:
                    categorical_cols.append(col)
                else:
                    numerical_cols.append(col)
        self.numerical_cols = numerical_cols
        self.categorical_cols = categorical_cols
        # Преобразование данных
        self.transform = transform
        if transform:
            self._apply_transform()

    def _apply_transform(self):
        """
        Применяет нормализацию и one-hot кодирование
        """
        preprocessor = ColumnTransformer(
            transformers=[
                ('num', StandardScaler(), self.numerical_cols),
                ('cat', OneHotEncoder(handle_unknown='ignore'), self.categorical_cols)
            ],
            remainder='passthrough'
        )

        X_transformed = preprocessor.fit_transform(self.features)
        # Если матрица разреженная — переводим в плотный формат
        if hasattr(X_transformed, "toarray"):
            X_transformed = X_transformed.toarray()

        X_transformed = np.nan_to_num(X_transformed, nan=0.0, posinf=0.0, neginf=0.0)

        self.X_tensor = torch.tensor(X_transformed, dtype=torch.float32)
        self.y_tensor = torch.tensor(self.targets, dtype=torch.float32).view(-1, 1)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.X_tensor[idx], self.y_tensor[idx]
        
def mse(y_pred: torch.Tensor, y_true:torch.Tensor) -> torch.Tensor:
    return ((y_pred - y_true)**2).mean()
        
def log_epoch(epoch, avg_loss, **metrics):
    message = f'Epoch: {epoch}\t loss:{avg_loss}'
    for k, v in metrics.items():
        message += f'\t{k}: {v:.4f}'
    print(message)

def accuracy(y_pred, y_true):
    y_pred_bin = (y_pred>0.5).float()
    return (y_pred_bin == y_true).float().mean().item()

# 2.2 Эксперименты с различными датасетами

In [126]:
import torch

class LinearRegressionTorch(torch.nn.Module):
    def __init__(self, in_features):
        super().__init__()
        self.linear = torch.nn.Linear(in_features, 1)
    
    def forward(self, x):
        return self.linear.forward(x)

In [128]:
import torch

class LogisticRegressionTorch(torch.nn.Module):
    def __init__(self, in_features, n_classes=1):
        super().__init__()
        self.linear = torch.nn.Linear(in_features, n_classes)
    
    def forward(self, x):
        return self.linear.forward(x)
    

In [130]:
url_linear = 'https://raw.githubusercontent.com/thainazanfolin/min_sleep_efficiency/refs/heads/main/Sleep_Efficiency_table.csv'
import kagglehub

url_logict = kagglehub.dataset_download("yasserh/titanic-dataset") #Взял все датасеты из своих наработок
#выбираем колонку по которой будем предиктить
# print(pd.read_csv(url_linear, sep=';'))
reg_dataset = CustomDataset(url_linear, target_col='Sleep efficiency', sep=';')

clf_dataset = CustomDataset(url_logict+'\\Titanic-Dataset.csv', target_col='Survived')

train_size_reg = int(0.8 * len(reg_dataset))
val_size_reg = len(reg_dataset) - train_size_reg
train_reg, val_reg = random_split(reg_dataset, [train_size_reg, val_size_reg])

train_loader_reg = DataLoader(train_reg, batch_size=32, shuffle=True)
val_loader_reg = DataLoader(val_reg, batch_size=32)

# То же самое для классификации
train_size_clf = int(0.8 * len(clf_dataset))
val_size_clf = len(clf_dataset) - train_size_clf
train_clf, val_clf = random_split(clf_dataset, [train_size_clf, val_size_clf])

train_loader_clf = DataLoader(train_clf, batch_size=32, shuffle=True)
val_loader_clf = DataLoader(val_clf, batch_size=32)



  self.data = pd.read_csv(file_path, sep=sep)


In [131]:
EPOCHS = 100

# Решил разделить датасет на тренировочную и тестовую выборки, так как по определению early_stop ориентируется на ошибку 
# валидационной выборки
train_size = int(0.8 * len(reg_dataset)) # train
val_size = len(reg_dataset) - train_size # validate
train_dataset, val_dataset = random_split(reg_dataset, [train_size, val_size])

# dataloader = DataLoader(
#     dataset,
#     batch_size = 128,
#     shuffle = True,
# )

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False)

L1_coefficient = 0.01 # Добавили коэфициент влияния штрафа на Лассо
L2_coefficient = 0.01 # Добавили коэфициент влияния штрафа на Ridge 
Lasso = 0
Ridge = 0

early_stop = EPOCHS*0.4 # этот параметр нужен, чтобы иметь лимит при обучении, в случае увеличения эпох, качество ошибки не меняется или ниже
no_improvement = 0
best_val_loss = 1e-9
delta_number = 0.0001 # Добавил, чтобы модель учитывала не все улучшения, а только значимые. А то 1^(-10) улучшение как то несерьезно


lr = 0.5
epochs = 100
model = LinearRegressionTorch(reg_dataset.X_tensor.shape[1])

loss_fn = torch.nn.MSELoss() 

optimizer = torch.optim.SGD(model.parameters(),lr=lr)

for epoch in range(1, EPOCHS+1):
    total_loss = 0
    
    for i, (batch_x, batch_y) in enumerate(train_loader):
        optimizer.zero_grad()
        y_pred = model.forward(batch_x)
        
        
        if L1_coefficient != 0 or L2_coefficient != 0:    
            Lasso = 0
            Ridge = 0
            for name, param in model.named_parameters():
                if 'bias' not in name: # в формуле Лассо и Ridge мы суммируем только веса
                    # Ошибка по формуле sum(|wi|) * coef, а после param.abs ставим .sum, так как param - тензор
                    Lasso += param.abs().sum() 
                    # Ridge это то же Лассо, только веса беруться в квадрате
                    Ridge += (param ** 2).sum()
                    
        '''
        Сделал универсальный способ учета ошибки.
        Теперь это и LassoRegression в случае если будет указан только L1, и RidgeRegression если указывать только L2, 
        и ElasticNetRegression если указан и L1 и L2, и обычная линейная регрессия если ничего не указывать 
        
        Все по формуле ElasticNet = MSE + sum(|wi|) * coef_l1 + sum(wi^2) * coef_l2 
        '''
        loss = loss_fn.forward(y_pred, batch_y) + L1_coefficient * Lasso + L2_coefficient * Ridge
        
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        
    avg_train_loss = total_loss / len(train_loader) #ошибка на обучающей выборке 

    model.eval() #переключаем модель в режим оценки 
    total_val_loss = 0
    with torch.no_grad():
        for batch_x, batch_y in val_loader:
            y_pred = model(batch_x)
            loss = loss_fn(y_pred, batch_y) # Здесь штрафы не нужны, так как мы проверям качество модели в чистом виде
            total_val_loss += loss.item()
    avg_val_loss = total_val_loss / len(val_loader)

    if avg_val_loss < best_val_loss-delta_number:
        no_improvement = 0
        best_val_loss = avg_val_loss
    else:
        no_improvement += 1

    if epoch % 10 == 0:
        log_epoch(epoch, avg_val_loss)
 
    if early_stop <= no_improvement:
        print(f"Early stopping на эпохе - {epoch}")
        break
    # print(model.linear.weight.data, model.linear.bias.data)

Epoch: 10	 loss:1.9074078928555633e+24
Epoch: 20	 loss:inf
Epoch: 30	 loss:inf
Epoch: 40	 loss:nan
Early stopping на эпохе - 40


In [139]:
#Сделаю только для бинарной классификации, сложно как то для 1.5 часа решения домашки
def accuracy(y_pred, y_true): #вынес в отдельную функцию
    y_pred = (torch.sigmoid(y_pred) > 0.5).float()
    return (y_pred == y_true).float().mean().item()

def precision(y_pred, y_true):
    y_pred = (y_pred.sigmoid() > 0.5).float()#Применяем sigmoid, чтобы получить вероятности от 0 до 1
    #это случаи (tp), когда модель сказала да (y_pred == 1) и на самом деле - да (y_true == 1) 
    tp = (y_pred * y_true).sum().item()
    #это случаи (fp), когда модель сказала да (y_pred == 1), а на самом деле - нет (y_true == 0).
    fp = ((y_pred == 1) & (y_true == 0)).sum().item()
    if tp + fp == 0:
        return 0.0
    return tp / (tp + fp)

def recall(y_pred, y_true):
    y_pred = (y_pred.sigmoid() > 0.5).float()
    tp = (y_pred * y_true).sum().item()
    fn = ((y_pred == 0) & (y_true == 1)).sum().item()
    if tp + fn == 0:
        return 0.0
    return tp / (tp + fn)

def f1_score(y_pred, y_true):
    p = precision(y_pred, y_true)
    r = recall(y_pred, y_true)
    if p + r == 0:
        return 0.0
    return 2*p*r / (p + r) #изначально по формуле 2/(1/p + 1/r), но я раскрыл скобки просто

def roc_auc(y_pred, y_true):
    y_scores = y_pred.sigmoid().detach().numpy()#выход модели до sigmoid
    y_true = y_true.numpy()#реальные метки [0, 1]
    
    #cортировка по уверенности модели
    #мы делаем список пар (score, true_label) и сортируем его по убыванию уверенности модели.
    y_scores = y_scores.reshape(-1)  #делаем 1d (на случай, если пришёл 2d)
    y_true = y_true.reshape(-1)
    scores = np.column_stack((y_scores, y_true))  # связываем предсказания и правду
    scores = scores[scores[:, 0].argsort()][::-1]  # сортировка по убыванию уверенности
    
    #Общее число положительных и отрицательных примеров
    total_pos = y_true.sum()
    total_neg = len(y_true) - total_pos

    tp = fp = 0
    tpr_list = [0]
    fpr_list = [0]

    for score in scores:
        if score[1] == 1:
            tp += 1
        else:
            fp += 1
        tpr = tp / total_pos
        fpr = fp / total_neg
        tpr_list.append(tpr)
        fpr_list.append(fpr)

    #Вычисление площади под кривой (AUC)
    auc = 0
    for i in range(1, len(tpr_list)):
        #Интегрирование методом трапеций
        auc += (fpr_list[i] - fpr_list[i - 1]) * (tpr_list[i] + tpr_list[i - 1]) / 2

    return auc


def print_confusion_matrix(y_pred, y_true):
    y_pred = (torch.sigmoid(y_pred) > 0.5).float().numpy().flatten()# Преобразуем в numpy для удобства
    y_true = y_true.numpy().flatten()

    tp = np.sum((y_pred == 1) & (y_true == 1))
    tn = np.sum((y_pred == 0) & (y_true == 0))
    fp = np.sum((y_pred == 1) & (y_true == 0))
    fn = np.sum((y_pred == 0) & (y_true == 1))

    # Выводим confusion matrix
    print("Confusion Matrix:")
    print("               | Predicted Pos | Predicted Neg |")
    print("---------------|--------------|--------------|")
    print(f"Actual Pos     |     {tp:4d}    |     {fn:4d}    |")
    print(f"Actual Neg     |     {fp:4d}    |     {tn:4d}    |")
    print("\n")

    # Считаем метрики
    accuracy = (tp + tn) / (tp + tn + fp + fn)
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

    # Выводим метрики
    print(f"Accuracy : {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall   : {recall:.4f}")
    print(f"F1 Score : {f1:.4f}")

In [141]:

EPOCHS = 100
lr = 0.1
epochs = 100

n_classes = 2# количество разнообразных классов
print(n_classes,'классы')

if n_classes > 2:
    model = LogisticRegressionTorch(X.shape[1], n_classes)# выбираем модель с несколькими классами
    loss_fn = torch.nn.CrossEntropyLoss() # Взял функцию для высчитывания ошибки в случае, если берем многоклассовый датасет
    '''
    С многоклассовым датасетом лучше работает Adam оптимизатор, 
    чем с SGD (SGD тоже работает нормально, но может застрять в локальных минимумах)
    '''
    optimizer = torch.optim.Adam(model.parameters(),lr=lr)
else:
    model = LogisticRegressionTorch(clf_dataset.X_tensor.shape[1])# выбираем модель с несколькими классами
    loss_fn = torch.nn.BCEWithLogitsLoss()
    optimizer = torch.optim.SGD(model.parameters(),lr=lr)

# Разделение на train/val
train_size = int(0.8 * len(clf_dataset))
val_size = len(clf_dataset) - train_size
train_dataset, val_dataset = random_split(clf_dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16)
    
for epoch in range(1, EPOCHS+1):
    model.train()#включаем режим обучения
    total_loss = 0
    total_acc = 0
    total_prec = 0
    total_rec = 0
    total_f1 = 0
    total_auc = 0
    for i, (batch_x, batch_y) in enumerate(train_loader):
        optimizer.zero_grad()
        y_pred = model.forward(batch_x)
        loss = loss_fn.forward(y_pred, batch_y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        if n_classes > 2:
            preds = y_pred.argmax(dim=1)#Для accuracy сравниваем argmax предсказаний и реальных меток
            total_acc += (preds == batch_y).float().mean().item()
        else:
            probs = torch.sigmoid(y_pred)
            preds = (probs > 0.5).float()
            total_acc += (preds == batch_y).float().mean().item()
            
    avg_loss = total_loss / len(train_loader)
    avg_acc = total_acc/ len(train_loader)
    
    model.eval()#включаем для обучения
    all_preds = []
    all_probs = []
    all_true = []
    
    
    with torch.no_grad():
        for batch_x, batch_y in val_loader:
            y_pred_logits = model(batch_x)
            probs = torch.sigmoid(y_pred_logits)
            preds = (probs > 0.5).float().flatten()

            all_preds.extend(preds)
            all_probs.extend(probs)
            all_true.extend(batch_y)

    all_preds = torch.tensor(all_preds)
    all_probs = torch.tensor(all_probs)
    all_true = torch.tensor(all_true)

    acc = accuracy(all_probs, all_true)
    prec = precision(all_probs, all_true)
    rec = recall(all_probs, all_true)
    f1 = f1_score(all_probs, all_true)
    auc = roc_auc(all_probs, all_true)
    
    
    if epoch % 10 == 0:
        print_confusion_matrix(torch.tensor(all_probs), torch.tensor(all_true))
        log_epoch(epoch, avg_loss, accuracy=avg_acc)

print(f"Epoch {epoch} | Loss: {avg_train_loss:.4f} | "
              f"Acc: {acc:.4f}, Prec: {prec:.4f}, Rec: {rec:.4f}, F1: {f1:.4f}, AUC: {auc:.4f}")

2 классы


  print_confusion_matrix(torch.tensor(all_probs), torch.tensor(all_true))


Confusion Matrix:
               | Predicted Pos | Predicted Neg |
---------------|--------------|--------------|
Actual Pos     |       78    |        0    |
Actual Neg     |      101    |        0    |


Accuracy : 0.4358
Precision: 0.4358
Recall   : 1.0000
F1 Score : 0.6070
Epoch: 10	 loss:0.40193198521931967	accuracy: 0.8278
Confusion Matrix:
               | Predicted Pos | Predicted Neg |
---------------|--------------|--------------|
Actual Pos     |       78    |        0    |
Actual Neg     |      101    |        0    |


Accuracy : 0.4358
Precision: 0.4358
Recall   : 1.0000
F1 Score : 0.6070
Epoch: 20	 loss:0.3782078969809744	accuracy: 0.8403
Confusion Matrix:
               | Predicted Pos | Predicted Neg |
---------------|--------------|--------------|
Actual Pos     |       78    |        0    |
Actual Neg     |      101    |        0    |


Accuracy : 0.4358
Precision: 0.4358
Recall   : 1.0000
F1 Score : 0.6070
Epoch: 30	 loss:0.355729270974795	accuracy: 0.8625
Confusion 