In [10]:
import os
import numpy as np
import cv2
import random
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import SwinConfig, SwinModel

import albumentations as A
from albumentations.pytorch import ToTensorV2
IMG_SIZE = 256
BATCH_SIZE = 8
SEED = 42
np.random.seed(SEED)
random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Sử dụng thiết bị: {DEVICE}")

# Thay đổi đường dẫn thư mục tùy theo máy của bạn
train_img_dir = r'C:\Users\Admin\Documents\Python Project\DPL Crack detection\UDTIRI-Crack Detection\train\image'
train_mask_dir = r'C:\Users\Admin\Documents\Python Project\DPL Crack detection\UDTIRI-Crack Detection\train\label'
val_img_dir = r'C:\Users\Admin\Documents\Python Project\DPL Crack detection\UDTIRI-Crack Detection\val\image'
val_mask_dir = r'C:\Users\Admin\Documents\Python Project\DPL Crack detection\UDTIRI-Crack Detection\val\label'

# --- Thu thập đường dẫn tệp ảnh và mask ---
train_img_paths = sorted([os.path.join(train_img_dir, f) for f in os.listdir(train_img_dir)])
train_mask_paths = sorted([os.path.join(train_mask_dir, f) for f in os.listdir(train_mask_dir)])
val_img_paths = sorted([os.path.join(val_img_dir, f) for f in os.listdir(val_img_dir)])
val_mask_paths = sorted([os.path.join(val_mask_dir, f) for f in os.listdir(val_mask_dir)])


Sử dụng thiết bị: cuda


In [None]:
class CrackDetectionDataset(Dataset):
    def __init__(self, image_filenames, mask_filenames, augment=False):
        self.image_filenames = image_filenames
        self.mask_filenames = mask_filenames
        self.augment = augment

        if len(self.image_filenames) != len(self.mask_filenames):
            raise ValueError("Số lượng tệp ảnh và tệp mask không khớp.")


    def __len__(self):
        return len(self.image_filenames)
    # Hàm thực hiện resize ảnh và mask về kích thước mong muốn (ban đầu)
    def resize_image_and_mask(self, img, mask, target_size=IMG_SIZE):
        img = cv2.resize(img, (target_size, target_size), interpolation=cv2.INTER_AREA)
        mask = cv2.resize(mask, (target_size, target_size), interpolation=cv2.INTER_NEAREST)
        return img, mask

    # Hàm thực hiện cắt ngẫu nhiên cho ảnh và mask
    def random_crop_image_and_mask(self, img, mask, crop_size=IMG_SIZE, min_scale=0.7):
        h, w = img.shape[:2]
        current_min_dim = min(h, w)
        crop_h = crop_w = int(random.uniform(min_scale, 1.0) * crop_size)

        crop_h = min(crop_h, h)
        crop_w = min(crop_w, w)

        if h == crop_h and w == crop_w: # Không cần cắt nếu kích thước đã khớp
            return img, mask

        start_x = random.randint(0, w - crop_w)
        start_y = random.randint(0, h - crop_h)

        cropped_img = img[start_y:start_y + crop_h, start_x:start_x + crop_w]
        cropped_mask = mask[start_y:start_y + crop_h, start_x:start_x + crop_w]
        
        # Resize lại về kích thước IMG_SIZE sau khi cắt
        cropped_img = cv2.resize(cropped_img, (IMG_SIZE, IMG_SIZE), interpolation=cv2.INTER_AREA)
        cropped_mask = cv2.resize(cropped_mask, (IMG_SIZE, IMG_SIZE), interpolation=cv2.INTER_NEAREST)

        return cropped_img, cropped_mask

    def __getitem__(self, idx):
        img = cv2.imread(self.image_filenames[idx])
        if img is None:
            raise ValueError(f"Không thể đọc tệp ảnh: {self.image_filenames[idx]}")

        mask = cv2.imread(self.mask_filenames[idx], cv2.IMREAD_GRAYSCALE)
        if mask is None:
            raise ValueError(f"Không thể đọc tệp mask: {self.mask_filenames[idx]}")

        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        if self.augment:
            if img.shape[0] < IMG_SIZE or img.shape[1] < IMG_SIZE:
                img, mask = self.resize_image_and_mask(img, mask, target_size=IMG_SIZE)
            
            img, mask = self.random_crop_image_and_mask(img, mask, crop_size=IMG_SIZE, min_scale=0.7)

            if random.random() < 0.5:
                img = cv2.flip(img, 1)
                mask = cv2.flip(mask, 1)
            if random.random() < 0.5:
                img = cv2.flip(img, 0)
                mask = cv2.flip(mask, 0)
        else:
            img, mask = self.resize_image_and_mask(img, mask, target_size=IMG_SIZE)
        img = img.astype(np.float32) / 255.0
        mask = mask.astype(np.float32) / 255.0
        
        if mask.ndim == 2:
            mask = np.expand_dims(mask, axis=0) 
        elif mask.ndim == 3 and mask.shape[0] != 1:
            if mask.shape[0] == 3:
                mask = mask[0:1, :, :]
            else:
                raise ValueError(f"Mask có hình dạng không mong muốn {mask.shape} tại chỉ số {idx} sau khi biến đổi. Expected channel dim 1.")
        
        img_tensor = torch.from_numpy(img).permute(2, 0, 1)
        mask_tensor = torch.from_numpy(mask)

        return img_tensor, mask_tensor

In [None]:
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.block = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.block(x)

class DecoderBlock(nn.Module):
    def __init__(self, in_channels, skip_channels, out_channels):
        super().__init__()
        self.upsample = nn.Sequential(
            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True),
            nn.Conv2d(in_channels, out_channels, kernel_size=1) 
        )
        
        self.conv_block = ConvBlock(out_channels + skip_channels, out_channels)

    def forward(self, x, skip_features=None):
        x = self.upsample(x)
        
        if skip_features is not None:
            if x.shape[2:] != skip_features.shape[2:]:
                x = nn.functional.interpolate(x, size=skip_features.shape[2:], mode='bilinear', align_corners=True)
            
            x = torch.cat([x, skip_features], dim=1)
        
        x = self.conv_block(x)
        return x


In [13]:

class SwinUNet(nn.Module):
    def __init__(self, input_channels=3, num_classes=1):
        super().__init__()
        self.IMG_SIZE = IMG_SIZE

        config = SwinConfig(image_size=self.IMG_SIZE, num_channels=input_channels, 
                            patch_size=4, embed_dim=96, depths=[2, 2, 6, 2], num_heads=[3, 6, 12, 24],
                            window_size=7, mlp_ratio=4., qkv_bias=True, hidden_dropout_prob=0.0, 
                            attention_probs_dropout_prob=0.0, drop_path_rate=0.1, 
                            hidden_act="gelu", use_absolute_embeddings=False, 
                            patch_norm=True, initializer_range=0.02, layer_norm_eps=1e-05,
                            out_features=["stage1", "stage2", "stage3", "stage4"])
        self.swin = SwinModel(config)

        self.bottleneck = ConvBlock(config.embed_dim * 8, config.embed_dim * 8)

        self.decoder4 = DecoderBlock(in_channels=config.embed_dim * 8, skip_channels=config.embed_dim * 4, out_channels=config.embed_dim * 4)
        self.decoder3 = DecoderBlock(in_channels=config.embed_dim * 4, skip_channels=config.embed_dim * 2, out_channels=config.embed_dim * 2)
        self.decoder2 = DecoderBlock(in_channels=config.embed_dim * 2, skip_channels=config.embed_dim * 1, out_channels=config.embed_dim * 1)
        
        self.decoder1 = DecoderBlock(in_channels=config.embed_dim * 1, skip_channels=0, out_channels=config.embed_dim // 2)

        self.final_upsample = DecoderBlock(in_channels=config.embed_dim // 2, skip_channels=0, out_channels=config.embed_dim // 4)

        self.final_conv = nn.Conv2d(config.embed_dim // 4, num_classes, kernel_size=1)


    def forward(self, x):
        outputs = self.swin(pixel_values=x, output_hidden_states=True)
        encoder_features = []

        hs0 = outputs.hidden_states[0]
        batch_size, num_patches, embed_dim = hs0.shape
        side = int(np.sqrt(num_patches))
        encoder_features.append(hs0.permute(0, 2, 1).reshape(batch_size, embed_dim, side, side))

        hs1 = outputs.hidden_states[1]
        batch_size, num_patches, embed_dim = hs1.shape
        side = int(np.sqrt(num_patches))
        encoder_features.append(hs1.permute(0, 2, 1).reshape(batch_size, embed_dim, side, side))

        hs2 = outputs.hidden_states[2]
        batch_size, num_patches, embed_dim = hs2.shape
        side = int(np.sqrt(num_patches))
        encoder_features.append(hs2.permute(0, 2, 1).reshape(batch_size, embed_dim, side, side))

        x_bottleneck = outputs.hidden_states[4]
        batch_size, num_patches, embed_dim = x_bottleneck.shape
        side = int(np.sqrt(num_patches))
        x_bottleneck = x_bottleneck.permute(0, 2, 1).reshape(batch_size, embed_dim, side, side)
        
        x = self.bottleneck(x_bottleneck)

        x = self.decoder4(x, encoder_features[2])
        x = self.decoder3(x, encoder_features[1])
        x = self.decoder2(x, encoder_features[0])
        x = self.decoder1(x)
        x = self.final_upsample(x)

        outputs = self.final_conv(x)

        return outputs

In [14]:
def calculate_metrics(predicted_masks, true_masks, smooth=1e-6):
    
    intersection = (predicted_masks * true_masks).sum()
    union = (predicted_masks + true_masks).sum() - intersection
    
    iou = (intersection + smooth) / (union + smooth)
    
    dice = (2. * intersection + smooth) / ((predicted_masks.sum() + true_masks.sum()) + smooth)
    f1_score = dice 

    return iou.item(), f1_score.item()

In [None]:
def train_model(model, train_loader, val_loader, optimizer, criterion, scheduler, num_epochs, callbacks_config, start_epoch=0, best_val_loss_so_far=float('inf')):
    best_val_loss = best_val_loss_so_far 
    patience_counter = 0
    model_checkpoint_path = callbacks_config.get('checkpoint_path', 'swin_unet_best_pytorch.pth')

    for epoch in range(start_epoch, num_epochs):
        model.train()
        running_loss = 0.0
        running_iou = 0.0
        running_f1 = 0.0
        print(f"Epoch {epoch+1}/{num_epochs} Bắt đầu...")
        for batch_idx, (images, masks) in enumerate(train_loader):
            images = images.to(DEVICE)
            masks = masks.to(DEVICE)

            optimizer.zero_grad()

            outputs = model(images)
            loss = criterion(outputs, masks)

            loss.backward()
            optimizer.step()

            running_loss += loss.item() * images.size(0)

            predicted_masks = (outputs > 0.5).float()
            
            batch_iou, batch_f1 = calculate_metrics(predicted_masks, masks)
            running_iou += batch_iou * images.size(0)
            running_f1 += batch_f1 * images.size(0)

            if (batch_idx + 1) % 10 == 0:
                print(f"Epoch {epoch+1}/{num_epochs}, Batch {batch_idx+1}/{len(train_loader)}, Loss: {loss.item():.4f}")

        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_iou = running_iou / len(train_loader.dataset)
        epoch_f1 = running_f1 / len(train_loader.dataset)
        print(f"Epoch {epoch+1} Kết thúc - Mất mát Huấn luyện: {epoch_loss:.4f}, IoU Huấn luyện: {epoch_iou:.4f}, F1-Score Huấn luyện: {epoch_f1:.4f}")

        model.eval()
        val_loss = 0.0
        val_iou = 0.0
        val_f1 = 0.0

        with torch.no_grad():
            for images, masks in val_loader:
                images = images.to(DEVICE)
                masks = masks.to(DEVICE)

                outputs = model(images)
                loss = criterion(outputs, masks)

                val_loss += loss.item() * images.size(0)

                predicted_masks = (outputs > 0.5).float()
                
                batch_iou, batch_f1 = calculate_metrics(predicted_masks, masks)
                val_iou += batch_iou * images.size(0)
                val_f1 += batch_f1 * images.size(0)

        val_loss /= len(val_loader.dataset)
        val_iou /= len(val_loader.dataset)
        val_f1 /= len(val_loader.dataset)
        print(f"Mất mát Xác thực: {val_loss:.4f}, IoU Xác thực: {val_iou:.4f}, F1-Score Xác thực: {val_f1:.4f}")
        scheduler.step(val_loss) 
        current_lr = optimizer.param_groups[0]['lr']
        print(f"Learning Rate hiện tại: {current_lr:.8f}")
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            print(f"Mất mát xác thực tốt nhất được cập nhật: {best_val_loss:.4f}. Lưu mô hình và trạng thái...")
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'best_val_loss': best_val_loss,
            }, model_checkpoint_path)
        else:
            patience_counter += 1
            print(f"Mất mát xác thực không cải thiện. Sự kiên nhẫn: {patience_counter}/{callbacks_config['patience']}")
            if patience_counter >= callbacks_config['patience']:
                print("Dừng sớm!")
                break


In [None]:
import os
import cv2
import numpy as np
import torch
from tqdm import tqdm

def calculate_pos_weight_from_masks(mask_dir, device):

    total_pixels = 0
    total_positive_pixels = 0
    
    mask_files = [f for f in os.listdir(mask_dir) if f.endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff'))]
    
    if not mask_files:
        print(f"Cảnh báo: Không tìm thấy tệp mask nào trong thư mục '{mask_dir}'.")
        print("Sử dụng pos_weight mặc định 1.0. Vui lòng kiểm tra đường dẫn và định dạng tệp.")
        return torch.tensor(1.0, dtype=torch.float).to(device)

    print(f"Đang tính toán pos_weight từ {len(mask_files)} tệp mask trong '{mask_dir}'...")

    for filename in tqdm(mask_files, desc="Đang xử lý mask"):
        mask_path = os.path.join(mask_dir, filename)

        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
        
        if mask is None:
            print(f"Cảnh báo: Không thể đọc tệp mask: {mask_path}. Bỏ qua.")
            continue
            
        binary_mask = (mask > 127).astype(np.float32) 
        
        total_pixels += binary_mask.size
        total_positive_pixels += np.sum(binary_mask == 1)

    total_negative_pixels = total_pixels - total_positive_pixels
    
    if total_positive_pixels == 0:
        print("Cảnh báo: Không tìm thấy pixel dương nào (vết nứt) trong toàn bộ mask.")
        print("Mô hình sẽ không thể học được lớp vết nứt. Đang đặt pos_weight rất cao để cảnh báo hoặc cần kiểm tra dữ liệu.")
        return torch.tensor(1000.0, dtype=torch.float).to(device)

    pos_weight = total_negative_pixels / total_positive_pixels
    
    print(f"Tổng số pixel được phân tích: {total_pixels}")
    print(f"Tổng số pixel vết nứt (dương): {total_positive_pixels}")
    print(f"Tổng số pixel nền (âm): {total_negative_pixels}")
    print(f"\nGiá trị pos_weight được tính toán: {pos_weight:.4f}")
    
    return torch.tensor(pos_weight, dtype=torch.float).to(device)

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

MASK_DIRECTORY = r"C:\Users\Admin\Documents\Python Project\DPL Crack detection\UDTIRI-Crack Detection\train\label" 

calculated_pos_weight_tensor = calculate_pos_weight_from_masks(MASK_DIRECTORY, DEVICE)

criterion = nn.BCEWithLogitsLoss(pos_weight=calculated_pos_weight_tensor)

print(f"\nCriterion được khởi tạo với pos_weight: {criterion.pos_weight.item()}")


Đang tính toán pos_weight từ 1500 tệp mask trong 'C:\Users\Admin\Documents\Python Project\DPL Crack detection\UDTIRI-Crack Detection\train\label'...


Đang xử lý mask: 100%|██████████| 1500/1500 [00:00<00:00, 2304.05it/s]

Tổng số pixel được phân tích: 153600000
Tổng số pixel vết nứt (dương): 4430132
Tổng số pixel nền (âm): 149169868

Giá trị pos_weight được tính toán: 33.6717

Criterion được khởi tạo với pos_weight: 33.671653747558594





In [17]:
train_dataset = CrackDetectionDataset(train_img_paths, train_mask_paths, augment=True)
val_dataset = CrackDetectionDataset(val_img_paths, val_mask_paths, augment=False)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

# --- Khởi tạo mô hình, optimizer, criterion ---
model = SwinUNet(input_channels=3, num_classes=1).to(DEVICE)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-3)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3)

my_pos_weight = 11.33

pos_weight_tensor = torch.tensor(my_pos_weight, dtype=torch.float).to(DEVICE)

criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight_tensor)

print(model)

callbacks_config = {
    'patience': 20,
    'checkpoint_path': 'swin_unet_best_pytorchv2.pth'
}

start_epoch = 0
best_val_loss_so_far = float('inf')
checkpoint_path = callbacks_config['checkpoint_path']

if os.path.exists(checkpoint_path):
    print(f"Phát hiện checkpoint tại {checkpoint_path}. Đang tải để tiếp tục huấn luyện...")
    checkpoint = torch.load(checkpoint_path, map_location=DEVICE)
    
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    start_epoch = checkpoint['epoch'] + 1 
    best_val_loss_so_far = checkpoint['best_val_loss']
    
    print(f"Đã tải checkpoint từ Epoch {start_epoch-1}. Tiếp tục huấn luyện từ Epoch {start_epoch}.")
    print(f"Mất mát xác thực tốt nhất trước đó: {best_val_loss_so_far:.4f}")
else:
    print("Không tìm thấy checkpoint. Bắt đầu huấn luyện từ đầu (Epoch 0).")

print("\nBắt đầu huấn luyện mô hình Swin-Unet...")
train_model(model, train_loader, val_loader, optimizer, criterion, scheduler,
            num_epochs=10000, callbacks_config=callbacks_config,
            start_epoch=start_epoch, best_val_loss_so_far=best_val_loss_so_far)

print("\nQuá trình huấn luyện đã hoàn tất.")

SwinUNet(
  (swin): SwinModel(
    (embeddings): SwinEmbeddings(
      (patch_embeddings): SwinPatchEmbeddings(
        (projection): Conv2d(3, 96, kernel_size=(4, 4), stride=(4, 4))
      )
      (norm): LayerNorm((96,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.0, inplace=False)
    )
    (encoder): SwinEncoder(
      (layers): ModuleList(
        (0): SwinStage(
          (blocks): ModuleList(
            (0): SwinLayer(
              (layernorm_before): LayerNorm((96,), eps=1e-05, elementwise_affine=True)
              (attention): SwinAttention(
                (self): SwinSelfAttention(
                  (query): Linear(in_features=96, out_features=96, bias=True)
                  (key): Linear(in_features=96, out_features=96, bias=True)
                  (value): Linear(in_features=96, out_features=96, bias=True)
                  (dropout): Dropout(p=0.0, inplace=False)
                )
                (output): SwinSelfOutput(
                  (dense):