<a href="https://colab.research.google.com/github/EthanW67/Machine-Learning-Research/blob/main/Research_Projects/Revised_Regular_CNN_6_27_24.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Using the resized 28x28 to modified Regular CNN architecture.
* Resized Original 28x28 NMIST dataset to 7x7 then resized 7x7 â†’ 28x28
* Compared and trained resized 28x28 with the original 28x28 NMIST dataset.


In [None]:
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torchvision.transforms import Resize
from torchmetrics.image import StructuralSimilarityIndexMeasure, PeakSignalNoiseRatio
device = "cuda" if torch.cuda.is_available() else "cpu"
psnr = PeakSignalNoiseRatio()
psnr.to(device)
ssim = StructuralSimilarityIndexMeasure(data_range=1.0)
ssim.to(device)

## Load and Resize dataset

In [None]:
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

# Define the transformations
original_transforms = transforms.Compose([
    transforms.ToTensor(), # Convert to tensor
    transforms.Resize((28, 28)),
])

transform_down = transforms.Compose([
    transforms.ToTensor(),  # Convert to tensor
    transforms.Resize((7, 7))  # Resize to 7x7
])

transform_up = transforms.Compose([
    transforms.Resize((28, 28)),  # Resize back to 28x28
])

# Regular Dataset
train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=original_transforms)
test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=original_transforms)

# Load MNIST dataset and apply the downscale transformation
train_dataset_down = datasets.MNIST(root='./data', train=True, download=True, transform=transform_down)
test_dataset_down = datasets.MNIST(root='./data', train=False, download=True, transform=transform_down)

# Apply the upscale transformation on the already downscaled datasets
train_dataset_up = datasets.MNIST(root='./data', train=True, download=True, transform=transforms.Compose([transform_down, transform_up]))
test_dataset_up = datasets.MNIST(root='./data', train=False, download=True, transform=transforms.Compose([transform_down, transform_up]))
batch_size = 32

# Original Dataset
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

# Create DataLoader instances for the downscaled version
train_loader_down = DataLoader(dataset=train_dataset_down, batch_size=batch_size, shuffle=False)
test_loader_down = DataLoader(dataset=test_dataset_down, batch_size=batch_size, shuffle=False)

# Create DataLoader instances for the upscaled version from downscaled images
train_loader_up = DataLoader(dataset=train_dataset_up, batch_size=batch_size, shuffle=False)
test_loader_up = DataLoader(dataset=test_dataset_up, batch_size=batch_size, shuffle=False)


# Get the dimensions of the original dataset

for batch_idx, (data, target) in enumerate(train_loader):
    print(data.size())  # Should print torch.Size([64, 1, 28, 28])
    break
# Example of how you might check the sizes to confirm the process
for batch_idx, (data, target) in enumerate(train_loader_down):
    print(f"Size of batch {batch_idx + 1} in train_loader_down: {data.size()}")  # 7x7 images
    break
for batch_idx, (data, target) in enumerate(train_loader_up):
    print(f"Size of batch {batch_idx + 1} in train_loader_up: {data.size()}")  # 28x28 images
    break

## Create CNN Model


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim


# Define a simple super-resolution model using convolutional layers
class SuperResolutionModel(nn.Module):
    def __init__(self, dropout_rate = 0.5):
        super(SuperResolutionModel, self).__init__()
        # Encoder
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.dropout1 = nn.Dropout2d(p=dropout_rate)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(256)
        self.dropout2 = nn.Dropout2d(p=dropout_rate)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)

        # Instance of ReLU
        self.relu = nn.ReLU()

        # Decoder
        self.upconv1 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.bn5 = nn.BatchNorm2d(128)
        self.conv7 = nn.Conv2d(128, 64, kernel_size=3, padding=1)
        self.bn6 = nn.BatchNorm2d(64)
        self.dropout3 = nn.Dropout2d(p=dropout_rate)

        self.upconv2 = nn.ConvTranspose2d(64, 32, kernel_size=2, stride=2)
        self.bn7 = nn.BatchNorm2d(32)
        self.conv8 = nn.Conv2d(32, 1, kernel_size=3, padding=1)

        # Forward

    def forward(self, x):
        # Encoder
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.relu(self.bn2(self.conv2(x)))
        x = self.dropout1(x)
        x = self.pool1(x)

        x = self.relu(self.bn3(self.conv3(x)))
        x = self.relu(self.bn4(self.conv4(x)))
        x = self.dropout2(x)
        x = self.pool2(x)

        # Decoder
        x = self.relu(self.bn5(self.upconv1(x)))
        x = self.relu(self.bn6(self.conv7(x)))
        x = self.dropout3(x)
        x = self.relu(self.bn7(self.upconv2(x)))
        x = self.relu(self.conv8(x))
        return x

## CNN Model with more Conv Layers

In [None]:
"""
class SuperResolutionModel(nn.Module):
    def __init__(self, dropout_rate = 0.5):
        super(SuperResolutionModel, self).__init__()
        # Encoder
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.dropout1 = nn.Dropout2d(p=dropout_rate)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(256)
        self.conv5 = nn.Conv2d(256, 512, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(512)
        self.conv6 = nn.Conv2d(512, 1024, kernel_size=3, padding=1)
        self.dropout2 = nn.Dropout2d(p=dropout_rate)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)

        # Instance of ReLU
        self.relu = nn.ReLU()

        # Decoder
        self.upconv1 = nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2)
        self.bn5 = nn.BatchNorm2d(512)
        self.conv7 = nn.Conv2d(512, 256, kernel_size=3, padding=1)
        self.conv8 = nn.Conv2d(256, 128, kernel_size=3, padding=1)
        self.bn6 = nn.BatchNorm2d(128)
        self.dropout3 = nn.Dropout2d(p=dropout_rate)

        self.upconv2 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.bn7 = nn.BatchNorm2d(64)
        self.conv9 = nn.Conv2d(64, 32, kernel_size=3, padding=1)
        self.dropout4 = nn.Dropout2d(p=dropout_rate)

        self.conv10 = nn.Conv2d(32, 1, kernel_size=3, padding=1)

        # Forward

    def forward(self, x):
        # Encoder
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.relu(self.bn2(self.conv2(x)))
        x = self.relu(self.conv3(x))
        x = self.dropout1(x)
        x = self.pool1(x)

        x = self.relu(self.bn3(self.conv4(x)))
        x = self.relu(self.bn4(self.conv5(x)))
        x = self.relu(self.conv6(x))
        x = self.dropout2(x)
        x = self.pool2(x)

        # Decoder
        x = self.relu(self.bn5(self.upconv1(x)))
        x = self.relu(self.conv7(x))
        x = self.relu(self.bn6(self.conv8(x)))
        x = self.dropout3(x)
        x = self.relu(self.bn7(self.upconv2(x)))
        x = self.relu(self.conv9(x))
        x = self.dropout4(x)
        x = self.relu(self.conv10(x))
        return x
"""

## Test and Train Model


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Instantiate the model, loss function, and optimizer
model = SuperResolutionModel().to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
#optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-5)

epoch_numbers = []
psnr_values = []
ssim_values = []

#training
def image_up_train(model, criterion, optimizer, num_epochs):
    """
    Compares the results with the original 28x28 MNISt dataset
    """
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        for (images, _), (images_up, _ ) in zip(train_loader, train_loader_up):
            images = images.to(device)
            images_up = images.to(device)

            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward + backward + optimize
            outputs = model(images_up)
            if outputs.shape != images.shape:
                raise ValueError(f"Output shape {outputs.shape} does not match target shape {images.shape}")
            loss = criterion(outputs, images)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        print(f'Epoch {epoch + 1}: Loss {running_loss / len(train_loader_up)}')

        PSNR_value, SSIM_value = PSNR_SSIM_Value(model, test_loader, test_loader_up)

        epoch_numbers.append(epoch + 1)
        psnr_values.append(PSNR_value)
        ssim_values.append(SSIM_value)


##Helpful Functions


In [None]:
import matplotlib.pyplot as plt


def validate(model, loader):
    model.eval()  # Set the model to evaluation mode
    total_loss = 0.0
    criterion = nn.MSELoss()
    with torch.no_grad():  # No need to track gradients
        for images, _ in loader:
            images = images.to(device)

            outputs = model(images)
            loss = criterion(outputs, images)
            total_loss += loss.item()

    avg_loss = total_loss / len(loader)
    print(f'Validation Loss: {avg_loss}')
    return avg_loss




def PSNR_SSIM_Value(model, loader, loader_up):
    model.eval()
    with torch.inference_mode():
        for (images, _), (images_up, _) in zip(loader, loader_up):
            images = images.to(device)
            images_up = images_up.to(device)
            #print(images.shape)
            outputs = model(images_up)
            #print(outputs.shape)
            PSNR_value = psnr(outputs, images)
            SSIM_value = ssim(outputs, images)
            print(PSNR_value)
            print(SSIM_value)
            break
    return  PSNR_value.item(), SSIM_value.item()

## Visualize

In [None]:
def visualize_results_CNN(model, loader, loader_down, loader_up, criterion, optimizer, num_epochs):

    image_up_train(model, criterion, optimizer, num_epochs)

    # Plot the epoch numbers vs. PSNR and SSIM
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.plot(epoch_numbers, psnr_values, label='PSNR')
    print(epoch_numbers)
    print(psnr_values)
    plt.xlabel('Epoch Number')
    plt.ylabel('PSNR')
    plt.title('Epoch Number vs. PSNR')
    plt.grid(True)

    plt.subplot(1, 2, 2)
    plt.plot(epoch_numbers, ssim_values, label='SSIM')
    print(ssim_values)
    plt.xlabel('Epoch Number')
    plt.ylabel('SSIM')
    plt.title('Epoch Number vs. SSIM')
    plt.grid(True)




    model.eval()
    with torch.no_grad():
        for (images, _), (images_down, _), (images_up, _) in zip(loader, loader_down, loader_up):

            images = images.to(device)
            images_down = images_down.to(device)
            images_up = images_up.to(device)
            output = model(images_up)

            print(images.shape)
            print(images_down.shape)
            print(images_up.shape)
            print(output.shape)
            # print(transform_down)

            # Convert first 4 images in the batch from PyTorch tensors to NumPy arrays
            original_images = images[:5].cpu().numpy()
            seven_images = images_down[:5].cpu().numpy()
            twenty_eight_images = images_up[:5].cpu().numpy()
            reconstructed_images = output[:5].cpu().numpy()

            # Plotting
            fig, axes = plt.subplots(4, 5, figsize=(12, 9))
            for i in range(5):
                ax = axes[0][i]
                ax.imshow(original_images[i][0], cmap='gray', interpolation='none')
                ax.title.set_text('Original Image')
                ax.axis('off')

                ax = axes[1][i]
                ax.imshow(seven_images[i][0], cmap='gray', interpolation='none')
                ax.title.set_text('7x7 Images')
                ax.axis('off')

                ax = axes[2][i]
                ax.imshow(twenty_eight_images[i][0], cmap='gray', interpolation='none')
                ax.title.set_text('28x28 Images')
                ax.axis('off')

                ax = axes[3][i]
                ax.imshow(reconstructed_images[i][0], cmap='gray', interpolation='none')
                ax.title.set_text('Result Images')
                ax.axis('off')

            plt.show()
            break  # Only show one batch of images

visualize_results_CNN(model, test_loader, test_loader_down, test_loader_up, criterion, optimizer, num_epochs=30)
