In [18]:
import torch
import torch.nn as nn

class ConvolutionalAutoencoder(nn.Module):
    def __init__(self):
        super(ConvolutionalAutoencoder, self).__init__()
        
        # Encoder: Feature extraction
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 4, kernel_size=3, stride=1, padding=1),  
            nn.ReLU(),
            nn.Conv2d(4, 8, kernel_size=3, stride=1, padding=1),  
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # Downsample (24x24)

            nn.Conv2d(8, 16, kernel_size=3, stride=1, padding=1),  
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),  
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # Downsample (12x12)

            nn.Conv2d(32, 32, kernel_size=3, stride=1, padding=1),  
            nn.ReLU(),
        )

        # Decoder: Reconstruction
        self.decoder = nn.Sequential(
            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False),  # Upsample to (24,24)
            nn.Conv2d(32, 16, kernel_size=3, stride=1, padding=1),  # Refining the upsampled feature map
            nn.ReLU(),

            
            nn.Conv2d(16, 8, kernel_size=3, stride=1, padding=1),  
            nn.ReLU(),

            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False),  # Upsample to (48,48)
            nn.Conv2d(8, 4, kernel_size=3, stride=1, padding=1),  
            nn.ReLU(),
            
            nn.Conv2d(4, 1, kernel_size=3, stride=1, padding=1),  # Final reconstruction
            nn.Sigmoid()  # Output pixel values in the range [0, 1]
        )

    def forward(self, x):
        x = self.encoder(x)  
        x = self.decoder(x)  
        return x

# Test
model = ConvolutionalAutoencoder()
x = torch.randn(1, 1, 48, 48)  # Sample input
output = model(x)
print(output.shape)  # Should be [1, 1, 48, 48]


torch.Size([1, 1, 48, 48])


In [3]:
!pip install opencv-python-headless torchsummary # For install cv2

Collecting opencv-python-headless
  Using cached opencv_python_headless-4.11.0.86-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (20 kB)
Collecting torchsummary
  Using cached torchsummary-1.5.1-py3-none-any.whl.metadata (296 bytes)
Using cached opencv_python_headless-4.11.0.86-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (50.0 MB)
Using cached torchsummary-1.5.1-py3-none-any.whl (2.8 kB)
Installing collected packages: torchsummary, opencv-python-headless
Successfully installed opencv-python-headless-4.11.0.86 torchsummary-1.5.1


In [19]:
from torchsummary import summary
import torch

# Initialize the model
model = ConvolutionalAutoencoder()

# Move the model to the GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

# Create a dummy input tensor (assuming grayscale image 48x48)
input_tensor = torch.randn(1, 1, 48, 48).to(device)  # Move input tensor to the same device

# Print the model summary
summary(model, (1, 48, 48))


----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1            [-1, 4, 48, 48]              40
              ReLU-2            [-1, 4, 48, 48]               0
            Conv2d-3            [-1, 8, 48, 48]             296
              ReLU-4            [-1, 8, 48, 48]               0
         MaxPool2d-5            [-1, 8, 24, 24]               0
            Conv2d-6           [-1, 16, 24, 24]           1,168
              ReLU-7           [-1, 16, 24, 24]               0
            Conv2d-8           [-1, 32, 24, 24]           4,640
              ReLU-9           [-1, 32, 24, 24]               0
        MaxPool2d-10           [-1, 32, 12, 12]               0
           Conv2d-11           [-1, 32, 12, 12]           9,248
             ReLU-12           [-1, 32, 12, 12]               0
         Upsample-13           [-1, 32, 24, 24]               0
           Conv2d-14           [-1, 16,

In [20]:
import os
import time
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
import torch.cuda.amp as amp

# Fix random seeds for reproducibility
seed = 1
torch.manual_seed(seed)
np.random.seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)  # for multi-GPU
torch.use_deterministic_algorithms(True)

class PatchDataset(Dataset):
    def __init__(self, data_path):
        """
        Dataset for loading pre-extracted patches.
        """
        self.original_patches = np.load(os.path.join(data_path, 'original_patches.npy')) # Original patch
        self.ground_truth_patches = np.load(os.path.join(data_path, 'ground_truth_patches.npy')) # Ground truth patch
        self.num_samples = len(self.original_patches)

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        original_patch = self.original_patches[idx]
        ground_truth_patch = self.ground_truth_patches[idx]

        # Convert to tensors and normalize to [0, 1]
        original_patch = torch.tensor(original_patch, dtype=torch.float32).unsqueeze(0) / 255.0
        ground_truth_patch = torch.tensor(ground_truth_patch, dtype=torch.float32).unsqueeze(0) / 255.0

        return original_patch, ground_truth_patch

# Initialize the dataset
train_data_path = 'data/train_patches_48x48x25'
train_dataset = PatchDataset(train_data_path)

# Define a seed
seed = 1

# Worker initialization function
def worker_init_fn(worker_id):
    # Seed each worker with a combination of the base seed and the worker ID
    np.random.seed(seed + worker_id)
    torch.manual_seed(seed + worker_id)

# Create DataLoader
train_loader = DataLoader(
    train_dataset,
    batch_size=64,
    shuffle=False,
    num_workers=8,
    pin_memory=True,
    worker_init_fn=worker_init_fn
)

# Model setup (example: AttU_Net)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model =  ConvolutionalAutoencoder().to(device)

# Loss and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# Training loop
epochs = 10
scaler = amp.GradScaler()

print(f"Total patches: {len(train_dataset)}")
print(f"Using device: {device}")
if device.type == 'cuda':
    print(f"GPU available: {torch.cuda.get_device_name(0)}")

for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    start_time = time.time()

    print(f"Epoch {epoch+1}/{epochs}")
    for batch_idx, (original_patches, ground_truth_patches) in enumerate(train_loader):
        inputs = original_patches.to(device)
        targets = ground_truth_patches.to(device)

        optimizer.zero_grad()

        # Forward pass with mixed precision
        with amp.autocast():
            outputs = model(inputs)
            loss = criterion(outputs, targets)

        # Backpropagation with mixed precision
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item()

        # Batch progress and memory usage
        if batch_idx % 1000 == 0:  # Log every 1000 batches
            print(f"Batch {batch_idx}/{len(train_loader)} - Loss: {loss.item():.4f}")
            if device.type == 'cuda':
                gpu_memory = torch.cuda.memory_allocated(device) / 1e6  # Convert to MB
                print(f"    GPU Memory Usage: {gpu_memory:.2f} MB")

    epoch_loss = running_loss / len(train_loader)
    epoch_time = time.time() - start_time

    print(f"Epoch [{epoch+1}/{epochs}] completed in {epoch_time:.2f}s")
    print(f"Average Loss: {epoch_loss:.4f}")
    print("-" * 50)

# Save the trained model
os.makedirs('model', exist_ok=True)
torch.save(model.state_dict(), 'model_48x48x25/valy_cae_trained_48x48x25.pth')
print("Model training completed!")


Total patches: 340534
Using device: cuda
GPU available: NVIDIA A10G
Epoch 1/10
Batch 0/5321 - Loss: 0.2725
    GPU Memory Usage: 306.92 MB
Batch 1000/5321 - Loss: 0.1376
    GPU Memory Usage: 306.92 MB
Batch 2000/5321 - Loss: 0.0001
    GPU Memory Usage: 306.92 MB
Batch 3000/5321 - Loss: 0.0529
    GPU Memory Usage: 306.92 MB
Batch 4000/5321 - Loss: 0.0105
    GPU Memory Usage: 306.92 MB
Batch 5000/5321 - Loss: 0.0583
    GPU Memory Usage: 306.92 MB
Epoch [1/10] completed in 63.78s
Average Loss: 0.0700
--------------------------------------------------
Epoch 2/10
Batch 0/5321 - Loss: 0.0007
    GPU Memory Usage: 306.92 MB
Batch 1000/5321 - Loss: 0.0659
    GPU Memory Usage: 306.92 MB
Batch 2000/5321 - Loss: 0.0003
    GPU Memory Usage: 306.92 MB
Batch 3000/5321 - Loss: 0.0239
    GPU Memory Usage: 306.92 MB
Batch 4000/5321 - Loss: 0.0097
    GPU Memory Usage: 306.92 MB
Batch 5000/5321 - Loss: 0.0494
    GPU Memory Usage: 306.92 MB
Epoch [2/10] completed in 63.71s
Average Loss: 0.0452
-

In [23]:
import os
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader
from skimage.metrics import structural_similarity as ssim
from skimage import img_as_ubyte
import cv2  # Import OpenCV
import matplotlib.pyplot as plt

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the trained model
model = ConvolutionalAutoencoder().to(device)
model.load_state_dict(torch.load('model_48x48x25/valy_cae_trained_48x48x25.pth'))
model.eval()

# Define the dataset and DataLoader
class PatchDataset(Dataset):
    def __init__(self, data_path):
        """
        Dataset for loading pre-extracted patches.
        """
        self.original_patches = np.load(os.path.join(data_path, 'original_patches.npy'))
        self.ground_truth_patches = np.load(os.path.join(data_path, 'ground_truth_patches.npy'))
        self.num_samples = len(self.original_patches)

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        original_patch = self.original_patches[idx]
        ground_truth_patch = self.ground_truth_patches[idx]

        # Convert to tensors and normalize to [0, 1]
        original_patch = torch.tensor(original_patch, dtype=torch.float32).unsqueeze(0) / 255.0
        ground_truth_patch = torch.tensor(ground_truth_patch, dtype=torch.float32).unsqueeze(0) / 255.0

        return original_patch, ground_truth_patch

# Helper function for SSIM
def calculate_ssim(output, target):
    output_np = output.squeeze().cpu().numpy()
    target_np = target.squeeze().cpu().numpy()
    output_np = img_as_ubyte(np.clip(output_np, 0, 1))
    target_np = img_as_ubyte(np.clip(target_np, 0, 1))
    return ssim(output_np, target_np, data_range=255.0)

# Initialize the dataset and dataloader
data_path = 'data/test_patches_48x48x25'
patch_dataset = PatchDataset(data_path)
patch_loader = DataLoader(patch_dataset, batch_size=1, shuffle=False)

# Metrics storage and image reconstruction
mse_list = []
psnr_list = []
ssim_list = []
reconstructed_patches = []

# Iterate through the test data
for i, (original_patch, ground_truth_patch) in enumerate(patch_loader):
    original_patch = original_patch.to(device)
    ground_truth_patch = ground_truth_patch.to(device)

    # Predict the output from the model
    with torch.no_grad():
        output_patch = model(original_patch)

    # Convert tensors to numpy arrays for metric calculations
    output_np = output_patch.squeeze().cpu().numpy()
    ground_truth_np = ground_truth_patch.squeeze().cpu().numpy()

    # Calculate MSE
    mse_value = np.mean((output_np - ground_truth_np) ** 2)

    # Calculate PSNR using OpenCV
    psnr_value = cv2.PSNR(output_np, ground_truth_np)

    # Calculate SSIM
    ssim_value = calculate_ssim(output_patch, ground_truth_patch)

    mse_list.append(mse_value)
    psnr_list.append(psnr_value)
    ssim_list.append(ssim_value)

    # Collect patches for reconstruction
    reconstructed_patches.append(output_np)

# Calculate and print the average metrics
average_mse = sum(mse_list) / len(mse_list)
average_psnr = sum(psnr_list) / len(psnr_list)
average_ssim = sum(ssim_list) / len(ssim_list)

print(f"Average MSE: {average_mse:.4f}")
print(f"Average PSNR: {average_psnr:.4f}")
print(f"Average SSIM: {average_ssim:.4f}")


Average MSE: 0.0386
Average PSNR: 70.8928
Average SSIM: 0.8195


In [24]:
import os
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader
from skimage.metrics import structural_similarity as ssim
from skimage import img_as_ubyte
import matplotlib.pyplot as plt
import math

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the trained model

model = ConvolutionalAutoencoder().to(device)
model.load_state_dict(torch.load('model_48x48x25/valy_cae_trained_48x48x25.pth'))
model.eval()

# Define the dataset and DataLoader
class PatchDataset(Dataset):
    def __init__(self, data_path):
        """
        Dataset for loading pre-extracted patches.
        """
        self.original_patches = np.load(os.path.join(data_path, 'original_patches.npy'))
        self.ground_truth_patches = np.load(os.path.join(data_path, 'ground_truth_patches.npy'))
        self.num_samples = len(self.original_patches)

    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        original_patch = self.original_patches[idx]
        ground_truth_patch = self.ground_truth_patches[idx]
        # Convert to tensors and normalize to [0, 1]
        original_patch = torch.tensor(original_patch, dtype=torch.float32).unsqueeze(0) / 255.0
        ground_truth_patch = torch.tensor(ground_truth_patch, dtype=torch.float32).unsqueeze(0) / 255.0
        return original_patch, ground_truth_patch

# Helper function for SSIM (using skimage)
def calculate_ssim_custom(output, target):
    output_np = output.squeeze().cpu().numpy()
    target_np = target.squeeze().cpu().numpy()
    output_np = img_as_ubyte(np.clip(output_np, 0, 1))
    target_np = img_as_ubyte(np.clip(target_np, 0, 1))
    return ssim(output_np, target_np, data_range=255.0)

##############################################
# Custom MSE and PSNR Functions from Scratch
##############################################

def calculate_mse(output, target):
    """
    Compute the Mean Squared Error between two images.
    Args:
        output (np.ndarray): The output image.
        target (np.ndarray): The ground truth image.
    Returns:
        float: The MSE value.
    """
    return np.mean((output - target) ** 2)

def calculate_psnr_from_scratch(output, target, max_pixel=1.0):
    """
    Compute the Peak Signal-to-Noise Ratio using MSE.
    Args:
        output (np.ndarray): The output image.
        target (np.ndarray): The ground truth image.
        max_pixel (float): The maximum possible pixel value (default: 1.0 for normalized images).
    Returns:
        float: The PSNR value in decibels.
    """
    mse_value = calculate_mse(output, target)
    if mse_value == 0:
        return float('inf')
    return 10 * math.log10((max_pixel ** 2) / mse_value)

##############################################
# Evaluation on Test Data
##############################################

data_path = 'data/test_patches_48x48x25'
patch_dataset = PatchDataset(data_path)
patch_loader = DataLoader(patch_dataset, batch_size=1, shuffle=False)

# Metrics storage and image reconstruction
mse_list = []
psnr_list = []
ssim_list = []
reconstructed_patches = []

for i, (original_patch, ground_truth_patch) in enumerate(patch_loader):
    original_patch = original_patch.to(device)
    ground_truth_patch = ground_truth_patch.to(device)

    # Predict the output from the model
    with torch.no_grad():
        output_patch = model(original_patch)

    # Convert tensors to numpy arrays for metric calculations
    output_np = output_patch.squeeze().cpu().numpy()
    ground_truth_np = ground_truth_patch.squeeze().cpu().numpy()

    # Calculate MSE using our custom function
    mse_value = calculate_mse(output_np, ground_truth_np)
    
    # Calculate PSNR using our custom function
    psnr_value = calculate_psnr_from_scratch(output_np, ground_truth_np, max_pixel=1.0)
    
    # Calculate SSIM
    ssim_value = calculate_ssim_custom(output_patch, ground_truth_patch)
    
    mse_list.append(mse_value)
    psnr_list.append(psnr_value)
    ssim_list.append(ssim_value)
    
    # Collect patches for reconstruction if needed
    reconstructed_patches.append(output_np)

# Calculate and print the average metrics
average_mse = np.mean(mse_list)
average_psnr = np.mean(psnr_list)
average_ssim = np.mean(ssim_list)

print(f"Average MSE: {average_mse:.4f}")
print(f"Average PSNR: {average_psnr:.4f} dB")
print(f"Average SSIM: {average_ssim:.4f}")


Average MSE: 0.0386
Average PSNR: 22.7620 dB
Average SSIM: 0.8195


In [7]:
import os
import numpy as np
import cv2

def extract_patches(image, patch_size, stride):
    """
    Extracts overlapping patches from an image.
    """
    patches = []
    h, w = image.shape
    for y in range(0, h - patch_size + 1, stride[1]):
        for x in range(0, w - patch_size + 1, stride[0]):
            patch = image[y:y + patch_size, x:x + patch_size]
            patches.append(patch)
    return np.array(patches)

In [21]:
import cv2
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import os

# Move to the device (GPU/CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define the function to reconstruct an image from patches
def reconstruct_image(patches, image_shape, patch_size=48, stride=(12, 12)):
    reconstructed_image = np.zeros(image_shape)
    count_matrix = np.zeros(image_shape)

    index = 0
    for y in range(0, image_shape[0] - patch_size + 1, stride[1]):
        for x in range(0, image_shape[1] - patch_size + 1, stride[0]):
            reconstructed_image[y:y + patch_size, x:x + patch_size] += patches[index].reshape(patch_size, patch_size)
            count_matrix[y:y + patch_size, x:x + patch_size] += 1
            index += 1

    # Avoid division by zero
    count_matrix[count_matrix == 0] = 1
    reconstructed_image /= count_matrix
    return reconstructed_image

# Load the trained model
model = ConvolutionalAutoencoder().to(device)
model.load_state_dict(torch.load('model_48x48x25/valy_cae_trained_48x48x25.pth'))

# Load the test image
test_image_path = 'data/train/original/008_nl_a161_304_01.jpg'
test_image = cv2.imread(test_image_path, cv2.IMREAD_GRAYSCALE)

# Extract patches from the test image
patch_size = 48
stride = (12, 12)
test_patches = extract_patches(test_image, patch_size, stride)

# Normalize patches and convert to a tensor
test_patches = test_patches.astype('float32') / 255.0
test_patches = torch.tensor(test_patches).unsqueeze(1).to(device)  # Add channel dimension

# Perform batch inference on patches
batch_size = 64  # Adjust based on available GPU memory
reconstructed_patches = []

with torch.no_grad():
    for i in range(0, len(test_patches), batch_size):
        batch = test_patches[i:i + batch_size]  # Create a batch of patches
        output_batch = model(batch)  # Run the batch through the model
        reconstructed_patches.append(output_batch.cpu().numpy())  # Move to CPU and convert to NumPy

# Convert reconstructed patches to a NumPy array
reconstructed_patches = np.concatenate(reconstructed_patches, axis=0)

# Reconstruct the full image
reconstructed_image = reconstruct_image(reconstructed_patches, test_image.shape, patch_size, stride)

# Save the reconstructed image
output_path = '008_nl_a161_304_01_recon.jpg'
cv2.imwrite(output_path, (reconstructed_image * 255).astype('uint8'))
print(f"Reconstructed image saved at {output_path}")


Reconstructed image saved at 008_nl_a161_304_01_recon.jpg
