In [2]:
import cv2
import matplotlib.pyplot as plt
import random
import zipfile
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from PIL import Image
import unittest
import time
from torch.autograd import Variable
from scipy.stats import shapiro
from tqdm import tqdm

In [None]:

image_dir = 'images path'

print("Contents of the 'data' directory:")
print(os.listdir(image_dir))

image_files = [f for f in os.listdir(image_dir) if f.endswith(('png', 'jpg', 'jpeg'))]

num_images = len(image_files)
print(f'Number of images: {num_images}')

if num_images > 0:
    first_image_path = os.path.join(image_dir, image_files[0])
    first_image = cv2.imread(first_image_path)

    print(f'First image shape: {first_image.shape}')

    def plot_random_images(image_files, num_images=10):
        plt.figure(figsize=(15, 8))
        random_files = random.sample(image_files, num_images)
        for i, file in enumerate(random_files):
            img_path = os.path.join(image_dir, file)
            img = cv2.imread(img_path)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            plt.subplot(2, 5, i + 1)
            plt.imshow(img)
            plt.axis('off')
            plt.title(file)
        plt.show()

    plot_random_images(image_files, num_images=10)
else:
    print("No images found in the specified directory.")

transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
])

class CustomImageDataset(torch.utils.data.Dataset):
    def __init__(self, image_files, transform=None):
        self.image_files = image_files
        self.transform = transform

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_path = os.path.join(image_dir, self.image_files[idx])
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image


custom_data = CustomImageDataset(image_files, transform=transform)
data_loader = DataLoader(custom_data, batch_size=64, shuffle=True)

In [None]:
H_DIM = 128
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class VAE(nn.Module):
    def __init__(self, h_dim=H_DIM):
        super(VAE, self).__init__()

        #Encoder Convs
        #TODO: Write the encoder conv layers
        # Write 3 convolutional layers with increasing number of filters


        self.flatten = nn.Flatten()

        # Note: ensure these dims match your input size after convs
        self.fc_mu = nn.Linear(32 * 32 * 128, h_dim)
        self.fc_logvar = nn.Linear(32 * 32 * 128, h_dim)


        self.fc_decode = nn.Linear(h_dim, 32 * 32 * 128)


        # Decoder
        self.deconv1 = nn.ConvTranspose2d(128, 64, kernel_size=4, padding=1, stride=2)
        self.deconv2 = nn.ConvTranspose2d(64, 32, kernel_size=4, padding=1, stride=2)
        self.deconv3 = nn.ConvTranspose2d(32, 3,kernel_size=4, padding=1, stride=2)


    def reparameterize(self, mu, logvar):
        # TODO: "Sample epsilon from standard normal, compute std from logvar, and return z = mu + std * eps"
        return


    def sample(self, num_samples, device='cpu'):
        self.eval()

        # TODO: Sample from standard

        with torch.no_grad():
            # Generate images from sampled z
            # decode the sampled z to images
            z = F.relu(self.fc_decode(z))
            z = z.view(-1, 128, 32, 32)
            z = F.relu(self.deconv1(z))
            z = F.relu(self.deconv2(z))

            generated_images = torch.sigmoid(self.deconv3(z))

        return generated_images

    def forward(self, x):

        # Encoder 
        
        #TODO: Write the forward pass through the encoder conv layers

        x = self.flatten(x)
        mu = self.fc_mu(x)
        logvar = self.fc_logvar(x)

        # sample z
        z = self.reparameterize(mu, logvar)

        # Decoder
        x = F.relu(self.fc_decode(z))
        x = x.view(-1, 128, 32, 32)
        x = F.relu(self.deconv1(x))
        x = F.relu(self.deconv2(x))
        x = torch.sigmoid(self.deconv3(x))
        return x, mu, logvar


In [4]:

#TODO: implement the loss function
def loss_function(recon_x, x, mu, logvar):
    # implement BCE + KLD loss, complete the BCE and KLD calculations
    BCE = None
    KLD = None

    return BCE + KLD, BCE, KLD

In [5]:

#TODO: complete the training function
def train(epoch, model, optimizer, train_loader, history):
    model.train()
    train_loss = 0
    recon_loss_total = 0
    kld_loss_total = 0

    for batch_idx, data in enumerate(train_loader):
        data = data.to(DEVICE)
        data = data.float()
        optimizer.zero_grad()


        #TODO: use model for reconstruction and get mu, logvar and calculate loss


        loss.backward()
        train_loss += loss.item()
        recon_loss_total += recon_loss.item()
        kld_loss_total += kld_loss.item()

        optimizer.step()

    avg_loss = train_loss / len(train_loader.dataset)
    avg_recon = recon_loss_total / len(train_loader.dataset)
    avg_kld = kld_loss_total / len(train_loader.dataset)


    history['total_loss'].append(avg_loss)
    history['recon_loss'].append(avg_recon)
    history['kld_loss'].append(avg_kld)

    print(f'====> Epoch: {epoch} Average Loss: {avg_loss:.4f} (BCE: {avg_recon:.4f}, KL: {avg_kld:.4f})')

In [None]:
#TODO: configure training parameters

EPOCHS = None
LR = None

if __name__ == '__main__':
    model = VAE().to(DEVICE)
    optimizer = optim.Adam(model.parameters(), lr=LR)
    history = {'total_loss': [], 'recon_loss': [], 'kld_loss': []}

    print("Start training...")
    for epoch in range(1, EPOCHS + 1):
        train(epoch, model, optimizer, data_loader, history)

    print("Finish training.")

In [None]:
# Example of retrieving a random sample from the dataset
import random

dataset = data_loader.dataset 

random_index = random.randint(0, len(dataset) - 1)

num_samples = 5
indices = random.sample(range(len(dataset)), num_samples)

sampled_images = [dataset[i] for i in indices]

In [None]:
#Visualize the test_data image
plt.imshow(np.transpose(sampled_images[0].cpu().numpy(), (1,2,0)))


In [None]:
#TODO: Generate reconstructions for the sampled images

    





In [None]:
#TODO: Generate new images by sampling from the latent space
