# **# Import packets and libraries**

In [None]:
# Import google drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Thiết lập seed ngẫu nhiên cho Python, NumPy và PyTorch
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)

In [None]:
# Import Pytorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.models as models
import torch.utils.model_zoo as model_zoo
import torchvision.transforms as transforms
import torchvision
import torchvision.transforms.functional as TF

# import wandb
import random
import json
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
import math

from piq import ssim
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.optim import Adam
from sklearn.model_selection import train_test_split
from glob import glob
from PIL import Image

# **# Pre-processing**

In [None]:
# **# Thay đường dẫn [FILE].json vào đây**
with open('/content/drive/MyDrive/UTBT_DATASET/OTU_2D/OTU_2D_annotation.json', 'r') as file:
    data = json.load(file)

In [None]:
train_x = []
train_y = []
test_x  = []
test_y  = []
valid_x = []
valid_y = []

path = "/content/drive/MyDrive/UTBT_DATASET/"
for i in data :
    if i['split'] == 'train' :
        img_path = path + str(i['file_path_img'])
        ann_path = path + str(i['file_path_ann'])
        train_x.append(img_path)
        train_y.append(ann_path)
    if i['split'] == 'test' :
        img_path = path + str(i['file_path_img'])
        ann_path = path + str(i['file_path_ann'])
        test_x.append(img_path)
        test_y.append(ann_path)
    if i['split'] == 'validation' :
        img_path = path + str(i['file_path_img'])
        ann_path = path + str(i['file_path_ann'])
        valid_x.append(img_path)
        valid_y.append(ann_path)

In [None]:
images_arr = train_x + valid_x + test_x
annotations_arr = train_y + valid_y + test_y

# **# Import function**

In [None]:
IMAGE_SIZE = 256  # Image size
EPOCHS = 30       # Epochs
BATCH = 8         # Batch size
LR = 1e-4         # Learning rate 10^-4 = 0.0001

class Augment(torch.nn.Module):
    def __init__(self, seed=42):
        super().__init__()
        torch.manual_seed(seed)
        self.augment = transforms.Compose([
            transforms.RandomHorizontalFlip(p=0.5),
            transforms.RandomVerticalFlip(p=0.5),
            transforms.RandomRotation(20),
        ])

    def forward(self, inputs, labels):
        seed = torch.random.initial_seed()
        torch.manual_seed(seed)
        inputs = self.augment(inputs)

        torch.manual_seed(seed)
        labels = self.augment(labels)
        return inputs, labels

class CustomDataset(Dataset):
    def __init__(self, image_paths, mask_paths, transform=None):
        self.image_paths = image_paths
        self.mask_paths = mask_paths
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        mask_path = self.mask_paths[idx]
        image = cv2.imread(img_path, cv2.IMREAD_COLOR)
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

        image = cv2.resize(image, (IMAGE_SIZE, IMAGE_SIZE))
        mask = cv2.resize(mask, (IMAGE_SIZE, IMAGE_SIZE))

        # image[image > 0] = 255
        # mask[mask > 0] = 255
        image = image / 255.0
        mask = mask / 255.0

        mask = np.expand_dims(mask, axis=-1)  # Add channel dimension to mask

        image = TF.to_tensor(image).float()
        mask = TF.to_tensor(mask).float()

        return image, mask

def get_dataloader(image_paths, mask_paths, batch_size=8, shuffle=True):
    dataset = CustomDataset(image_paths, mask_paths)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)
    return dataloader

train_dataloader = get_dataloader(train_x, train_y, batch_size = BATCH, shuffle = False)
valid_dataloader = get_dataloader(valid_x, valid_y, batch_size = BATCH, shuffle = False)
test_dataloader = get_dataloader(test_x, test_y, batch_size = BATCH, shuffle = False)

In [None]:
print(train_dataloader)
print(valid_dataloader)
print(test_dataloader)

<torch.utils.data.dataloader.DataLoader object at 0x7837ffbb0850>
<torch.utils.data.dataloader.DataLoader object at 0x7837ffbb3f70>
<torch.utils.data.dataloader.DataLoader object at 0x7837ffbb3670>


# **# Model**

## **Model**

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models import vgg16

class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ConvBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        return x

# class EncoderBlock(nn.Module):
#     def __init__(self, in_channels, out_channels):
#         super(EncoderBlock, self).__init__()
#         self.conv_block = ConvBlock(in_channels, out_channels)
#         self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

#     def forward(self, x):
#         conv = self.conv_block(x)
#         pooled = self.pool(conv)
#         return conv, pooled

class AttentionGate(nn.Module):
    def __init__(self, g_channels, s_channels, inter_channels):
        super(AttentionGate, self).__init__()
        self.Wg = nn.Sequential(
            nn.Conv2d(g_channels, inter_channels, kernel_size=1, padding=0),
            nn.BatchNorm2d(inter_channels)
        )
        self.Ws = nn.Sequential(
            nn.Conv2d(s_channels, inter_channels, kernel_size=1, padding=0),
            nn.BatchNorm2d(inter_channels)
        )
        self.psi = nn.Sequential(
            nn.Conv2d(inter_channels, 1, kernel_size=1, padding=0),
            nn.BatchNorm2d(1),
            nn.Sigmoid()
        )

    def forward(self, g, s):
        g_conv = self.Wg(g)
        s_conv = self.Ws(s)
        psi = self.psi(F.relu(g_conv + s_conv))
        return s * psi

class DecoderBlock(nn.Module):
    def __init__(self, in_channels, s_channels, out_channels):
        super(DecoderBlock, self).__init__()
        self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.att_gate = AttentionGate(in_channels, s_channels, out_channels)
        self.conv_block = ConvBlock(in_channels + s_channels, out_channels)

    def forward(self, x, s):
        x = self.up(x)
        s = self.att_gate(x, s)
        x = torch.cat([x, s], dim=1)
        x = self.conv_block(x)
        return x

class SPPFModule(nn.Module):
    def __init__(self, in_channels, k=5):
        super(SPPFModule, self).__init__()
        self.cv1 = nn.Sequential(
            nn.Conv2d(in_channels, in_channels // 2, kernel_size=1, padding=0),
            nn.BatchNorm2d(in_channels // 2),
            nn.SiLU()
        )
        self.mp1 = nn.MaxPool2d(kernel_size=k, stride=1, padding=k // 2)
        self.mp2 = nn.MaxPool2d(kernel_size=k, stride=1, padding=k // 2)
        self.mp3 = nn.MaxPool2d(kernel_size=k, stride=1, padding=k // 2)
        self.final_conv = nn.Sequential(
            nn.Conv2d(in_channels // 2 * 4, in_channels, kernel_size=1, padding=0),
            nn.BatchNorm2d(in_channels),
            nn.SiLU()
        )

    def forward(self, x):
        cv1 = self.cv1(x)
        mp1 = self.mp1(cv1)
        mp2 = self.mp2(cv1)
        mp3 = self.mp3(cv1)
        out = torch.cat([mp1, mp2, mp3], dim=1)
        out = self.final_conv(out)
        return out


class VGG16AttentionUNet(nn.Module):
    def __init__(self, num_classes=1):
        super(VGG16AttentionUNet, self).__init__()
        vgg16_model = vgg16(pretrained=True)
        vgg16_features = vgg16_model.features

        self.feature1 = nn.Sequential(*vgg16_features[:5])
        self.feature2 = nn.Sequential(*vgg16_features[5:10])
        self.feature3 = nn.Sequential(*vgg16_features[10:17])
        self.feature4 = nn.Sequential(*vgg16_features[17:24])
        self.encoder = nn.Sequential(*vgg16_features[24:30])

        self.sppf_module = SPPFModule(in_channels=512)

        self.decoder1 = DecoderBlock(512, 512, 512)
        self.decoder2 = DecoderBlock(512, 256, 256)
        self.decoder3 = DecoderBlock(256, 128, 128)
        self.decoder4 = DecoderBlock(128, 64, 64)

        self.final_conv = nn.Conv2d(64, num_classes, kernel_size=1, padding=0)

    def forward(self, x):
        s1 = self.feature1(x)       ; print(s1.shape)
        s2 = self.feature2(s1)      ; print(s2.shape)
        s3 = self.feature3(s2)      ; print(s3.shape)
        s4 = self.feature4(s3)      ; print(s4.shape)
        b1 = self.encoder(s4)       ; print(b1.shape)
        b2 = self.sppf_module(b1)   ; print(b2.shape)

        d4 = self.decoder1(b2, s4)  ; print(d4.shape)
        d3 = self.decoder2(d4, s3)  ; print(d3.shape)
        d2 = self.decoder3(d3, s2)  ; print(d2.shape)
        d1 = self.decoder4(d2, s1)  ; print(d1.shape)

        out = self.final_conv(d1)   ; print(out.shape)
        return torch.sigmoid(out)

# Kiểm tra với một đầu vào ngẫu nhiên
model = VGG16AttentionUNet(num_classes=1)
x = torch.randn(1, 3, 256, 256)
output = model(x)
print(output.shape)


# **# Import Loss**

In [None]:
# Định nghĩa các hàm loss và metrics
beta = 0.25
alpha = 0.25
gamma = 2

def focal_loss_with_logits(logits, labels, alpha, gamma, y_pred):
    weight_a = alpha * torch.pow(1 - y_pred, gamma) * labels
    weight_b = (1 - alpha) * torch.pow(y_pred, gamma) * (1 - labels)
    logit_loss = (torch.log1p(torch.exp(-torch.abs(logits))) + F.relu(-logits))
    loss = logit_loss * (weight_a + weight_b) + logits * weight_b
    return loss

def focal_loss(y_true, y_pred, alpha=0.25, gamma=2):
    y_pred = torch.clamp(y_pred, min=1e-7, max=1 - 1e-7)
    logits = torch.log(y_pred / (1 - y_pred))
    loss = focal_loss_with_logits(logits=logits, labels=y_true, alpha=alpha, gamma=gamma, y_pred=y_pred)
    return loss.mean()

def dice_coef(y_true, y_pred, smooth=1e-5, threshold=0.5):
    y_true_pos = apply_threshold(y_true, threshold)
    y_pred_pos = apply_threshold(y_pred, threshold)
    y_true_f = torch.flatten(y_true_pos)
    y_pred_f = torch.flatten(y_pred_pos)
    intersection = torch.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (torch.sum(y_true_f) + torch.sum(y_pred_f) + smooth)

def dice_loss(y_true, y_pred, threshold=0.5):
    return 1 - dice_coef(y_true, y_pred, threshold=threshold)

def jacard_similarity(y_true, y_pred, threshold=0.5):
    y_true_pos = apply_threshold(y_true, threshold)
    y_pred_pos = apply_threshold(y_pred, threshold)
    y_true_f = torch.flatten(y_true_pos)
    y_pred_f = torch.flatten(y_pred_pos)
    intersection = torch.sum(y_true_f * y_pred_f)
    union = torch.sum((y_true_f + y_pred_f) - (y_true_f * y_pred_f))
    return intersection / union

def jacard_loss(y_true, y_pred, threshold=0.5):
    return 1 - jacard_similarity(y_true, y_pred, threshold=threshold)

def ssim_loss(y_true, y_pred):
    ssim_value = ssim(y_pred, y_true, data_range=1.0)
    return 1 - ssim_value.mean()

def joint_loss1(y_true, y_pred, threshold=0.5):
    focal_loss1 = focal_loss(y_true, y_pred)
    ms_ssim_loss1 = ssim_loss(y_true, y_pred)
    jacard_loss1 = jacard_loss(y_true, y_pred, threshold=threshold)
    loss = (focal_loss1 + ms_ssim_loss1 + jacard_loss1) / 3
    return loss

def evaluate_metrics(y_true, y_pred, threshold=0.5):
    dc = dice_coef(y_true, y_pred, threshold=threshold)
    js = jacard_similarity(y_true, y_pred, threshold=threshold)
    pre = precision(y_true, y_pred, threshold=threshold)
    rec = recall(y_true, y_pred, threshold=threshold)
    return dc, js, pre, rec

In [None]:
if __name__ == "__main__":
    # Giả sử y_true và y_pred là các tensor dự đoán và ground truth của bạn
    y_true = torch.rand((1, 1, 256, 256))
    # y_true[y_true > 0] = 255
    # y_true = y_true / 255

    y_pred = torch.rand((1, 1, 256, 256))

    # Đánh giá các metrics với ngưỡng 0.5
    dc, js, pre, rec = evaluate_metrics(y_true, y_true, threshold=0.5)

    print(f'Dice Coefficient: {dc.item()}')
    print(f'IoU: {js.item()}')
    print(f'Precision: {pre.item()}')
    print(f'Recall: {rec.item()}')


Dice Coefficient: 1.0
IoU: 1.0
Precision: 1.0
Recall: 1.0


# **# Compile Model**

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = VGG16AttentionUNet().to(device)

loss_function = joint_loss1
num_classes = 1
optimizer = optim.Adam(model.parameters(), lr=LR)

scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5, verbose=True)

metrics = [
    dice_coef,
    jacard_similarity,
    precision,
    recall
]



In [None]:
checkpoint_dir = '/content/drive/MyDrive'
os.makedirs(checkpoint_dir, exist_ok=True)

for epoch in range(EPOCHS):
    epoch = epoch + 30
    print(f"EPOCH: {epoch + 1}/{EPOCHS}")
    model.train()

    train_loss = 0
    train_metrics = {metric.__name__: 0 for metric in metrics}

    for inputs, labels in train_dataloader:
        optimizer.zero_grad()
        inputs = inputs.to(device)
        labels = labels.to(device)

        outputs = model(inputs)
        loss_total = loss_function(labels, outputs)

        loss_total.backward()
        optimizer.step()

        train_loss += loss_total.item()

        for metric in metrics:
            metric_value = metric(labels, outputs)
            train_metrics[metric.__name__] += metric_value

    train_loss /= len(train_dataloader)

    for metric_name in train_metrics:
        train_metrics[metric_name] = len(train_dataloader)

    print("Train_loss: {:.4f} - Train_Dice: {:.4f} - Train_Jaccard : {:.4f} - Train_Precision: {:.4f} - Train_Recall: {:.4f}".format(train_loss, train_metrics['dice_coef'], train_metrics['jacard_similarity'], train_metrics['precision'], train_metrics['recall']))

    model.eval()
    val_loss = 0
    val_metrics = {metric.__name__: 0 for metric in metrics}

    with torch.no_grad():  # Không tính gradient
        for inputs, labels in valid_dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            loss_total = loss_function(labels, outputs)

            val_loss += loss_total.item()

            for metric in metrics:
                metric_value = metric(labels, outputs)
                val_metrics[metric.__name__] += metric_value

        # Trung bình hóa loss và metrics
        val_loss /= len(valid_dataloader)
        for metric_name in val_metrics:
            val_metrics[metric_name] / len(valid_dataloader)

        print("val_loss: {:.4f} - val_Dice: {:.4f} - val_Jaccard : {:.4f} - val_Precision: {:.4f} - val_Recall: {:.4f}".format(val_loss, val_metrics['dice_coef'], val_metrics['jacard_similarity'], val_metrics['precision'], val_metrics['recall']))

    # checkpoint_path = os.path.join(checkpoint_dir, f'Epoch_{epoch+1}.pth')
    # torch.save(model.state_dict(), checkpoint_path)

In [None]:

model.eval()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model.to(device)

test_loss = 0
test_metrics = {metric.__name__: 0 for metric in metrics}

with torch.no_grad():  # Không tính gradient
    for inputs, labels in test_dataloader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        outputs = model(inputs)

        loss_total = loss_function(labels, outputs)

        test_loss += loss_total.item()

        for metric in metrics:
            metric_value = metric(labels, outputs)
            test_metrics[metric.__name__] += metric_value

        # Trung bình hóa loss và metrics
    test_loss /= len(test_dataloader)
    for metric_name in test_metrics:
        test_metrics[metric_name] /= len(test_dataloader)

    print("test_loss: :.4f} - test_Dice: {:.4f} - test_Jaccard : {:.4f} - test_Precision: {:.4f} - test_Recall: {:.4f}".format(test_loss, test_metrics['dice_coef'], test_metrics['jacard_similarity'], test_metrics['precision'], test_metrics['recall']))