In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import random
import pytorch_lightning as pl
from torchmetrics import Accuracy
from torchvision.transforms import ToTensor
import torch.nn.functional as F
from pytorch_lightning.callbacks import ModelCheckpoint
import matplotlib.image as img
from PIL import Image

In [None]:
class MyDataset(Dataset):

    def __init__(self, csv_file):
        self.df = pd.read_csv(csv_file, delimiter=",", header = None)
        
    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        entry = self.df.iloc[index]
        image_in_1D = torch.from_numpy(entry[1:].to_numpy())
        image_in_3D = torch.reshape(image_in_1D, (1, 28, 28))
        image = image_in_3D / 255
        label = torch.tensor(entry[0])
        return image, label

In [None]:
train_dataset = MyDataset("mnist_train.csv")
test_dataset = MyDataset("mnist_test.csv")
train_dataloader = DataLoader(train_dataset, batch_size = 100, shuffle = True) 
test_dataloader = DataLoader(test_dataset, batch_size = 100, shuffle = False) 

In [None]:
# This is almost exactly the class 'EncoderModel' from autoencoder model.ipynb
# With the addition of a pooling layer

class EncoderModel(nn.Module):
    
    def __init__(self):
        
        super().__init__()
        
        self.conv1 = nn.Conv2d(1, 28, kernel_size=(3,3), stride=1, padding=1)
        self.act1 = nn.ReLU()
        self.drop1 = nn.Dropout(0.3)
        
        self.conv2 = nn.Conv2d(28, 28, kernel_size=(3,3), stride=1, padding=1)
        self.act2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=(2, 2), return_indices=True)
        
        self.flat = nn.Flatten()
        
        self.fc3 = nn.Linear(5488, 512)
        self.act3 = nn.ReLU()
        self.drop3 = nn.Dropout(0.5)
        
        self.fc4 = nn.Linear(512, 10)

    def forward(self, x):
        
        # input 1x28x28, output 28x28x28
        x = self.act1(self.conv1(x))
        x = self.drop1(x)     
        
        # input 28x28x28, output 28x14x14
        x = self.act2(self.conv2(x))
        x, indices = self.pool2(x)
        
        # input 28x14x14, output 5488
        x = self.flat(x)
        
        # input 5488, output 512
        x = self.act3(self.fc3(x))
        x = self.drop3(x)
        
        # input 512, output 10
        x = self.fc4(x)
        
        return x, indices

In [None]:
# Undoing everything done above

class DecoderModel(nn.Module):

    def __init__(self):
        
        super().__init__()
        
        self.fc1 = nn.Linear(10, 512)
        self.act1 = nn.ReLU()
        self.drop1 = nn.Dropout(0.3)

        self.fc2 = nn.Linear(512, 5488)
        self.act2 = nn.ReLU()

        self.unflat = nn.Unflatten(1,(28,14,14))

        self.unpool3 = nn.MaxUnpool2d(kernel_size=(2,2), stride=2)
        self.deconv3 = nn.ConvTranspose2d(28, 28, kernel_size=(3,3), stride=1, padding=1)
        self.act3 = nn.ReLU()
        self.drop3 = nn.Dropout(0.5)

        self.deconv4 = nn.ConvTranspose2d(28, 1, kernel_size=(3,3), stride=1, padding=1)

    def forward(self, x, indices):

        # input 10, output 512
        x = self.act1(self.fc1(x))
        x = self.drop1(x)
        
        # input 512, output 5488
        x = self.act2(self.fc2(x))

        # input 5488, output 28x14x14
        x = self.unflat(x)
        
        # input 28x14x14, output 28x28x28
        x = self.unpool3(x, indices)
        x = self.act3(self.deconv3(x))
        x = self.drop3(x)
        
        # input 28x28x28, output 1x28x28
        x = self.deconv4(x)
        
        return x

In [None]:
# https://lightning.ai/docs/pytorch/stable/notebooks/course_UvA-DL/08-deep-autoencoders.html

class Autoencoder(pl.LightningModule):

    def __init__(self):
        super().__init__()
        self.encoder = EncoderModel()
        self.decoder = DecoderModel()
        self.loss_fn = nn.MSELoss(reduction="none")
        
    def forward(self, x):
        z, indices = self.encoder(x)
        x_hat = self.decoder(z, indices)
        return x_hat

    def training_step(self, batch, batch_idx):
        images, _ = batch
        autoencoded_images = self(images)
        loss = self.loss_fn(autoencoded_images, images)
        loss = loss.sum(dim=[1, 2, 3]).mean(dim=[0])
        self.log("train_loss", loss, on_epoch=True)
        return loss   
    
    def validation_step(self, batch, batch_idx):
        images, _ = batch
        autoencoded_images = self(images)
        loss = self.loss_fn(autoencoded_images, images)
        loss = loss.sum(dim=[1, 2, 3]).mean(dim=[0])
        self.log("val_loss", loss, on_epoch=True)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
        return optimizer

In [None]:
# https://www.youtube.com/watch?v=9Vc7tTWZark&list=PLaMu-SDt_RB6b4Z_kOUAlT0KI6jTMCKPL&index=2
    
# callbacks = [ModelCheckpoint(save_top_k=-1, mode="min", monitor="val_loss")]
# lightning_model = Autoencoder()
# trainer = pl.Trainer(max_epochs=5, callbacks=callbacks)
# trainer.fit(model=lightning_model, train_dataloaders=train_dataloader, val_dataloaders=test_dataloader)

In [None]:
lightning_model = Autoencoder.load_from_checkpoint("lightning_logs/version_25/checkpoints/epoch=4-step=3000.ckpt")

In [None]:
# plotting accuracy, loss curves from logged data

metrics = pd.read_csv("lightning_logs/version_25/metrics.csv")

train_loss = np.zeros(5)
val_loss = np.zeros(5)

for i in range(5):
    train_loss[i] = metrics.iloc[13+(14*i)].iloc[2].item()
    val_loss[i] = metrics.iloc[12+(14*i)].iloc[-1].item()

print(train_loss)
print(val_loss)

epochs = np.array([1,2,3,4,5])

plt.plot(epochs, train_loss, label="train_loss")
plt.plot(epochs, val_loss, label="val_loss")
plt.title("Training vs validation loss during first 5 epochs")
plt.legend()
plt.show()

In [None]:
# checking the results

samples = random.sample(list(range(0, 60000)), 10)

image_label = []
autoencoded_loss = np.zeros(10)
counter = 0

# data = [['MNIST image number ', 'Digit represented ', 'Average MSE loss for this image'],]

for i in samples:    
    image, label = train_dataset[i]
    image_label.append(label.item())
    image = torch.reshape(image, (28, 28))
    plt.imshow(image, cmap="grey")
    # plt.savefig(f"MNIST_image_number_{i}.png")
    plt.title(f"Real image number {i}")
    plt.show()
    image_hat = torch.reshape(image, (1, 1, 28, 28))
    autoencoded_image = lightning_model(image_hat)
    autoencoded_image = torch.reshape(autoencoded_image, (28, 28))
    loss_fn = nn.MSELoss()
    loss = loss_fn(autoencoded_image, image)
    autoencoded_loss[counter] = loss
    plt.imshow(autoencoded_image.detach().numpy(), cmap="grey")
    # plt.savefig(f"Autoencoded_MNIST_image_number_{i}.png")
    plt.title(f"Autoencoded image number {i}")
    plt.show()
    print(f"MSE Loss averaged over image = {loss}")
    # entry = [f"{i} ",f"{label} ",f"{loss.item()}"]
    # data.append(entry)
    counter += 1

# csv_file_path = 'testing_autoencoder_with_pooling_on_MNIST.csv'
# df = pd.DataFrame(data)
# df.to_csv(csv_file_path, index=False)

In [None]:
print(f"What pictures contained: {image_label}")
print(f"MSE loss: {np.round(autoencoded_loss, 4)}")
print(f"Overall mean loss: {np.round(np.mean(autoencoded_loss), 4)}")

In [None]:
# comparing to non-digit test images

test_image_label = ["A", "C", "E", "H", "L", "Q", "W", "Y", "%", "&"]
test_autoencoded_loss = np.zeros(10)
counter = 0

# new_data = [['Letter ',' Autoencoder MSE loss'],]

for i in test_image_label:
    image = Image.open(f"test_images/{i}.png").convert('L')
    image = np.array(image)
    plt.imshow(image, cmap="grey")
    # plt.savefig(f"Original_image_of_{i}.png")
    plt.title(f"Original image of {i}")
    plt.show()
    image = torch.tensor(image).float()
    image_hat = torch.reshape(image, (1, 1, 28, 28))
    autoencoded_image = lightning_model(image_hat)
    autoencoded_image = torch.reshape(autoencoded_image, (28, 28))
    loss_fn = nn.MSELoss()
    loss = loss_fn(autoencoded_image, image)
    test_autoencoded_loss[counter] = loss
    plt.imshow(autoencoded_image.detach().numpy(), cmap="grey")
    # plt.savefig(f"Autoencoded_image_of_{i}.png")
    plt.title(f"Autoencoded image of {i}")
    plt.show()
    print(f"MSE Loss averaged over image = {loss}")
    # entry = [f"{i} ",f" {loss}"]
    # new_data.append(entry)
    counter += 1

# csv_file_path = 'testing_autoencoder_with_pooling_on_letters.csv'
# new_df = pd.DataFrame(new_data)
# df.to_csv(csv_file_path, index=False)

In [None]:
print(f"What pictures contained: {test_image_label}")
print(f"MSE loss > {np.floor(test_autoencoded_loss)}")
print(f"Overall mean loss > {np.floor(np.mean(test_autoencoded_loss))}")