In [1]:
import os
import time
import csv
from PIL import Image
from tqdm import tqdm 
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader, Dataset, Subset, random_split
from sklearn.metrics import f1_score
import matplotlib.pyplot as plt
from torchvision.transforms.v2 import ToDtype
from torchvision.transforms import Normalize
from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split

In [None]:
#Преобразования для данных
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ColorJitter(brightness=1.5),  # Уменьшение яркости
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Загрузка категорий
activity_categories = pd.read_csv(activity_categories_file)
category_map = dict(zip(activity_categories['id'], activity_categories['category']))

# Класс для пользовательского датасета
class CustomDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.data_frame = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, str(self.data_frame.iloc[idx, 0]) + ".jpg")  # ID изображений в первом столбце
        image = Image.open(img_name).convert('RGB')
        label = self.data_frame.iloc[idx, 1]  # Метки во втором столбце

        if self.transform:
            image = self.transform(image)

        return image, label

# Загрузка данных
train_dataset = CustomDataset(csv_file=csv_train_ans, root_dir=root_dir_train, transform=transform)
train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True)

train_indices, val_indices = train_test_split(range(len(train_dataset)), test_size=0.2, random_state=42)

train_subset = Subset(train_dataset, train_indices)
val_subset = Subset(train_dataset, val_indices)

# Создание загрузчиков для обучающего и валидационного наборов
train_loader = DataLoader(dataset=train_subset, batch_size=64, shuffle=True)
val_loader = DataLoader(dataset=val_subset, batch_size=64, shuffle=False)

# Определение модели (ResNet18)
class BasicBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet, self).__init__()
        self.in_channels = 64
        
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride):
        layers = []
        layers.append(block(self.in_channels, out_channels, stride))
        self.in_channels = out_channels
        for _ in range(1, blocks):
            layers.append(block(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

def ResNet18(num_classes=10):
    return ResNet(BasicBlock, [2, 2, 2, 2], num_classes)

# Инициализация модели
num_classes = len(activity_categories)  # Количество классов в вашем датасете
model = ResNet18(num_classes=num_classes)

if torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")  # Используем CUDA или CPU

print("Для обучения выбран девайс {}".format(device))

model.to(device)

# Определение функции потерь и оптимизатора
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001, betas=(0.9, 0.999), eps=1e-8, weight_decay=1e-4)
scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95)

num_epochs = 120  # Увеличьте количество эпох по мере необходимости

# Инициализация списков для хранения значений потерь и точности
losses = []
accuracies = []
f1_scores = []
val_losses = []  # Инициализация списка для валидационных потерь
val_accuracies = []  # Инициализация списка для валидационных точностей

# Обучение модели
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    all_labels = []
    all_preds = []
    correct = 0
    total = 0

    start_time = time.time()  # Запомнить время начала эпохи

    for images, labels in tqdm(train_loader, desc=f"Epoch {epoch + 1}/{num_epochs}"):
        images, labels = images.to(device), labels.to(device)

        # Обнуляем градиенты
        optimizer.zero_grad()

        # Прямой проход
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Обратный проход и оптимизация
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        # Сохранение меток и предсказаний для F1 Score
        _, predicted = torch.max(outputs.data, 1)
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(predicted.cpu().numpy())

        # Обновление correct и total
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    end_time = time.time()  # Запомнить время конца эпохи
    epoch_time = end_time - start_time  # Вычислить время эпохи

    # Вычисление среднего лосса за эпоху
    epoch_loss = running_loss / len(train_loader)
    losses.append(epoch_loss)

    # Вычисление точности и F1 Score
    accuracy = correct / total
    accuracies.append(accuracy)
    f1 = f1_score(all_labels, all_preds, average='weighted')
    f1_scores.append(f1)

    # Валидация
    model.eval()
    val_running_loss = 0.0
    val_correct = 0
    val_total = 0
    val_all_labels = []
    val_all_preds = []

    with torch.no_grad():  # Отключаем градиенты для валидации
        for val_images, val_labels in val_loader:
            val_images, val_labels = val_images.to(device), val_labels.to(device)

            val_outputs = model(val_images)
            val_loss = criterion(val_outputs, val_labels)

            val_running_loss += val_loss.item()

            # Сохранение меток и предсказаний для F1 Score
            _, val_predicted = torch.max(val_outputs.data, 1)
            val_all_labels.extend(val_labels.cpu().numpy())
            val_all_preds.extend(val_predicted.cpu().numpy())

            # Обновление val_correct и val_total
            val_total += val_labels.size(0)
            val_correct += (val_predicted == val_labels).sum().item()

    # Вычисление валидационной потери и точности
    val_epoch_loss = val_running_loss / len(val_loader)
    val_losses.append(val_epoch_loss)

    val_accuracy = val_correct / val_total
    val_accuracies.append(val_accuracy)

    # Вывод результатов для текущей эпохи
    print(f"Epoch [{epoch + 1}/{num_epochs}], "
          f"Train Loss: {epoch_loss:.4f}, Train Accuracy: {accuracy:.4f}, Train F1 Score: {f1:.4f}, "
          f"Val Loss: {val_epoch_loss:.4f}, Val Accuracy: {val_accuracy:.4f}")


    # Сохранение чекпоинта каждые 5 эпох
    if (epoch + 1) % 5 == 0:
        checkpoint_path = f"checkpoint_epoch_{epoch + 1}.pth"
        torch.save({
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': epoch_loss,
        }, checkpoint_path)
        print(f"Чекпоинт сохранен: {checkpoint_path}")

print("Обучение завершено!")



# Функция для тестирования модели
class CustomTestDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.data_frame = os.listdir(root_dir)
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.data_frame[idx])
        image = Image.open(img_name).convert('RGB')
        label = self.data_frame[idx].replace(".jpg", '')  # Извлечение ID из имени файла

        if self.transform:
            image = self.transform(image)

        return image, int(label)

def get_result(model: torch.nn.Module, transform: transforms.Compose, root_dir_test: str, category_map: dict):
    # Создание тестового датасета и загрузчика
    dataset = CustomTestDataset(root_dir=root_dir_test, transform=transform)
    dl = DataLoader(dataset, batch_size=128)

    model.eval()  # Установка модели в режим оценки
    ans = []

    # Перебор данных в загрузчике
    for img, label in tqdm(dl):
        img = img.to(device)
        label = label.to(device)  # Перемещение изображений на устройство
        pred = model(img)  # Предсказания модели
        preds = torch.argmax(pred, dim=1)

        # Конкатенация меток и предсказаний
        res = torch.cat((label.unsqueeze(1), preds.unsqueeze(1)), dim=1)
        ans.extend(res.cpu())
    
    # Проверка, есть ли данные в ans
    if not ans:
        print("Ошибка: массив ans пуст. Проверьте процесс предсказания.")
        return

    # Преобразование ans в список
    ans = [[element.item() for element in row] for row in ans]

    # Создание списка для сохранения результатов с номерами категорий
    results_with_categories = []
    
    for id, pred in ans:
        results_with_categories.append([id, pred])  # Сохраняем ID и номер предсказанной категории

    # Убедитесь, что файл будет создан в текущей директории
    output_file = 'result.csv'  # Сохраняем в текущей директории

    # Запись результатов в CSV файл
    try:
        with open(output_file, 'w', newline="") as out_file:
            writer = csv.writer(out_file, delimiter=',')
            writer.writerow(['id', 'target_feature'])  # Заголовки
            writer.writerows(results_with_categories)  # Запись данных
        print("Результаты успешно сохранены в", output_file)
    except Exception as e:
        print(f"Ошибка при записи в файл: {e}")

# Вызов функции для получения результатов на тестовых данных
get_result(model, transform, root_dir_test, category_map)

In [None]:
#Преобразования для данных
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ColorJitter(brightness=1.5),  # Уменьшение яркости
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Загрузка категорий
activity_categories = pd.read_csv(activity_categories_file)
category_map = dict(zip(activity_categories['id'], activity_categories['category']))

In [None]:
# Класс для пользовательского датасета
class CustomDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.data_frame = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, str(self.data_frame.iloc[idx, 0]) + ".jpg")  # ID изображений в первом столбце
        image = Image.open(img_name).convert('RGB')
        label = self.data_frame.iloc[idx, 1]  # Метки во втором столбце

        if self.transform:
            image = self.transform(image)

        return image, label

# Загрузка данных
train_dataset = CustomDataset(csv_file=csv_train_ans, root_dir=root_dir_train, transform=transform)
train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True)

train_indices, val_indices = train_test_split(range(len(train_dataset)), test_size=0.2, random_state=42)

train_subset = Subset(train_dataset, train_indices)
val_subset = Subset(train_dataset, val_indices)

# Создание загрузчиков для обучающего и валидационного наборов
train_loader = DataLoader(dataset=train_subset, batch_size=64, shuffle=True)
val_loader = DataLoader(dataset=val_subset, batch_size=64, shuffle=False)

In [None]:
# Определение модели (ResNet18)
class BasicBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet, self).__init__()
        self.in_channels = 64
        
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride):
        layers = []
        layers.append(block(self.in_channels, out_channels, stride))
        self.in_channels = out_channels
        for _ in range(1, blocks):
            layers.append(block(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

def ResNet18(num_classes=10):
    return ResNet(BasicBlock, [2, 2, 2, 2], num_classes)

# Инициализация модели
num_classes = len(activity_categories)  # Количество классов в вашем датасете
model = ResNet18(num_classes=num_classes)

if torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")  # Используем CUDA или CPU

print("Для обучения выбран девайс {}".format(device))

model.to(device)

In [None]:
# Обучение модели
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    all_labels = []
    all_preds = []
    correct = 0
    total = 0

    start_time = time.time()  # Запомнить время начала эпохи

    for images, labels in tqdm(train_loader, desc=f"Epoch {epoch + 1}/{num_epochs}"):
        images, labels = images.to(device), labels.to(device)

        # Обнуляем градиенты
        optimizer.zero_grad()

        # Прямой проход
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Обратный проход и оптимизация
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        # Сохранение меток и предсказаний для F1 Score
        _, predicted = torch.max(outputs.data, 1)
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(predicted.cpu().numpy())

        # Обновление correct и total
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    end_time = time.time()  # Запомнить время конца эпохи
    epoch_time = end_time - start_time  # Вычислить время эпохи

    # Вычисление среднего лосса за эпоху
    epoch_loss = running_loss / len(train_loader)
    losses.append(epoch_loss)

    # Вычисление точности и F1 Score
    accuracy = correct / total
    accuracies.append(accuracy)
    f1 = f1_score(all_labels, all_preds, average='weighted')
    f1_scores.append(f1)

    # Валидация
    model.eval()
    val_running_loss = 0.0
    val_correct = 0
    val_total = 0
    val_all_labels = []
    val_all_preds = []

    with torch.no_grad():  # Отключаем градиенты для валидации
        for val_images, val_labels in val_loader:
            val_images, val_labels = val_images.to(device), val_labels.to(device)

            val_outputs = model(val_images)
            val_loss = criterion(val_outputs, val_labels)

            val_running_loss += val_loss.item()

            # Сохранение меток и предсказаний для F1 Score
            _, val_predicted = torch.max(val_outputs.data, 1)
            val_all_labels.extend(val_labels.cpu().numpy())
            val_all_preds.extend(val_predicted.cpu().numpy())

            # Обновление val_correct и val_total
            val_total += val_labels.size(0)
            val_correct += (val_predicted == val_labels).sum().item()

    # Вычисление валидационной потери и точности
    val_epoch_loss = val_running_loss / len(val_loader)
    val_losses.append(val_epoch_loss)

    val_accuracy = val_correct / val_total
    val_accuracies.append(val_accuracy)

    # Вывод результатов для текущей эпохи
    print(f"Epoch [{epoch + 1}/{num_epochs}], "
          f"Train Loss: {epoch_loss:.4f}, Train Accuracy: {accuracy:.4f}, Train F1 Score: {f1:.4f}, "
          f"Val Loss: {val_epoch_loss:.4f}, Val Accuracy: {val_accuracy:.4f}")


    # Сохранение чекпоинта каждые 5 эпох
    if (epoch + 1) % 5 == 0:
        checkpoint_path = f"checkpoint_epoch_{epoch + 1}.pth"
        torch.save({
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': epoch_loss,
        }, checkpoint_path)
        print(f"Чекпоинт сохранен: {checkpoint_path}")

print("Обучение завершено!")



# Функция для тестирования модели
class CustomTestDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.data_frame = os.listdir(root_dir)
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.data_frame[idx])
        image = Image.open(img_name).convert('RGB')
        label = self.data_frame[idx].replace(".jpg", '')  # Извлечение ID из имени файла

        if self.transform:
            image = self.transform(image)

        return image, int(label)

def get_result(model: torch.nn.Module, transform: transforms.Compose, root_dir_test: str, category_map: dict):
    # Создание тестового датасета и загрузчика
    dataset = CustomTestDataset(root_dir=root_dir_test, transform=transform)
    dl = DataLoader(dataset, batch_size=128)

    model.eval()  # Установка модели в режим оценки
    ans = []

    # Перебор данных в загрузчике
    for img, label in tqdm(dl):
        img = img.to(device)
        label = label.to(device)  # Перемещение изображений на устройство
        pred = model(img)  # Предсказания модели
        preds = torch.argmax(pred, dim=1)

        # Конкатенация меток и предсказаний
        res = torch.cat((label.unsqueeze(1), preds.unsqueeze(1)), dim=1)
        ans.extend(res.cpu())
    
    # Проверка, есть ли данные в ans
    if not ans:
        print("Ошибка: массив ans пуст. Проверьте процесс предсказания.")
        return

    # Преобразование ans в список
    ans = [[element.item() for element in row] for row in ans]

    # Создание списка для сохранения результатов с номерами категорий
    results_with_categories = []
    
    for id, pred in ans:
        results_with_categories.append([id, pred])  # Сохраняем ID и номер предсказанной категории

    # Убедитесь, что файл будет создан в текущей директории
    output_file = 'result.csv'  # Сохраняем в текущей директории

    # Запись результатов в CSV файл
    try:
        with open(output_file, 'w', newline="") as out_file:
            writer = csv.writer(out_file, delimiter=',')
            writer.writerow(['id', 'target_feature'])  # Заголовки
            writer.writerows(results_with_categories)  # Запись данных
        print("Результаты успешно сохранены в", output_file)
    except Exception as e:
        print(f"Ошибка при записи в файл: {e}")

# Вызов функции для получения результатов на тестовых данных
get_result(model, transform, root_dir_test, category_map)