# Cài đặt triển khai mô hình toàn diện với kiến trúc đề xuất theo Pipeline (Khởi tạo kiến trúc, Train, Test)
###Bài toán cài đặt:
#### Nhận diện chữ số viết tay (bộ dữ liệu gồm 50k mẫu train, 10k mẫu Test)

In [1]:
# Các thư viện cần dùng cho dự án
import sys
sys.path.append("../")

import torch
import torchvision
import random
import numpy as np
import faiss

from fgi import *
from torch import nn, optim
from torchmetrics import Accuracy
from pytorch_metric_learning import losses, miners, samplers
from lightning.pytorch import Trainer
from lightning.pytorch.callbacks import EarlyStopping, ModelCheckpoint
from lightning.pytorch.loggers import TensorBoardLogger

In [2]:
# Các Constant cài đặt mặc định
IMG_SHAPE = (1, 28, 28) # C, H, W (ảnh xám mặc định)
PHI_DIM = 128
OPTIONS = ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]
NUM_CLASSES = len(OPTIONS)
DEFAULT_LR = 0.2
MAX_EXAMPLES = 1000 # Dùng cho embedding
NORMALIZE_IMAGE = (0.1307,), (0.3081,)
NUM_WORKERS = 7
BATCH_SIZE = 32
STORAGE_DATA = "../data/mnist_digit"
PROBLEM_ID = "digit_classifier"
EXPERIMENT_TENSORBOARD_NAME = PROBLEM_ID
EXPRIMENT_TENSORBOARD_PATH = "../experiment/"
SEED_CODE = 131006 # Random id dùng cho sinh dữ liệu
TRAIN_SIZE = 0.8 # 80% dữ liệu train sẽ được đem đi đào tạo, 20% cho validation
NUM_SAMPLE_PER_CLASS = 4 # Số sample mỗi label dành cho học không gian embedding
DROPOUT = 0.2 # Công dụng regularier, cấu hình cho nn.Dropout
TRIPLER_MARGIN = 28.6 # Độ khác biệt tối thiểu giữa anchor và positive, dùng cho biệt hoá không gian embedding
STRATEGY_ANCHOR_POSITIVE = "hard"
DEVICE = "cpu"
PATIENCE = 3 # Số lượt đợi không cải thiện
LAMBDA_CLASSIFY = 1. # Trọng số đánh giá tầm quan trọng mục tiêu phân loại
LAMBDA_EMBEDDING = 1. # Trọng số đánh giá tầm quan trọng mục tiêu biệt hoá không gian
ILLUSTRATION_EXAMPLES = 1000 # Giới hạn số mẫu dùng cho show projector trong tensorboard
MAX_EPOCHS = 30

In [3]:
# Cấu hình pytorch đảm bảo thí nghiệm
torch.manual_seed(SEED_CODE)
random.seed(SEED_CODE)
np.random.seed(SEED_CODE)
torch.cuda.manual_seed(SEED_CODE)
torch.cuda.manual_seed_all(SEED_CODE)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

## Chuẩn bị dữ liệu huấn luyện.

In [4]:
# Chuyển đổi và tăng cường dữ liêụ
# Định dạng transform mặc định
default_transform = torchvision.transforms.Compose([
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize(*NORMALIZE_IMAGE)
])

In [5]:
full_dataset = torchvision.datasets.MNIST(STORAGE_DATA, train=True, download=True, transform=default_transform)

# Chia tập train và validation
train_size = int(len(full_dataset) * TRAIN_SIZE)
val_size = len(full_dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(full_dataset, [train_size, val_size])

# Lấy bộ dữ liệu test
test_dataset = torchvision.datasets.MNIST(STORAGE_DATA, train=False, transform=default_transform, download=True)

In [6]:
# In thử thông tin bộ dữ liệu
print(f"Kích thước toàn bộ dữ liệu đào tạo: {len(full_dataset)}")
print(f"Kích thước dữ liệu cho training: {train_size}")
print(f"Kích thước dữ liệu cho validation: {val_size}")
print(f"Kích thước dữ liệu cho test: {len(test_dataset)}")

Kích thước toàn bộ dữ liệu đào tạo: 60000
Kích thước dữ liệu cho training: 48000
Kích thước dữ liệu cho validation: 12000
Kích thước dữ liệu cho test: 10000


In [7]:
# Tiến hành tạo các DataLoader
# Lấy tập train_labels dành cho sampler hỗ trợ học ko gian biểu diễn
train_labels = full_dataset.targets[train_dataset.indices]
embedding_sampler = samplers.MPerClassSampler(train_labels, m=NUM_SAMPLE_PER_CLASS, batch_size=BATCH_SIZE, length_before_new_iter=train_size)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, sampler=embedding_sampler, num_workers=NUM_WORKERS, persistent_workers=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS, persistent_workers=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS)

In [8]:
# In thử train_labels, soát lỗi
print(train_labels)

tensor([6, 5, 6,  ..., 0, 9, 6])


## Kiến trúc mô hình

In [9]:
# Lớp dùng cho khai thác kết quả
class DigitExploiter(Exploiter):
    def __init__(self, digit, *args, **kwargs):
        super().__init__(*args, **kwargs)  
        self.digit = digit

    def __repr__(self):
        return f"{self.metadata["type"]}(digit={self.digit})"

In [10]:
# Lớp hành vi của vấn đề
class DigitClassifier(NonCodeProblem):
    def __init__(self, *args, **kwargs):
        super().__init__(PROBLEM_ID, DigitExploiter, *args, **kwargs)
        self._r = RepresentLayer([ 
            ImageRepresent(img_shape=IMG_SHAPE, patch_size=7, num_heads=1, phi_dim=PHI_DIM),  
            ImageRepresent(img_shape=IMG_SHAPE, patch_size=7, num_heads=1, phi_dim=PHI_DIM)
        ], output_dim=PHI_DIM)
        self._en = EnhanceRepresentUnit(phi_dim=PHI_DIM)
        self._po = PropertyUnit(phi_dim=PHI_DIM)
        self._spe = ChooseOptions(1, options=OPTIONS, property_name="digit", phi_dim=PHI_DIM)
    
    def recognize_unknown(self, x, *args, **kwargs):
        x = self._r(x)
        x = x.mean(dim = 1)
        x = self._en.embedding(x)
        return x

    def forward(self, x, skip_avatar : bool = False, *args, **kwargs):
        """
        Lan truyền, đại diện cho cách suy luận, cách giải quyết vấn đề,
        skip_avatar (bool) : bỏ qua việc xuất kết quả đại diện vấn đề
        """
        q1 = self._r(x)
        q1 = q1.mean(dim = 1)
        
        q_root = self._en(q1)
        q = self._po(q_root)
        q = self._spe(q_root + q)
        
        if skip_avatar:
            return q
        
        return q, self._en.embedding(q1)

## Cấu hình huấn luyện mô hình

In [11]:
# Viết lớp Learner dành riêng cho việc học đào tạo vấn đề
class DigitClassifierLearner(LightningLearner):
    def __init__(self, problem, *args, **kwargs):
        super().__init__(problem, *args, **kwargs)
        self._classify = nn.CrossEntropyLoss()
        self._specialized_space = losses.ArcFaceLoss(num_classes=NUM_CLASSES, embedding_size=PHI_DIM, margin=TRIPLER_MARGIN)
        # self._miner = miners.TripletMarginMiner(margin=TRIPLER_MARGIN, type_of_triplets=STRATEGY_ANCHOR_POSITIVE)
        self._train_accuracy = Accuracy(task="multiclass", num_classes=NUM_CLASSES)
        self._val_accuracy = Accuracy(task="multiclass", num_classes=NUM_CLASSES)
    
    def configure_optimizers(self):
        # Kết hợp thêm chiến lược scheduler
        optimizer = optim.AdamW(self._problem.parameters(), lr=0.001)
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=PATIENCE - 2)
        return { "optimizer" : optimizer, "lr_scheduler" : scheduler, "monitor" : "val/loss" }

    def training_step(self, batch, batch_idx, *args, **kwargs):
        x, y = batch
        y_predicted = self(x)

        loss, ce, triplet = self._aggerate_loss(y_predicted, y)
        self._train_accuracy.update(y_predicted[0], y)
        
        self.log("train/loss", loss, prog_bar=True, logger=True, on_epoch=True, on_step=True)
        self.log("train/ce", ce, prog_bar=True, logger=True, on_epoch=True)
        self.log("train/triplet", triplet, prog_bar=True, logger=True, on_epoch=True)
        self.log("train/acc", self._train_accuracy, prog_bar=True, on_epoch=True, logger=True)

        return loss

    def validation_step(self, batch, batch_idx, *args, **kwargs):
        x, y = batch
        y_predicted = self(x)

        loss, ce, triplet = self._aggerate_loss(y_predicted, y)
        self._val_accuracy.update(y_predicted[0], y)

        self.log("val/loss", loss, prog_bar=True, logger=True, on_epoch=True)
        self.log("val/acc", self._val_accuracy, prog_bar=True, logger=True, on_epoch=True)
        self.log("val/ce", ce, prog_bar=True, logger=True, on_epoch=True)
        self.log("val/triplet", triplet, prog_bar=True, logger=True, on_epoch=True)

    def on_train_batch_end(self, outputs, batch, batch_idx):
        lr = self.trainer.optimizers[0].param_groups[0]['lr']
        self.log("lr", lr, logger=True, on_epoch=True)

    def _aggerate_loss(self, y_predicted, y, *args, **kwargs):
        y_hat, emb = y_predicted

        ce = self._classify(y_hat, y)
        triplet = self._specialized_space(emb, y)

        loss = LAMBDA_CLASSIFY * ce + LAMBDA_EMBEDDING * triplet

        return loss, ce, triplet
    
    def test_step(self, batch, *args, **kwargs):
        x, y = batch
        y_predicted = self(x)

        loss, __, __ = self._aggerate_loss(y_predicted, y)
        self._val_accuracy.update(y_predicted[0], y)

        self.log("test/loss", loss, prog_bar=True, on_epoch=True, logger=True)
        self.log("test/acc", self._val_accuracy, prog_bar=True, logger=True, on_epoch=True)
    
    def _get_embedding(self, iterators):
        # Hiển thị thử embedding
        collected = 0

        all_imgs = []
        all_labels = []
        all_embeds = []

        self.eval()
        with torch.no_grad():
            for batch in iterators:
                x, y = batch
                x = x.to(self.device)
                y = y.to(self.device)

                # Encode ảnh thành vector đặc trưng (ví dụ: self._problem.encode)
                embedding = self._problem.recognize_unknown(x)

                all_imgs.append(x.cpu())
                all_labels.append(y.cpu())
                all_embeds.append(embedding.cpu())

                collected += x.size(0)
                if collected >= ILLUSTRATION_EXAMPLES:
                    break  # Dừng sớm nếu vượt quá max_examples
        
        all_imgs = torch.cat(all_imgs, dim=0)[:ILLUSTRATION_EXAMPLES]
        all_labels = torch.cat(all_labels, dim=0)[:ILLUSTRATION_EXAMPLES]
        all_embeds = torch.cat(all_embeds, dim=0)[:ILLUSTRATION_EXAMPLES]

        return all_imgs, all_labels, all_embeds

    def on_train_end(self):
        all_imgs, all_labels, all_embeds = self._get_embedding(self.trainer.val_dataloaders)
        self.logger.experiment.add_embedding(
            mat=all_embeds,
            metadata=[str(label.item()) for label in all_labels],
            label_img=all_imgs,
            global_step=self.global_step,
            tag="train/embedding"
        )

    def on_test_end(self):
        all_imgs, all_labels, all_embeds = self._get_embedding(self.trainer.test_dataloaders)
        self.logger.experiment.add_embedding(
            mat=all_embeds,
            metadata=[str(label.item()) for label in all_labels],
            label_img=all_imgs,
            global_step=self.global_step,
            tag="test/embedding"
        )

In [12]:
# Hiển thị thử mối quan hệ kế thừa
LightningLearner.__mro__

(fgi.learner.LightningLearner.LightningLearner,
 fgi.learner.Learner,
 lightning.pytorch.core.module.LightningModule,
 abc.ABC,
 lightning.fabric.utilities.device_dtype_mixin._DeviceDtypeModuleMixin,
 lightning.pytorch.core.mixins.hparams_mixin.HyperparametersMixin,
 lightning.pytorch.core.hooks.ModelHooks,
 lightning.pytorch.core.hooks.DataHooks,
 lightning.pytorch.core.hooks.CheckpointHooks,
 torch.nn.modules.module.Module,
 object)

# Phần thao tác chính, phối hợp các thành phần ở trên

In [13]:
# Khởi tạo bộ giải quyết
solver = DigitClassifier()
# Thử xem các cấu hình metadata
solver.metadata

{'type': 'DigitClassifier',
 'default_exploiter': 'DigitExploiter',
 'call_update': True,
 'layers': ['_r'],
 'units': ['fc2b842a-e054-48da-8c38-65a6ee5a8f67',
  '14369b48-48a5-4f13-a437-fb9f9a7a2ab6',
  'a15005e4-a400-4a93-95b1-29094bd88b6e',
  'd0a4a325-d60b-4b1d-bc17-b892b299d84d',
  'bfd1b5c0-4109-44ad-84d5-6069bfcd2812'],
 'properties': ['digit']}

In [14]:
# Khởi tạo bộ học
learner = DigitClassifierLearner(solver)
learner.compile()
# Dùng cho vẽ đồ thị lan truyền
learner.example_input_array = torch.randn(1, 1, 28, 28, device=DEVICE)

In [15]:
# Thử hiển thị và chạy thử solver
y_predicted = solver(learner.example_input_array)
print(y_predicted)

(tensor([[-0.0349,  0.4046,  0.2125,  0.0635, -0.2107,  0.0188, -0.3013,  0.5342,
          0.0666, -0.1598]], grad_fn=<AddmmBackward0>), tensor([[ 1.2526e-01, -1.1781e-01,  5.3821e-02,  2.5770e-02, -6.7324e-02,
          2.7504e-02,  9.0435e-03, -1.0721e-01, -9.9266e-02, -1.6187e-01,
          2.0328e-02, -1.3367e-02,  9.8793e-02, -2.9920e-02, -7.0345e-02,
          1.7222e-02,  9.5359e-02,  8.3620e-02, -1.1969e-01,  6.8430e-02,
          2.0422e-02,  1.2440e-02,  5.5009e-02, -6.1053e-02,  5.3340e-02,
         -1.2076e-01,  8.4601e-02,  1.8055e-01, -1.9966e-02,  4.7596e-02,
          3.7765e-02, -3.7254e-02, -8.8597e-02, -5.1497e-02,  1.5733e-01,
         -1.9636e-03, -4.5517e-02, -1.0543e-01, -2.2419e-01,  9.0068e-02,
         -2.7879e-02, -1.1419e-02,  7.1065e-02, -4.0827e-02, -1.1615e-02,
         -8.7144e-02, -1.2128e-02,  1.2616e-01, -1.5050e-01, -6.7234e-02,
         -1.6145e-01,  1.7606e-01, -8.5293e-02,  1.0005e-01,  1.4408e-02,
         -2.2719e-02,  1.1610e-01,  1.1428e-01, 

In [16]:
# Cấu hình logger, callbacks
logger = TensorBoardLogger(EXPRIMENT_TENSORBOARD_PATH, EXPERIMENT_TENSORBOARD_NAME, log_graph=True)
early = EarlyStopping("val/loss", patience=PATIENCE, verbose=True)
best_checkpoint = ModelCheckpoint(dirpath=f"../database/{PROBLEM_ID}", filename="best", monitor="val/loss", verbose=True, save_weights_only=True)

In [17]:
# Huấn luyện mô hình
trainer = Trainer(accelerator="auto", max_epochs=MAX_EPOCHS, logger=logger, callbacks=[early, best_checkpoint])
trainer.fit(learner, train_loader, val_loader)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
e:\simulations\implementations\env\Lib\site-packages\lightning\pytorch\callbacks\model_checkpoint.py:658: Checkpoint directory E:\simulations\implementations\database\digit_classifier exists and is not empty.
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name               | Type               | Params | Mode  | In sizes | Out sizes
-----------------------------------------------------------------------------------------
0 | _problem           | DigitClassifier    | 184 K  | train | ?        | ?        
1 | _classify          | CrossEntropyLoss   | 0      | train | ?        | ?        
2 | _specialized_space | ArcFaceLoss        | 1.3 K  | train | ?        | ?        
3 | _train_accuracy    | MulticlassAccuracy | 0      | train | ?        | ?        
4 | _val_accuracy      | MulticlassAccuracy | 0      | train | ?        | ?        
--------------------------------------

Epoch 0: 100%|██████████| 1500/1500 [02:52<00:00,  8.71it/s, v_num=12, train/loss_step=12.60, train/ce_step=0.616, train/triplet_step=12.00, val/loss=12.80, val/acc=0.804, val/ce=0.633, val/triplet=12.20, train/loss_epoch=20.00, train/ce_epoch=1.060, train/triplet_epoch=18.90, train/acc_epoch=0.653]

Metric val/loss improved. New best score: 12.799
Epoch 0, global step 1500: 'val/loss' reached 12.79865 (best 12.79865), saving model to 'E:\\simulations\\implementations\\database\\digit_classifier\\best-v2.ckpt' as top 1


Epoch 1:  82%|████████▏ | 1232/1500 [02:17<00:29,  8.94it/s, v_num=12, train/loss_step=10.20, train/ce_step=0.503, train/triplet_step=9.730, val/loss=12.80, val/acc=0.804, val/ce=0.633, val/triplet=12.20, train/loss_epoch=20.00, train/ce_epoch=1.060, train/triplet_epoch=18.90, train/acc_epoch=0.653]


Detected KeyboardInterrupt, attempting graceful shutdown ...


NameError: name 'exit' is not defined

In [None]:
trainer.test(learner, test_loader)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
e:\simulations\implementations\env\Lib\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:420: Consider setting `persistent_workers=True` in 'test_dataloader' to speed up the dataloader worker initialization.


Testing DataLoader 0: 100%|██████████| 313/313 [00:06<00:00, 49.90it/s]


[{'test/loss': 5.092170715332031, 'test/acc': 0.930400013923645}]

In [None]:
y_predicted = solver.forward(learner.example_input_array, skip_avatar=True)
y_predicted

tensor([[ 4.2225, -4.2506,  1.1206, -4.2685, -1.6954,  4.0609,  1.6806, -3.7513,
         -1.2111, -0.6855]], grad_fn=<AddmmBackward0>)

In [None]:
# Tiến hành lưu lại mô hình, lưu phân rã các thành phần mô hình
solver.save(f"../database/{PROBLEM_ID}")

### Thử khai thác tính chất OOP của solver

In [None]:
# Thử lấy một lớp khai thác
instance, _ = solver.as_instance(y_predicted)

In [None]:
instance

DigitExploiter(digit=0)

### Thử load solver đã giải quyết

### Thử nghiệm nhận diện mẫu chưa đào tạo, tận dụng kết quả từ vector đại diện học được trong ArcFaceLoss

In [None]:
type(learner)

__main__.DigitClassifierLearner

In [None]:
protos = learner._specialized_space.W.detach().numpy().astype("float32").T
index = faiss.IndexFlatIP(PHI_DIM)
index.add(protos)

(10, 128)


In [None]:
D, I = index.search(solver.forward(learner.example_input_array)[1].detach().numpy(), k=1)
print(D)
print(I)

[[2.0357275]]
[[2]]
