# Загрузка данных


In [None]:
import requests
import zipfile
import gdown
import os
import shutil
from tqdm import tqdm

def merge_directories(dir1, dir2, output_dir):
    """
    Объединяет содержимое поддиректорий из dir1 и dir2 в одну структуру в output_dir.
    Используется один общий прогресс-бар для всех файлов.

    :param dir1: str, путь к первой директории.
    :param dir2: str, путь ко второй директории.
    :param output_dir: str, путь к результирующей директории.
    """
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    subdirs = [d for d in os.listdir(dir1) if os.path.isdir(os.path.join(dir1, d))]

    total_files = 0
    for subdir in subdirs:
        total_files += len(os.listdir(os.path.join(dir1, subdir)))
        if os.path.exists(os.path.join(dir2, subdir)):
            total_files += len(os.listdir(os.path.join(dir2, subdir)))

    with tqdm(total=total_files, desc="Объединение файлов") as pbar:
        for subdir in subdirs:
            dir1_subdir_path = os.path.join(dir1, subdir)
            dir2_subdir_path = os.path.join(dir2, subdir)
            output_subdir_path = os.path.join(output_dir, subdir)

            if not os.path.exists(dir2_subdir_path):
                print(f"Поддиректория {subdir} отсутствует в {dir2}. Пропуск.")
                continue

            if not os.path.exists(output_subdir_path):
                os.makedirs(output_subdir_path)

            for file_name in os.listdir(dir1_subdir_path):
                src_file = os.path.join(dir1_subdir_path, file_name)
                if os.path.isfile(src_file):
                    shutil.copy(src_file, output_subdir_path)
                    pbar.update(1)

            for file_name in os.listdir(dir2_subdir_path):
                src_file = os.path.join(dir2_subdir_path, file_name)
                if os.path.isfile(src_file):
                    shutil.copy(src_file, output_subdir_path)
                    pbar.update(1)

    print(f"Объединение завершено. Результат сохранен в {output_dir}.")


url = "https://fall.cv-gml.ru/task_file/15/additional_files"

headers = {
    "Cookie": "session=.eJwlzs1KBTEMQOF36dpFkuZnel9mSNMERVGY0ZXcd7fg9oMD57eddeX92h7lH3e-tPNttUfj5RC1hJU1M7APGUM3iodCmU9FDF-SLOQ6vc9pgIcw9-LOHANKBFkHroqQg0GqOKZhArGqrCQjWEyTUxAiJdC4wKIGtz3yc-f1f0PEY0vcV53fX-_5uc3Ra6ZBHgbeYbnKri2dRAgFyg-VgmzPP9v1QBE.Z4Tjug.mhYU5xb79OKI-ezR_Icya5Jght0"
}

response = requests.get(url, headers=headers)

if response.status_code == 200:
    with open("additional_files.zip", "wb") as f:
        f.write(response.content)
    print("Файл успешно скачан и сохранен как 'additional_files.zip'.")
else:
    print(f"Ошибка при скачивании. Код ответа: {response.status_code}")
    print(f"Тело ответа:\n{response.text}")

zip_file_path = "additional_files.zip"

output_dir = "/content/additional_files"

with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(output_dir)

print(f"Файлы распакованы в {output_dir}")

file_id = "1C1oQ_mXwPvUYKOjpvd0YTNQFSL6-OdZb"
url = f"https://drive.google.com/uc?export=download&id={file_id}"

dataset_path = "/content/synt.zip"
output_extract_path = "/content/dataset/"

gdown.download(url, dataset_path, quiet=False)

with zipfile.ZipFile(dataset_path, 'r') as zip_ref:
    zip_ref.extractall(output_extract_path)

print(f"Датасет распакован в {output_extract_path}")


dir1 = "/content/additional_files/cropped-train"
dir2 = "/content/dataset/synt"
output_dir = "/content/mixdataset"

merge_directories(dir1, dir2, output_dir)


# Dataloaders

In [None]:
import csv
import json
import os
import pickle
import random
import shutil
import typing
from concurrent.futures import ProcessPoolExecutor
import albumentations as A
import numpy as np
import scipy
import skimage
import skimage.filters
import skimage.io
import skimage.transform
import torch
import torchvision
import tqdm
from albumentations.pytorch import ToTensorV2
from PIL import Image
from sklearn.neighbors import KNeighborsClassifier
from torchvision import transforms
import pandas as pd
from albumentations import Compose, Resize, RandomCrop, HorizontalFlip, Normalize
from albumentations.pytorch import ToTensorV2


In [8]:
class DatasetRTSD(torch.utils.data.Dataset):
    """
    Класс для чтения и хранения датасета.

    :param root_folders: список путей до папок с данными
    :param path_to_classes_json: путь до classes.json
    """

    def __init__(
        self,
        root_folders: typing.List[str],
        path_to_classes_json: str,
    ) -> None:
        super().__init__()
        with open(path_to_classes_json, 'r') as file:
            self.classes_info = json.load(file)
        self.classes, self.class_to_idx = self.get_classes(path_to_classes_json)

        self.samples = []
        for folder in root_folders:
            for class_name in os.listdir(folder):
                class_folder = os.path.join(folder, class_name)
                if os.path.isdir(class_folder):
                    class_idx = self.class_to_idx.get(class_name, -1)
                    for img_name in os.listdir(class_folder):
                        img_path = os.path.join(class_folder, img_name)
                        if img_path.lower().endswith(('.png', '.jpg', '.jpeg')):
                            self.samples.append((img_path, class_idx))

        self.classes_to_samples = {idx: [] for idx in self.class_to_idx.values()}
        for idx, (_, class_idx) in enumerate(self.samples):
            self.classes_to_samples[class_idx].append(idx)

        self.transform = Compose([
            Resize(256, 256),
            RandomCrop(224, 224, p=0.5),
            HorizontalFlip(p=0.5),
            Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
            Resize(224, 224),
            ToTensorV2(),
        ])



    def __getitem__(self, index: int) -> typing.Tuple[torch.Tensor, str, int]:
        """
        Возвращает тройку: тензор с картинкой, путь до файла, номер класса файла (если нет разметки, то "-1").
        """
        img_path, class_idx = self.samples[index]
        image = Image.open(img_path).convert("RGB")

        transformed = self.transform(image=np.array(image))
        image_tensor = transformed["image"]

        return image_tensor, img_path, class_idx

    @staticmethod
    def get_classes(
        path_to_classes_json,
    ) -> typing.Tuple[typing.List[str], typing.Mapping[str, int]]:
        """
        Считывает из classes.json информацию о классах.

        :param path_to_classes_json: путь до classes.json
        """
        with open(path_to_classes_json, 'r') as file:
            class_data = json.load(file)

        class_to_idx = {class_name: idx for idx, (class_name, _) in enumerate(class_data.items())}

        classes = [class_name for class_name in class_data.keys()]

        return classes, class_to_idx

    def __len__(self) -> int:
        """
        Возвращает размер датасета (количество сэмплов).
        """
        return len(self.samples)

    def get_rare_images(self) -> typing.List[str]:
        """
        Функция для нахождения путей к изображениями с типом "rare" и подсчета их количества.
        Возвращает список путей и количество изображений.
        """
        rare_images = []
        for img_path, class_idx in self.samples:
            class_name = self.classes[class_idx]

            if class_name in self.classes_info and self.classes_info[class_name].get("type") == "rare":
                rare_images.append(img_path)

        print(f"Количество изображений с типом 'rare': {len(rare_images)}",total)
        return rare_images


In [9]:

class TestData(torch.utils.data.Dataset):
    """
    Класс для чтения и хранения тестового датасета.

    :param root: путь до папки с картинками знаков
    :param path_to_classes_json: путь до classes.json
    :param annotations_file: путь до .csv-файла с аннотациями (опциональный)
    """

    def __init__(
        self,
        root: str,
        path_to_classes_json: str,
        annotations_file: str = None,
    ) -> None:
        super().__init__()
        self.root = root

        with open(path_to_classes_json, 'r') as f:
            self.classes_info = json.load(f)

        self.class_to_idx = {cls: idx for idx, cls in enumerate(self.classes_info.keys())}

        self.samples = []
        for img_name in os.listdir(self.root):
            img_path = os.path.join(self.root, img_name)
            if img_name.endswith(".png"):
                self.samples.append(img_path)

        self.targets = None
        if annotations_file is not None:
            annotations = pd.read_csv(annotations_file)
            self.targets = {row["filename"]: self.class_to_idx[row["class"]] for _, row in annotations.iterrows()}

        self.transform = Compose([
            Resize(256, 256),
            #RandomCrop(224, 224, p=0.5),
            #HorizontalFlip(p=0.5),
            Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
            Resize(224, 224),
            ToTensorV2(),
        ])

    def __getitem__(self, index: int) -> typing.Tuple[torch.Tensor, str, int, str]:
        """
        Возвращает кортеж: тензор с картинкой, путь до файла, номер класса файла (если нет разметки, то "-1"),
        аннотация (имя класса или "unknown" при отсутствии разметки).
        """
        img_path = self.samples[index]
        image = Image.open(img_path).convert("RGB")

        image = self.transform(image=np.array(image))["image"]

        if self.targets is not None:
            target = self.targets.get(os.path.basename(img_path), -1)
            annotation = (
                list(self.class_to_idx.keys())[list(self.class_to_idx.values()).index(target)]
                if target != -1
                else "unknown"
            )
        else:
            target = -1
            annotation = "unknown"

        return image, img_path, target, annotation


    def __len__(self) -> int:
        """
        Возвращает размер датасета (количество сэмплов).
        """
        return len(self.samples)

In [10]:
class TestDataRare(torch.utils.data.Dataset):
    """
    Класс для чтения и хранения тестового датасета, включающего только редкие (rare) классы.

    :param root: путь до папки с картинками знаков
    :param path_to_classes_json: путь до classes.json
    :param annotations_file: путь до .csv-файла с аннотациями (опциональный)
    """

    def __init__(
        self,
        root: str,
        path_to_classes_json: str,
        annotations_file: str = None,
    ) -> None:
        super().__init__()
        self.root = root

        with open(path_to_classes_json, 'r') as f:
            self.classes_info = json.load(f)

        self.classes_info = {cls: info for cls, info in self.classes_info.items() if info["type"] == "freq" }

        self.class_to_idx = {cls: info["id"] for cls, info in self.classes_info.items()}

        self.samples = []
        for img_name in os.listdir(self.root):
            img_path = os.path.join(self.root, img_name)
            if img_name.endswith(".png"):
                self.samples.append(img_path)

        self.targets = None
        if annotations_file is not None:
            annotations = pd.read_csv(annotations_file)
            self.targets = {
                row["filename"]: self.class_to_idx[row["class"]]
                for _, row in annotations.iterrows()
                if row["class"] in self.class_to_idx
            }

        self.transform = Compose([
            Resize(256, 256),
            #RandomCrop(224, 224, p=0.5),
            #HorizontalFlip(p=0.5),
            Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
            Resize(224, 224),
            ToTensorV2(),
        ])

    def __getitem__(self, index: int) -> typing.Tuple[torch.Tensor, str, int, str]:
        """
        Возвращает кортеж: тензор с картинкой, путь до файла, номер класса файла (если нет разметки, то "-1"),
        аннотация (имя класса или "unknown" при отсутствии разметки).
        """
        img_path = self.samples[index]
        image = Image.open(img_path).convert("RGB")

        image = self.transform(image=np.array(image))["image"]

        if self.targets is not None:
            target = self.targets.get(os.path.basename(img_path), -1)
            annotation = (
                list(self.class_to_idx.keys())[list(self.class_to_idx.values()).index(target)]
                if target != -1
                else "unknown"
            )
        else:
            target = -1
            annotation = "unknown"

        return image, img_path, target, annotation

    def __len__(self) -> int:
        """
        Возвращает размер датасета (количество сэмплов).
        """
        return len(self.samples)





# Тестирование на редких классах

In [None]:
weights_path = "/Users/andreitsyrkunov/Downloads/simple1_model.pth"

state_dict = torch.load(weights_path, map_location=torch.device('mps'))

model = CustomNetwork(internal_features=512, num_classes=205)

model.load_state_dict(state_dict)

model.eval()

print("Веса успешно загружены!")


In [10]:
model.eval()

CustomNetwork(
  (resnet): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): Bottleneck(
        (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (downsample): Sequential(
     

In [20]:
root = "/content/additional_files/smalltest"
path_to_classes_json = "/content/additional_files/classes.json"
annotations_file = "/content/additional_files/smalltest_annotations.csv"


dataset_rare = TestDataRare(
    root=root,
    path_to_classes_json=path_to_classes_json,
    annotations_file=annotations_file
)

In [None]:
from tqdm import tqdm
import torch

root = "/content/additional_files/smalltest"
path_to_classes_json = "/content/additional_files/classes.json"
annotations_file = "/content/additional_files/smalltest_annotations.csv"


dataset_rare = TestDataRare(
    root=root,
    path_to_classes_json=path_to_classes_json,
    annotations_file=annotations_file
)

val_loader = DataLoader(dataset_rare, batch_size=1, shuffle=False)
correct = 0
total = 0
device = torch.device('cuda')
model.to(device)

with torch.no_grad():
    for images, _, labels, x in tqdm(val_loader, desc="Validation Progress", unit="batch"):
        images, labels = images.to(device), labels.to(device)
        if labels == -1:
          continue
        #print(labels)
        images = images.float()

        outputs = model(images)

        _, predicted = torch.max(outputs, 1)
        #print(predicted)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

val_accuracy = 100 * correct / total
print(f"Validation Rare Accuracy: {val_accuracy}%")


# Модель нейронное сети

In [11]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models
from torch.utils.data import DataLoader
from tqdm import tqdm
import os

class CustomNetwork(nn.Module):
    def __init__(self, num_classes: int, internal_features: int = 512):
        super(CustomNetwork, self).__init__()

        self.resnet = models.resnet50(pretrained=True)

        in_features = self.resnet.fc.in_features
        self.resnet.fc = nn.Linear(in_features, internal_features)

        self.relu = nn.ReLU()

        self.classifier = nn.Linear(internal_features, num_classes)

    def forward(self, x):
        x = self.resnet(x)
        x = self.relu(x)
        x = self.classifier(x)
        return x


# Trainloop

In [12]:
from torch.optim.lr_scheduler import CosineAnnealingLR

def train_simple_classifier(model, train_dataset, val_dataset, epochs=10, batch_size=32, learning_rate=0.001):
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()
    scheduler = CosineAnnealingLR(optimizer, T_max=epochs)


    device = torch.device("cuda")
    model.to(device)

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        train_loader_tqdm = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs} Training", leave=False)
        for images, _, labels in train_loader_tqdm:
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()

            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

            train_loader_tqdm.set_postfix(loss=running_loss / len(train_loader_tqdm), accuracy=100 * correct / total)
        train_accuracy = 100 * correct / total
        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss / len(train_loader)}, Train Accuracy: {train_accuracy}%")

        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0



        val_loader_tqdm = tqdm(val_loader, desc=f"Epoch {epoch+1}/{epochs} Validation", leave=False)

        with torch.no_grad():
            for images, _, labels, __ in val_loader_tqdm:
                images, labels = images.to(device), labels.to(device)

                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                _, predicted = torch.max(outputs, 1)
                correct += (predicted == labels).sum().item()
                total += labels.size(0)

                val_loader_tqdm.set_postfix(loss=val_loss / len(val_loader_tqdm), accuracy=100 * correct / total)

        val_accuracy = 100 * correct / total
        print(f"Validation Loss: {val_loss / len(val_loader)}, Validation Accuracy: {val_accuracy}%")
        scheduler.step()

        current_lr = scheduler.get_last_lr()[0]
        print(f"Learning rate for epoch {epoch+1}: {current_lr}")
    torch.save(model.state_dict(), 'simple2_model.pth')
    print("Model saved as simple2_model.pth")

In [None]:
import gdown

file_id = "1UJqADXcfy21YbTXujvB7omC2IAnjdDnM"
url = f"https://drive.google.com/uc?export=download&id={file_id}"

output_path = "/content/FT_model.pth"

gdown.download(url, output_path, quiet=False)

model = CustomNetwork(internal_features=512, num_classes=205)
#model.load_state_dict(torch.load(output_path))
for name, param in model.resnet.named_parameters():
    if 'layer4' not in name and 'fc' not in name:
        param.requires_grad = False


In [16]:
import torch
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torch import nn




root_train = "/content/dataset/synt"
root_val = "/content/additional_files/smalltest"
classes_json = "/content/additional_files/classes.json"
annotations_file_val = "/content/additional_files/smalltest_annotations.csv"

train_dataset = DatasetRTSD(root_folders=[root_train], path_to_classes_json=classes_json)
val_dataset = TestData(root=root_val, path_to_classes_json=classes_json, annotations_file=annotations_file_val)



In [None]:
train_simple_classifier(model, train_dataset, val_dataset, epochs=5,batch_size=512)

In [39]:
model_path = "/content/FineTuneModelWithSyntRS.pth"
torch.save(model.state_dict(), model_path)

In [40]:
from google.colab import files
files.download(model_path)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# Синтез данных

In [None]:
icons_dir = "/Users/andreitsyrkunov/Desktop/ML/CV(YSDA)/task 8(road znak)))/additonal_files/icons"
backgrounds_dir = "/Users/andreitsyrkunov/Desktop/ML/CV(YSDA)/task 8(road znak)))/additonal_files/background_images"
output_dir = "/Users/andreitsyrkunov/Desktop/ML/CV(YSDA)/task 8(road znak)))/additonal_files/synt"

generator = SignGenerator(icons_dir, backgrounds_dir, output_dir)
generator.generate_all_data(num_images_per_class=3)

In [None]:
import os
import random
import re
from pathlib import Path
from PIL import Image, ImageFilter, ImageEnhance
import numpy as np
from scipy.ndimage import convolve
from concurrent.futures import ThreadPoolExecutor

class SignGenerator:
    def __init__(self, icons_dir, backgrounds_dir, output_dir):
        self.icons_dir = icons_dir
        self.backgrounds_dir = backgrounds_dir
        self.output_dir = output_dir
        self.background_images = list(Path(backgrounds_dir).glob('*.jpg'))
        print(f"Найдено фонов: {len(self.background_images)}")
        self.icon_files = list(Path(icons_dir).glob('*.png'))
        Path(output_dir).mkdir(parents=True, exist_ok=True)

    def resize_icon(self, icon, min_size=16, max_size=128):
        size = random.randint(min_size, max_size)
        return icon.resize((size, size), Image.Resampling.LANCZOS)

    def add_padding(self, icon, max_padding_pct=0.15):
        icon_size = icon.size[0]
        padding = random.randint(0, int(icon_size * max_padding_pct))
        new_size = (icon_size + padding, icon_size + padding)
        new_icon = Image.new('RGBA', new_size, (255, 255, 255, 0))
        new_icon.paste(icon, (padding // 2, padding // 2))
        return new_icon

    def rotate_icon(self, icon, min_angle=-15, max_angle=15):
        angle = random.randint(min_angle, max_angle)
        return icon.rotate(angle, resample=Image.Resampling.BICUBIC, expand=True)

    def blur_icon(self, icon, min_angle=-90, max_angle=90):
        angle = random.randint(min_angle, max_angle)
        kernel = np.ones((3, 3)) / 9.0
        icon_array = np.array(icon.convert("RGBA"))
        icon_blurred = np.zeros_like(icon_array)
        for channel in range(4):
            icon_blurred[..., channel] = convolve(icon_array[..., channel], kernel, mode='reflect')
        return Image.fromarray(icon_blurred.astype(np.uint8))

    def apply_gaussian_blur(self, icon, radius=2):
        return icon.filter(ImageFilter.GaussianBlur(radius))

    def apply_random_dullness(self, icon):
        enhancer = ImageEnhance.Brightness(icon)
        dullness_factor = random.uniform(0.5, 0.8)
        return enhancer.enhance(dullness_factor)

    def embed_on_background(self, icon, background):
        icon_size = icon.size[0]
        bg_resized = background.resize((icon_size + random.randint(0, int(icon_size * 0.15)),
                                        icon_size + random.randint(0, int(icon_size * 0.15))))
        bg_resized.paste(icon, (random.randint(0, bg_resized.width - icon_size),
                                random.randint(0, bg_resized.height - icon_size)), icon)
        return bg_resized

    def generate_one_icon(self, icon_class):
        icon_files = [file for file in self.icon_files if re.search(f'{icon_class}', file.stem)]
        if not icon_files:
            raise ValueError(f"Нет иконок для класса {icon_class} в директории {self.icons_dir}")

        icon_path = random.choice(icon_files)
        icon = Image.open(icon_path).convert("RGBA")
        icon_resized = self.resize_icon(icon)
        icon_with_padding = self.add_padding(icon_resized)
        icon_rotated = self.rotate_icon(icon_with_padding)
        icon_blurred = self.blur_icon(icon_rotated)
        icon_gaussian_blurred = self.apply_gaussian_blur(icon_blurred)
        icon_dulled = self.apply_random_dullness(icon_gaussian_blurred)

        if not self.background_images:
            raise ValueError("Нет доступных фонов для встраивания. Проверьте папку с фонами.")

        background = random.choice(self.background_images)
        background_image = Image.open(background).convert("RGBA")
        final_image = self.embed_on_background(icon_dulled, background_image)
        return final_image


    def generate_samples_from_image(self, image_path, n=10):
        icon = Image.open(image_path).convert("RGBA")


        for i in range(n):
            if not self.background_images:
                raise ValueError("Нет доступных фонов в указанной директории.")
            icon_resized = self.resize_icon(icon)
            icon_with_padding = self.add_padding(icon_resized)
            icon_rotated = self.rotate_icon(icon_with_padding)
            icon_blurred = self.blur_icon(icon_rotated)
            icon_gaussian_blurred = self.apply_gaussian_blur(icon_blurred)
            icon_dulled = self.apply_random_dullness(icon_gaussian_blurred)

            background = random.choice(self.background_images)
            background_image = Image.open(background).convert("RGBA")

            final_image = self.embed_on_background(icon_dulled, background_image)

            output_image_path = Path(self.output_dir) / f"sample_{i}.png"
            final_image.save(output_image_path)

        print(f"Генерация {n} сэмплов завершена, сохранено в {self.output_dir}.")

    def generate_all_data(self, num_images_per_class=1000):
        icon_classes = set()
        for file in self.icon_files:
            class_name = file.stem
            icon_classes.add(class_name)

        if not icon_classes:
            raise ValueError("Не удалось извлечь классы из имен файлов.")

        with ThreadPoolExecutor() as executor:
            for icon_class in icon_classes:
                output_class_dir = Path(self.output_dir) / icon_class
                output_class_dir.mkdir(parents=True, exist_ok=True)
                futures = [executor.submit(self.generate_one_icon, icon_class) for _ in range(num_images_per_class)]
                for i, future in enumerate(futures):
                    image = future.result()
                    image.save(output_class_dir / f"{i}.png")

In [None]:
generator = SignGenerator(icons_dir, backgrounds_dir, output_dir)
generator.generate_all_data(num_images_per_class=1000)

# Метрическое обучение

In [13]:
import torch
import torch.nn as nn

class FeaturesLoss(nn.Module):
    def __init__(self, margin=2.0):
        super(FeaturesLoss, self).__init__()
        self.margin = margin

    def forward(self, features, labels):
        """
        Вычисляет contrastive loss.

        :param features: Tensor, размер (batch_size, feature_dim), векторы признаков объектов.
        :param labels: Tensor, размер (batch_size), реальные метки объектов.
        :return: Tensor, значение функции потерь.
        """
        batch_size = features.size(0)
        loss = 0.0
        positive_pairs = 0
        negative_pairs = 0

        for i in range(batch_size):
            for j in range(batch_size):
                if i != j:
                    distance = torch.norm(features[i] - features[j], p=2) ** 2
                    if labels[i] == labels[j]:
                        loss += distance
                        positive_pairs += 1
                    else:
                        loss += torch.clamp(self.margin - torch.sqrt(distance), min=0) ** 2
                        negative_pairs += 1

        if positive_pairs > 0:
            loss /= positive_pairs + negative_pairs

        return loss


In [15]:
import random
import torch
from torch.utils.data import Sampler
from tqdm import tqdm
from collections import defaultdict

class CustomBatchSampler(Sampler):
    def __init__(self, data_source, elems_per_class, classes_per_batch, use_tqdm=True):
        self.data_source = data_source
        self.elems_per_class = elems_per_class
        self.classes_per_batch = classes_per_batch
        self.use_tqdm = use_tqdm

        self.class_indices = defaultdict(list)

        if self.use_tqdm:
            print("Building class indices...")

        for idx, (_, _, label) in tqdm(enumerate(data_source), total=len(data_source), disable=not self.use_tqdm, desc="Building indices"):
            self.class_indices[label].append(idx)

        self.classes = list(self.class_indices.keys())

    def __iter__(self):
        batch_indices = []

        for _ in tqdm(range(len(self)), desc="Generating batches", disable=not self.use_tqdm):
            selected_classes = random.sample(self.classes, self.classes_per_batch)

            batch = []
            for c in selected_classes:
                class_samples = random.sample(self.class_indices[c], self.elems_per_class)
                batch.extend(class_samples)

            random.shuffle(batch)
            batch_indices.append(batch)

            yield torch.tensor(batch)

    def __len__(self):
        return len(self.data_source) // (self.elems_per_class * self.classes_per_batch)

In [16]:
root_train = "/content/mixdataset"
root_val = "/content/additional_files/smalltest"
classes_json = "/content/additional_files/classes.json"
annotations_file_val = "/content/additional_files/smalltest_annotations.csv"
train_dataset = DatasetRTSD(root_folders=[root_train], path_to_classes_json=classes_json)
root_val = "/content/additional_files/smalltest"
val_dataset = TestData(root=root_val, path_to_classes_json=classes_json, annotations_file=annotations_file_val)
batch_sampler = CustomBatchSampler(train_dataset, elems_per_class=4, classes_per_batch=32)


Building class indices...


Building indices: 100%|██████████| 284896/284896 [07:46<00:00, 610.90it/s]


In [18]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models
from torch.utils.data import DataLoader
from tqdm import tqdm
import os

class CustomNetworkMetric(nn.Module):
    def __init__(self, num_classes: int, internal_features: int = 512):
        super(CustomNetworkMetric, self).__init__()

        self.resnet = models.resnet50(pretrained=True)

        in_features = self.resnet.fc.in_features
        self.resnet.fc = nn.Linear(in_features, internal_features)

        self.relu = nn.ReLU()

        self.classifier = nn.Linear(internal_features, num_classes)

    def forward(self, x):
        l = self.resnet(x)
        x = self.relu(l)
        x = self.classifier(x)
        return x, l


In [None]:
model = CustomNetworkMetric(internal_features=512, num_classes=205)
import gdown

file_id = "1UJqADXcfy21YbTXujvB7omC2IAnjdDnM"
url = f"https://drive.google.com/uc?export=download&id={file_id}"

output_path = "/content/FT_model.pth"

gdown.download(url, output_path, quiet=False)

model.load_state_dict(torch.load(output_path))

for name, param in model.resnet.named_parameters():
    if 'layer4' not in name and 'fc' not in name:
        param.requires_grad = False

In [28]:
def train_better_model(
    model,
    train_dataset,
    val_dataset,
    sampler,
    epochs=10,
    learning_rate=0.001,
    margin=2.0,
    synthetic_weight=0.5,
    use_tqdm=True
):
    """
    Обучает модель с использованием заданного сэмплера и комбинированной функции потерь.

    :param model: torch.nn.Module, модель для обучения.
    :param train_dataset: Dataset, тренировочный датасет.
    :param val_dataset: Dataset, валидационный датасет.
    :param sampler: Sampler, сэмплер для формирования батчей.
    :param epochs: int, количество эпох обучения.
    :param learning_rate: float, скорость обучения.
    :param margin: float, маржа для FeaturesLoss.
    :param synthetic_weight: float, вес синтетической части loss.
    :param use_tqdm: bool, использовать ли прогресс-бары.
    """
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_sampler=sampler)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=32, shuffle=False)

    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    classification_criterion = nn.CrossEntropyLoss()
    features_criterion = FeaturesLoss(margin=margin)
    scheduler = CosineAnnealingLR(optimizer, T_max=epochs)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        running_classification_loss = 0.0
        running_features_loss = 0.0
        correct = 0
        total = 0

        train_loader_tqdm = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs} Training", leave=False, disable=not use_tqdm)
        for images, _, labels in train_loader_tqdm:
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()

            outputs, features = model(images)
            classification_loss = classification_criterion(outputs, labels)
            features_loss = features_criterion(features, labels)

            loss = classification_loss + synthetic_weight * features_loss
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            running_classification_loss += classification_loss.item()
            running_features_loss += features_loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

            train_loader_tqdm.set_postfix(
                loss=running_loss / len(train_loader_tqdm),
                classification_loss=running_classification_loss / len(train_loader_tqdm),
                features_loss=running_features_loss / len(train_loader_tqdm),
                accuracy=100 * correct / total
            )

        train_accuracy = 100 * correct / total
        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss / len(train_loader)}, Train Accuracy: {train_accuracy}%")

        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0

        val_loader_tqdm = tqdm(val_loader, desc=f"Epoch {epoch+1}/{epochs} Validation", leave=False, disable=not use_tqdm)
        with torch.no_grad():
            for images, _, labels in val_loader_tqdm:
                images, labels = images.to(device), labels.to(device)

                outputs, features = model(images)
                classification_loss = classification_criterion(outputs, labels)
                features_loss = features_criterion(features, labels)
                loss = classification_loss + synthetic_weight * features_loss

                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                correct += (predicted == labels).sum().item()
                total += labels.size(0)

                val_loader_tqdm.set_postfix(
                    loss=val_loss / len(val_loader_tqdm),
                    accuracy=100 * correct / total
                )

        val_accuracy = 100 * correct / total
        print(f"Validation Loss: {val_loss / len(val_loader)}, Validation Accuracy: {val_accuracy}%")
        scheduler.step()

        current_lr = scheduler.get_last_lr()[0]
        print(f"Learning rate for epoch {epoch+1}: {current_lr}")

    torch.save(model.state_dict(), 'better_model.pth')
    print("Model saved as better_model.pth")


In [None]:
train_better_model(
    model,
    train_dataset,
    val_dataset,
    batch_sampler,
    epochs=5,
    learning_rate=0.001,
    margin=2.0,
    synthetic_weight=0.5,
    use_tqdm=True
)