In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image
import os

class DenoisingDataset(Dataset):
    def __init__(self, degraded_dir, mask_dir, gt_dir, transform=None):
        self.degraded_dir = degraded_dir
        self.mask_dir = mask_dir
        self.gt_dir = gt_dir
        self.transform = transform
        self.images = os.listdir(degraded_dir)
        
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        degraded_img = Image.open(os.path.join(self.degraded_dir, self.images[idx])).convert('RGB')
        mask_img_name = self.images[idx].replace('.png', '_mask.png')
        mask_img = Image.open(os.path.join(self.mask_dir, mask_img_name)).convert('L')
        gt_img = Image.open(os.path.join(self.gt_dir, self.images[idx])).convert('RGB')
        
        if self.transform:
            degraded_img = self.transform(degraded_img)
            mask_img = self.transform(mask_img)
            gt_img = self.transform(gt_img)
        
        return degraded_img, mask_img, gt_img


class MaskedAutoencoder(nn.Module):
    def __init__(self):
        super(MaskedAutoencoder, self).__init__()

        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=4, stride=2, padding=1),
            nn.ReLU(True),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.ReLU(True),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.ReLU(True),
        )

        # Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(259, 128, kernel_size=4, stride=2, padding=1),
            nn.ReLU(True),
            nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
            nn.ReLU(True),
            nn.ConvTranspose2d(64, 3, kernel_size=4, stride=2, padding=1),
            nn.Sigmoid(),
        )

    def forward(self, x, mask):
        unmasked_input = x * (1 - mask)
        encoded_unmasked = self.encoder(unmasked_input)
        
        masked_input = x * mask
        encoded_unmasked_upsampled = nn.functional.interpolate(
            encoded_unmasked, size=(masked_input.size(2), masked_input.size(3)), mode='bilinear', align_corners=False
        )

        combined_decoder_input = torch.cat((encoded_unmasked_upsampled, masked_input), dim=1)
        decoded = self.decoder(combined_decoder_input)


        decoded = nn.functional.interpolate(decoded, size=x.size()[2:], mode='bilinear', align_corners=False)
        
        output = decoded * (1 - mask) + x * mask
        return output



def train_autoencoder(model, dataloader, criterion, optimizer, num_epochs=1):
    for epoch in range(num_epochs):
        for degraded, mask, gt in dataloader:
            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
            degraded, mask, gt = degraded.to(device), mask.to(device), gt.to(device)
            
            # Forward pass
            outputs = model(degraded, mask)
            loss = criterion(outputs, gt)
            
            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
    return model

if __name__ == "__main__":
    transform = transforms.Compose([
        transforms.ToTensor(),
    ])
    
    degraded_dir = r"F:\KLA Problem Statement\Denoising_Dataset_train_val\bottle\Train\Degraded_image\broken_small"
    mask_dir = r"F:\KLA Problem Statement\Denoising_Dataset_train_val\bottle\Train\Defect_mask\broken_small"
    gt_dir = r"F:\KLA Problem Statement\Denoising_Dataset_train_val\bottle\Train\GT_clean_image\broken_small"
    
    dataset = DenoisingDataset(degraded_dir, mask_dir, gt_dir, transform)
    dataloader = DataLoader(dataset, batch_size=1, shuffle=True)
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model = MaskedAutoencoder().to(device) 
    criterion = nn.MSELoss() 
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    model_trained= train_autoencoder(model, dataloader, criterion, optimizer)


KeyboardInterrupt: 

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image
import os

class DenoisingDataset(Dataset):
    def __init__(self, degraded_dir, mask_dir, gt_dir, transform=None):
        self.degraded_dir = degraded_dir
        self.mask_dir = mask_dir
        self.gt_dir = gt_dir
        self.transform = transform
        self.images = os.listdir(degraded_dir)
        
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        degraded_img = Image.open(os.path.join(self.degraded_dir, self.images[idx])).convert('RGB')
        mask_img_name = self.images[idx].replace('.png', '_mask.png')
        mask_img = Image.open(os.path.join(self.mask_dir, mask_img_name)).convert('L')
        gt_img = Image.open(os.path.join(self.gt_dir, self.images[idx])).convert('RGB')
        
        if self.transform:
            degraded_img = self.transform(degraded_img)
            mask_img = self.transform(mask_img)
            gt_img = self.transform(gt_img)
        
        return degraded_img, mask_img, gt_img


class MaskedAutoencoder(nn.Module):
    def __init__(self):
        super(MaskedAutoencoder, self).__init__()

        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=4, stride=2, padding=1),
            nn.ReLU(True),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.ReLU(True),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.ReLU(True),
        )

        # Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(259, 128, kernel_size=4, stride=2, padding=1),
            nn.ReLU(True),
            nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
            nn.ReLU(True),
            nn.ConvTranspose2d(64, 3, kernel_size=4, stride=2, padding=1),
            nn.Sigmoid(),
        )

    def forward(self, x, mask):
        unmasked_input = x * (1 - mask)
        encoded_unmasked = self.encoder(unmasked_input)
        
        masked_input = x * mask
        encoded_unmasked_upsampled = nn.functional.interpolate(
            encoded_unmasked, size=(masked_input.size(2), masked_input.size(3)), mode='bilinear', align_corners=False
        )

        combined_decoder_input = torch.cat((encoded_unmasked_upsampled, masked_input), dim=1)
        decoded = self.decoder(combined_decoder_input)

        decoded = nn.functional.interpolate(decoded, size=x.size()[2:], mode='bilinear', align_corners=False)
        
        output = decoded * (1 - mask) + x * mask
        return output


def train_autoencoder(model, dataloader, criterion, optimizer, num_epochs=1):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    for epoch in range(num_epochs):
        for degraded, mask, gt in dataloader:
            degraded, mask, gt = degraded.to(device), mask.to(device), gt.to(device)
            
            # Forward pass
            outputs = model(degraded, mask)
            loss = criterion(outputs, gt)
            
            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
    return model

if __name__ == "__main__":
    # Transformation with resizing to 256x256
    transform = transforms.Compose([
        transforms.Resize((256, 256)),  # Resize images to 256x256
        transforms.ToTensor(),
    ])
    
    degraded_dir = r"F:\KLA Problem Statement\Denoising_Dataset_train_val\bottle\Train\Degraded_image\broken_small"
    mask_dir = r"F:\KLA Problem Statement\Denoising_Dataset_train_val\bottle\Train\Defect_mask\broken_small"
    gt_dir = r"F:\KLA Problem Statement\Denoising_Dataset_train_val\bottle\Train\GT_clean_image\broken_small"
    
    dataset = DenoisingDataset(degraded_dir, mask_dir, gt_dir, transform)
    dataloader = DataLoader(dataset, batch_size=1, shuffle=True)
    
    model = MaskedAutoencoder() 
    criterion = nn.MSELoss() 
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    model_trained = train_autoencoder(model, dataloader, criterion, optimizer)


Epoch [1/1], Loss: 0.0506


In [4]:
import os
import shutil
import torch
from PIL import Image
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset

class InferenceDataset(Dataset):
    def __init__(self, degraded_dir, transform=None):
        self.degraded_dir = degraded_dir
        self.transform = transform
        self.images = os.listdir(degraded_dir)
        
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        img_name = self.images[idx]
        degraded_img = Image.open(os.path.join(self.degraded_dir, img_name)).convert('RGB')
        
        if self.transform:
            degraded_img = self.transform(degraded_img)
        
        return degraded_img, img_name


def save_predictions(model, device, input_dir, output_dir, transform):
    # Ensure the output directory has the same structure as the input directory
    if os.path.exists(output_dir):
        shutil.rmtree(output_dir)
    shutil.copytree(input_dir, output_dir, ignore=shutil.ignore_patterns("*.png"))

    # Iterate over objects in the dataset (e.g., 'bottle', 'cable')
    for object_type in os.listdir(input_dir):
        object_path = os.path.join(input_dir, object_type, "Val", "Degraded_image")
        
        # Iterate over defect types (e.g., 'broken_large', 'broken_small')
        for defect_type in os.listdir(object_path):
            defect_path = os.path.join(object_path, defect_type)

            # Create a DataLoader for the current defect type
            dataset = InferenceDataset(defect_path, transform)
            dataloader = DataLoader(dataset, batch_size=1, shuffle=False)

            # Directory to save predictions
            output_defect_path = os.path.join(output_dir, object_type, "Val", "Degraded_image", defect_type)

            # Run inference
            model.eval()
            with torch.no_grad():
                for degraded_img, img_name in dataloader:
                    degraded_img = degraded_img.to(device)
                    mask = torch.zeros_like(degraded_img)  # assuming no mask provided in val
                    output = model(degraded_img, mask)

                    # Save output image
                    output_img = transforms.ToPILImage()(output.squeeze(0).cpu())
                    output_img.save(os.path.join(output_defect_path, img_name[0]))

# Parameters
input_dir = r"F:\KLA Problem Statement\Denoising_Dataset_train_val"
output_dir = r"F:\KLA Problem Statement\Denoising_Dataset_results"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Image transformation
transform = transforms.Compose([
    transforms.ToTensor(),
])


save_predictions(model, device, input_dir, output_dir, transform)


KeyboardInterrupt: 

In [None]:
import os
from pathlib import Path
from torchvision.utils import save_image

val_dir = "F:/KLA Problem Statement/Denoising_Dataset_train_val"
results_dir = "F:/KLA Problem Statement/Denoising_Dataset_results"

Path(results_dir).mkdir(parents=True, exist_ok=True)

def process_and_save_images(model, val_dir, results_dir, transform):
    model.eval() 
    with torch.no_grad():
        for obj in os.listdir(val_dir):
            obj_dir = os.path.join(val_dir, obj, 'Val', 'Degraded_image')
            if not os.path.isdir(obj_dir):
                continue
            
            for defect_type in os.listdir(obj_dir):
                defect_dir = os.path.join(obj_dir, defect_type)
                save_defect_dir = os.path.join(results_dir, obj, 'Val', defect_type)
                Path(save_defect_dir).mkdir(parents=True, exist_ok=True)
                for img_name in os.listdir(defect_dir):
                    if img_name.endswith('.png'):
                        degraded_img_path = os.path.join(defect_dir, img_name)
                        mask_img_path = os.path.join(
                            val_dir, obj, 'Val', 'Defect_mask', defect_type, img_name.replace('.png', '_mask.png')
                        )

                        degraded_img = Image.open(degraded_img_path).convert('RGB')
                        mask_img = Image.open(mask_img_path).convert('L')
                        degraded_img = transform(degraded_img).to(device)
                        mask_img = transform(mask_img).to(device)
                        mask_img = mask_img.unsqueeze(0)  
                        output = model(degraded_img.unsqueeze(0), mask_img).squeeze(0)
                        output_path = os.path.join(save_defect_dir, img_name)
                        save_image(output, output_path)

transform = transforms.Compose([
    transforms.ToTensor(),
])


process_and_save_images(model_trained, val_dir, results_dir, transform)




In [None]:
from skimage.metrics import structural_similarity as ssim
import torchmetrics
import numpy as np
import matplotlib.pyplot as plt

# Function to calculate PSNR using torchmetrics
psnr_metric = torchmetrics.functional.peak_signal_noise_ratio

def calculate_psnr_ssim(output, gt):
    """Calculate PSNR and SSIM between the output and ground truth."""
    # Convert tensors to NumPy arrays for SSIM calculation
    output_np = output.permute(0, 2, 3, 1).cpu().numpy()  # Convert to (N, H, W, C)
    gt_np = gt.permute(0, 2, 3, 1).cpu().numpy()  # Convert to (N, H, W, C)

    psnr_list = []
    ssim_list = []

    for i in range(len(output_np)):
        # PSNR
        psnr_value = psnr_metric(output[i], gt[i], data_range=1.0)  # Normalized to [0,1]
        psnr_list.append(psnr_value.item())
        
        # SSIM: Set `win_size` to a smaller value to ensure it fits within the image dimensions
        ssim_value = ssim(gt_np[i], output_np[i], multichannel=True, data_range=1.0, win_size=3)
        ssim_list.append(ssim_value)

    return psnr_list, ssim_list


def visualize_results_with_metrics(model, dataloader, device):
    model.eval()  # Set model to evaluation mode
    
    with torch.no_grad():
        for degraded, mask, gt in dataloader:
            # Move data to the device (GPU or CPU)
            degraded, mask, gt = degraded.to(device), mask.to(device), gt.to(device)
            
            # Forward pass through the model
            output = model(degraded, mask)
            
            # Calculate PSNR and SSIM
            psnr_list, ssim_list = calculate_psnr_ssim(output, gt)
            
            # Convert torch tensors to numpy arrays for plotting
            degraded_np = degraded.cpu().numpy()
            mask_np = mask.cpu().numpy()
            gt_np = gt.cpu().numpy()
            output_np = output.cpu().numpy()
            
            # Plot the first few images in the batch
            for i in range(min(len(degraded_np), 5)):  # Visualize up to 5 images
                fig, axs = plt.subplots(1, 4, figsize=(12, 4))
                
                axs[0].imshow(degraded_np[i].transpose(1, 2, 0))  # Degraded image
                axs[0].set_title('Degraded Image')
                axs[0].axis('off')
                
                axs[1].imshow(mask_np[i].squeeze(), cmap='gray')  # Mask image
                axs[1].set_title('Mask Image')
                axs[1].axis('off')
                
                axs[2].imshow(gt_np[i].transpose(1, 2, 0))  # Ground Truth image
                axs[2].set_title('Ground Truth')
                axs[2].axis('off')
                
                axs[3].imshow(output_np[i].transpose(1, 2, 0))  # Model Output
                axs[3].set_title(f'Reconstructed Image\nPSNR: {psnr_list[i]:.2f} SSIM: {ssim_list[i]:.2f}')
                axs[3].axis('off')
                
                plt.show()

# Directories for test dataset
test_degraded_dir = r"F:\KLA Problem Statement\Denoising_Dataset_train_val\\bottle\Val\Degraded_image\broken_small"
test_mask_dir = r"F:\KLA Problem Statement\Denoising_Dataset_train_val\bottle\Val\Defect_mask\broken_small"
test_gt_dir = r"F:\KLA Problem Statement\Denoising_Dataset_train_val\bottle\Val\GT_clean_image\broken_small"

# Load test dataset
test_dataset = DenoisingDataset(test_degraded_dir, test_mask_dir, test_gt_dir, transform)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False)

# Visualize results with PSNR and SSIM
visualize_results_with_metrics(model, test_dataloader, device)


In [None]:
import os
from skimage.metrics import structural_similarity as ssim
import torchmetrics
import numpy as np
import matplotlib.pyplot as plt

# Function to calculate PSNR using torchmetrics
psnr_metric = torchmetrics.functional.peak_signal_noise_ratio

def calculate_psnr_ssim(output, gt):
    """Calculate PSNR and SSIM between the output and ground truth."""
    # Convert tensors to NumPy arrays for SSIM calculation
    output_np = output.permute(0, 2, 3, 1).cpu().numpy()  # Convert to (N, H, W, C)
    gt_np = gt.permute(0, 2, 3, 1).cpu().numpy()  # Convert to (N, H, W, C)

    psnr_list = []
    ssim_list = []

    for i in range(len(output_np)):
        # PSNR
        psnr_value = psnr_metric(output[i], gt[i], data_range=1.0)  # Normalized to [0,1]
        psnr_list.append(psnr_value.item())
        
        # SSIM: Set `win_size` to a smaller value to ensure it fits within the image dimensions
        ssim_value = ssim(gt_np[i], output_np[i], multichannel=True, data_range=1.0, win_size=3)
        ssim_list.append(ssim_value)

    return psnr_list, ssim_list

def save_results_with_metrics(model, dataloader, device, save_dir):
    model.eval()  # Set model to evaluation mode
    os.makedirs(save_dir, exist_ok=True)  # Create the directory if it doesn't exist
    
    with torch.no_grad():
        for batch_idx, (degraded, mask, gt) in enumerate(dataloader):
            # Move data to the device (GPU or CPU)
            degraded, mask, gt = degraded.to(device), mask.to(device), gt.to(device)
            
            # Forward pass through the model
            output = model(degraded, mask)
            
            # Calculate PSNR and SSIM
            psnr_list, ssim_list = calculate_psnr_ssim(output, gt)
            
            # Convert torch tensors to numpy arrays for saving
            degraded_np = degraded.cpu().numpy()
            mask_np = mask.cpu().numpy()
            gt_np = gt.cpu().numpy()
            output_np = output.cpu().numpy()
            
            # Save the first few images in the batch
            for i in range(len(degraded_np)):  # Save all images in the batch
                fig, axs = plt.subplots(1, 4, figsize=(12, 4))
                
                axs[0].imshow(degraded_np[i].transpose(1, 2, 0))  # Degraded image
                axs[0].set_title('Degraded Image')
                axs[0].axis('off')
                
                axs[1].imshow(mask_np[i].squeeze(), cmap='gray')  # Mask image
                axs[1].set_title('Mask Image')
                axs[1].axis('off')
                
                axs[2].imshow(gt_np[i].transpose(1, 2, 0))  # Ground Truth image
                axs[2].set_title('Ground Truth')
                axs[2].axis('off')
                
                axs[3].imshow(output_np[i].transpose(1, 2, 0))  # Model Output
                axs[3].set_title(f'Reconstructed Image\nPSNR: {psnr_list[i]:.2f} SSIM: {ssim_list[i]:.2f}')
                axs[3].axis('off')
                
                # Save the figure
                save_path = os.path.join(save_dir, f'batch_{batch_idx}_image_{i}.png')
                plt.savefig(save_path)
                plt.close(fig)  # Close the figure to avoid memory leaks

# Directories for test dataset
test_degraded_dir = r"F:\KLA Problem Statement\Denoising_Dataset_train_val\\bottle\Val\Degraded_image\broken_small"
test_mask_dir = r"F:\KLA Problem Statement\Denoising_Dataset_train_val\bottle\Val\Defect_mask\broken_small"
test_gt_dir = r"F:\KLA Problem Statement\Denoising_Dataset_train_val\bottle\Val\GT_clean_image\broken_small"

# Load test dataset
test_dataset = DenoisingDataset(test_degraded_dir, test_mask_dir, test_gt_dir, transform)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False)

# Directory to save images
save_dir = r"F:\KLA Problem Statement\Saved_Results"

# Save the results with PSNR and SSIM metrics
save_results_with_metrics(model, test_dataloader, device, save_dir)
