In [2]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset


%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [3]:
# Определение класса датасета
class EMGDataset(Dataset):
    def __init__(self, pkl_path, fixed_length=515):
        self.data = pd.read_pickle(pkl_path)
        self.emgs = self.data['emg'].values
        self.labels = self.data['stimulus'].values - 1
        self.fixed_length = fixed_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        emg = self.emgs[idx]

        if emg.shape[0] > self.fixed_length:
            emg = emg[:self.fixed_length, :]
        elif emg.shape[0] < self.fixed_length:
            pad_size = self.fixed_length - emg.shape[0]
            emg = np.pad(emg, ((0, pad_size), (0, 0)), mode='constant')

        emg_tensor = torch.tensor(emg, dtype=torch.float32)
        emg_tensor = emg_tensor.unsqueeze(0)
        emg_tensor = emg_tensor.permute(2, 0, 1)

        label = self.labels[idx]
        return emg_tensor, label

In [4]:
# Функция для коллации батчей с паддингом
def collate_fn(batch):
    inputs, labels = zip(*batch)

    inputs = torch.stack(inputs)
    labels = torch.tensor(labels, dtype=torch.long)
    return inputs, labels

In [5]:
class InceptionBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(InceptionBlock, self).__init__()

        self.branch1 = nn.Sequential(
            nn.Conv2d(in_channels, 32, kernel_size=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True)
        )

        self.branch2 = nn.Sequential(
            nn.Conv2d(in_channels, 32, kernel_size=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True)
        )

        self.branch3 = nn.Sequential(
            nn.Conv2d(in_channels, 32, kernel_size=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.Conv2d(32, 64, kernel_size=5, padding=2),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True)
        )

        self.branch4 = nn.Sequential(
            nn.AvgPool2d(kernel_size=3, stride=1, padding=1),
            nn.Conv2d(in_channels, 32, kernel_size=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True)
        )

        self.conv_reduce = nn.Conv2d(32 + 64 + 64 + 32, out_channels, kernel_size=1)

    def forward(self, x):
        branch1 = self.branch1(x)
        branch2 = self.branch2(x)
        branch3 = self.branch3(x)
        branch4 = self.branch4(x)

        out = torch.cat([branch1, branch2, branch3, branch4], 1)

        out = self.conv_reduce(out)
        return out

In [6]:
class EMGHandNet(nn.Module):
    def __init__(self, num_classes=53):
        super(EMGHandNet, self).__init__()

        self.features = nn.Sequential(
            nn.Conv2d(10, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.AdaptiveMaxPool2d((1, 257)),

            InceptionBlock(32, 64),
            InceptionBlock(64, 128),
            InceptionBlock(128, 128),

            nn.AdaptiveAvgPool2d((1, 1))
        )

        self.classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(128, 128),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

In [7]:
# Инициализация модели
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = EMGHandNet(num_classes=53).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [8]:
# Загрузка данных
train_dataset = EMGDataset(r'C:\Users\Administrator\OneDrive\Рабочий стол\projects\prostheses\data\ninaprodb1train.pkl')
train_loader = DataLoader(
    train_dataset,
    batch_size=32,
    shuffle=True,
    num_workers=4,
    collate_fn=collate_fn
)

  self.labels = self.data['stimulus'].values - 1


In [None]:
# Цикл обучения
num_epochs = 50
log_interval = 10
running_loss = 0
total = 0
correct = 0

for epoch in range(num_epochs):
    for batch_idx, (inputs, labels) in enumerate(train_loader):
        labels[labels == 255] = 0  # Заменяем 255 на 0
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # Выводим информацию о потере
        if batch_idx % log_interval == 0:
            print(f"Train Epoch: {epoch} [{batch_idx * len(inputs)}/{len(train_loader.dataset)} "
                  f"({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}")

    for inputs, labels in train_loader:
        inputs = inputs.to(device)
        labels = labels.to(device).long()

        labels[labels == 255] = 0

        optimizer.zero_grad()
        outputs = model(inputs)

        num_classes = 40
        assert labels.max() < num_classes, f"Target value {labels.max()} exceeds the number of classes!"

        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    epoch_loss = running_loss / total
    epoch_acc = correct / total
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.4f}')

# Сохранение модели
torch.save(model.state_dict(), 'emghandnet.pth')