## import thư viện

In [None]:
!pip install timm



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image, ImageSequence, ImageOps
from tqdm import tqdm

import timm
from timm import create_model

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)

cuda


In [None]:
timm.list_models(pretrained=True)

['aimv2_1b_patch14_224.apple_pt',
 'aimv2_1b_patch14_336.apple_pt',
 'aimv2_1b_patch14_448.apple_pt',
 'aimv2_3b_patch14_224.apple_pt',
 'aimv2_3b_patch14_336.apple_pt',
 'aimv2_3b_patch14_448.apple_pt',
 'aimv2_huge_patch14_224.apple_pt',
 'aimv2_huge_patch14_336.apple_pt',
 'aimv2_huge_patch14_448.apple_pt',
 'aimv2_large_patch14_224.apple_pt',
 'aimv2_large_patch14_224.apple_pt_dist',
 'aimv2_large_patch14_336.apple_pt',
 'aimv2_large_patch14_336.apple_pt_dist',
 'aimv2_large_patch14_448.apple_pt',
 'bat_resnext26ts.ch_in1k',
 'beit3_base_patch16_224.in22k_ft_in1k',
 'beit3_base_patch16_224.indomain_in22k_ft_in1k',
 'beit3_base_patch16_224.indomain_pt',
 'beit3_base_patch16_224.pt',
 'beit3_large_patch16_224.in22k_ft_in1k',
 'beit3_large_patch16_224.indomain_in22k_ft_in1k',
 'beit3_large_patch16_224.indomain_pt',
 'beit3_large_patch16_224.pt',
 'beit_base_patch16_224.in22k_ft_in22k',
 'beit_base_patch16_224.in22k_ft_in22k_in1k',
 'beit_base_patch16_384.in22k_ft_in22k_in1k',
 'beit_l

## data reading

In [None]:
def get_image_path(folder, index):
    # chỉ cho phép 4 định dạng này
    valid_exts = ["jpeg", "jpg", "png", "gif"]

    for ext in valid_exts:
        path = os.path.join(folder, f"{index}.{ext}")
        if os.path.exists(path):
            return path
    return None

In [None]:
# hàm tạo danh sách path ảnh và label
def load_dataset(img_folder, label_file):
  df = pd.read_csv(label_file)

  img_paths = []
  labels = []

  for _, row in df.iterrows():
    idx = int(row['index'])
    label = float(row['label'])

    img_path = get_image_path(img_folder, idx)

    if img_path is None:
      print(f'Lỗi: Không tìm thấy ảnh index = {idx}')
      continue

    img_paths.append(img_path)
    labels.append(label) # ép nhãn về kiểu float phục vụ cho regression assess

  return img_paths, labels

## bannerDataset init

In [None]:
class BannerDataset(Dataset):
    def __init__(self, img_paths, labels, transform=None):
        self.img_paths = img_paths
        self.labels = labels  # float 1-5
        self.transform = transform

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx):
        img_path = self.img_paths[idx]

        # 1. Load Label
        label = torch.tensor([float(self.labels[idx])], dtype=torch.float)

        # 2. Mở ảnh (Nếu ảnh lỗi, chương trình sẽ dừng tại đây để báo bạn biết)
        img = Image.open(img_path)

        # 3. Xử lý ảnh động (GIF)
        if getattr(img, "is_animated", False):
            img = ImageSequence.Iterator(img).__next__()

        # 4. Xử lý trong suốt -> NỀN TRẮNG (Logic quan trọng giữ lại)
        img = img.convert("RGBA")
        background = Image.new('RGBA', img.size, (255, 255, 255)) # Tạo nền trắng
        img = Image.alpha_composite(background, img) # Dán ảnh lên
        img = img.convert("RGB") # Chuyển về RGB

        # 5. Transform
        if self.transform:
            img = self.transform(img)

        return img, label

## Regression head

In [None]:
class RegressionHead(nn.Module):
    def __init__(self, in_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, 1028),
            nn.ReLU(),
            nn.Dropout(0.1),

            nn.Linear(1028, 512),
            nn.ReLU(),
            nn.Dropout(0.1),

            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.1),

            nn.Linear(256, 128),
            nn.ReLU(),

            nn.Linear(128, 1)
        )

    def forward(self, x):
        return self.net(x)

## Resize ảnh

In [None]:
class ResizeWithPadding:
    def __init__(self, target_size, fill_color=(0, 0, 0)):
        if isinstance(target_size, int):
            self.target_size = (target_size, target_size)
        else:
            self.target_size = target_size
        self.fill_color = fill_color

    def __call__(self, img):
        # Resize giữ nguyên tỉ lệ (thumbnail)
        img.thumbnail(self.target_size, Image.Resampling.BICUBIC)

        # Tạo ảnh nền mới
        new_img = Image.new("RGB", self.target_size, self.fill_color)

        # Dán ảnh đã resize vào giữa
        left = (self.target_size[0] - img.size[0]) // 2
        top = (self.target_size[1] - img.size[1]) // 2
        new_img.paste(img, (left, top))

        return new_img

# 2. Cập nhật lại transform pipeline của bạn
# LƯU Ý: Tăng size lên 448 hoặc 512 để model nhìn rõ nét chữ hơn
TARGET_SIZE = 448

readability_transform = transforms.Compose([
    ResizeWithPadding(TARGET_SIZE), # Dùng class tự viết thay vì transforms.Resize
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

print("Đã cập nhật Transform với Padding & Size lớn!")

Đã cập nhật Transform với Padding & Size lớn!


## Model

In [None]:
class ConvNeXtV2FeatureExtractor(nn.Module):
    def __init__(self):
        super().__init__()
        self.backbone = create_model(
            'convnextv2_tiny.fcmae_ft_in22k_in1k',
            pretrained=True,
            num_classes=0
        )
        self.feature_dim = 768

        # --- CHIẾN THUẬT FINE-TUNING ---
        # 1. Ban đầu vẫn đóng băng hết
        for param in self.backbone.parameters():
            param.requires_grad = False

        # Mở khóa stage cuối (quan trọng nhất)
        for param in self.backbone.stages[3].parameters():
            param.requires_grad = True

        self.backbone.train() # Chuyển sang chế độ train để cập nhật weights

    def forward(self, x):
        return self.backbone(x)

## Mạng MLP

In [None]:
class ReadabilityRegressionHead(nn.Module):
    def __init__(self, in_features=768):
        super().__init__()
        self.head = nn.Sequential(
            nn.LayerNorm(in_features),
            nn.Dropout(0.4),

            nn.Linear(in_features, 512),
            nn.GELU(),
            nn.Dropout(0.35),

            nn.Linear(512, 256),
            nn.GELU(),
            nn.Dropout(0.3),

            nn.Linear(256, 1)
        )

    def forward(self, x):
        return self.head(x)

In [None]:
# Gộp Extractor và MLP lại thành 1 model duy nhất
class CompleteReadabilityModel(nn.Module):
    def __init__(self):
        super().__init__()
        # 1. Phần Backbone (ConvNeXt)
        # Lưu ý: Class ConvNeXtV2FeatureExtractor bạn đã sửa ở bước trước (đã có phần unfreeze)
        self.feature_extractor = ConvNeXtV2FeatureExtractor()

        # 2. Phần Head (MLP)
        # Lấy feature_dim từ extractor (768)
        self.head = ReadabilityRegressionHead(in_features=self.feature_extractor.feature_dim)

    def forward(self, x):
        # Chạy qua backbone
        features = self.feature_extractor(x)
        # Chạy qua MLP
        score = self.head(features)
        return score

# Khởi tạo model trọn gói
model = CompleteReadabilityModel()
model = model.to(device)

## Hàm R2

In [None]:
def r2_score(preds, targets):
    """
    preds, targets: Tensor shape (batch, 1)
    """
    ss_res = torch.sum((targets - preds) ** 2)
    ss_tot = torch.sum((targets - torch.mean(targets)) ** 2)
    return 1 - ss_res / ss_tot

## train funct

In [None]:
def train_one_epoch(model, loader, optimizer, loss_fn):
    model.train()
    total_loss = 0

    all_preds = []
    all_targets = []

    mae_fn = nn.L1Loss()

    for imgs, labels in loader:
        imgs, labels = imgs.to(device), labels.to(device)

        optimizer.zero_grad()
        preds = model(imgs)
        loss = loss_fn(preds, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * imgs.size(0)

        # Gom lại để tính metrics cuối epoch
        all_preds.append(preds.detach().cpu())
        all_targets.append(labels.detach().cpu())

    # Gộp và tính toán trên toàn bộ tập train
    all_preds = torch.cat(all_preds)
    all_targets = torch.cat(all_targets)

    avg_loss = total_loss / len(loader.dataset)
    avg_mae = mae_fn(all_preds, all_targets).item()
    epoch_r2 = r2_score(all_preds, all_targets).item()

    return avg_loss, avg_mae, epoch_r2

## evaluate funct

In [None]:
def evaluate(model, loader, loss_fn):
    model.eval()
    total_loss = 0

    all_preds = []
    all_targets = []

    mae_fn = nn.L1Loss() # Dùng cái này để đo MAE

    with torch.no_grad():
        for imgs, labels in loader:
            imgs, labels = imgs.to(device), labels.to(device)
            preds = model(imgs)
            loss = loss_fn(preds, labels)

            total_loss += loss.item() * imgs.size(0) # Sửa cách tính loss trung bình chuẩn hơn

            # Gom lại
            all_preds.append(preds.cpu())
            all_targets.append(labels.cpu())

    # Gộp lại
    all_preds = torch.cat(all_preds)
    all_targets = torch.cat(all_targets)

    # Tính toán metrics trên toàn tập
    avg_loss = total_loss / len(loader.dataset)
    avg_mae = mae_fn(all_preds, all_targets).item()
    epoch_r2 = r2_score(all_preds, all_targets).item()

    return avg_loss, avg_mae, epoch_r2

## Early stopping

In [None]:
class EarlyStopping:
    def __init__(self, patience=5, min_delta=1e-4):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = None
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

## main

In [None]:
# load dataset
label_file = "/content/drive/MyDrive/DS201/Do an/readability_label.csv"   # file Excel duy nhất
img_folder  = "/content/drive/MyDrive/DS201/Do an/DS201/data/extracted_images"        # folder duy nhất chứa ảnh

all_paths, all_labels = load_dataset(img_folder, label_file)

#### transform

In [None]:
TARGET_SIZE = 224
readability_transform = transforms.Compose([
    ResizeWithPadding(TARGET_SIZE),
    #transforms.Resize((224, 224), interpolation=transforms.InterpolationMode.BICUBIC),  # BICUBIC tốt hơn default
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])


## Extractor

In [None]:
# Chia data
train_paths, test_paths, train_labels, test_labels = train_test_split(
    all_paths, all_labels, test_size=0.1, random_state=42
)

# Tạo dataloader
train_ds = BannerDataset(train_paths, train_labels, transform=readability_transform)
val_ds   = BannerDataset(test_paths, test_labels, transform=readability_transform)

# Batch size để 16 hoặc 32 tùy VRAM (vì giờ load cả ảnh to nên tốn VRAM hơn)
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True,
                          num_workers=4, pin_memory=True)
val_loader   = DataLoader(val_ds, batch_size=32, shuffle=False,
                          num_workers=4, pin_memory=True)

# Khới tạo model
model = CompleteReadabilityModel()
model = model.to(device)

# loss
criterion = nn.MSELoss()

#optimizer
optimizer = optim.Adam([
    {'params': model.feature_extractor.backbone.parameters(), 'lr': 1e-5},
    {'params': model.head.parameters(), 'lr': 1e-4}
])




Setup hoàn tất! Sẵn sàng train End-to-End.


## Optimizer

In [None]:
epochs = 20
early_stopping = EarlyStopping(patience=5, min_delta=0.001)

print("Starting training...")

for epoch in range(1, epochs + 1):
    # Swapped 'criterion' and 'optimizer' in the call to train_one_epoch
    train_loss, train_mae, train_r2 = train_one_epoch(model, train_loader, optimizer, criterion)
    val_loss, val_mae, val_r2 = evaluate(model, val_loader, criterion)

    print(f"Epoch {epoch:02d}/ {epochs} | "
          f"Train Loss: {train_loss:.4f}, MAE: {train_mae:.4f}, R2: {train_r2:.4f} | "
          f"Val Loss: {val_loss:.4f}, MAE: {val_mae:.4f}, R2: {val_r2:.4f}")

    early_stopping(val_loss)
    if early_stopping.early_stop:
        print(f"Early stopping at epoch {epoch} due to no improvement in validation loss.")
        break

print("Training finished.")

Starting training...
Epoch 01/ 20 | Train Loss: 2.6324, MAE: 1.1995, R2: -3.4670 | Val Loss: 0.6938, MAE: 0.6859, R2: -0.2677
Epoch 02/ 20 | Train Loss: 0.7700, MAE: 0.7082, R2: -0.3066 | Val Loss: 0.5618, MAE: 0.6159, R2: -0.0265
Epoch 03/ 20 | Train Loss: 0.6652, MAE: 0.6583, R2: -0.1289 | Val Loss: 0.6621, MAE: 0.6688, R2: -0.2098
Epoch 04/ 20 | Train Loss: 0.6522, MAE: 0.6593, R2: -0.1067 | Val Loss: 0.4847, MAE: 0.5649, R2: 0.1143
Epoch 05/ 20 | Train Loss: 0.5921, MAE: 0.6254, R2: -0.0048 | Val Loss: 0.5011, MAE: 0.5754, R2: 0.0844
Epoch 06/ 20 | Train Loss: 0.5530, MAE: 0.6037, R2: 0.0617 | Val Loss: 0.5475, MAE: 0.5983, R2: -0.0004
Epoch 07/ 20 | Train Loss: 0.5323, MAE: 0.5893, R2: 0.0967 | Val Loss: 0.4840, MAE: 0.5586, R2: 0.1157
Epoch 08/ 20 | Train Loss: 0.4979, MAE: 0.5702, R2: 0.1551 | Val Loss: 0.4484, MAE: 0.5363, R2: 0.1806
Epoch 09/ 20 | Train Loss: 0.4728, MAE: 0.5558, R2: 0.1976 | Val Loss: 0.4671, MAE: 0.5442, R2: 0.1465
Epoch 10/ 20 | Train Loss: 0.4491, MAE: 0.5

# Lưu model

In [None]:
# Đường dẫn lưu file trên Google Drive (vì bạn đã mount Drive)
model_path = "/content/drive/MyDrive/DS201/Do an/Save model/best_readability_model.pth"

# Lưu state_dict
torch.save(model.state_dict(), model_path)
print(f"Đã lưu mô hình tại: {model_path}")

Đã lưu mô hình tại: /content/drive/MyDrive/DS201/Do an/Save model/best_readability_model.pth


In [None]:
checkpoint = {
    'epoch': epoch,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'train_loss': train_loss,
    'val_loss': val_loss
}

torch.save(checkpoint, "/content/drive/MyDrive/DS201/Do an/Save model/checkpoint.pth")