# Install libraries

# Imports

In [1]:
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions.normal import Normal
from dataclasses import dataclass
from torchvision import transforms
from PIL import Image
import torch.optim as optim
import matplotlib.pyplot as plt
import torchvision
import numpy as np
import tqdm

# Unzip

In [9]:
!unzip "/content/drive/MyDrive/Bachelor's Project/data/original/redfin_images.zip"

Archive:  /content/drive/MyDrive/Bachelor's Project/data/original/redfin_images.zip
replace redfin_images/186132643_5.jpg? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

# Config

In [10]:
@dataclass
class Config:
  image_size: int
  embedding_size: int
  shape_before_flattening: int
  device: str
  epochs: int
  batch_size: int
  lr: float

In [29]:
config = Config(
    image_size = 512,
    embedding_size = 128,
    shape_before_flattening = (128, 64, 64),
    device = 'cpu',
    epochs = 5,
    batch_size = 4,
    lr = 1e-5
)

# Preprocessing

In [30]:
class DataProcessor:

  def __init__(self, config):
    self.config = config
    self.transformer = self.__get_image_transformer()

  def __get_image_transformer(self):
    transformer = transforms.Compose([
        transforms.Resize((config.image_size, config.image_size)),
        transforms.Grayscale(),
        transforms.ToTensor(),
    ])
    return transformer

  def process(self, input_image):
    input_image = self.transformer(input_image)
    return input_image

# Model Architecture

In [31]:
def vae_gaussian_kl_loss(mu, logvar):
    # see Appendix B from VAE paper:
    # Kingma and Welling. Auto-Encoding Variational Bayes. ICLR, 2014
    # https://arxiv.org/abs/1312.6114
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp(), dim=1)
    return KLD.mean()

def reconstruction_loss(x_reconstructed, x):
    # Assuming that the final layer of the decoder uses a sigmoid activation
    bce_loss = nn.BCELoss()
    # Flatten the inputs for BCELoss
    x_reconstructed_flat = x_reconstructed.view(x_reconstructed.size(0), -1)
    x_flat = x.view(x.size(0), -1)
    return bce_loss(x_reconstructed_flat, x_flat)

def vae_loss(y_pred, y_true):
    mu, logvar, recon_x = y_pred
    recon_loss = reconstruction_loss(recon_x, y_true)
    kld_loss = vae_gaussian_kl_loss(mu, logvar)
    return 5_000 * recon_loss + kld_loss

In [32]:
class Sampling(nn.Module):
    def forward(self, z_mean, z_log_var):
        # get the shape of the tensor for the mean and log variance
        batch, dim = z_mean.shape
        # generate a normal random tensor (epsilon) with the same shape as z_mean
        # this tensor will be used for reparameterization trick
        epsilon = Normal(0, 1).sample((batch, dim)).to(z_mean.device)
        # apply the reparameterization trick to generate the samples in the
        # latent space
        return z_mean + torch.exp(0.5 * z_log_var) * epsilon

In [35]:
class Encoder(nn.Module):
    def __init__(self, image_size, embedding_dim):
        super(Encoder, self).__init__()
        self.conv1 = nn.Conv2d(1, 128, 3, stride=2, padding=1)
        self.bn1 = nn.BatchNorm2d(128)  # BatchNorm layer
        self.conv2 = nn.Conv2d(128, 128, 3, stride=2, padding=1)
        self.bn2 = nn.BatchNorm2d(128)  # BatchNorm layer
        self.conv3 = nn.Conv2d(128, 128, 3, stride=2, padding=1)
        self.bn3 = nn.BatchNorm2d(128)  # BatchNorm layer
        self.flatten = nn.Flatten()
        self.fc_mean = nn.Linear(128 * (image_size // 8) * (image_size // 8), embedding_dim)
        self.fc_log_var = nn.Linear(128 * (image_size // 8) * (image_size // 8), embedding_dim)

        self.sampling = Sampling()

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = self.flatten(x)
        z_mean = self.fc_mean(x)
        z_log_var = self.fc_log_var(x)
        z = self.sampling(z_mean, z_log_var)
        return z_mean, z_log_var, z


In [36]:
class Decoder(nn.Module):
    def __init__(self, embedding_dim, shape_before_flattening):
        super(Decoder, self).__init__()
        self.fc = nn.Linear(embedding_dim, shape_before_flattening[0] * shape_before_flattening[1] * shape_before_flattening[2])
        self.reshape = lambda x: x.view(-1, *shape_before_flattening)
        self.deconv1 = nn.ConvTranspose2d(128, 128, 3, stride=2, padding=1, output_padding=1)
        self.bn1 = nn.BatchNorm2d(128)
        self.deconv2 = nn.ConvTranspose2d(128, 128, 3, stride=2, padding=1, output_padding=1)
        self.bn2 = nn.BatchNorm2d(128)
        self.deconv3 = nn.ConvTranspose2d(128, 1, 3, stride=2, padding=1, output_padding=1)

    def forward(self, x):
        x = self.fc(x)
        x = self.reshape(x)
        x = F.relu(self.bn1(self.deconv1(x)))
        x = F.relu(self.bn2(self.deconv2(x)))
        x = torch.sigmoid(self.deconv3(x))  # Sigmoid activation for the final layer
        return x


In [37]:
class VAE(nn.Module):
    def __init__(self, encoder, decoder):
        super(VAE, self).__init__()
        # initialize the encoder and decoder
        self.encoder = encoder
        self.decoder = decoder
    def forward(self, x):
        # pass the input through the encoder to get the latent vector
        z_mean, z_log_var, z = self.encoder(x)
        # pass the latent vector through the decoder to get the reconstructed
        # image
        reconstruction = self.decoder(z)
        # return the mean, log variance and the reconstructed image
        return z_mean, z_log_var, reconstruction

# Dataset and Dataloader

In [38]:
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms
import glob
import os

In [39]:
image_dir = '/content/redfin_images'

In [40]:
class ImageFolderDataset(Dataset):

    def __init__(self, image_dir, config):
        self.image_dir = image_dir
        self.processor = DataProcessor(config)
        self.image_paths = glob.glob(os.path.join(image_dir, '*.jpg'))

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx])

        model_input = self.processor.process(image)

        return model_input

# Define variables

## Eval Function

In [41]:
def show_images(images, title="Images"):
    """Display a batch of images"""
    images = torchvision.utils.make_grid(images, nrow=5, normalize=True)
    plt.figure(figsize=(15, 15))
    plt.imshow(images.permute(1, 2, 0))
    plt.title(title)
    plt.axis('off')
    plt.show()

## Training Parameters

In [43]:
encoder = Encoder(config.image_size, config.embedding_size).to(config.device)
decoder = Decoder(config.embedding_size, config.shape_before_flattening).to(config.device)
model = VAE(encoder, decoder)

In [44]:
dataset = ImageFolderDataset(image_dir=image_dir, config=config)

In [45]:
val_split = int(np.floor(0.01 * dataset.__len__()))
train_split = dataset.__len__() - val_split
train_dataset, val_dataset = random_split(dataset, [train_split, val_split])

In [47]:
shuffle = True
num_workers = 2

train_dataloader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=config.batch_size, shuffle=False)

In [48]:
optimizer = optim.Adam(
    list(encoder.parameters()) + list(decoder.parameters()), lr=config.lr
)
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95)

## Training Loop

In [49]:
print_steps = 250

In [1]:
train_losses = []
val_losses = []


for epoch in tqdm.notebook.tqdm(range(config.epochs), desc='Epoch'):
    model.train()
    running_loss = 0.0
    for batch_idx, data in tqdm.notebook.tqdm(enumerate(train_dataloader), total=len(train_dataloader), leave=False, desc='Train Batch'):
        data = data.to(config.device)
        optimizer.zero_grad()
        pred = model(data)
        loss = vae_loss(pred, data)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

        if (batch_idx + 1) % print_steps == 0:
            # Calculate average training loss
            avg_train_loss = running_loss / print_steps
            train_losses.append(avg_train_loss)

            model.eval()
            val_running_loss = 0.0
            with torch.no_grad():
                for val_data in val_dataloader:
                    val_data = val_data.to(config.device)
                    val_pred = model(val_data)
                    val_loss = vae_loss(val_pred, val_data)
                    val_running_loss += val_loss.item()
                avg_val_loss = val_running_loss / len(val_dataloader)
                val_losses.append(avg_val_loss)

                val_batch = next(iter(val_dataloader))
                val_batch = val_batch.to(config.device)
                val_pred = model(val_batch)
                recon_images = val_pred[2]
                show_images(recon_images.cpu(), title=f"Reconstruction at Epoch {epoch+1}, Step {batch_idx+1}")

            print(f"Epoch [{epoch+1}/{config.epochs}], Step [{batch_idx+1}/{len(train_dataloader)}], "
                  f"Avg Train Loss: {avg_train_loss:.4f}, Avg Val Loss: {avg_val_loss:.4f}")

            # Reset running loss and switch back to training mode
            running_loss = 0.0
            model.train()


In [2]:
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Print Steps')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [4]:
images = next(iter(val_dataloader))
mean = torch.tensor([0.485, 0.456, 0.406])
std = torch.tensor([0.229, 0.224, 0.225])
images = images * std[:, None, None] + mean[:, None, None]

# Display images
show_images(images, title="Sample Training Images")

In [3]:
images = model(next(iter(val_dataloader)).to('cuda'))[2].cpu()

mean = torch.tensor([0.485, 0.456, 0.406])
std = torch.tensor([0.229, 0.224, 0.225])
images = images * std[:, None, None] + mean[:, None, None]

show_images(images, title="Sample Training Images")

In [None]:
def display_decoder_output(model, data_loader, device='cpu', n_images=5):
    """Display original and reconstructed images from the model's decoder.

    Args:
        model: The VAE model which includes the decoder.
        data_loader: DataLoader for providing batches of images.
        device: The device (CPU or GPU) model is running on.
        n_images: Number of images to display.
    """
    model.eval()  

    images = next(iter(data_loader))
    images = images.to(device)[:n_images]  # Select a subset of images for display

    with torch.no_grad():
        _, _, reconstructions = model(images)

    images = images.cpu()
    reconstructions = reconstructions.cpu()

    images = (images + 1) / 2
    reconstructions = (reconstructions + 1) / 2

    show_images(images, title="Original Images")

    show_images(reconstructions, title="Reconstructed Images")

In [None]:
display_decoder_output(model, train_dataloader, device=config.device, n_images=5)