# 1. Import thư viện

In [15]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torchsummary import summary
from torch.utils.data import DataLoader, Dataset
import cv2
import numpy as np
import os
from PIL import Image
from tqdm import tqdm
import time
import matplotlib.pyplot as plt

In [16]:
import os
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:128'
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 2. Tạo Mô hình SR

In [None]:
import torch
import torch.nn as nn
import cv2
import numpy as np

class CannyFilterOpenCV(nn.Module):
    def __init__(self, low_threshold=100, high_threshold=200):
        super(CannyFilterOpenCV, self).__init__()
        self.low_threshold = low_threshold
        self.high_threshold = high_threshold

    def forward(self, x):
        x_np = x.cpu().detach().numpy()
        canny_edges_batch = []
        for img in x_np:
            img_np = img.transpose(1, 2, 0)
            img_np = np.uint8(img_np * 255)
            canny_edges = cv2.Canny(img_np, self.low_threshold, self.high_threshold)
            canny_edges = canny_edges / 255.0
            canny_edges_batch.append(canny_edges[np.newaxis, ...])
        canny_edges_tensor = torch.from_numpy(np.array(canny_edges_batch)).float().to(x.device)
        return canny_edges_tensor
    
    def get_output_channels(self):
        return 1  # Canny filter luôn trả về 1 kênh
class SobelFilterOpenCV(nn.Module):
    def __init__(self):
        super(SobelFilterOpenCV, self).__init__()

    def forward(self, x):
        x_np = x.cpu().detach().numpy()
        sobel_edges_batch = []
        for img in x_np:
            img_np = img.transpose(1, 2, 0)
            img_np = np.uint8(img_np * 255)
            sobel_x = cv2.Sobel(img_np, cv2.CV_64F, 1, 0, ksize=3)
            sobel_y = cv2.Sobel(img_np, cv2.CV_64F, 0, 1, ksize=3)
            sobel_edges = np.sqrt(sobel_x ** 2 + sobel_y ** 2)
            sobel_edges = cv2.normalize(sobel_edges, None, 0, 1, cv2.NORM_MINMAX)
            sobel_edges_batch.append(sobel_edges.transpose(2, 0, 1))
        sobel_edges_tensor = torch.from_numpy(np.array(sobel_edges_batch)).float().to(x.device)
        return sobel_edges_tensor

    def get_output_channels(self, input_channels):
        return input_channels  # Sobel filter giữ nguyên số kênh đầu vào
class SR(nn.Module):
    def __init__(self, num_channels=3, scale_factor=4, use_canny=False, use_sobel=False):
        super(SR, self).__init__()
        self.scale_factor = scale_factor
        self.use_canny = use_canny
        self.use_sobel = use_sobel
        
        self.canny_filter = CannyFilterOpenCV() if use_canny else None
        self.sobel_filter = SobelFilterOpenCV() if use_sobel else None
        
        if use_canny:
            additional_channels = 1
        if use_sobel:
            additional_channels = 3
        self.input_conv = nn.Conv2d(num_channels+additional_channels, 64, kernel_size=3, stride=1, padding=1)
        
        self.residual_layers = nn.Sequential(
            *[ResidualCatBlock(64) for _ in range(16)]
        )
        self.upsample = nn.Sequential(
            nn.Conv2d(64, 64*scale_factor**2, kernel_size=3, stride=1, padding=1),
            nn.PixelShuffle(scale_factor),
            nn.Conv2d(64, num_channels, kernel_size=3, stride=1, padding=1)
        )
        
    def forward(self, x):
        edge_maps = []
        if self.use_canny:
            edge_maps.append(self.canny_filter(x))
        if self.use_sobel:
            edge_maps.append(self.sobel_filter(x))
        
        if edge_maps:
            x_with_edges = torch.cat([x] + edge_maps, dim=1)
        else:
            x_with_edges = x
        
        x = self.input_conv(x_with_edges)
        res = self.residual_layers(x)
        out = res + x
        out = self.upsample(out)
        return out

class ResidualCatBlock(nn.Module):
    def __init__(self, num_channels):
        super(ResidualCatBlock, self).__init__()
        self.block = nn.Sequential(
            nn.Conv2d(num_channels, num_channels, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(num_channels, num_channels, kernel_size=3, stride=1, padding=1)
        )
        self.conv = nn.Conv2d(num_channels*2, num_channels, kernel_size=1, stride=1)
        
    def forward(self, x):
        out = self.block(x)
        out = torch.cat((x, out), 1)
        out = self.conv(out)
        return x + out

# Usage example:
# scale = 4
# model = SR(scale_factor=scale, use_/=True, use_sobel=True)
# model = nn.DataParallel(model).cuda()
# summary(model, (3, 150, 150))

In [None]:
class ImageDataset(Dataset):
    def __init__(self, lr_dir, hr_dir):
        self.lr_files = sorted(os.listdir(lr_dir))
        self.hr_files = sorted(os.listdir(hr_dir))
        self.lr_dir = lr_dir
        self.hr_dir = hr_dir

    def __len__(self):
        return len(self.lr_files)

    def __getitem__(self, idx):
        lr_image = cv2.imread(os.path.join(self.lr_dir, self.lr_files[idx]))
        hr_image = cv2.imread(os.path.join(self.hr_dir, self.hr_files[idx]))
        lr_image = cv2.cvtColor(lr_image, cv2.COLOR_BGR2RGB)
        hr_image = cv2.cvtColor(hr_image, cv2.COLOR_BGR2RGB)
        
        transform = transforms.Compose([
            transforms.ToPILImage(),
            transforms.ToTensor()
        ])
        
        lr_image = transform(lr_image)
        hr_image = transform(hr_image)
        return lr_image, hr_image

# 3. Tạo Hyperparameter

In [None]:
# Đường dẫn tới bộ dữ liệu
train_lr_dir = 'Train/LR'
train_hr_dir = 'Train/HR'
valid_lr_dir = 'Test/LR'
valid_hr_dir = 'Test/HR'
# test_hr_dir  = '/kaggle/input/srdataset/sr_data/test/HR'
# test_lr_dir  = '/kaggle/input/srdataset/sr_data/test/LR'

# print(torch.cuda.memory_allocated())
# print(torch.cuda.memory_reserved())

In [17]:
from torch.amp import autocast, GradScaler
from torchsummary import summary
scaler = GradScaler()

# Khởi tạo dataset và dataloader
train_dataset = ImageDataset(train_lr_dir, train_hr_dir)
train_loader = DataLoader(train_dataset, batch_size = 16, shuffle=True)

valid_dataset = ImageDataset(valid_lr_dir, valid_hr_dir)
valid_loader = DataLoader(valid_dataset)
print(len(train_loader))
# Khởi tạo mô hình, loss function và optimizer
scale = 4
sobelsr = SR(scale_factor = scale, use_sobel = True).to(device)
# sobelsr = nn.DataParallel(sobelsr).to(device)
sobelsr.load_state_dict(torch.load('best_sobel_srx4_model.pth'))
criterion = nn.MSELoss()
optim_sobel = optim.Adam(sobelsr.parameters(), lr=1e-4,betas =(0.9, 0.999))
scheduler_sobel = optim.lr_scheduler.StepLR(optim_sobel, step_size=10**5, gamma=0.5)
summary(sobelsr.cuda(), input_size=(3, 510, 339), device='cuda')
scale = 4
cannysr = SR(scale_factor = scale, use_canny = True).to(device)
# cannysr = nn.DataParallel(cannysr).to(device)
cannysr.load_state_dict(torch.load('best_canny_srx4_model.pth'))
criterion = nn.MSELoss()
optim_canny = optim.Adam(cannysr.parameters(), lr=1e-4,betas =(0.9, 0.999))
scheduler_canny = optim.lr_scheduler.StepLR(optim_canny, step_size=10**5, gamma=0.5)

12500


  sobelsr.load_state_dict(torch.load('best_sobel_srx4_model.pth'))


OutOfMemoryError: CUDA out of memory. Tried to allocate 86.00 MiB. GPU 0 has a total capacity of 10.71 GiB of which 87.94 MiB is free. Process 84006 has 850.00 MiB memory in use. Process 88444 has 7.10 GiB memory in use. Including non-PyTorch memory, this process has 1.60 GiB memory in use. Of the allocated memory 1.31 GiB is allocated by PyTorch, and 106.27 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

In [None]:
def calculate_psnr(img1, img2):
    mse = torch.mean((img1 - img2) ** 2)
    if mse == 0:
        return float('inf')
    max_pixel = 1.0
    psnr = 20 * torch.log10(max_pixel / torch.sqrt(mse))
    return psnr.item()

# 4. Training

In [None]:
num_epochs = 24

best_psnr_sobel = float('-inf')
best_psnr_canny = float('-inf')
torch.cuda.empty_cache()

losses_sobel = []
losses_canny = []
avg_psnr_sobel = []
avg_psnr_canny = []

val_avg_psnr_sobel = []  # Validation PSNR
val_avg_psnr_canny = []

patience = 50
epochs_no_improve = 0
log_file = open('training_log.txt', 'a')
scaler = GradScaler()

for epoch in range(num_epochs):
    sobelsr.train()
    cannysr.train()

    epoch_loss_sobel = 0
    psnr_values_sobel = 0
    epoch_loss_canny = 0
    psnr_values_canny = 0
    start_time = time.time()

    # Training loop
    for (lr_images, hr_images) in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}', unit='batch'):
        lr_images = lr_images.cuda()
        hr_images = hr_images.cuda()

        # Sobel SR training
        optim_sobel.zero_grad()  
        with autocast(device_type='cuda'):
            outputs_sobel = sobelsr(lr_images)
            loss_sobel = criterion(outputs_sobel, hr_images)
        psnr_sobel = calculate_psnr(outputs_sobel, hr_images)
            
        scaler.scale(loss_sobel).backward()
        scaler.step(optim_sobel)
        scaler.update()
        scheduler_sobel.step()

        # Canny SR training
        optim_canny.zero_grad()  
        with autocast(device_type='cuda'):
            outputs_canny = cannysr(lr_images)
            loss_canny = criterion(outputs_canny, hr_images)
        psnr_canny = calculate_psnr(outputs_canny, hr_images)

        scaler.scale(loss_canny).backward()
        scaler.step(optim_canny)
        scaler.update()
        scheduler_canny.step()
        
        # Update metrics
        epoch_loss_sobel += loss_sobel.item()
        psnr_values_sobel += psnr_sobel
        epoch_loss_canny += loss_canny.item()
        psnr_values_canny += psnr_canny

    # Calculate average training metrics
    avg_epoch_loss_sobel = epoch_loss_sobel / len(train_loader)
    average_psnr_sobel = psnr_values_sobel / len(train_loader)
    losses_sobel.append(avg_epoch_loss_sobel)
    avg_psnr_sobel.append(average_psnr_sobel)

    avg_epoch_loss_canny = epoch_loss_canny / len(train_loader)
    average_psnr_canny = psnr_values_canny / len(train_loader)
    losses_canny.append(avg_epoch_loss_canny)
    avg_psnr_canny.append(average_psnr_canny)

    # Validation step
    sobelsr.eval()
    cannysr.eval()

    val_psnr_values_sobel = 0
    val_psnr_values_canny = 0

    with torch.no_grad():  # No gradients during validation
        for (lr_images, hr_images) in tqdm(valid_loader, desc=f'Validation Epoch {epoch+1}/{num_epochs}', unit='batch'):
            lr_images = lr_images.cuda()
            hr_images = hr_images.cuda()

            # Sobel SR validation (no loss, only PSNR)
            outputs_sobel = sobelsr(lr_images)
            psnr_sobel = calculate_psnr(outputs_sobel, hr_images)

            # Canny SR validation (no loss, only PSNR)
            outputs_canny = cannysr(lr_images)
            psnr_canny = calculate_psnr(outputs_canny, hr_images)

            # Update validation PSNR
            val_psnr_values_sobel += psnr_sobel
            val_psnr_values_canny += psnr_canny

    # Calculate average validation PSNR
    val_average_psnr_sobel = val_psnr_values_sobel / len(valid_loader)
    val_avg_psnr_sobel.append(val_average_psnr_sobel)

    val_average_psnr_canny = val_psnr_values_canny / len(valid_loader)
    val_avg_psnr_canny.append(val_average_psnr_canny)

    end_time = time.time()

    # Logging results
    log_string = (f"Epoch {epoch+1}/{num_epochs}, Loss sobel: {avg_epoch_loss_sobel:.4f}, "
                  f"Loss canny: {avg_epoch_loss_canny:.4f}, Time training: {end_time - start_time:.4f}s, "
                  f"PSNR sobel: {average_psnr_sobel:.2f} dB, PSNR canny: {average_psnr_canny:.2f} dB, "
                  f"Val PSNR sobel: {val_average_psnr_sobel:.2f} dB, Val PSNR canny: {val_average_psnr_canny:.2f} dB")
    print(log_string)
    log_file.write(log_string + '\n')
    log_file.flush()

    # Save best models based on validation PSNR
    if val_average_psnr_sobel > best_psnr_sobel:
        best_psnr_sobel = val_average_psnr_sobel
        torch.save(sobelsr.state_dict(), f'best_sobel_srx{scale}_model.pth')
        print(f"Saved Sobel SR model with PSNR {best_psnr_sobel:.4f}")
        epochs_no_improve=0
    

    if val_average_psnr_canny > best_psnr_canny:
        best_psnr_canny = val_average_psnr_canny
        torch.save(cannysr.state_dict(), f'best_canny_srx{scale}_model.pth')
        print(f"Saved Canny SR model with PSNR {best_psnr_canny:.4f}")
        epochs_no_improve=0
    
    if (val_average_psnr_sobel < best_psnr_sobel) and (val_average_psnr_canny < best_psnr_canny):
        epochs_no_improve+=1
    if epochs_no_improve >= patience:
        print(f"PSNR did not improve for 50 epochs. Early stopping at epoch {epoch+1}")
        break
    # Clear cache and optionally save models at each epoch
    torch.cuda.empty_cache()
    torch.save(sobelsr.state_dict(), 'sobel_sr.pth')
    torch.save(cannysr.state_dict(), 'canny_sr.pth')

# Close log file after training
log_file.close()

# Plotting results
plt.figure(figsize=(12, 10))

# Plot loss
plt.subplot(2, 1, 1)
plt.plot(losses_sobel, label='Sobel SR Loss (Train)')
plt.plot(losses_canny, label='Canny SR Loss (Train)')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training Loss')

# Plot PSNR
plt.subplot(2, 1, 2)
plt.plot(avg_psnr_sobel, label='Sobel SR PSNR (Train)')
plt.plot(val_avg_psnr_sobel, label='Sobel SR PSNR (Val)')
plt.plot(avg_psnr_canny, label='Canny SR PSNR (Train)')
plt.plot(val_avg_psnr_canny, label='Canny SR PSNR (Val)')
plt.xlabel('Epoch')
plt.ylabel('PSNR (dB)')
plt.legend()
plt.title('Average PSNR (Train and Val)')

plt.tight_layout()
plt.show()

# 5. Testing

In [18]:
cannysr = cannysr.cpu()
sobelsr = sobelsr.cpu()
sobelsr.eval()
cannysr.eval()

val_psnr_values_sobel = 0
val_psnr_values_canny = 0
torch.cuda.empty_cache()
with torch.no_grad():  # No gradients during validation
        for (lr_images, hr_images) in tqdm(valid_loader, desc=f'Validation Epoch {epoch+1}/{num_epochs}', unit='batch'):
                lr_images = lr_images.cpu()
                hr_images = hr_images.cpu()

                # Sobel SR validation (no loss, only PSNR)
                outputs_sobel = sobelsr(lr_images)
                psnr_sobel = calculate_psnr(outputs_sobel, hr_images)

                # Canny SR validation (no loss, only PSNR)
                outputs_canny = cannysr(lr_images)
                psnr_canny = calculate_psnr(outputs_canny, hr_images)

                # Update validation PSNR
                val_psnr_values_sobel += psnr_sobel
                val_psnr_values_canny += psnr_canny

        # Calculate average validation PSNR
        val_average_psnr_sobel = val_psnr_values_sobel / len(valid_loader)

        val_average_psnr_canny = val_psnr_values_canny / len(valid_loader)
        print(val_average_psnr_canny, val_average_psnr_sobel)

Validation Epoch 1/24: 100%|██████████| 10/10 [00:41<00:00,  4.11s/batch]

27.73695487976074 27.6855562210083



