<a href="https://colab.research.google.com/github/dtanlocc/offine-signature-verification-with-gan/blob/main/Test_SignGAN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **PREPARE KAGGLE DATASET**

### Lấy API key kaggle từ máy tính

In [1]:
from google.colab import files

files.upload()
#Cài đặt kaggle api client
!pip install -q kaggle

In [2]:
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/

#Set permision for file
!chmod 600 ~/.kaggle/kaggle.json

### Download Dataset

In [3]:
!kaggle datasets download -d 'sobirthday/dataset-fold-user'

Dataset URL: https://www.kaggle.com/datasets/sobirthday/dataset-fold-user
License(s): unknown
dataset-fold-user.zip: Skipping, found more recently modified local copy (use --force to force download)


In [4]:
!unzip -q dataset-fold-user.zip -d dataset

replace dataset/BHSig260-Bengali/BHSig260-Bengali/1/B-S-1-F-01.tif? [y]es, [n]o, [A]ll, [N]one, [r]ename: None


## **IMPORT MODULES**

In [5]:
import os
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
import pandas as pd
from PIL import Image
import numpy as np
from typing import Optional, List, Tuple
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML
import torchvision.utils as vutils
import seaborn as sns
from torch.utils.data import Dataset, DataLoader
from tqdm.notebook import tqdm
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
import json
import yaml
import argparse
from torchvision import transforms
from torch.amp import autocast, GradScaler

## **DATA HANDLING & UTILITIES**

### *Create SignatureDataset class*

In [6]:
class SignatureDataset(Dataset):
  """class signatureDataset for using """

  def __init__(self, file_csv = None,  path_root = None, transform=None, data_frame=None) -> None:
    """
    :param file_csv: path to CSV files with path images, labels
    :param path_root: path to all images
    :param transform: transform to be applied to a sample
    """
    self.transform = transform
    self.path_root = path_root
    if file_csv is not None:
      self.data = pd.read_csv(file_csv)
    if data_frame is not None:
      self.data = data_frame

  def __len__(self):
    return len(self.data)

  def __getitem__(self, index: int):
    image_1_path = os.path.join(self.path_root, self.data.loc[index, 'image_1'])
    image_2_path = os.path.join(self.path_root, self.data.loc[index, 'image_2'])
    label =self.data.loc[index, 'label']
    image_1 = Image.open(image_1_path).convert('L')
    image_2 = Image.open(image_2_path).convert('L')
    sample = {'image_1': image_1, 'image_2': image_2, 'label': label}
    if self.transform:
      sample['image_1'] = self.transform(image_1)
      sample['image_2'] = self.transform(image_2)
    return sample

### *Create Meter Utillity Classes*

In [7]:
class CatMeter:

    """A utility class to concatenate PyTorch Tensors over time.
    This meter is useful for accumulating a sequence of tensors (e.g., model
    outputs or labels from different batches) into a single, larger tensor.
    The tensors are concatenated along the first dimension (dim=0).
    """
    def __init__(self) -> None:
      """Init a CatMeter"""
      self.val: Optional[torch.Tensor] = None

    def reset(self) -> None:
      self.val = None

    def update(self, value: Optional[torch.Tensor]) -> None:
      """Attend a torch"""
      if self.val is None:
        self.val = value
      else:
        self.val = torch.cat([self.val, value], dim=0)

    def get_val(self):
      """To get value of CatMeter by torch"""
      return self.val

    def get_val_np(self):
      """To get value of CatMeter by numpy"""
      return self.val.data.cpu().numpy()


In [8]:
class AverageMeter:
  """A utility class to calculator average numpy, when cal loss train..."""
  def __init__(self) -> None:
    self.n = 0
    self.sum = 0.0
    self.mean = np.nan

  def reset(self) -> None:
    self.n = 0
    self.sum = 0.0
    self.mean = np.nan

  def update(self, val, n=1) -> None:
    self.sum += val
    self.n += n
    if n>0:
      self.mean = self.sum/self.n

  def get_mean(self):
    return self.mean

## **MODEL ARCHITECTURE**

### *Class Discriminator*

In [9]:
class Discriminator(nn.Module):
  def __init__(self, **kwargs) -> None:
    super(Discriminator, self).__init__()
    backbone_name = kwargs["backbone"]
    output_dim = kwargs["output_dim"]
    pretrained = kwargs["pretrained"]

    if pretrained:
      if backbone_name == 'resnet18':
        weight_arg = models.ResNet18_Weights.DEFAULT
      if backbone_name == 'resnet34':
        weight_arg = models.ResNet34_Weights.DEFAULT
    if backbone_name == 'resnet18':
      self.backbone = models.resnet18(weights=weight_arg)
    if backbone_name == 'resnet34':
      self.backbone = models.resnet34(weights=weight_arg)
    if pretrained:
      original_weights = self.backbone.conv1.weight.data
      self.backbone.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
      self.backbone.conv1.weight.data = original_weights.mean(dim=1, keepdim=True)
    else:
      self.backbone.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)

    feature_dim = self.backbone.fc.in_features
    self.backbone.fc = nn.Identity()

    self.fc = nn.Sequential(
        nn.Linear(feature_dim, feature_dim//2),
        nn.BatchNorm1d(feature_dim//2),
        nn.ReLU(inplace=True),
        nn.Linear(feature_dim//2, output_dim)
    )

  def forward_one(self, x):
    features = self.fc(self.backbone(x))
    return features

  def forward(self, image_1, image_2):
    out1 = self.forward_one(image_1)
    out2 = self.forward_one(image_2)

    out1 = F.normalize(out1, p=2, dim=1)
    out2 = F.normalize(out2, p=2, dim=1)

    cosine_similarity = F.pairwise_distance(out1, out2, p=2)
    return cosine_similarity


### *class Generator*

#### class Block on UNet

In [10]:
class Block(nn.Module):
  """Block Unet have AdaIN"""
  def __init__(self, channels, w_dim=512) -> None:
    super().__init__()

    self.conv1 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)
    self.conv2 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)

    self.instance_norm = nn.InstanceNorm2d(channels, affine=False)
    self.style_scale_transform = nn.Linear(w_dim, channels)
    self.style_bias_transform = nn.Linear(w_dim, channels)

    self.leaky_relu = nn.LeakyReLU(0.2, inplace=True)

  def forward(self, x, w):
    shortcut = x

    x = self.conv1(x)
    style_scale = self.style_scale_transform(w).unsqueeze(2).unsqueeze(3)
    style_bias = self.style_bias_transform(w).unsqueeze(2).unsqueeze(3)

    x = self.instance_norm(x)
    x = style_scale*x + style_bias

    x = self.leaky_relu(x)
    x = self.conv2(x)

    return x + shortcut

In [11]:

class MappingNetwork(nn.Module):
  """
mapping input vector noise z, output: vector style w
"""
  def __init__(self, z_dim, w_dim) -> None:
    super().__init__()
    self.network = nn.Sequential(
        nn.Linear(z_dim, w_dim), nn.LeakyReLU(0.2, inplace=True),
        nn.Linear(w_dim, w_dim), nn.LeakyReLU(0.2, inplace=True),
        nn.Linear(w_dim, w_dim), nn.LeakyReLU(0.2, inplace=True),
        nn.Linear(w_dim, w_dim)
    )

  def forward(self, z):
    return self.network(z)

In [12]:
class Generator(nn.Module):
    """
    Generator đã được tái cấu trúc hoàn chỉnh:
    - Kiến trúc đối xứng, khởi tạo và forward nhất quán.
    - Logic Style Mixing được kiểm soát từ bên ngoài.
    """
    def __init__(self, in_channels: int = 1, z_dim: int = 512, w_dim: int = 512, base_channels: int = 64):
        super().__init__()

        self.z_dim = z_dim
        self.w_dim = w_dim
        c = base_channels
        self.num_decoder_layers = 3 # Số tầng decoder để style mixing

        # --- 1. CÁC LINH KIỆN ---
        self.mapping_network = MappingNetwork(z_dim, w_dim)
        self.initial_conv = nn.Conv2d(in_channels, c, kernel_size=3, padding=1)

        # Encoder (3 tầng xử lý, 3 tầng downsampling)
        self.enc1 = Block(channels=c, w_dim=w_dim)
        self.down1 = nn.Conv2d(c, c*2, kernel_size=3, stride=2, padding=1)
        self.enc2 = Block(channels=c*2, w_dim=w_dim)
        self.down2 = nn.Conv2d(c*2, c*4, kernel_size=3, stride=2, padding=1)
        self.enc3 = Block(channels=c*4, w_dim=w_dim)
        self.down3 = nn.Conv2d(c*4, c*8, kernel_size=3, stride=2, padding=1)

        self.bottleneck = Block(channels=c*8, w_dim=w_dim)

        # Decoder (3 tầng xử lý, 3 tầng upsampling)
        self.up1 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.dec1_conv = nn.Conv2d(in_channels=c*8 + c*4, out_channels=c*4, kernel_size=3, padding=1) # up(b) + e3
        self.dec1_block = Block(channels=c*4, w_dim=w_dim)

        self.up2 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.dec2_conv = nn.Conv2d(in_channels=c*4 + c*2, out_channels=c*2, kernel_size=3, padding=1) # d1 + e2
        self.dec2_block = Block(channels=c*2, w_dim=w_dim)

        self.up3 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.dec3_conv = nn.Conv2d(in_channels=c*2 + c, out_channels=c, kernel_size=3, padding=1)   # d2 + e1
        self.dec3_block = Block(channels=c, w_dim=w_dim)

        self.out_conv = nn.Conv2d(c, in_channels, kernel_size=3, padding=1)
        self.out_tanh = nn.Tanh()


    def forward(self, img: torch.Tensor, w1: torch.Tensor, skip_alpha: float = 1.0,
                w2: torch.Tensor = None, crossover_layer: int = 999):
        """
        Hàm forward được thiết kế lại, nhận trực tiếp các vector phong cách w.
        """
        # --- ENCODER (CHỈ TIÊM 1 PHONG CÁCH CHÍNH w1) ---
        e1 = self.enc1(self.initial_conv(img), w1)
        e2 = self.enc2(self.down1(e1), w1)
        e3 = self.enc3(self.down2(e2), w1)
        b = self.bottleneck(self.down3(e3), w1)

        # --- DECODER với Style Mixing và Weighted Skip Connection ---
        w_main = w1
        w_secondary = w2 if w2 is not None else w_main # Nếu không có w2, dùng w1

        # Tầng 1
        current_w = w_secondary if crossover_layer <= 1 else w_main
        d1 = self.up1(b)
        d1 = self.dec1_block(self.dec1_conv(torch.cat([d1, e3 * skip_alpha], dim=1)), current_w)

        # Tầng 2
        current_w = w_secondary if crossover_layer <= 2 else w_main
        d2 = self.up2(d1)
        d2 = self.dec2_block(self.dec2_conv(torch.cat([d2, e2 * skip_alpha], dim=1)), current_w)

        # Tầng 3
        current_w = w_secondary if crossover_layer <= 3 else w_main
        d3 = self.up3(d2)
        d3 = self.dec3_block(self.dec3_conv(torch.cat([d3, e1 * skip_alpha], dim=1)), current_w)

        # --- OUTPUT ---
        out = self.out_tanh(self.out_conv(d3))

        return out

## **LOSS FUNCTIONS**

In [13]:
class TripletLoss(nn.Module):
  def __init__(self, margin=1.0) -> None:
    super().__init__()
    self.margin = margin

  def forward(self, pos_dist, neg_dist):
    loss = F.relu(pos_dist - neg_dist + self.margin)
    return loss.mean()

In [14]:
class ContrastiveLoss(nn.Module):
  def __init__(self, margin=2.0) -> None:
    super().__init__()
    self.margin = margin

  def forward(self, euclidean_distance, label):
    loss_contrastive = torch.mean((1-label) * torch.pow(euclidean_distance, 2) +
                                  label * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))
    return loss_contrastive

## **VISUALIZATION & EVALUATION**

### *Test Genrator each batch*

In [15]:
def test_generator(img_list, save_folder):
  fig = plt.figure(figsize=(8,8))

  plt.axis('off')
  ims = [[plt.imshow(np.transpose(i, (1,2,0)), animated=True)] for i in img_list]

  ani = animation.ArtistAnimation(fig, ims, interval=1000, repeat_delay=1000, blit=True)

  if save_folder is not None:
    ani.save(f"{save_folder}/animation_generator.gif", writer="pillow", fps=1)
  display(HTML(ani.to_jshtml()))
  plt.close()

In [16]:
def plot_confusion_matrix(cm, class_names, save_dir):
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Prediction')
    plt.ylabel('True Label')
    plt.title('Confusion Matrix')

    os.makedirs(os.path.join(save_dir, "output"), exist_ok=True)
    save_dir = os.path.join(save_dir, "output", "confusion_matrix.png")

    plt.savefig(save_dir, dpi=300)
    plt.close()

In [17]:
def plot_confusion_matrix(cm, class_names, save_dir):
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Prediction')
    plt.ylabel('True Label')
    plt.title('Confusion Matrix')

    os.makedirs(os.path.join(save_dir, "output"), exist_ok=True)
    save_dir = os.path.join(save_dir, "output", "confusion_matrix.png")

    plt.savefig(save_dir, dpi=300)
    plt.close()

### *Calculator Metric such as far, frr, accuracy*

In [18]:
def calculator_metric(confusion_matrix):
  tn, fp, fn, tp = confusion_matrix.ravel()
  far = fp / (fp + tn)
  frr = fn / (fn + tp)
  accuracy = (tp + tn)/ (tp + tn + fn + fp)
  return far, frr, accuracy

## **TRAINING LOGIC**

### *Create Dataloader for train and test*

In [19]:
def create_train_test_loaders(
    csv_files: List[str],
    path_root: str,
    transform=None,
    batch_size: int = 64,
    idx_fold_test: int = 0,
    num_workers: int = 4
) -> Tuple[DataLoader, DataLoader]:
    """
    Tạo DataLoader cho train và test dựa trên phương pháp K-Fold Cross-Validation.

    Args:
        csv_files (List[str]): Danh sách các đường dẫn tới file CSV, mỗi file là một fold.
        path_root (str): Đường dẫn gốc tới thư mục chứa ảnh.
        transform (callable, optional): Các phép biến đổi áp dụng cho ảnh.
        batch_size (int, optional): Kích thước batch. Mặc định là 64.
        idx_fold_test (int, optional): Index của fold được dùng làm tập test. Mặc định là 0.
        num_workers (int, optional): Số luồng tải dữ liệu. Mặc định là 4.

    Returns:
        Tuple[DataLoader, DataLoader]: Một tuple chứa (train_loader, test_loader).
    """
    if not 0 <= idx_fold_test < len(csv_files):
        raise ValueError(f"idx_fold_test phải nằm trong khoảng [0, {len(csv_files)-1}]")

    print(f"--- Đang tạo loaders cho Fold {idx_fold_test + 1}/{len(csv_files)} ---")
    print(f"Test Fold: {csv_files[idx_fold_test]}")

    # [CẢI TIẾN] Lấy đường dẫn file test và train
    test_csv_path = csv_files[idx_fold_test]
    train_csv_paths = [path for i, path in enumerate(csv_files) if i != idx_fold_test]

    # [CẢI TIẾN] Đọc và kết hợp train data một cách rõ ràng
    train_df_list = [pd.read_csv(file) for file in train_csv_paths]
    train_dataframe = pd.concat(train_df_list, ignore_index=True)

    # [CẢI TIẾN] Đọc test data một cách rõ ràng (để dùng trong khởi tạo dataset)
    test_dataframe = pd.read_csv(test_csv_path)

    # Tạo dataset từ DataFrame để nhất quán
    # Giả định SignatureDataset của bạn chấp nhận DataFrame
    train_dataset = SignatureDataset(data_frame=train_dataframe, path_root=path_root, transform=transform)
    test_dataset = SignatureDataset(data_frame=test_dataframe, path_root=path_root, transform=transform)

    print(f"Số lượng mẫu huấn luyện: {len(train_dataset)}")
    print(f"Số lượng mẫu kiểm thử: {len(test_dataset)}")

    # Tạo DataLoader
    train_loader = DataLoader(
        train_dataset,
        num_workers=num_workers,
        batch_size=batch_size,
        shuffle=True,
        drop_last=True
    )

    test_loader = DataLoader(
        test_dataset,
        num_workers=num_workers,
        batch_size=batch_size,
        shuffle=False
    )

    return train_loader, test_loader

### *BaseTrainer*

In [20]:
from abc import ABC, abstractmethod
class BaseTrainer(ABC):
  """
  Base train:
  """
  def __init__(self, config: dict, fold: int, train_dataloader: DataLoader, test_dataloader: DataLoader) -> None:
    self.config = config
    self.fold = fold
    self.train_dataloader = train_dataloader
    self.test_dataloader = test_dataloader
    self.epochs = self.config['training']['epochs']
    # self.epochs = 1
    self.img_list = []

    torch.manual_seed(self.config['project']['seed'])
    self.device = torch.device(self.config['optimization']['device'])

    print(f"[BaseTrainer] Fold {self.fold} - Sử dụng Device: {self.device}")

    self._build_paths()
    self._create_folders()
    self._init_models()
    self._init_optimizers_and_schedulers()
    self._init_criterions()

    self.d_loss = []
    self.g_loss = []
    self.best_accuracy = 0.0
    self.best_epoch = 0
    self.final_results = {}

  @abstractmethod
  def _init_models(self):
    """Khởi tạo tất cả mô hình"""
    pass

  @abstractmethod
  def _init_optimizers_and_schedulers(self):
    """Khởi tạo optimizer và schedulers"""
    pass

  @abstractmethod
  def _init_criterions(self):
    """khởi tạo loss"""
    pass

  @abstractmethod
  def _train_one_epoch(self, epoch: int) -> dict:
    """Logic train 1 epoch. Return dict chứa các loss"""
    pass

  @abstractmethod
  def _validate_one_epoch(self, epoch: int) -> dict:
    """Logic đánh giá trên tập validation. Return dict metric"""
    pass

  @abstractmethod
  def _load_best_model(self, epoch):
    pass

  @abstractmethod
  def _get_checkpoint_state(self) -> dict:
    pass

  def _build_paths(self):
    exp_path = os.path.join(
        self.config['logging']['output_dir'],
        self.config['project']['experiment_name'],
        f"fold_{self.fold}"
    )

    self.experiment_path = exp_path
    self.checkpoints_dir = os.path.join(exp_path, self.config['logging']['checkpoints']['base_folder'])
    self.results_dir = os.path.join(exp_path, self.config['logging']['results']['folder_name'])
    self.log_dir = os.path.join(exp_path, self.config['logging']['log']['folder_name'])
    # self.discriminator_dir = os.path.join(exp_path, self.config['logging']['checkpoints']['discriminator_folder'], self.config['logging']['checkpoints']['base_folder'])
    # self.generator_dir = os.path.join(exp_path, self.config['logging']['checkpoints']['generator_dir'], self.config['logging']['checkpoints']['base_folder'])

  def _create_folders(self):
    os.makedirs(self.results_dir, exist_ok=True)
    os.makedirs(self.checkpoints_dir, exist_ok=True)
    os.makedirs(self.log_dir, exist_ok=True)

  def _save_checkpoint(self, epoch: int, is_best: bool = False):
    state = self._get_checkpoint_state()
    state['epoch'] = epoch
    filename = f'checkpoint_last.pth'
    save_path = os.path.join(self.checkpoints_dir, filename)
    torch.save(state, save_path)
    print(f"Đã lưu checkpoint: {save_path}")

    if is_best:
      best_path = os.path.join(self.checkpoints_dir, 'model_best.pth')
      torch.save(state, best_path)
      print(f"Epoch {epoch}: Đã cập nhật Best Model tại {best_path}")

  def train(self):
    print(f"\n=== BẮT ĐẦU HUẤN LUYỆN FOLD {self.fold} ===")

    for epoch in range(self.epochs):
        # Huấn luyện
        train_results = self._train_one_epoch(epoch)
        val_results = self._validate_one_epoch(epoch)
        self.d_loss.append(train_results['Loss D'])
        self.g_loss.append(train_results['Loss G'])

        with torch.no_grad():
          w = self.model_g.mapping_network(self.fixed_noise)
          fake = self.model_g(self.fixed_image_1, self.fixed_noise, skip_alpha=1.0).to(self.device).detach().cpu()
          fake_vis_normalized = (fake * 0.5 + 0.5).detach().cpu()

        grid = vutils.make_grid(fake_vis_normalized, padding=2)

        # Thêm grid mới vào danh sách
        self.img_list.append(grid)
        log_message = f"EPOCH {epoch}:\tLoss D: {train_results['Loss D']}\tLoss G: {train_results['Loss G']}\tAccuracy: {val_results['Accuracy']}\n"
        print(log_message)
        log_file = os.path.join(self.log_dir, f'train_log_fold{self.fold}.txt')
        with open(log_file, 'a') as log:
            log.write(log_message)

        # log_message = f"Epoch [{epoch+1}/{self.epochs}]"
        # for k, v in {**train_results, **val_results}.items():
        #     log_message += f" | {k}: {v}"
        # print(log_message)
        test_generator(self.img_list, save_folder=None)

        current_accuracy = val_results['Accuracy']
        if current_accuracy > self.best_accuracy:
            self.best_metric = current_accuracy
            self.best_epoch = epoch
            self._save_checkpoint(epoch, is_best=True)
        else:
              self._save_checkpoint(epoch, is_best=False)

    print(f"--- Huấn luyện hoàn tất. Best Accuracy: {self.best_metric} tại Epoch {self.best_epoch} ---")

    if self.best_epoch > 0:
      print("\n=== BẮT ĐẦU PHÂN TÍCH CUỐI CÙNG TRÊN MODEL TỐT NHẤT ===")
      self._run_final_analysis()
    else:
      print("Không tìm thấy model nào tốt hơn, bỏ qua phân tích cuối cùng.")
    test_generator(self.img_list, save_folder=None)

  @abstractmethod
  def _run_final_analysis(self):
    # Placeholder, lớp con sẽ implement chi tiết
    pass

### *SignGAN Train*

In [21]:
class SignGanTrainer(BaseTrainer):
  def __init__(self, config: dict, fold: int, train_dataloader: DataLoader, test_dataloader: DataLoader) -> None:
    super().__init__(config, fold, train_dataloader, test_dataloader)

    self._setup_visualization()
    self.results = {}

  def _setup_visualization(self):
    """Lấy 1 batch cố định để hiển thị so sánh Generator"""
    fixed_batch_data = next(iter(self.test_dataloader))

    cfg_vis = self.config['logging']['visualization']
    self.fixed_image_1 = fixed_batch_data['image_1'][:cfg_vis['num_test_images']].to(self.device)

    self.fixed_noise = torch.randn(cfg_vis['num_test_images'], self.config['model']['generator']['z_dim']).to(self.device)
    real_images_for_grid = (self.fixed_image_1.cpu() * 0.5 + 0.5).clamp(0, 1)

    # Tạo grid ảnh
    grid = vutils.make_grid(real_images_for_grid, padding=2, normalize=False)

    # [SỬA LỖI QUAN TRỌNG]
    # Chỉ append TENSOR gốc có shape (C, H, W) vào danh sách
    self.img_list.append(grid)

  def _init_models(self):
    cfg_g = self.config['model']['generator']
    cfg_d = self.config['model']['discriminator']
    self.model_g = Generator(in_channels=self.config['data']['channels'], **cfg_g)
    self.model_d = Discriminator(**cfg_d)
    if self.config['optimization']['use_compile']:
      print("[Trainer] Bật torch.compile() cho các mô hình.")
      self.model_g = torch.compile(self.model_g)
      self.model_d = torch.compile(self.model_d)
    self.model_g.to(self.device)
    self.model_d.to(self.device)

  def _init_optimizers_and_schedulers(self):
    cfg_g_opt = self.config['training']['optimizer_g']
    cfg_d_opt = self.config['training']['optimizer_d']
    self.optimizer_g = torch.optim.Adam(self.model_g.parameters(), lr=float(cfg_g_opt['lr']), betas=tuple(cfg_g_opt['betas']))
    self.optimizer_d = torch.optim.RMSprop(self.model_d.parameters(), lr=float(cfg_d_opt['lr']))

    self.scheduler_g = torch.optim.lr_scheduler.ReduceLROnPlateau(self.optimizer_g, mode='min', factor=0.5, patience=5)
    self.scheduler_d = torch.optim.lr_scheduler.ReduceLROnPlateau(self.optimizer_d, mode='min', factor=0.5, patience=5)
    self.schedulers = {'g': self.scheduler_g, 'd': self.scheduler_d}

  def _init_criterions(self):
    marggin_contrastive = self.config['training']['loss']['margin_contrastive']
    margin_triplet = self.config['training']['loss']['margin_triplet']

    self.contrastive_loss = ContrastiveLoss(margin=marggin_contrastive)
    self.triplet_loss = TripletLoss(margin=margin_triplet)
    self.content_loss = nn.L1Loss()

    use_amp = self.config['optimization']['use_amp']
    # self.content_loss = VGGPerceptualLoss(device=self.device)

    self.g_scaler = GradScaler(device=self.device, enabled=use_amp)
    self.d_scaler = GradScaler(device=self.device, enabled=use_amp)
    print("[SignGanTrainer] Đã khởi tạo Custom Criterions (Contrastive, Triplet, L1).")

  def _get_checkpoint_state(self) -> dict:
    return {
        'model_g_state_dict' : self.model_g.state_dict(),
        'model_d_state_dict' : self.model_d.state_dict(),
    }

  def _load_model_from_checkpoint(self, checkpoint_path):
    """Tải trọng số cho cả G và D từ một checkpoint."""
    checkpoint = torch.load(checkpoint_path, map_location=self.device)
    self.model_g.load_state_dict(checkpoint['model_g_state_dict'])
    self.model_d.load_state_dict(checkpoint['model_d_state_dict'])
    print(f"Đã tải model từ {checkpoint_path}")

  def _train_one_epoch(self, epoch: int) -> dict:
    self.model_d.train()
    self.model_g.train()

    g_loss_meter = AverageMeter()
    d_loss_meter = AverageMeter()
    device_type = self.device.type
    use_amp = self.config['optimization']['use_amp']

    g_cfg = self.config['model']['generator']
    loss_cfg = self.config['training']['loss']
    style_mixing_prob = loss_cfg['style_mixing_prob']
    skip_alpha = loss_cfg['skip_alpha']
    lambda_content = loss_cfg['lambda_content']
    lambda_diversity = loss_cfg['lambda_diversity']

    progress_bar = tqdm(self.train_dataloader, desc=f"Epoch {epoch+1}/{self.epochs}", leave=False)

    for batch in progress_bar:
      real_A = batch['image_1'].to(self.device)
      real_B = batch['image_2'].to(self.device)

      label_for_contrastive = (1 - batch['label'].to(self.device)).float()

      batch_size = real_A.size(0)

      #--------------------------------------------
      #Train Discriminator
      with autocast(device_type=device_type, enabled=use_amp):
        with torch.no_grad():
          z = torch.randn(batch_size, g_cfg['z_dim'], device=self.device)
          # D chỉ cần xem một loại ảnh giả, không cần mixing khi train D
          w = self.model_g.mapping_network(z)
          fake_B = self.model_g(real_A, w, skip_alpha=1.0) # Dùng skip đầy đủ khi tạo fake

        dist_real_pair = self.model_d(real_A, real_B)
        dist_fake_pair = self.model_d(real_A, fake_B.detach())

        loss_d_contrastive = self.contrastive_loss(dist_real_pair, label_for_contrastive)

        positive_distances = dist_real_pair[label_for_contrastive == 0]
        negative_distances = dist_fake_pair[label_for_contrastive == 0]

        if positive_distances.size(0) >0:
          loss_d_triplet = self.triplet_loss(positive_distances, negative_distances)

        loss_d = loss_d_contrastive + loss_d_triplet
      self.optimizer_d.zero_grad(set_to_none=True)
      self.d_scaler.scale(loss_d).backward()
      self.d_scaler.step(self.optimizer_d)
      self.d_scaler.update()

      #----------------------------------------------------------
      #Train Generator
      with autocast(device_type=device_type, enabled=use_amp):
        # --- Chuẩn bị Style Mixing ---
        z1 = torch.randn(batch_size, g_cfg['z_dim'], device=self.device)
        z2 = torch.randn(batch_size, g_cfg['z_dim'], device=self.device) # Luôn tạo z2

        w1 = self.model_g.mapping_network(z1)
        w2 = self.model_g.mapping_network(z2)
        crossover_layer = 999 # Mặc định không mix

        if random.random() < style_mixing_prob:
            crossover_layer = random.randint(1, self.model_g.num_decoder_layers)

        # --- Tạo 2 ảnh giả ---
        # Ảnh 1 dùng w1
        fake_B1 = self.model_g(real_A, w1, skip_alpha)
        # Ảnh 2 có mix style
        fake_B2 = self.model_g(real_A, w1, skip_alpha, w2, crossover_layer)

        # --- Tính Loss Đối kháng ---
        # Lừa D rằng cả hai ảnh giả đều là cặp "giống nhau"
        dist_g_fake1 = self.model_d(real_A, fake_B1)
        dist_g_fake2 = self.model_d(real_A, fake_B2)
        target_adv = torch.zeros_like(dist_g_fake1)
        loss_g_adv = (self.contrastive_loss(dist_g_fake1, target_adv) +
                      self.contrastive_loss(dist_g_fake2, target_adv)) / 2

        # --- Tính Loss Nội dung (với PerceptualLoss là tốt nhất) ---
        loss_g_content = (self.content_loss(fake_B1, real_A) +
                          self.content_loss(fake_B2, real_A)) / 2 * lambda_content

        # --- Tính Loss Đa dạng ---
        # loss_g_diversity = -self.content_loss(fake_B1, fake_B2) * lambda_diversity

        # --- Loss tổng ---
        loss_g = loss_g_adv + loss_g_content

      self.optimizer_g.zero_grad(set_to_none=True)
      self.g_scaler.scale(loss_g).backward()
      self.g_scaler.step(self.optimizer_g)
      self.g_scaler.update()

      # ... (cập nhật meter) ...
      d_loss_meter.update(loss_d.item(), batch_size)
      g_loss_meter.update(loss_g.item(), batch_size)
      progress_bar.set_postfix({'Loss_D': d_loss_meter.get_mean(), 'Loss_G': g_loss_meter.get_mean()})

    # results_dict = self._validate_one_epoch(epoch=epoch)
    d_loss_avg = d_loss_meter.get_mean()
    g_loss_avg = g_loss_meter.get_mean()
    self.scheduler_d.step(d_loss_avg)
    self.scheduler_g.step(g_loss_avg)

    return {'Loss D': d_loss_avg, 'Loss G': g_loss_avg}

  def accuracy(self, distances, labels, step=0.001):
    """
    Tìm ngưỡng tốt nhất để tối đa hóa balanced accuracy.
    (Hàm này được bê nguyên từ code cũ của bạn).
    """
    # Lưu ý: Chắc chắn rằng label của bạn ở đây là 0 và 1 gốc,
    # không phải label đã bị đảo ngược cho contrastive loss.
    dmax = torch.max(distances).item()
    dmin = torch.min(distances).item()

    # Đảm bảo nsame và ndiff không bằng 0 để tránh lỗi chia cho 0
    nsame = torch.sum(labels == 1).clamp(min=1) # label=1 là "giống nhau"
    ndiff = torch.sum(labels == 0).clamp(min=1) # label=0 là "khác nhau"

    max_acc = 0
    best_thresh = (dmax + dmin) / 2 # Khởi tạo giá trị mặc định

    for d_thresh in torch.arange(dmin, dmax + step, step):
        # Dự đoán: nếu khoảng cách < ngưỡng -> là cùng loại (1)
        #           nếu khoảng cách > ngưỡng -> là khác loại (0)
        pred_labels = (distances < d_thresh).float()

        # True Positive Rate (TPR): Tỷ lệ cặp dương được dự đoán đúng là dương
        tpr = (pred_labels[labels == 1] == 1).float().sum() / nsame
        # True Negative Rate (TNR): Tỷ lệ cặp âm được dự đoán đúng là âm
        tnr = (pred_labels[labels == 0] == 0).float().sum() / ndiff

        # Balanced Accuracy
        acc = 0.5 * (tpr + tnr)

        if acc > max_acc:
            max_acc = acc
            best_thresh = d_thresh

    return max_acc, best_thresh

  def _validate_one_epoch(self, epoch: int, verbose: bool = False) -> dict:
    self.model_d.eval()

    distances_meter = CatMeter()
    labels_meter = CatMeter()

    print(f"\nEpoch {epoch+1}: Bắt đầu đánh giá trên tập validation...")
    with torch.no_grad():
      for sample in tqdm(self.test_dataloader, desc=f"Epoch {epoch+1} Evaluating", leave=False):
        image_1 = sample['image_1'].to(self.device)
        image_2 = sample['image_2'].to(self.device)

        label = sample['label'].to(self.device).float()

        dist =  self.model_d(image_1, image_2)

        distances_meter.update(dist)
        labels_meter.update(label)

    all_distances = distances_meter.get_val()
    all_labels = labels_meter.get_val()

    balanced_acc, best_threshold = self.accuracy(all_distances, all_labels)

    # Chuyển về CPU/Numpy để dùng sklearn
    all_distances_cpu = all_distances.cpu().numpy()
    all_labels_cpu = all_labels.cpu().numpy()

    # Dự đoán lại nhãn dựa trên ngưỡng tốt nhất
    predicted_labels = (all_distances_cpu < best_threshold.cpu().numpy()).astype(int)

    final_accuracy_score = accuracy_score(all_labels_cpu, predicted_labels)
    cm = confusion_matrix(all_labels_cpu, predicted_labels)
    class_report_dict = classification_report(all_labels_cpu, predicted_labels, output_dict=True)

    far, frr, acc = calculator_metric(cm)
    if verbose:
      print(f"--- KẾT QUẢ ĐÁNH GIÁ CHI TIẾT (Epoch {epoch}) ---")
      print(f"Accuracy: {final_accuracy_score}")
      print("Confusion Matrix:\n", cm)
    results_dict = {
        'Accuracy': final_accuracy_score,
        'BalancedAcc': balanced_acc.item(),
        'Threshold': best_threshold.item(),
        'FAR': far,
        'FRR': frr,
        'ConfusionMatrix' : cm,
        'ClassReport' : class_report_dict,
    }
    return results_dict

  def _load_best_model(self):
    """Hàm tiện ích để tải trọng số từ checkpoint tốt nhất."""
    best_model_path = os.path.join(self.checkpoints_dir, 'model_best.pth')
    if not os.path.exists(best_model_path):
        print(f"Không tìm thấy file best model tại: {best_model_path}")
        return False

    print(f"Đang tải best model từ epoch {self.best_epoch} tại: {best_model_path}")
    checkpoint = torch.load(best_model_path, map_location=self.device)
    self.model_d.load_state_dict(checkpoint['model_d_state_dict'])
    self.model_g.load_state_dict(checkpoint['model_g_state_dict'])
    return True

  def _run_final_analysis(self):
    """
    Chạy test lần cuối, tính toán, trực quan hóa và lưu kết quả cuối cùng.
    Đây chính là logic trong hàm train() cũ của bạn.
    """
    if not self._load_best_model():
        return

    # Hàm _validate_one_epoch có thể được tái sử dụng để chạy đánh giá
    # Hoặc em có thể viết lại logic test ở đây cho rõ ràng
    print("Đang tính toán lại các metric trên tập test với model tốt nhất...")

    # Chúng ta tái sử dụng hàm validate nhưng với cờ để in chi tiết
    final_val_results = self._validate_one_epoch(self.best_epoch, verbose=True)

    # Trích xuất các giá trị cần thiết từ dict kết quả
    accuracy = final_val_results['Accuracy']
    far = final_val_results['FAR']
    frr = final_val_results['FRR']
    cm = final_val_results['ConfusionMatrix'] # Giả sử _validate trả về cm
    # class_report_dict = final_val_results['ClassReport']

    # XÂY DỰNG LẠI CẤU TRÚC KẾT QUẢ CỦA BẠN
    # (Nên có một cấu trúc dict đơn giản hơn, nhưng ta sẽ theo cấu trúc cũ của bạn)
    final_summary = {
        'GAN': {
            f'fold_{self.fold}': {
                'best_epoch': self.best_epoch,
                'best_accuracy': self.best_metric,
                'final_test_metrics': {
                    'acc': accuracy,
                    'far': far,
                    'frr': frr
                },
                'training_history': {
                    'g_loss': self.g_loss,
                    'd_loss': self.d_loss,
                } # Lưu lại toàn bộ lịch sử huấn luyện
            }
        }
    }
    self.results = final_summary

    # Lưu file JSON
    self.save_final_results(self.results)

    # VẼ BIỂU ĐỒ VÀ TRỰC QUAN HÓA
    # Các đường dẫn được xây dựng trong BaseTrainer._build_paths()
    vis_path_prefix = os.path.join(self.results_dir, "final")

    print("Đang vẽ các biểu đồ...")
    plot_confusion_matrix(cm=cm, save_dir=vis_path_prefix)

  def save_final_results(self, results):
    save_path = os.path.join(self.results_dir, f'results_fold{self.fold}.json')
    with open(save_path, 'w') as f:
        json.dump(results, f, indent=4)
    print(f"Saved results to {save_path}")

## **MAIN**

In [22]:
def main(config_path: str):
    """
    Hàm chính để chạy toàn bộ quá trình huấn luyện và đánh giá.
    """
    # 1. Đọc file cấu hình YAML
    try:
        with open(config_path, 'r') as f:
            config = yaml.safe_load(f)
        print("--- Đã tải file config thành công ---")
    except FileNotFoundError:
        print(f"Lỗi: Không tìm thấy file config tại '{config_path}'")
        return
    except Exception as e:
        print(f"Lỗi khi đọc file config: {e}")
        return

    # 2. Thiết lập các phép biến đổi (Transforms) cho ảnh
    # Các thông số này được lấy từ config
    data_cfg = config['data']
    image_size = data_cfg['image_size']

    # Định nghĩa transform cho tập train (có thể thêm data augmentation)
    train_transform = transforms.Compose([
        transforms.Resize((image_size, image_size)),
        transforms.CenterCrop((image_size, image_size)),
        transforms.Grayscale(num_output_channels=1),
        # transforms.RandomAffine(degrees=5, translate=(0.05, 0.05)), # Ví dụ data augmentation
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5], std=[0.5]) # Chuẩn hóa về [-1, 1]
    ])

    # Định nghĩa transform cho tập test (không có data augmentation)
    test_transform = transforms.Compose([
        transforms.Resize((image_size, image_size)),
        transforms.Grayscale(num_output_channels=1),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5], std=[0.5])
    ])

    # 3. Chạy K-Fold Cross-Validation
    # num_folds = len(data_cfg['csv_files'])
    num_folds = 1
    all_fold_results = [] # Lưu kết quả của mỗi fold

    for i in range(num_folds):
        current_fold = i + 1

        # 4. Tạo Dataloaders cho fold hiện tại
        try:
            train_loader, test_loader = create_train_test_loaders(
                csv_files=data_cfg['csv_files'],
                path_root=data_cfg['path_root'],
                transform=train_transform,  # Sử dụng train_transform cho cả 2 để nhất quán size, sau này có thể tách ra
                batch_size=data_cfg['batch_size'],
                idx_fold_test=i,
                num_workers=data_cfg['num_workers']
            )
        except Exception as e:
            print(f"Lỗi khi tạo Dataloader cho fold {current_fold}: {e}")
            continue # Bỏ qua fold này và tiếp tục

        # 5. Khởi tạo và chạy Trainer
        print(f"\n>>> BẮT ĐẦU HUẤN LUYỆN FOLD {current_fold}/{num_folds} <<<\n")

        # Tạo instance của SignGanTrainer
        trainer = SignGanTrainer(
            config=config,
            fold=current_fold,
            train_dataloader=train_loader,
            test_dataloader=test_loader
        )

        # Bắt đầu quá trình huấn luyện cho fold này
        # Hàm .train() của BaseTrainer sẽ tự động xử lý mọi thứ
        trainer.train()




In [23]:
def process_config(config: dict) -> dict:
  """Xử lý config, xây dựng các đường dẫn động."""
  env = config['environment']
  paths_cfg = config['paths'][env]

  config['data']['path_root'] = paths_cfg['data_root']
  config['data']['csv_files'] = [
      os.path.join(paths_cfg['data_root'],config['data']['dataset_name'], f)
      for f in config['data']['csv_files']
  ]
  config['logging']['output_dir'] = paths_cfg['output_dir']

  if config['optimization']['cudnn_benchmark']:
      torch.backends.cudnn.benchmark = True
      print("[Config] Đã bật cudnn.benchmark.")

  return config

In [None]:

"""Hàm chính để chạy toàn bộ quá trình huấn luyện."""
# 1. Xử lý config để có các đường dẫn cuối cùng

config_path = '/content/config.yaml'
try:
  with open(config_path, 'r') as f:
      config = yaml.safe_load(f)
  print("--- Đã tải file config thành công ---")
except FileNotFoundError:
  print(f"Lỗi: Không tìm thấy file config tại '{config_path}'")

except Exception as e:
  print(f"Lỗi khi đọc file config: {e}")

config = process_config(config)

# 2. Thiết lập transforms
data_cfg = config['data']
image_size = data_cfg['image_size']
transform = transforms.Compose([
  transforms.Resize((image_size, image_size)),
  transforms.Grayscale(num_output_channels=1),
  transforms.ToTensor(),
  transforms.Normalize(mean=[0.5], std=[0.5])
])

# 3. Chạy K-Fold Cross-Validation
# num_folds = len(data_cfg['csv_files'])
num_folds = 5
all_fold_results = [] # Lưu kết quả của mỗi fold
path_root = os.path.join(data_cfg['path_root'],config['data']['dataset_name'],config['data']['dataset_name'])
print(path_root)
# for i in range(num_folds):
  # current_fold = i + 1
current_fold = 0
  # 4. Tạo Dataloaders cho fold hiện tại
try:
  train_loader, test_loader = create_train_test_loaders(
      csv_files=data_cfg['csv_files'],
      path_root=path_root,
      transform=transform,
      batch_size=data_cfg['batch_size'],
      idx_fold_test=current_fold,
      num_workers=data_cfg['num_workers']
  )
except Exception as e:
  print(f"Lỗi khi tạo Dataloader cho fold {current_fold}: {e}")

# 5. Khởi tạo và chạy Trainer
print(f"\n>>> BẮT ĐẦU HUẤN LUYỆN FOLD {current_fold}/{num_folds} <<<\n")

# Tạo instance của SignGanTrainer
trainer = SignGanTrainer(
  config=config,
  fold=current_fold,
  train_dataloader=train_loader,
  test_dataloader=test_loader
)
trainer.train()




--- Đã tải file config thành công ---
[Config] Đã bật cudnn.benchmark.
/content/dataset/CEDAR/CEDAR
--- Đang tạo loaders cho Fold 1/5 ---
Test Fold: /content/dataset/CEDAR/fold_user/data_fold_1.csv
Số lượng mẫu huấn luyện: 24288
Số lượng mẫu kiểm thử: 6072

>>> BẮT ĐẦU HUẤN LUYỆN FOLD 0/5 <<<

[BaseTrainer] Fold 0 - Sử dụng Device: cuda
[SignGanTrainer] Đã khởi tạo Custom Criterions (Contrastive, Triplet, L1).

=== BẮT ĐẦU HUẤN LUYỆN FOLD 0 ===


Epoch 1/25:   0%|          | 0/253 [00:00<?, ?it/s]


Epoch 1: Bắt đầu đánh giá trên tập validation...


Epoch 1 Evaluating:   0%|          | 0/64 [00:00<?, ?it/s]

EPOCH 0:	Loss D: 0.00198949516731853	Loss G: 0.025172446435992268	Accuracy: 0.9089262187088274



Đã lưu checkpoint: /content/outputs/Run_CEDAR_Res18_b16_lr1e4/fold_0/checkpoints/checkpoint_last.pth
Epoch 0: Đã cập nhật Best Model tại /content/outputs/Run_CEDAR_Res18_b16_lr1e4/fold_0/checkpoints/model_best.pth


Epoch 2/25:   0%|          | 0/253 [00:00<?, ?it/s]

In [None]:
test_generator(trainer.img_list, None)