In [1]:
import os
import pandas as pd
import torch
from torch.utils.data import Dataset
import numpy as np
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim

In [2]:
class CustomDataset(Dataset):
    def __init__(self, directory):
        self.names = list()
        fds = os.listdir(directory)
        for item in fds:
            if item.endswith(('.npy')):
                if item.split('_')[6] != "1slash2-1slash2":
                    self.names.append(item)
        self.dir = directory

    def __len__(self):
        return len(self.names)

    def __getitem__(self, idx):
        path = os.path.join(self.dir, self.names[idx])
        array = np.load(path).astype(np.float32)
        if self.names[idx].split('_')[6] == '1-0':
            label = 1.
        else:
            label = 0.
        return torch.tensor(array), label

In [3]:
class CustomCollator:
    def __call__(self, batch):
        objects, labels = zip(*batch)
        max_l = max([obj.size(0) for obj in objects])
        
        padded_objects = []
        for obj in objects:
            padding = torch.zeros(max_l - obj.size(0), obj.size(1))
            padded_obj = torch.cat([obj, padding], dim=0)
            padded_objects.append(padded_obj)
        
        return torch.stack(padded_objects), torch.tensor(labels)

In [11]:
dataset = CustomDataset('/kaggle/input/singulars/sing')
data_collator = CustomCollator()
train, test = torch.utils.data.random_split(dataset, [0.7, 0.3])
train_dataloader = DataLoader(train, batch_size=64, collate_fn=data_collator, shuffle=True)
test_dataloader = DataLoader(test, batch_size=64, collate_fn=data_collator)

In [12]:
import torch
import torch.nn as nn

class LinearBlock(nn.Module):
    def __init__(self, dim):
        super(LinearBlock, self).__init__()
        self.linear1 = nn.Linear(dim, dim)
        self.relu = nn.ReLU()
        self.linear2 = nn.Linear(dim, dim)
        
    def forward(self, x):
        out = x
        out = self.linear1(out)
        out = self.relu(out)
        out = self.linear2(out)
        out = out + x
        out = self.relu(out)
        return out
        

class LinearModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers):
        super(LinearModel, self).__init__()
        
        self.input_proj = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.layers = []
        for i in range(num_layers):
            self.layers.append(LinearBlock(hidden_size))
        self.layers = nn.Sequential(*self.layers)
        self.output_proj = nn.Linear(hidden_size, output_size)
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, x):
        x = self.relu(self.input_proj(x))
        x = self.layers(x)
        x = self.output_proj(x)
        x = x.squeeze(2)
        x = x.mean(1)
        x = self.sigmoid(x)
        return x

input_size = 32
hidden_size = 64
output_size = 1
num_layers = 3
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = LinearModel(input_size, hidden_size, output_size, num_layers).to(device)

In [13]:
# Цикл обучения
def train_model(model, train_dataloader, num_epochs=10):
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(num_epochs):
        model.train()  # Указываем, что модель находится в режиме обучения
        running_loss = 0.0

        for inputs, labels in train_dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            # Обнуление градиентов
            optimizer.zero_grad()
            
            # Прямое распространение
            outputs = model(inputs)
            loss = criterion(outputs.squeeze(), labels)  # Убедитесь, что размерность совпадает
            
            # Обратное распространение
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()

        epoch_loss = running_loss / len(train_dataloader)
        print(f'Эпоха {epoch + 1}/{num_epochs}, Loss: {epoch_loss:.4f}')

# Функция для тестирования
def test_model(model, test_dataloader):
    model.eval()  # Указываем, что модель находится в режиме оценки
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in test_dataloader:
            outputs = model(inputs)
            predicted = outputs.round().squeeze()  # Округляем вероятность до 0 или 1
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    print(f'Точность на тесте: {accuracy:.2f}%')


train_model(model, train_dataloader, num_epochs=1000)
test_model(model, test_dataloader)

KeyboardInterrupt: 

In [14]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.position_embedding = nn.Embedding(max_len, d_model)

    def forward(self, x):
        positions = torch.arange(0, x.size(1), device=x.device).unsqueeze(0).repeat(x.size(0), 1)
        pos_emb = self.position_embedding(positions)
        return x + pos_emb

class SimpleTransformer(nn.Module):
    def __init__(self, embed_dim, d_model, num_heads, num_classes, dim_feedforward=16, num_layers=1, dropout=0.1):
        super(SimpleTransformer, self).__init__()
        
        self.in_proj = nn.Linear(embed_dim, d_model)
        
        # Позиционное кодирование
        self.positional_encoding = PositionalEncoding(d_model=d_model)
        
        # Энкодер Transformer
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model,
                                                   nhead=num_heads,
                                                   dim_feedforward=dim_feedforward,
                                                   dropout=dropout,
                                                   batch_first=True)
        
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        
        # Полносвязный слой для классификации
        self.classifier = nn.Linear(d_model, num_classes)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        x = F.relu(self.in_proj(x))
        
        # Добавляем позиционное кодирование
        x = self.positional_encoding(x)
        
        # Пропускаем через слои Transformer
        transformed = self.transformer_encoder(x)
        
        # Агрегация по временному измерению
        transformed = transformed.mean(dim=1)
        
        # Пропускаем через классификатор
        out = self.classifier(transformed)
        
        # Применяем softmax для вывода вероятности класса
        prob = self.sigmoid(out)
        
        return prob

# Пример использования
model = SimpleTransformer(embed_dim=32, d_model=32, num_heads=4, num_classes=1).to(device)

In [15]:

train_model(model, train_dataloader, num_epochs=1000)
test_model(model, test_dataloader)

Эпоха 1/1000, Loss: 0.6906
Эпоха 2/1000, Loss: 0.6895


KeyboardInterrupt: 

In [18]:
class RNNBinaryClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers=1):
        super(RNNBinaryClassifier, self).__init__()
        # Инициализация RNN слоя
        self.rnn = nn.RNN(
            input_size=input_size,   # размерность входа на каждом временном шаге
            hidden_size=hidden_size, # размерность скрытого состояния
            num_layers=num_layers,   # количество слоев RNN
            batch_first=True        # указывает, что первое измерение батча является размером батча
        )
        # Инициализация линейного слоя для классификации
        self.fc = nn.Linear(hidden_size, 1)  # один выходной нейрон, так как задача бинарной классификации

    def forward(self, x):
        # Проход через RNN, последний скрытый слой передаем на линейный слой
        output, hidden = self.rnn(x)  # выходы имеют размерность (batch_size, seq_len, hidden_size)
        last_hidden = output[:, -1, :]  # последнее скрытое состояние каждой последовательности в батче
        out = self.fc(last_hidden)     # пропускаем через линейный слой
        return torch.sigmoid(out)      # сигмоидальная функция активации для получения вероятности

    
# Инициализация модели
model = RNNBinaryClassifier(input_size=32, hidden_size=128, num_layers=3).to(device)


In [19]:

train_model(model, train_dataloader, num_epochs=1000)
test_model(model, test_dataloader)

Эпоха 1/1000, Loss: 0.6911
Эпоха 2/1000, Loss: 0.6907
Эпоха 3/1000, Loss: 0.6906
Эпоха 4/1000, Loss: 0.6907
Эпоха 5/1000, Loss: 0.6905
Эпоха 6/1000, Loss: 0.6906


KeyboardInterrupt: 

In [22]:
class CNN1DBinaryClassifier(nn.Module):
    def __init__(self, input_channels, num_features, output_size=20):
        super(CNN1DBinaryClassifier, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=input_channels, out_channels=num_features, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(in_channels=num_features, out_channels=num_features * 2, kernel_size=3, padding=1)
        self.adaptive_pool = nn.AdaptiveMaxPool1d(output_size)  # Фиксированный размер перед FC слоем
        self.fc1 = nn.Linear(num_features * 2 * output_size, 100)
        self.fc2 = nn.Linear(100, 1)

    def forward(self, x):
        x = x.permute(0, 2, 1)  # (batch_size, 32, seq_len)
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = self.adaptive_pool(x)  # Адаптивный пулинг для стандартизации размера
        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        x = torch.sigmoid(self.fc2(x))
        return x
    
model = CNN1DBinaryClassifier(32, 16).to(device)

In [23]:
train_model(model, train_dataloader, num_epochs=1000)
test_model(model, test_dataloader)

KeyboardInterrupt: 