# ❗❗❗Внимание❗❗❗
### Данный ноутбук составлял участник нашей команды вне окружения проекта.

### Здесь используется нестандартное окружение, в связи с этим, вероятно возникнут трудности с запуском.

### Ноутбук предоставляется «как есть» и не несет важной составляющей инференса или обучения моделей.

### Для работы может потребоваться установка дополнительных библиотек и настройка окружения.

## Импорты

In [3]:
import os
import shutil
import sys
from pathlib import Path
from typing import Tuple, Iterable, Callable, Any, Optional, Literal

import numpy as np
import torch
import torch.nn.functional as F
from torch import Tensor
from torch.utils.data import Dataset, DataLoader
from transformers import AutoModel
from huggingface_hub import hf_hub_download
from PIL import Image

current_dir = os.getcwd()
parent_dir = os.path.join(current_dir, '..')
sys.path.append(parent_dir)
import scripts.utils.net as net

## Константы

In [None]:
HF_TOKEN = None

IMAGE_DATASET_DIR = r"..\image_dataset"                     # путь к датасету в виде пар фотографий

CHECKPOINT_PATH = r"..\ckpt_eer_epoch2_batch209000.ckpt"    # путь к весам модели

FGSM_SAVE_DIR = r"\check_attack_score"                      # путь к папке для сохранения датасета с FGSM атакой

## Переменные

In [18]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

aligner_path = os.path.expanduser('~/.cvlface_cache/minchul/cvlface_DFA_resnet50')
aligner_repo = 'minchul/cvlface_DFA_resnet50'

adaface_models = {
    'ir_101': CHECKPOINT_PATH
}

test_dir = IMAGE_DATASET_DIR

## Функция для загрузки модели-выравнивателя с HuggingFace

In [19]:
def download(repo_id: str, path: str, HF_TOKEN: Optional[str] = None) -> None:
    """
    :param repo_id: ID репозитория на Hugging Face.
    :param path: Локальный путь для сохранения файлов.
    :param HF_TOKEN: Токен аутентификации (если требуется).
    """
    os.makedirs(path, exist_ok=True)
    files_path = os.path.join(path, 'files.txt')
    
    if not os.path.exists(files_path):
        hf_hub_download(
            repo_id, 'files.txt', token=HF_TOKEN, 
            local_dir=path, local_dir_use_symlinks=False
        )
    
    with open(files_path, 'r', encoding='utf-8') as f:
        files = f.read().splitlines()
    
    additional_files = ['config.json', 'wrapper.py', 'model.safetensors']
    
    for file in [f for f in files if f] + additional_files:
        full_path = os.path.join(path, file)
        if not os.path.exists(full_path):
            hf_hub_download(
                repo_id, file, token=HF_TOKEN, 
                local_dir=path, local_dir_use_symlinks=False
            )

## Загрузка модели из локального пути

In [20]:
def load_model_from_local_path(path: str, HF_TOKEN: Optional[str] = None):
    """
    :param path: Путь к модели.
    :param HF_TOKEN: Токен аутентификации (если требуется).
    :return: Загруженная модель.
    """
    cwd = os.getcwd()
    os.chdir(path)
    sys.path.insert(0, path)
    
    model = AutoModel.from_pretrained(path, trust_remote_code=True, token=HF_TOKEN)
    
    os.chdir(cwd)
    sys.path.pop(0)
    return model

## Загрузка модели по идентификатору репозитория.

In [21]:
def load_model_by_repo_id(repo_id: str, save_path: str, HF_TOKEN: Optional[str] = None, force_download: bool = False):
    """
    :param repo_id: ID репозитория на Hugging Face.
    :param save_path: Путь для сохранения модели.
    :param HF_TOKEN: Токен аутентификации (если требуется).
    :param force_download: Принудительная загрузка (удаляет существующую директорию перед загрузкой).
    :return: Загруженная модель.
    """
    if force_download and os.path.exists(save_path):
        shutil.rmtree(save_path)
    
    download(repo_id, save_path, HF_TOKEN)
    return load_model_from_local_path(save_path, HF_TOKEN)

## Загрузка предобученной модели указанной архитектуры.

In [22]:
def load_pretrained_model(architecture: Literal['ir_101'] = 'ir_101'):
    """
    :param architecture: Название архитектуры модели (по умолчанию 'ir_101').
    :return: Загруженная и подготовленная к использованию модель.
    """
    assert architecture in adaface_models, f"Архитектура {architecture} не поддерживается."
    
    model_ = net.build_model(architecture)
    statedict = torch.load(
        adaface_models[architecture], map_location=torch.device('cpu')
    )['model_state_dict']
    
    model_.load_state_dict(statedict)
    model_.eval()
    
    return model_

## Загрузка состояния модели, оптимизатора и планировщика обучения из чекпоинта.

In [23]:
def load_checkpoint(
    filepath: str, 
    model: torch.nn.Module, 
    optimizer: Optional[torch.optim.Optimizer] = None, 
    scheduler: Optional[torch.optim.lr_scheduler._LRScheduler] = None
) -> torch.nn.Module:
    """
    :param filepath: Путь к файлу чекпоинта.
    :param model: Модель, в которую загружается состояние.
    :param optimizer: Опционально, оптимизатор для загрузки состояния.
    :param scheduler: Опционально, планировщик обучения для загрузки состояния.
    :return: Модель с загруженными весами.
    """
    checkpoint = torch.load(filepath, map_location=torch.device('cpu'))
    
    model.load_state_dict(checkpoint["model_state_dict"])
    
    if optimizer is not None and "optimizer_state_dict" in checkpoint:
        optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
    
    if scheduler is not None and "scheduler_state_dict" in checkpoint:
        scheduler.load_state_dict(checkpoint["scheduler_state_dict"])
    
    epoch = checkpoint.get("epoch", -1)
    global_batch = checkpoint.get("global_batch", -1)
    
    print(f"Загружен чекпоинт: эпоха {epoch}, глобальный батч {global_batch}")
    
    return model

## Загрузка предобученной модели

In [24]:
model = load_pretrained_model('ir_101').to(device)
model = load_checkpoint(CHECKPOINT_PATH, model).to(device)

Загружен чекпоинт: эпоха 2, глобальный батч 209000


## Преобразование PIL-изображение (RGB) в тензор для модели AdaFace.

In [25]:
def to_input(pil_rgb_image: Image.Image) -> torch.Tensor:
    """
    :param pil_rgb_image: Входное изображение формата PIL (RGB).
    :return: Тензор изображения в формате BGR с нормализацией.
    """
    np_img = np.array(pil_rgb_image)
    bgr_img = np_img[:, :, ::-1]  # Преобразование RGB -> BGR
    bgr_img_norm = (bgr_img / 255.0 - 0.5) / 0.5  # Нормализация
    tensor = torch.tensor(bgr_img_norm.transpose(2, 0, 1), dtype=torch.float32)
    
    return tensor.unsqueeze(0)  # Добавление размерности батча

## Преобразование тензора  в изображение PIL

In [26]:
def to_output(tensor: torch.Tensor) -> Image.Image:
    tensor = tensor.squeeze(0).cpu().numpy()
    bgr_img = (tensor * 0.5 + 0.5) * 255.0
    bgr_img = bgr_img.transpose(1, 2, 0).astype(np.uint8)
    rgb_img = bgr_img[:, :, ::-1]
    return Image.fromarray(rgb_img)

## Dataset для тестовых пар

In [27]:
class TestPairsDataset(Dataset):
    """Датасет для тестирования пар предварительно выровненных изображений.
    
    Attributes:
        test_dir: Путь к директории с тестовыми данными
        pair_ids: Отсортированный список идентификаторов пар изображений
    """
    
    def __init__(self, test_dir: str) -> None:
        self.test_dir = Path(test_dir)
        self.pair_ids = sorted(os.listdir(self.test_dir))

    def __len__(self) -> int:
        """Возвращает общее количество пар изображений."""
        return len(self.pair_ids)
    
    def __getitem__(self, idx: int) -> Tuple[str, Tensor, Tensor]:
        """Загружает и предобрабатывает пару изображений.
        
        Args:
            idx: Индекс пары изображений в датасете
            
        Returns:
            Кортеж с:
            - идентификатором пары
            - тензор первого изображения
            - тензор второго изображения
        """
        pair_id = self.pair_ids[idx]
        pair_path = self.test_dir / pair_id
        
        # Загрузка и преобразование изображений
        img0 = self._load_image(pair_path / "0.jpg")
        img1 = self._load_image(pair_path / "1.jpg")
        
        return pair_id, img0, img1

    def _load_image(self, path: Path) -> Tensor:
        """Внутренний метод для загрузки и преобразования изображения."""
        image = Image.open(path).convert("RGB")
        tensor = to_input(image).squeeze(0)
        return tensor

## Создание датасета из пар изображений

In [28]:
test_dataset = TestPairsDataset(test_dir)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=0, pin_memory=True)
print(f"Количество изображений в тестовом датасете: {len(test_dataloader.dataset)}")

Количество изображений в тестовом датасете: 9


## FGSM атака

### Функция для FGSM атаки

In [29]:
def fgsm_attack(
    image: Tensor,
    epsilon: float,
    data_grad: Tensor,
) -> Tensor:
    """Реализация атаки Fast Gradient Sign Method (FGSM).
    
    Args:
        image: Исходный тензор изображения (формата [C, H, W])
        epsilon: Коэффициент силы атаки (максимальное отклонение пикселя)
        data_grad: Градиенты loss по входным данным
    
    Returns:
        Tensor: Возмущенное изображение с ограниченными значениями [0, 1]
    """
    sign_data_grad = data_grad.sign()
    perturbed_image = image + epsilon * sign_data_grad
    perturbed_image = torch.clamp(perturbed_image, 0.0, 1.0)
    
    return perturbed_image

### Создание датасета с FGSM атакой

In [None]:
def process_dataset(
    test_dataset: Iterable[Tuple[str, torch.Tensor, torch.Tensor]],
    device: torch.device,
    model: torch.nn.Module,
    fgsm_attack: Callable[[torch.Tensor, float, torch.Tensor], torch.Tensor],
    to_output: Callable[[torch.Tensor], Any],
    images_dir: str,
    check_attack_score_dir: str,
    epsilon: float = 1.5,
) -> None:

    for folder, tensor0, tensor1 in test_dataset:
        tensor0 = F.interpolate(
            tensor0.unsqueeze(0), size=(112, 112),
            mode='bilinear', align_corners=False
        ).to(device)
        tensor1 = F.interpolate(
            tensor1.unsqueeze(0), size=(112, 112),
            mode='bilinear', align_corners=False
        ).to(device).detach()
        tensor1.requires_grad = True

        emb0, _ = model(tensor0)
        emb1, _ = model(tensor1)

        emb0 = F.normalize(emb0, p=2, dim=1)
        emb1 = F.normalize(emb1, p=2, dim=1)

        sims = torch.sum(emb0 * emb1, dim=1)
        loss = -sims.mean()
        model.zero_grad()
        loss.backward()

        perturbed_image = fgsm_attack(tensor1, epsilon, tensor1.grad.data)
        img = to_output(perturbed_image.detach())
        perturbed_embedding, _ = model(perturbed_image)
        perturbed_embedding = F.normalize(perturbed_embedding, p=2, dim=1)
        sims_new = torch.sum(emb0 * perturbed_embedding, dim=1)

        target_folder = os.path.join(check_attack_score_dir, folder)
        os.makedirs(target_folder, exist_ok=True)
        shutil.copy(
            os.path.join(images_dir, folder, "0.jpg"),
            os.path.join(target_folder, "0.jpg")
        )
        img.save(os.path.join(target_folder, "9.jpg"))

process_dataset(test_dataset, device, model, fgsm_attack, to_output, images_dir=IMAGE_DATASET_DIR, check_attack_score_dir=FGSM_SAVE_DIR)