In [34]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torchvision.datasets import ImageFolder
import numpy as np

# Set the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define the path to the dataset
dataset_path = "datasets/sixray/train_data"

# Define the transformation to resize the input images
transform = transforms.Compose([
    transforms.Resize((256, 256)),  # Resize the image to 256x256
    transforms.ToTensor()  # Convert the image to a tensor
])

# Create the dataset
dataset = ImageFolder(dataset_path, transform=transform)

class Preprocessed_data(Dataset):
    def __init__(self,dataset):
        self.dataset = dataset
        self.patch_size = (56,56)

    def __getitem__(self,index):
        original_img = self.dataset[index][0]
        noise = np.random.normal(0,10**5,size=original_img.shape)
        noisy_img = original_img+noise

        row, col, ch =  noisy_img.shape
        patches = []
        for i in range(row):
            for j in range(col):
                if (i+1)*56 <= row and (j+1)*56 <= col:
                    patch = noisy_img[(i)*56:(i+1)*56,(j)*56:(j+1)*56,:]
                    patches.append(patch)


        return patches
    
    def __len__(self):
        return(len(self.dataset))

preproc_dataset = Preprocessed_data(dataset)
# Create the data loader
batch_size = 32
data_loader = DataLoader(preproc_dataset, batch_size=batch_size, shuffle=True)
print(len(data_loader))


14


In [18]:
import torch
import torch.nn as nn

# Define the autoencoder model
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1),  # Layer 1
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),  # Max pooling layer 1
            
            nn.Conv2d(16, 8, kernel_size=3, stride=1, padding=1),  # Layer 2
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),  # Max pooling layer 2
            
            nn.Conv2d(8, 8, kernel_size=3, stride=1, padding=1),  # Layer 3
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)  # Max pooling layer 3
        )
        
        # Decoder
        self.decoder = nn.Sequential(
            nn.Conv2d(8, 8, kernel_size=3, stride=1, padding=1),  # Layer 1
            nn.ReLU(inplace=True),
            nn.Upsample(scale_factor=2, mode='nearest'),  # Upsampling layer 1
            
            nn.Conv2d(8, 16, kernel_size=3, stride=1, padding=1),  # Layer 2
            nn.ReLU(inplace=True),
            nn.Upsample(scale_factor=2, mode='nearest'),  # Upsampling layer 2
            
            nn.Conv2d(16, 3, kernel_size=3, stride=1, padding=1),  # Layer 3
            nn.ReLU(inplace=True),
            nn.Upsample(scale_factor=2, mode='nearest')  # Upsampling layer 3
        )
        
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

# Create an instance of the autoencoder model
model = Autoencoder()


In [19]:
# Create an instance of the autoencoder model
model = Autoencoder().to(device)

# Define the loss function
criterion = nn.MSELoss()

# Define the optimizer
learning_rate = 0.001
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Set the number of training epochs
num_epochs = 30



In [20]:
# Training loop
for epoch in range(num_epochs):
    running_loss = 0.0
    for images, _ in data_loader:
        # Move images to the device
        images = images.to(device)
        
        # Forward pass
        outputs = model(images)
        #loss = criterion(outputs, images)
        # Calculate MAE loss
        mae_loss = nn.L1Loss()(outputs, images)
        
        # Calculate MSE loss
        mse_loss = nn.MSELoss()(outputs, images)
        
        # Compute the combined loss
        loss = 0.7 * mae_loss + 0.3 * mse_loss
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        
    epoch_loss = running_loss / len(data_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}")

# Save the trained model
torch.save(model.state_dict(), "autoencoder_model.pth")

Epoch [1/30], Loss: 0.6248
Epoch [2/30], Loss: 0.3546
Epoch [3/30], Loss: 0.1560
Epoch [4/30], Loss: 0.1283
Epoch [5/30], Loss: 0.1155
Epoch [6/30], Loss: 0.1037
Epoch [7/30], Loss: 0.0962
Epoch [8/30], Loss: 0.0920
Epoch [9/30], Loss: 0.0872
Epoch [10/30], Loss: 0.0845
Epoch [11/30], Loss: 0.0806
Epoch [12/30], Loss: 0.0764
Epoch [13/30], Loss: 0.0754
Epoch [14/30], Loss: 0.0738
Epoch [15/30], Loss: 0.0703
Epoch [16/30], Loss: 0.0711
Epoch [17/30], Loss: 0.0712
Epoch [18/30], Loss: 0.0685
Epoch [19/30], Loss: 0.0666
Epoch [20/30], Loss: 0.0650
Epoch [21/30], Loss: 0.0625
Epoch [22/30], Loss: 0.0603
Epoch [23/30], Loss: 0.0590
Epoch [24/30], Loss: 0.0572
Epoch [25/30], Loss: 0.0572
Epoch [26/30], Loss: 0.0562
Epoch [27/30], Loss: 0.0557
Epoch [28/30], Loss: 0.0543
Epoch [29/30], Loss: 0.0558
Epoch [30/30], Loss: 0.0544


In [35]:
import cv2 as cv
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline

# Define the path to the test dataset
test_dataset_path = "datasets/sixray/test_data"

# Define the transformation to resize the input images
transform = transforms.Compose([
    transforms.Resize((256, 256)),  # Resize the image to 256x256
    transforms.ToTensor()  # Convert the image to a tensor
])

# Create the test dataset
test_dataset = ImageFolder(test_dataset_path, transform=transform)
preproc_test = Preprocessed_data(test_dataset)

# Create the data loader for test images
batch_size = 1  # We process one image at a time
test_data_loader = DataLoader(preproc_test, batch_size=batch_size)

# Define the autoencoder model
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1),  # Layer 1
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),  # Max pooling layer 1
            
            nn.Conv2d(16, 8, kernel_size=3, stride=1, padding=1),  # Layer 2
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),  # Max pooling layer 2
            
            nn.Conv2d(8, 8, kernel_size=3, stride=1, padding=1),  # Layer 3
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)  # Max pooling layer 3
        )
        
        # Decoder
        self.decoder = nn.Sequential(
            nn.Conv2d(8, 8, kernel_size=3, stride=1, padding=1),  # Layer 1
            nn.ReLU(inplace=True),
            nn.Upsample(scale_factor=2, mode='nearest'),  # Upsampling layer 1
            
            nn.Conv2d(8, 16, kernel_size=3, stride=1, padding=1),  # Layer 2
            nn.ReLU(inplace=True),
            nn.Upsample(scale_factor=2, mode='nearest'),  # Upsampling layer 2
            
            nn.Conv2d(16, 3, kernel_size=3, stride=1, padding=1),  # Layer 3
            nn.ReLU(inplace=True),
            nn.Upsample(scale_factor=2, mode='nearest')  # Upsampling layer 3
        )
        
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

# Create an instance of the autoencoder model
model = Autoencoder().to(device)

# Load the trained model
model.load_state_dict(torch.load("autoencoder_model.pth"))
model.eval()

# Reconstruct and plot the images
with torch.no_grad():
    i = 0
    for images, _ in test_data_loader:
        # Move images to the device
        images = images.to(device)
        
        # Reconstruct images
        outputs = model(images)

        disparity = images - outputs
        
        
        # Move images and outputs back to CPU
        images = images.cpu()
        outputs = outputs.cpu()
        disparity = disparity.cpu()
        
        # Plot the original and reconstructed images
        original_img = transforms.ToPILImage()(images[0])
        reconstructed_img = transforms.ToPILImage()(outputs[0])
        disp = transforms.ToPILImage()(disparity[0])
        
        original_img = np.array(original_img)
        original_img = cv.cvtColor(original_img,cv.COLOR_RGB2BGR)
        cv.imwrite("datasets/sixray/results/real/"+ str(i) + ".jpg",original_img)

        reconstructed_img = np.array(reconstructed_img)
        reconstructed_img = cv.cvtColor(reconstructed_img,cv.COLOR_RGB2BGR)
        cv.imwrite("datasets/sixray/results/fake/"+ str(i) + ".jpg",reconstructed_img)


        i = i+1

        fig, axs = plt.subplots(1, 3, figsize=(10, 5))
        axs[0].imshow(original_img)
        axs[0].set_title("Original Image")
        axs[0].axis("off")
        
        axs[1].imshow(reconstructed_img)
        axs[1].set_title("Reconstructed Image")
        axs[1].axis("off")

        axs[2].imshow(disp)
        axs[2].set_title("Disparity Maps")
        axs[2].axis("off")



ValueError: not enough values to unpack (expected 2, got 0)