In [7]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score, roc_curve, auc
from tqdm.auto import tqdm

In [8]:
X_test = torch.load("X_test.pt")
X_train = torch.load("X_train.pt")
y_test = torch.load("y_test.pt")
y_train = torch.load("y_train.pt")

In [42]:
class MyModel(nn.Module):
    def __init__(
        self,
        dataloader: DataLoader,
        input_dim:int,
        hidden_dim:int,
        output_dim:int,
        dropout_rate=0.5,
    ):
        super(MyModel, self).__init__()
        self.dataloader = dataloader
        self.first_linear = nn.Linear(input_dim, out_features=768)
        self.conv1d = nn.Conv1d(in_channels=1, out_channels=768, kernel_size=3, stride=1, padding=1)
        self.maxpooling1d = nn.MaxPool1d(kernel_size=2, stride=2)
        self.dropout1 = nn.Dropout(dropout_rate)  # Dropout после MaxPooling1D
        self.lstm = nn.LSTM(input_size=768, hidden_size=hidden_dim, batch_first=True)
        self.dropout2 = nn.Dropout(dropout_rate)  # Dropout после LSTM
        self.last_linear = nn.Linear(hidden_dim, output_dim)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x:torch.Tensor):
        print(x.shape)
        if x.dim()==2:
            x = x.unsqueeze(1)
        
        print(x.shape)
        x = self.conv1d(x)
        x = self.maxpooling1d(x)
        x = self.dropout1(x)  # Применяем Dropout
        print(x.shape)
        x = x.permute(0, 2, 1)  # Перестановка для LSTM
        lstm_out, _ = self.lstm(x)
        lstm_out = lstm_out[:, -1, :]  # Берем последний выход LSTM
        lstm_out = self.dropout2(lstm_out)  # Применяем Dropout
        x = self.last_linear(lstm_out)
        x = self.sigmoid(x)
        return x

    def fit(
        self,
        num_epoch: int = 10,
        lr: float = 1e-10,
    ):
        self.epoch_loss = []
        self.batch_loss = []
        self.f1_score = []
        self.all_labels = []
        self.all_probs = []
        criterion = nn.BCELoss()  # Бинарная кросс-энтропия
        optimizer = optim.Adam(self.parameters(), lr=lr)
        for epoch in tqdm(range(num_epoch)):
            self.train()
            epoch_losses = []
            for batch_X, batch_y in self.dataloader:
                optimizer.zero_grad()
                outputs = self(batch_X)
                loss:torch.Tensor = criterion(outputs.squeeze(1), batch_y)
                loss.backward()
                optimizer.step()
                epoch_losses.append(loss.item())
                self.batch_loss.append(loss.item())
            
            self.epoch_loss.append(sum(epoch_losses)/len(epoch_losses))
            
            # Вычисление метрик
            self.eval()
            with torch.no_grad():
                self.all_preds = []
                self.all_labels = []
                for batch_X, batch_y in self.dataloader:
                    outputs = self(batch_X)
                    preds = (outputs > 0.5).float()
                    self.all_preds.extend(preds.cpu().numpy())
                    self.all_labels.extend(batch_y.cpu().numpy())

                f1 = f1_score(self.all_labels, self.all_preds)
                self.f1_score.append(f1)
                print(f"Epoch [{epoch+1}/{num_epoch}], Loss: {loss.item():.4f}, Accuracy: {f1:.4f}")
    
    def plot_metrics(self):
        # График лосса
        plt.figure(figsize=(12, 5))
        plt.subplot(1, 2, 1)
        plt.plot(self.epoch_loss, label="Loss")
        plt.xlabel("Epoch")
        plt.ylabel("Loss")
        plt.title("Loss per Epoch")
        plt.legend()

        plt.figure(figsize=(12, 5))
        plt.subplot(1, 2, 1)
        plt.plot(self.batch_loss, label="Loss")
        plt.xlabel("Batches")
        plt.ylabel("Loss")
        plt.title("Loss per batch")
        plt.legend()

        # График F1-score
        plt.subplot(1, 2, 2)
        plt.plot(self.f1_score, label="F1-score", color="orange")
        plt.xlabel("Epoch")
        plt.ylabel("F1-score")
        plt.title("F1-score per Epoch")
        plt.legend()

        # ROC-кривая
        fpr, tpr, _ = roc_curve(self.all_labels, self.all_probs)
        roc_auc = auc(fpr, tpr)

        plt.figure()
        plt.plot(fpr, tpr, color="darkorange", lw=2, label=f"ROC curve (area = {roc_auc:.2f})")
        plt.plot([0, 1], [0, 1], color="navy", lw=2, linestyle="--")
        plt.xlabel("False Positive Rate")
        plt.ylabel("True Positive Rate")
        plt.title("ROC Curve")
        plt.legend(loc="lower right")
        plt.show()

In [43]:
# Параметры модели
input_dim = X_train.shape[1]  # Размерность входных данных (например, 100 признаков)
hidden_dim = 64  # Размерность скрытого состояния LSTM
output_dim = 1   # Бинарная классификация
dropout_rate = 0.5  # Вероятность Dropout

# Создаем DataLoader
dataset = TensorDataset(X_train, y_train)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# Создаем модель
model = MyModel(dataloader, input_dim, hidden_dim, output_dim, dropout_rate)


In [44]:
model.fit()

  0%|          | 0/10 [00:00<?, ?it/s]

torch.Size([32, 768])
torch.Size([32, 1, 768])


  0%|          | 0/10 [00:00<?, ?it/s]

torch.Size([32, 768, 384])





RuntimeError: Found dtype Int but expected Float