## **Cài đặt và import thư viện**

In [None]:
!pip install albumentations
!pip install timm
!pip install segmentation-models-pytorch

In [None]:
import torch
import torch.nn as nn
import glob
import albumentations as A
from albumentations.pytorch import ToTensorV2
import os
from tqdm.auto import tqdm
import torch.nn.functional as F
import timm
import numpy as np
import cv2
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
import random
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd
import gc
import segmentation_models_pytorch as smp
from segmentation_models_pytorch.metrics import get_stats, f1_score

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

## **Thiết lập seed (tăng khả năng reproduce)**

In [None]:
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    # torch.cuda.manual_seed_all(seed)  # if using multi-GPU
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(31)

## **Khởi tạo batch size và epochs**

In [None]:
BATCH_SIZE = 4
EPOCHS = 20

## **Augmentation**
* Train Transform:
Áp dụng nhiều kỹ thuật biến đổi ảnh để tạo ra các biến thể khác nhau từ cùng một ảnh gốc:

Resize: chuẩn hóa kích thước ảnh về 512x512.

HorizontalFlip / VerticalFlip / RandomRotate90: tạo các góc nhìn khác nhau cho ảnh.

ShiftScaleRotate: dịch chuyển, phóng to/thu nhỏ, xoay ảnh nhẹ → tăng độ đa dạng hình học.

Color transforms (Brightness, Contrast, Hue, CLAHE): thay đổi màu sắc → mô hình không phụ thuộc màu quá nhiều.

Noise & Blur: mô phỏng điều kiện ảnh chụp kém chất lượng.

Distortion & Dropout: biến dạng và che một phần ảnh → giúp mô hình học được các đặc trưng quan trọng.

Normalize & ToTensor: chuẩn hóa giá trị pixel và chuyển ảnh về tensor để đưa vào mô hình.

* Test Transform:

Resize: giữ đồng nhất kích thước ảnh.

Normalize & ToTensor: chuẩn hóa giá trị và chuyển đổi định dạng dữ liệu.

In [None]:
trainsize = 512

train_transform = A.Compose([
    A.Resize(width=trainsize, height=trainsize),
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.RandomRotate90(p=0.5),
    A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.1, rotate_limit=20, border_mode=0, p=0.5), 

    A.OneOf([
        A.RandomBrightnessContrast(brightness_limit=0.3, contrast_limit=0.3),
        A.CLAHE(),
        A.HueSaturationValue(hue_shift_limit=20, sat_shift_limit=30, val_shift_limit=20),
    ], p=0.5),

    A.OneOf([
        A.GaussNoise(var_limit=(10.0, 50.0)),
        A.MotionBlur(blur_limit=3),
        A.MedianBlur(blur_limit=3),
    ], p=0.5),
    
    A.OneOf([
        A.ElasticTransform(p=0.3),
        A.GridDistortion(p=0.3),
        A.OpticalDistortion(p=0.3),
    ], p=0.3),
    
    A.CoarseDropout(max_holes=8, max_height=32, max_width=32, fill_value=0, p=0.5),

    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
])

test_transform = A.Compose([
    A.Resize(width=trainsize, height=trainsize),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0),
    ToTensorV2(), 
])

## **Chia tập train val**
Lớp SkinDataset chia dữ liệu thành 90% để huấn luyện và 10% để validation bằng train_test_split.
Việc chia này giúp mô hình học từ tập huấn luyện và được đánh giá công bằng trên tập validation — đảm bảo không đánh giá trên dữ liệu đã thấy.

In [None]:
class SkinDataset(Dataset):
    def __init__(self, images_path, masks_path, split, val_split = 0.2,transform=None):
        super().__init__()
        self.images_path = images_path
        self.masks_path = masks_path
        all_images = sorted(os.listdir(images_path))
        all_masks = sorted(os.listdir(masks_path))
        
        train_imgs, val_imgs, train_masks, val_masks = train_test_split(
            all_images, all_masks, test_size=val_split, random_state=33
        )

        if split == 'train':
            self.images = train_imgs
            self.masks = train_masks
        else:
            self.images = val_imgs
            self.masks = val_masks
            
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def mask_to_binary(self, mask):
        if mask.ndim == 3:
            mask = mask[:, :, 0]
        return (mask == 255).astype(np.float32)

    def __getitem__(self, idx):
        img_path = os.path.join(self.images_path, self.images[idx])
        mask_path = os.path.join(self.masks_path, self.masks[idx])

        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        mask = cv2.imread(mask_path)
        mask = cv2.cvtColor(mask, cv2.COLOR_BGR2RGB)
        mask = self.mask_to_binary(mask)

        if self.transform is not None:
            transformed = self.transform(image=image, mask=mask)
            image = transformed["image"]
            mask = transformed["mask"]

        return image, mask

In [None]:
traindataset = SkinDataset(images_path = "/kaggle/input/medical-segmentation/Dataset/Train/Image",
                          masks_path = "/kaggle/input/medical-segmentation/Dataset/Train/Mask",
                          split = 'train',
                          transform = train_transform)

valdataset = SkinDataset(images_path = "/kaggle/input/medical-segmentation/Dataset/Train/Image",
                          masks_path = "/kaggle/input/medical-segmentation/Dataset/Train/Mask",
                          split = 'val',
                          transform = test_transform)

In [None]:
trainloader = DataLoader(traindataset, batch_size = BATCH_SIZE, shuffle = True, num_workers = 2, drop_last = True)
valloader = DataLoader(valdataset, batch_size = BATCH_SIZE, shuffle = False, num_workers = 2, drop_last = True)

In [None]:
len(trainloader), len(valloader)

## **Hiển thị ảnh và mask sau khi augment**

In [None]:
test_img, test_mask = next(iter(trainloader))

In [None]:
class UnNormalize(object):
    def __init__(self, mean, std):
        self.mean = mean
        self.std = std
        
    def __call__(self, tensor):
        """
        Args:
            tensor (Tensor): Tensor image of size (C, H, W) to be normalized.
        Returns:
            Tensor: Normalized image.
        """
        for t, m, s in zip(tensor, self.mean, self.std):
            t.mul_(s).add_(m)
            # The normalize code -> t.sub_(m).div_(s)
        return tensor
    
unorm = UnNormalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))

In [None]:
def show_batch(images, masks, unorm, max_images=8):
    """
    images: Tensor of shape [B, C, H, W]
    masks: Tensor of shape [B, H, W]
    unorm: function to denormalize image
    """
    batch_size = min(images.size(0), max_images)
    plt.figure(figsize=(6, batch_size * 3))

    for i in range(batch_size):
        img = unorm(images[i]).permute(1, 2, 0).cpu().numpy()
        mask = masks[i].cpu().numpy()

        # Image subplot
        plt.subplot(batch_size, 2, 2 * i + 1)
        plt.imshow(img)
        plt.title(f"Image {i}")
        plt.axis('off')

        # Mask subplot
        plt.subplot(batch_size, 2, 2 * i + 2)
        plt.imshow(mask, cmap='gray')
        plt.title(f"Mask {i}")
        plt.axis('off')

    plt.tight_layout()
    plt.show()

In [None]:
show_batch(test_img, test_mask, unorm)

## **Mô hình huấn luyện**
### Kiến trúc tổng quan

Mô hình là một phiên bản cải tiến của kiến trúc U-Net cổ điển, sử dụng **ResNeSt269e** làm backbone encoder để trích xuất đặc trưng mạnh mẽ. Decoder được thiết kế bằng các khối `DoubleConv` đơn giản nhưng hiệu quả.

### Chi tiết kiến trúc

- **Encoder (ResNeSt269e pretrained)**: Trích xuất 5 mức đặc trưng `x1 → x5`, với độ sâu và số kênh tăng dần.
- **Bottleneck**: `DoubleConv(2048 → 1024)` xử lý đặc trưng từ tầng sâu nhất.
- **Decoder**:
  - 4 tầng giải mã.
  - Mỗi tầng: Upsample → Ghép nối với skip connection từ encoder → `DoubleConv`.
- **Output**: `Conv2d(64 → n_classes)` → Upsample cuối cùng để khôi phục kích thước ảnh gốc.

```mermaid
graph TD
    Input[Input Image (3xHxW)]
    E1[Encoder Stage 1 (x1): C=64]
    E2[Encoder Stage 2 (x2): C=128]
    E3[Encoder Stage 3 (x3): C=256]
    E4[Encoder Stage 4 (x4): C=1024]
    E5[Encoder Stage 5 (x5): C=2048]
    Neck[DoubleConv(2048 → 1024)]

    U1[Decoder Up1: Cat(x4, ↑x) → DoubleConv(2048 → 512)]
    U2[Decoder Up2: Cat(x3, ↑x) → DoubleConv(1024 → 256)]
    U3[Decoder Up3: Cat(x2, ↑x) → DoubleConv(512 → 128)]
    U4[Decoder Up4: Cat(x1, ↑x) → DoubleConv(256 → 64)]

    CLS[Conv2d(64 → n_classes)]
    Out[Upsample → Final Output (HxW)]

    Input --> E1 --> E2 --> E3 --> E4 --> E5 --> Neck
    Neck --> U1 --> U2 --> U3 --> U4 --> CLS --> Out
```


Ưu điểm trong bài toán segmentation:
* Giữ lại thông tin không gian: Nhờ skip connections, mô hình có thể phục hồi chi tiết vùng biên và cấu trúc đối tượng.
* Biểu diễn đặc trưng mạnh mẽ: Sử dụng ResNeSt269e, một backbone hiện đại với self-attention giúp mô hình học tốt hơn các đặc trưng không gian-phân bố.
* Decoder nhẹ nhưng hiệu quả: Các tầng DoubleConv và Upsample giúp khôi phục phân giải nhanh chóng mà không tốn quá nhiều tài nguyên.

In [None]:
class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(DoubleConv, self).__init__()

        self.block = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, 1, 1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, 3, 1, 1),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.block(x)

class Unet_Modern(nn.Module):
    def __init__(self, n_classes = 1):
        super().__init__()
        self.n_classes = n_classes
        self.encoder = timm.create_model("resnest269e", pretrained=True, features_only=True)
        self.upsample = nn.Upsample(scale_factor=2, mode="bilinear")
        self.block_neck = DoubleConv(2048, 1024)
        self.block_up1 = DoubleConv(1024+1024, 512)
        self.block_up2 = DoubleConv(512+512, 256)
        self.block_up3 = DoubleConv(256+256, 128)
        self.block_up4 = DoubleConv(128+128, 64)
        self.conv_cls = nn.Conv2d(64, self.n_classes, 1)

    def forward(self, x):
        # skip connections của resnet50
        x1, x2, x3, x4, x5 = self.encoder(x)
        # print(x1.shape)
        # print(x2.shape)
        # print(x3.shape)
        # print(x4.shape)
        # print(x5.shape)

        # bottleneck
        x = self.block_neck(x5) # x (B, 1024, 8, 8)

        # concat skip connections then halve the features channels
        x = torch.cat([x4, self.upsample(x)], dim=1) 
        # print("before block1", x.shape)
        x = self.block_up1(x)
        x = torch.cat([x3, self.upsample(x)], dim=1)
        # print("before block2",x.shape)
        x = self.block_up2(x)
        x = torch.cat([x2, self.upsample(x)], dim=1)
        # print("before block3",x.shape)
        x = self.block_up3(x)
        x = torch.cat([x1, self.upsample(x)], dim=1)
        # print("before block4", x.shape)
        x = self.block_up4(x)
        # classifier
        x = self.conv_cls(x) #size/2
        # scaling to return to the original size
        x = self.upsample(x)
        return x

## **Kết hợp Dice Loss và Focal Loss**
Dice Loss: tập trung tối đa vào vùng foreground (vùng có mask), hiệu quả trong xử lý mất cân bằng giữa nền và vật thể.

Focal Loss: giảm ảnh hưởng từ các mẫu dễ, tăng trọng số cho các điểm khó phân loại (vùng ranh giới, nhiễu).

In [None]:
class DiceFocalLoss(nn.Module):
    def __init__(self, alpha=0.7, beta=0.3):
        super().__init__()
        self.alpha = alpha
        self.beta = beta
        self.focal = smp.losses.FocalLoss(mode='binary', gamma=2.0)
        self.dice = smp.losses.DiceLoss(mode='binary', from_logits=True)

    def forward(self, pred, target):
        target = target.unsqueeze(1)  # đảm bảo shape là [B, 1, H, W]
        return self.alpha * self.dice(pred, target) + self.beta * self.focal(pred, target)

## **Hàm tính Dice coefficient**

In [None]:
def calc_f1(output, target, threshold=0.5):
    tp, fp, fn, tn = get_stats(output, target.unsqueeze(dim=1), mode='binary', threshold=threshold)
    f1 = f1_score(tp, fp, fn, tn)
    return f1.mean()

## **Hàm huấn luyện một epoch**

In [None]:
def train_one_epoch(model, trainloader, scaler, optimizer, loss_fn, device=device):
    model.train()
    pbar = tqdm(enumerate(trainloader), leave=True)
    total_loss = 0.0
    total_score = 0.0

    for batch_idx, (img, target_mask) in pbar:
        img, target_mask = img.to(device), target_mask.to(device)

        with torch.amp.autocast(device_type=device):
            logits_mask = model(img)
            loss = loss_fn(logits_mask, target_mask)

        probs = torch.sigmoid(logits_mask)
        score = calc_f1(probs, target_mask.to(torch.long))

        optimizer.zero_grad()
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        total_loss += loss.item()
        total_score += score.item()
        avg_loss = total_loss / (batch_idx + 1)
        avg_score = total_score / (batch_idx + 1)

        pbar.set_postfix(avg_loss=avg_loss, dice_score=avg_score)

    print(f"Average loss: {avg_loss:.4f}, Average dice score: {avg_score:.4f}")

## **Tối ưu hoá mô hình**
AdamW: giống Adam nhưng tách riêng weight decay, giúp regularization tốt hơn, tránh overfitting.

ReduceLROnPlateau: giảm learning rate khi validation loss không cải thiện → giúp mô hình học chậm lại khi cần.

GradScaler (AMP): dùng Automatic Mixed Precision (AMP) giúp tăng tốc độ huấn luyện và giảm tiêu thụ VRAM.


In [None]:
model = Unet_Modern().to(device) 
optimizer = torch.optim.AdamW(
    model.parameters(),
    lr=1e-4,
    # weight_decay=1e-4 
)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='min',
    factor=0.5,      
    patience=3,      
    threshold=1e-4,  
    min_lr=1e-12,
)
loss_fn = DiceFocalLoss().to(device)
scaler = torch.amp.GradScaler()

## **Kiểm tra tập val trên nhiều threshold**
* Ta đánh giá F1 (Dice) cho mỗi threshold rồi chọn threshold có điểm số tốt nhất để tăng độ chính xác khi phân đoạn ảnh.
* Ban đầu em kiểm tra trong khoảng 0.4 đến 0.8 với bước nhảy 0.05 nhưng sau nhiều lần thử nghiệm thì rút gọn lại được khoảng tốt nhất là 0.45 đến 0.55

In [None]:
def validate_one_epoch(model, valloader, loss_fn, scheduler, device=device):
    model.eval()
    pbar = tqdm(enumerate(valloader), leave=True)
    total_loss = 0.0
    # total_score = 0.0
    thresholds = [0.45, 0.5, 0.55]
    threshold_scores = {t: 0.0 for t in thresholds}
    print('current learning rate', scheduler.get_last_lr())
    with torch.inference_mode():
        for batch_idx, (img, target_mask) in pbar:
            img, target_mask = img.to(device), target_mask.to(device)

            with torch.amp.autocast(device_type=device):
                logits_mask = model(img)
                loss = loss_fn(logits_mask, target_mask)

            probs = torch.sigmoid(logits_mask)
            for t in thresholds:
                score = calc_f1(probs, target_mask.to(torch.long), threshold=t)
                threshold_scores[t] += score.item()
            # score = calc_f1(probs, target_mask.to(torch.long))

            total_loss += loss.item()
            # total_score += score.item()
            avg_loss = total_loss / (batch_idx + 1)
            # avg_score = total_score / (batch_idx + 1)
            avg_scores = {t: threshold_scores[t] / (batch_idx + 1) for t in thresholds}
            best_threshold = max(avg_scores, key=avg_scores.get)
            best_score = avg_scores[best_threshold]

            pbar.set_postfix(val_loss=avg_loss, best_dice=best_score)

    scheduler.step(avg_loss)  # Nếu bạn dùng ReduceLROnPlateau
    # current_lr = scheduler.optimizer.param_groups[0]["lr"]
    # for param_group in scheduler.optimizer.param_groups:
    #     param_group["weight_decay"] = current_lr
    # print(f"current weight decay", current_lr)
    print(f"Validation Average loss: {avg_loss:.4f}, Best dice score: {best_score:.4f} at threshold {best_threshold}")
    return avg_loss, best_score, best_threshold

## **Huấn luyện mô hình**

In [None]:
final_best_dice = float('-inf')
final_best_threshold = 0.5 

In [None]:
for epoch in tqdm(range(EPOCHS)):
    train_one_epoch(model, trainloader, scaler, optimizer, loss_fn)
    avg_loss, best_score, best_threshold = validate_one_epoch(model, valloader, loss_fn, scheduler)

    gc.collect()
    torch.cuda.empty_cache()

    if best_score > final_best_dice:
        torch.save(model.state_dict(), "best_model.pth")
        final_best_dice = best_score
        final_best_threshold = best_threshold
        print(f"✅ Saved Best Model at Epoch {epoch} with dice {best_score:.4f}")

## **Kiểm tra trên tập test**

In [None]:
test_model = Unet_Modern().to(device)

**Model tốt nhất trong tất cả các lần huấn luyện, được dùng để sinh csv nộp trên competition**

In [None]:
# test_model.load_state_dict(torch.load(
#     f'/kaggle/input/resnet-hopfully/pytorch/default/1/best_model.pth',
# ))

**Model tốt nhất được lưu trong một lần huấn luyện**

In [None]:
test_model.load_state_dict(torch.load(f'/kaggle/working/best_model.pth'))

## **Sinh file kết quả csv**
* Biểu diễn mặt nạ nhị phân thành chuỗi số (RLE), phù hợp để nộp kết quả trong các bài toán segmentation
* Show kết quả segmenatation của mô hình
* **Kết quả tốt nhất đạt được là 0.9142**

In [None]:
folder_path="/kaggle/input/medical-segmentation/Dataset/Test"
output_path="/kaggle/working/"
image_folder = os.path.join(folder_path, "Image")

image_files = glob.glob(os.path.join(image_folder, "*.jpg"))

test_transform = A.Compose([
    A.Resize(width=trainsize, height=trainsize),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2(),
])

def mask_to_rle(mask):
    pixels = mask.flatten()
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] -= runs[::2]
    return " ".join(map(str, runs))

results = {'ID': [], 'Mask': []}

test_model.eval()
test_model.to(device)

for img_path in image_files:
    img_id = os.path.basename(img_path).split('.')[0]
    
    image = cv2.imread(img_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    original_size = (image.shape[1], image.shape[0])
    
    transformed = test_transform(image=image)
    image_tensor = transformed['image'].unsqueeze(0).to(device)
    
    with torch.inference_mode():
        output_mask = test_model(image_tensor)
    
    output_mask = output_mask.squeeze().cpu().numpy()  
    binary_mask = (output_mask > final_best_threshold).astype(np.uint8)
    
    pred_mask_resized = cv2.resize(binary_mask, (original_size[0], original_size[1]), 
                                  interpolation=cv2.INTER_NEAREST)
    
    fig, axs = plt.subplots(1, 2, figsize=(10, 5))
    
    axs[0].imshow(image)
    axs[0].set_title(f"Image ID: {img_id}")
    axs[0].axis('off')

    axs[1].imshow(pred_mask_resized, cmap='gray')
    axs[1].set_title("Predicted Mask")
    axs[1].axis('off')
    
    plt.tight_layout()
    plt.show()
    
    mask_rle = mask_to_rle(pred_mask_resized)
    results['ID'].append(img_id)
    results['Mask'].append(mask_rle)

results_df = pd.DataFrame(results)
results_df = results_df.sort_values(by='ID', key=lambda x: x.astype(int)).reset_index(drop=True)

output_file = os.path.join(output_path, "predictions.csv")
results_df.to_csv(output_file, index=False)

print(f"Hoàn thành dự đoán, lưu kết quả vào {output_file}")