In [1]:
import os
import cv2
import torch
from torch.utils.data import Dataset, DataLoader
from torch.utils.data import random_split
import numpy as np
from torchvision import transforms
import matplotlib.pyplot as plt
import torch.optim as optim
from torch.optim import Adam
import torch.nn as nn
from torchvision import models
import torch.nn.functional as F
from tqdm import tqdm
from torchmetrics.classification import MulticlassJaccardIndex, MulticlassAccuracy
import copy

  from .autonotebook import tqdm as notebook_tqdm


In [8]:
class SemanticSegmentationDataset(Dataset):
    def __init__(self, image_dir, label_dir, transform=None):
        self.image_dir = image_dir
        self.label_dir = label_dir
        self.transform = transform
        # os.listdir(image_dir) liệt kê tên file trong thư mục
        # os.path.join(image_dir, img) tạo đường dẫn đầy đủ cho từng ảnh.
        # sorted(...) sắp xếp các đường dẫn theo thứ tự (đảm bảo thứ tự của ảnh và nhãn trùng khớp)
        self.image_paths = sorted([os.path.join(image_dir, img) for img in os.listdir(image_dir)])
        self.label_paths = sorted([os.path.join(label_dir, lbl) for lbl in os.listdir(label_dir)])
        self.class_colors = {
            (2, 0, 0): 0,       
            (127, 0, 0): 1,     
            (248, 163, 191): 2  
        }
    
    # tổng số ảnh trong dataset
    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image = cv2.imread(self.image_paths[idx])
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # chuyển đổi từ BGR (mặc định của cv2) sang RGB

        label = cv2.imread(self.label_paths[idx])
        label = cv2.cvtColor(label, cv2.COLOR_BGR2RGB)

        label_mask = np.zeros(label.shape[:2], dtype=np.uint8)
        for rgb, idx in self.class_colors.items():
            label_mask[np.all(label == rgb, axis=-1)] = idx

        if self.transform:
            image = self.transform(image)
            label_mask = torch.from_numpy(label_mask).long()

        return image, label_mask

train_transform = transforms.Compose([
    # convert qua PIL image trước khi convert sang tensor
    transforms.ToPILImage(),            
    transforms.ToTensor()               
])

dataset = SemanticSegmentationDataset(
    image_dir='kaggle\input\input',
    label_dir='kaggle\input\label',
    transform=train_transform)

In [9]:
def train_epoch(model, dataloader, criterion, optimizer, device, num_classes):
    model.train()
    running_loss = 0.0  
    accuracy_metric = MulticlassAccuracy(num_classes=num_classes).to(device)
    iou_metric = MulticlassJaccardIndex(num_classes=num_classes).to(device)
    # Sử dụng tqdm để tạo một progress bar hiển thị tiến trình của việc huấn luyện theo từng batch với mô tả "Training"
    pbar = tqdm(dataloader, desc='Training', unit='batch')
    for images, labels in pbar:
        # Chuyển dữ liệu sang device (GPU hoặc CPU) để thực hiện tính toán
        images = images.to(device)
        labels = labels.to(device)       
        # Tính toán loss và cập nhật trọng số 
        optimizer.zero_grad()                                      
        outputs = model(images)    
        loss = criterion(outputs, labels)                           
        loss.backward()                                            
        optimizer.step()              
        # tính tổng loss cho từng batch                              
        running_loss += loss.item() * images.size(0)      # loss.item() trả về giá trị loss cho batch hiện tại
        preds = torch.argmax(outputs, dim=1)    
        # Tính toán và cập nhật giá trị metric theo từng batch 
        accuracy_metric(preds, labels)
        iou_metric(preds, labels)
        pbar.set_postfix({
            'Batch Loss': f'{loss.item():.4f}',
            'Mean Accuracy': f'{accuracy_metric.compute():.4f}',
            'Mean IoU': f'{iou_metric.compute():.4f}',
        }) 
    epoch_loss = running_loss / len(dataloader.dataset)         # Tính toán loss trung bình cho toàn bộ epoch
    mean_accuracy = accuracy_metric.compute().cpu().numpy()     # accuracy trung bình cho toàn bộ epoch
    mean_iou = iou_metric.compute().cpu().numpy()               # IoU trung bình cho toàn bộ epoch
   
    return epoch_loss, mean_accuracy, mean_iou

In [10]:
def evaluate(model, dataloader, criterion, device, num_classes):
    model.eval()
    running_loss = 0.0    
    accuracy_metric = MulticlassAccuracy(num_classes=num_classes).to(device)
    iou_metric = MulticlassJaccardIndex(num_classes=num_classes).to(device)
    pbar = tqdm(dataloader, desc='Evaluating', unit='batch')
    with torch.no_grad():
        for images, labels in pbar:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            running_loss += loss.item() * images.size(0)
            preds = torch.argmax(outputs, dim=1)
            accuracy_metric(preds, labels)
            iou_metric(preds, labels)
            pbar.set_postfix({
                'Batch Loss': f'{loss.item():.4f}',
                'Mean Accuracy': f'{accuracy_metric.compute():.4f}',
                'Mean IoU': f'{iou_metric.compute():.4f}',
            })
    
    epoch_loss = running_loss / len(dataloader.dataset)
    mean_accuracy = accuracy_metric.compute().cpu().numpy()
    mean_iou = iou_metric.compute().cpu().numpy()
    
    return epoch_loss, mean_accuracy, mean_iou

In [14]:
class MultiScaleConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(MultiScaleConvBlock, self).__init__()
        
        # Tính số kênh chia đều cho 3 nhánh sao cho tổng đúng bằng out_channels
        inter_channels1 = out_channels // 3
        inter_channels2 = out_channels // 3
        inter_channels3 = out_channels - inter_channels1 - inter_channels2  # đảm bảo tổng = out_channels

        self.branch3x3 = nn.Sequential(
            nn.Conv2d(in_channels, inter_channels1, kernel_size=3, padding=1),
            nn.BatchNorm2d(inter_channels1),
            nn.ReLU(inplace=True)
        )
        self.branch5x5 = nn.Sequential(
            nn.Conv2d(in_channels, inter_channels2, kernel_size=5, padding=2),
            nn.BatchNorm2d(inter_channels2),
            nn.ReLU(inplace=True)
        )
        self.branch7x7 = nn.Sequential(
            nn.Conv2d(in_channels, inter_channels3, kernel_size=7, padding=3),
            nn.BatchNorm2d(inter_channels3),
            nn.ReLU(inplace=True)
        )

        # Tổng số kênh sau concat
        total_channels = inter_channels1 + inter_channels2 + inter_channels3
        # self.bn_after_concat = nn.BatchNorm2d(total_channels)
    
    def forward(self, x):
        x1 = self.branch3x3(x)
        x2 = self.branch5x5(x)
        x3 = self.branch7x7(x)
        return torch.cat([x1, x2, x3], dim=1)

class SpatialAttention(nn.Module):
    def __init__(self, kernel_size=7):
        super(SpatialAttention, self).__init__()
        padding = kernel_size // 2
        self.conv = nn.Conv2d(2, 1, kernel_size=kernel_size, padding=padding, bias=False)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        avg_out = torch.mean(x, dim=1, keepdim=True)  # (B, 1, H, W)
        max_out, _ = torch.max(x, dim=1, keepdim=True)  # (B, 1, H, W)
        x_cat = torch.cat([avg_out, max_out], dim=1)  # (B, 2, H, W)
        attn = self.sigmoid(self.conv(x_cat))  # (B, 1, H, W)
        return x * attn  # apply attention


# Định nghĩa mô hình CNN cải tiến
class myModel(nn.Module):
    def __init__(self, n_classes):
        super(myModel, self).__init__()

        # Encoder
        self.enc1 = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(inplace=True)
        )
        self.pool1 = nn.MaxPool2d(2)

        self.enc2 = nn.Sequential(
            MultiScaleConvBlock(16, 32),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            SpatialAttention()
        )
        self.pool2 = nn.MaxPool2d(2)

        self.enc3 = nn.Sequential(
            MultiScaleConvBlock(32, 64),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True)
        )

        # Decoder
        self.up2 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False)
        self.dec2 = nn.Sequential(
            nn.Conv2d(64 + 32, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True)
        )

        self.up1 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False)
        self.dec1 = nn.Sequential(
            nn.Conv2d(32 + 16, 16, kernel_size=3, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(inplace=True)
        )

        self.output_conv = nn.Conv2d(16, n_classes, kernel_size=1)

    def forward(self, x):
        # Encoder
        e1 = self.enc1(x)
        e2 = self.enc2(self.pool1(e1))
        e3 = self.enc3(self.pool2(e2))

        # Decoder + skip connections
        d2 = self.up2(e3)
        d2 = self.dec2(torch.cat([d2, e2], dim=1))

        d1 = self.up1(d2)
        d1 = self.dec1(torch.cat([d1, e1], dim=1))

        out = self.output_conv(d1)
        return out

In [15]:
total_size = len(dataset)
train_size = int(0.8 * total_size)  
val_size = total_size - train_size  
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
print(f"Train size: {len(train_dataset)}, Validation size: {len(val_dataset)}")

train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  
classes = 3                                                         # Số Class trong output
model = myModel(classes)
def count_parameters(model):  
    return sum(p.numel() for p in model.parameters())
total_params = count_parameters(model)
print(f"Total parameters: {total_params}")
model.to(device)
model = nn.DataParallel(model)                                      # Sử dụng DataParallel để tận dụng GPU nếu có nhiều GPU

criterion = nn.CrossEntropyLoss()                                   # Sử dụng CrossEntropyLoss cho bài toán phân loại đa lớp
optimizer = Adam(model.parameters(), lr=0.001)                      # Sử dụng Adam optimizer với learning rate 0.001
num_epochs = 1

epoch_saved = 0                                                     # Biến dùng để lưu lại epoch tốt nhất
best_val_mAcc = 0.0                                                 # Biến dùng để lưu lại giá trị mAcc tốt nhất trên tập validation            
best_model_state = None

for epoch in range(num_epochs):
    epoch_loss_train, mAcc_train, mIoU_train = train_epoch(model, train_dataloader, criterion, optimizer, device, classes)
    epoch_loss_val, mAcc_val, mIoU_val = evaluate(model, val_dataloader, criterion, device, classes)
    
    print(f"Epoch {epoch + 1}/{num_epochs}")
    print(f"Train Loss: {epoch_loss_train:.4f}, Mean Accuracy: {mAcc_train:.4f}, Mean IoU: {mIoU_train:.4f}")
    print(f"Validation Loss: {epoch_loss_val:.4f}, Mean Accuracy: {mAcc_val:.4f}, Mean IoU: {mIoU_val:.4f}")

    if mAcc_val >= best_val_mAcc:
        epoch_saved = epoch + 1 
        best_val_mAcc = mAcc_val
        best_model_state = copy.deepcopy(model.state_dict())
    
print("===================")
print(f"Best Model at epoch : {epoch_saved}")
model.load_state_dict(best_model_state)
if isinstance(model, torch.nn.DataParallel):
    model = model.module
model_save = torch.jit.script(model)
model_save.save("NgoTranQuocBao_22139004_VoXuanLoc_22139040.pt")
# Check again
model = torch.jit.load("NgoTranQuocBao_22139004_VoXuanLoc_22139040.pt")
epoch_loss_val, mAcc_val, mIoU_val = evaluate(model, val_dataloader, criterion, device, classes)
print(f"Validation Loss: {epoch_loss_val:.4f}, Mean Accuracy: {mAcc_val:.4f}, Mean IoU: {mIoU_val:.4f}")

Train size: 4800, Validation size: 1200
Total parameters: 108005


Training:   2%|▏         | 3/150 [00:19<16:05,  6.57s/batch, Batch Loss=0.9151, Mean Accuracy=0.4194, Mean IoU=0.2068]


KeyboardInterrupt: 