<a href="https://colab.research.google.com/github/Yousif-A2/Conv_AutoEncoder_Pytorch/blob/main/Part2_Yousif_Alnasser_Conv_AutoEncoder_Pytorch_CelebA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [58]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
import os
import random
import time

In [59]:
import torch
import torch.nn as nn
from torchvision import datasets
from torchvision import transforms
from torch.utils.data import DataLoader
from torch.utils.data import SubsetRandomSampler
from torch.utils.data import sampler
import torch.nn.functional as F

In [60]:

os.environ['CUBLAS_WORKSPACE_CONFIG'] = ':4096:8'  # or ':16:8'

In [61]:
cuda_device_num = 0
cuda_device = torch.device(f'cuda:{cuda_device_num}' if torch.cuda.is_available() else 'cpu')
print('Device:', cuda_device)

Device: cuda:0


In [62]:

if torch.cuda.is_available():
  # Useful for reproducibility as it ensures that the same algorithms are used across runs.
  torch.backends.cudnn.benchmark = False
  # Ensures that the cuDNN operations are deterministic and, therefore, the same input will produce the same output every time
  torch.backends.cudnn.deterministic = True
# Ensures that operations in PyTorch will behave deterministically
torch.use_deterministic_algorithms(True)

In [63]:
my_seed = 101
os.environ["PL_GLOBAL_SEED"] = str(my_seed)
random.seed(my_seed)
np.random.seed(my_seed)
torch.manual_seed(my_seed)
torch.cuda.manual_seed_all(my_seed)

In [64]:
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets download -d jessicali9530/celeba-dataset

Dataset URL: https://www.kaggle.com/datasets/jessicali9530/celeba-dataset
License(s): other
celeba-dataset.zip: Skipping, found more recently modified local copy (use --force to force download)


In [65]:
import zipfile

with zipfile.ZipFile('celeba-dataset.zip', 'r') as zip_ref:
    zip_ref.extractall('./data')

In [66]:
from torch.utils.data import random_split, DataLoader
from torchvision import transforms, datasets

# Set a fixed random seed for reproducibility
torch.manual_seed(42)

# Define transforms
transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])

# Load dataset
dataset = datasets.ImageFolder(root='data/img_align_celeba', transform=transform)

# Split sizes
total_len = len(dataset)
train_len = int(0.7 * total_len)
val_len = int(0.15 * total_len)
test_len = total_len - train_len - val_len  # 15%

# Split the dataset
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_len, val_len, test_len])

# Dataloaders
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
valid_loader = DataLoader(val_dataset, batch_size=128, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)


In [67]:
lr_rate = 0.0005
batch_size = 32
num_epochs = 20

In [68]:
# Dataset Sanity check 1:
print('Training Set:\n')
for images, labels in train_loader:
    print('Image batch dimensions:', images.size())
    print('Image label dimensions:', labels.size())
    print(labels[:10])
    break

# Dataset Sanity check 2:
print('\nValidation Set:')
for images, labels in valid_loader:
    print('Image batch dimensions:', images.size())
    print('Image label dimensions:', labels.size())
    print(labels[:10])
    break

# Dataset Sanity check 3:
print('\nTesting Set:')
for images, labels in test_loader:
    print('Image batch dimensions:', images.size())
    print('Image label dimensions:', labels.size())
    print(labels[:10])
    break

Training Set:

Image batch dimensions: torch.Size([128, 3, 64, 64])
Image label dimensions: torch.Size([128])
tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

Validation Set:
Image batch dimensions: torch.Size([128, 3, 64, 64])
Image label dimensions: torch.Size([128])
tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

Testing Set:
Image batch dimensions: torch.Size([128, 3, 64, 64])
Image label dimensions: torch.Size([128])
tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])


In [69]:
# Helper Class 1: Reshape is used to reshape the embeddings to the correct image size
class Reshape(nn.Module):
    def __init__(self, *args):
        super().__init__()
        self.shape = args

    def forward(self, x):
        return x.view(self.shape)

In [70]:
# Helper Class 2: Trim is used to remove the excess pixels at the output of the decoder block (28x28 as MNIST size)
class Trim(nn.Module):
    def __init__(self, *args):
        super().__init__()

    def forward(self, x):
        return x[:, :, :28, :28]

In [71]:
class Conv_AutoEncoder(nn.Module):
    def __init__(self, embedding_size):
        super().__init__()
        self.embedding_size = embedding_size

        # Encoder (input: 3x64x64)
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),      # 32x64x64
            nn.LeakyReLU(0.01),
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),  # 64x32x32
            nn.LeakyReLU(0.01),
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1), # 128x16x16
            nn.LeakyReLU(0.01),
            nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1),# 256x8x8
            nn.LeakyReLU(0.01),
            nn.Flatten(),
            nn.Linear(256 * 8 * 8, embedding_size)  # Adjust for input size
        )

        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(embedding_size, 256 * 8 * 8),
            Reshape(-1, 256, 8, 8),
            nn.ConvTranspose2d(256, 128, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.LeakyReLU(0.01),
            nn.ConvTranspose2d(128, 64, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.LeakyReLU(0.01),
            nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.LeakyReLU(0.01),
            nn.Conv2d(32, 3, kernel_size=3, padding=1),
            nn.Tanh()  # Use Tanh for [-1,1] normalization
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [72]:
model = Conv_AutoEncoder(embedding_size = 2)
print(model)
model.to(cuda_device)

optimizer = torch.optim.Adam(model.parameters(), lr = lr_rate)

Conv_AutoEncoder(
  (encoder): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): LeakyReLU(negative_slope=0.01)
    (2): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (3): LeakyReLU(negative_slope=0.01)
    (4): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (5): LeakyReLU(negative_slope=0.01)
    (6): Conv2d(128, 256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (7): LeakyReLU(negative_slope=0.01)
    (8): Flatten(start_dim=1, end_dim=-1)
    (9): Linear(in_features=16384, out_features=2, bias=True)
  )
  (decoder): Sequential(
    (0): Linear(in_features=2, out_features=16384, bias=True)
    (1): Reshape()
    (2): ConvTranspose2d(256, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
    (3): LeakyReLU(negative_slope=0.01)
    (4): ConvTranspose2d(128, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
    (5): LeakyReLU(negat

In [73]:
def compute_epoch_loss_autoencoder(model, data_loader, loss_fn, device):
    model.eval()
    curr_loss, num_examples = 0., 0
    with torch.no_grad():
        for features, _ in data_loader:
            features = features.to(device)
            logits = model(features)
            loss = loss_fn(logits, features, reduction = 'sum')
            num_examples += features.size(0)
            curr_loss += loss

        curr_loss = curr_loss / num_examples
        return curr_loss

In [74]:
def train_autoencoder_v1(num_epochs, model, optimizer, device,
                         train_loader, loss_fn = None,
                         logging_interval = 100,
                         skip_epoch_stats = False,
                         save_model = None):

    log_dict = {'train_loss_per_batch': [],
                'train_loss_per_epoch': []}

    if loss_fn is None:
        loss_fn = F.mse_loss

    start_time = time.time()
    for epoch in range(num_epochs):

        model.train()
        for batch_idx, (features, _) in enumerate(train_loader):

            features = features.to(device)

            # Forward and backpropagation ste[s]
            logits = model(features)
            loss = loss_fn(logits, features)
            optimizer.zero_grad()

            loss.backward()

            # Model weights' update
            optimizer.step()

            # Performance logging
            log_dict['train_loss_per_batch'].append(loss.item())

            if not batch_idx % logging_interval:
                print('Epoch: %03d/%03d | Batch %04d/%04d | Loss: %.4f'
                      % (epoch+1, num_epochs, batch_idx,
                          len(train_loader), loss))

        if not skip_epoch_stats:
            model.eval()

            with torch.set_grad_enabled(False):  # Will help save memory during inference

                train_loss = compute_epoch_loss_autoencoder(
                    model, train_loader, loss_fn, device)
                print('***Epoch: %03d/%03d | Loss: %.3f' % (
                      epoch+1, num_epochs, train_loss))
                log_dict['train_loss_per_epoch'].append(train_loss.item())

        print('Time elapsed: %.2f min' % ((time.time() - start_time)/60))

    print('Total Training Time: %.2f min' % ((time.time() - start_time)/60))
    if save_model is not None:
        torch.save(model.state_dict(), save_model)

    return log_dict

In [None]:
log_dict_2e = train_autoencoder_v1(num_epochs = num_epochs, model = model,
                                optimizer = optimizer, device = cuda_device,
                                train_loader = train_loader,
                                skip_epoch_stats = True,
                                logging_interval = 250)

Epoch: 001/020 | Batch 0000/1108 | Loss: 0.3599


In [None]:
torch.save(model.state_dict(), '/content/conv_ae_model.pth')

In [None]:
torch.save(model, '/content/conv_ae_model_complete.pth')

In [None]:
def plot_training_loss(minibatch_losses, num_epochs, averaging_iterations = 100, custom_label = ''):

    iter_per_epoch = len(minibatch_losses) // num_epochs

    plt.figure()
    ax1 = plt.subplot(1, 1, 1)
    ax1.plot(range(len(minibatch_losses)),
             (minibatch_losses), label = f'Minibatch Loss{custom_label}')
    ax1.set_xlabel('Iterations')
    ax1.set_ylabel('Loss')

    if len(minibatch_losses) < 1000:
        num_losses = len(minibatch_losses) // 2
    else:
        num_losses = 1000

    ax1.set_ylim([
        0, np.max(minibatch_losses[num_losses:])*1.5
        ])

    ax1.plot(np.convolve(minibatch_losses,
                         np.ones(averaging_iterations,)/averaging_iterations,
                         mode = 'valid'),
             label = f'Running Average{custom_label}')
    ax1.legend()

    ###################
    # Set scond x-axis
    ax2 = ax1.twiny()
    newlabel = list(range(num_epochs+1))

    newpos = [e*iter_per_epoch for e in newlabel]

    ax2.set_xticks(newpos[::10])
    ax2.set_xticklabels(newlabel[::10])

    ax2.xaxis.set_ticks_position('bottom')
    ax2.xaxis.set_label_position('bottom')
    ax2.spines['bottom'].set_position(('outward', 45))
    ax2.set_xlabel('Epochs')
    ax2.set_xlim(ax1.get_xlim())
    ###################

    plt.tight_layout()

In [None]:
plot_training_loss(log_dict_2e['train_loss_per_batch'], num_epochs)
plt.show()

In [None]:
def plot_generated_images(data_loader, model, device,
                          unnormalizer = None,
                          figsize = (20, 2.5), n_images = 15, modeltype = 'autoencoder'):

    fig, axes = plt.subplots(nrows = 2, ncols = n_images,
                             sharex = True, sharey = True, figsize = figsize)

    for batch_idx, (features, _) in enumerate(data_loader):

        features = features.to(device)

        color_channels = features.shape[1]
        image_height = features.shape[2]
        image_width = features.shape[3]

        with torch.no_grad():
            if modeltype == 'autoencoder':
                decoded_images = model(features)[:n_images]
            elif modeltype == 'VAE':
                encoded, z_mean, z_log_var, decoded_images = model(features)[:n_images]
            else:
                raise ValueError('`modeltype` not supported')

        orig_images = features[:n_images]
        break

    for i in range(n_images):
        for ax, img in zip(axes, [orig_images, decoded_images]):
            curr_img = img[i].detach().to(torch.device('cpu'))
            unnormalize = transforms.Normalize(mean=[-1, -1, -1], std=[2, 2, 2])
            if unnormalizer is not None:
                curr_img = unnormalizer(curr_img)

            if color_channels > 1:
                curr_img = np.transpose(curr_img, (1, 2, 0))
                ax[i].imshow(curr_img)
            else:
                ax[i].imshow(curr_img.view((image_height, image_width)), cmap = 'binary')

In [None]:
plot_generated_images(data_loader = train_loader, model = model, device = cuda_device)

In [None]:
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import numpy as np
from torchvision import datasets

def plot_latent_space(model, test_loader, dataset, device, attribute_idx=31, n_samples=1000):
    embeddings = []
    labels = []
    model.eval()
    with torch.no_grad():
        for batch_idx, (images, image_indices) in enumerate(test_loader):
            images = images.to(device)
            embeddings.append(model.encoder(images).cpu().numpy())

            # attributes from the dataset using image indices
            batch_labels = [dataset.targets[idx] for idx in image_indices]
            labels.append(np.array(batch_labels))

            if batch_idx * test_loader.batch_size >= n_samples:
                break

    embeddings = np.concatenate(embeddings)
    labels = np.concatenate(labels)

    # Random subset for faster computation
    rand_idx = np.random.choice(len(embeddings), size=min(n_samples, len(embeddings)), replace=False)
    embeddings = embeddings[rand_idx]
    labels = labels[rand_idx]

    # PCA Projection
    pca = PCA(n_components=2)
    pca_results = pca.fit_transform(embeddings)

    # t-SNE Projection
    tsne = TSNE(n_components=2, perplexity=30, n_iter=300)
    tsne_results = tsne.fit_transform(embeddings)

    plt.figure(figsize=(16, 6))

    # PCA Plot
    plt.subplot(1, 2, 1)
    scatter = plt.scatter(pca_results[:, 0], pca_results[:, 1], c=labels,
                         cmap='viridis', alpha=0.6, s=10)
    plt.colorbar(scatter, label='Smiling Probability')
    plt.title('PCA Projection of Latent Space')
    plt.xlabel('PCA Component 1')
    plt.ylabel('PCA Component 2')

    # t-SNE Plot
    plt.subplot(1, 2, 2)
    scatter = plt.scatter(tsne_results[:, 0], tsne_results[:, 1], c=labels,
                         cmap='viridis', alpha=0.6, s=10)
    plt.colorbar(scatter, label='Smiling Probability')
    plt.title('t-SNE Projection of Latent Space')
    plt.xlabel('t-SNE Component 1')
    plt.ylabel('t-SNE Component 2')

    plt.tight_layout()
    plt.show()

In [None]:
plot_latent_space(model, test_loader, dataset, device=cuda_device, n_samples=2000)

In [None]:
model = Conv_AutoEncoder(embedding_size=2)  # Initialize the model
# Option 1: Loading model params
model.load_state_dict(torch.load('/content/conv_ae_model.pth'))
model.to(cuda_device) # This line is added to move the model to the GPU
model.eval()  # Set to evaluation mode

# Option 2: Loading full model
# model = torch.load('/content/conv_ae_model_complete.pth')
# model.eval()  # Set to evaluation mode

In [None]:
all_embeddings = torch.zeros((10_000, 2))

num_images = 0
for images, labels in train_loader:

    if num_images >= 10_000:
        break

    begin = num_images
    end = begin + images.size(0)
    end = min(end, 10_000)


    images, labels = images.to(cuda_device), labels.to(cuda_device)

    with torch.no_grad():
        embeddings = model.encoder(images).to('cpu')

    # Adjust the slice size to match the available space:
    all_embeddings[begin:end] = embeddings[:end-begin]


    num_images = end

In [None]:
fig, axes = plt.subplots(nrows = 1, ncols = 2,
                         sharex = True, sharey = True,
                         figsize = (6, 4))

axes[0].hist(all_embeddings[:, 0].numpy())
axes[0].set_title('Histogram of Dim-1 Latent Vector', fontsize = 8)
axes[1].hist(all_embeddings[:, 1].numpy())
axes[1].set_title('Histogram of Dim-2 Latent Vector', fontsize = 8)
plt.show()

In [None]:
def extract_embeddings(model, dataloader, device, max_images=10_000):
    model.eval()
    embeddings_dim = model.encoder(next(iter(dataloader))[0].to(device)).shape[1]
    all_embeddings = torch.zeros((max_images, embeddings_dim))

    num_images = 0
    with torch.no_grad():
        for images, _ in dataloader:
            if num_images >= max_images:
                break

            batch_size = images.size(0)
            begin = num_images
            end = min(begin + batch_size, max_images)

            images = images.to(device)
            embeddings = model.encoder(images).to('cpu')

            all_embeddings[begin:end] = embeddings[:end-begin]
            num_images = end

    return all_embeddings


In [None]:
def decode_and_plot(model, latent_vector, device='cuda', cmap='binary'):
    model.eval()
    with torch.no_grad():
        # Ensure latent vector is a tensor and has batch dimension
        latent_tensor = torch.tensor(latent_vector, dtype=torch.float32).to(device)
        if latent_tensor.ndim == 1:
            latent_tensor = latent_tensor.unsqueeze(0)

        decoded_image = model.decoder(latent_tensor)
        decoded_image = decoded_image.squeeze()  # remove all dimensions of size 1

        # Move to CPU and transpose before plotting
        decoded_image = decoded_image.cpu().numpy()
        decoded_image = decoded_image.transpose(1, 2, 0)  # Transpose to (height, width, channels)


        plt.imshow(decoded_image, cmap=cmap)
        plt.axis('off')
        plt.show()

In [None]:
import matplotlib.animation as animation
from IPython.display import HTML

def create_embedding_animation(model, test_loader, device):
    model.eval()

    # Get first test batch
    test_batch = next(iter(test_loader))[0].to(device)

    # Create figure
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 5))

    # Get original embedding
    with torch.no_grad():
        orig_embedding = model.encoder(test_batch[0:1])
        orig_img = model.decoder(orig_embedding).cpu()

    # Initialize plots
    # Permute the dimensions of the image tensor to (height, width, channels)
    im1 = ax1.imshow(test_batch[0].cpu().permute(1, 2, 0), cmap='gray')
    im2 = ax2.imshow(orig_img.squeeze().permute(1, 2, 0), cmap='gray')
    ax1.set_title('Original Image')
    ax2.set_title('Reconstructed Image')
    plt.close()

    def update(frame):
        # Modify first embedding dimension
        modified_embedding = orig_embedding.clone()
        modified_embedding[0,0] = frame  # Vary first dimension

        # Reconstruct
        with torch.no_grad():
            recon_img = model.decoder(modified_embedding).cpu()

        # Update images
        im1.set_data(test_batch[0].cpu().permute(1, 2, 0))
        im2.set_data(recon_img.squeeze().permute(1, 2, 0))
        ax2.set_title(f'Reconstructed (z1={frame:.1f})')
        return im1, im2

    # Create animation
    ani = animation.FuncAnimation(
        fig, update,
        frames=np.arange(-10, 10, 0.1),
        interval=50
    )

    return ani

In [None]:
# Extract embeddings
embeddings = extract_embeddings(model, train_loader, device=cuda_device)

# Decode a specific embedding
decode_and_plot(model, embeddings[0], device=cuda_device)


In [None]:
animation_2e = create_embedding_animation(model, test_loader, cuda_device)
HTML(animation_2e.to_jshtml())

In [None]:
animation_2e.save('CelebA_embedding_animation_2e.gif', writer='imagemagick')

# Part 1

# Training with 32 Embedding

In [None]:
model_32e = Conv_AutoEncoder(embedding_size=32).to(cuda_device)
optimizer_32e = torch.optim.Adam(model_32e.parameters(), lr=lr_rate)

    # Train model
log_dict_32e = train_autoencoder_v1(
        num_epochs=num_epochs,
        model=model_32e,
        optimizer=optimizer_32e,
        device=cuda_device,
        train_loader=train_loader,
        skip_epoch_stats=True,
        logging_interval=250
    )

In [None]:
plot_training_loss(log_dict_32e['train_loss_per_batch'], num_epochs)
plt.show()

In [None]:
plot_latent_space(model_32e, test_loader, dataset, device=cuda_device, n_samples=2000)

In [None]:
plot_generated_images(data_loader = train_loader, model = model_32e, device = cuda_device)

In [None]:
# Extract embeddings
embeddings = extract_embeddings(model_32e, train_loader, device=cuda_device)

# Decode a specific embedding
decode_and_plot(model_32e, embeddings[0], device=cuda_device)


In [None]:
animation_32e = create_embedding_animation(model_32e, test_loader, cuda_device)
HTML(animation_32e.to_jshtml())

In [None]:
animation_32e.save('CelebA_embedding_animation_32e.gif', writer='imagemagick')

# Training with 64 Embedding

In [None]:
model_64e = Conv_AutoEncoder(embedding_size=64).to(cuda_device)
optimizer_64e = torch.optim.Adam(model_64e.parameters(), lr=lr_rate)

    # Train model
log_dict_64e = train_autoencoder_v1(
        num_epochs=num_epochs,
        model=model_64e,
        optimizer=optimizer_64e,
        device=cuda_device,
        train_loader=train_loader,
        skip_epoch_stats=True,
        logging_interval=250
    )

In [None]:
plot_training_loss(log_dict_64e['train_loss_per_batch'], num_epochs)

In [None]:
plot_latent_space(model_64e, test_loader, dataset, device=cuda_device, n_samples=2000)

In [None]:
plot_generated_images(data_loader = train_loader, model = model_64e, device = cuda_device)

In [None]:
# Extract embeddings
embeddings = extract_embeddings(model_64e, train_loader, device=cuda_device)

# Decode a specific embedding
decode_and_plot(model_64e, embeddings[0], device=cuda_device)


In [None]:
animation_64e = create_embedding_animation(model_64e, test_loader, cuda_device)
HTML(animation_64e.to_jshtml())

In [None]:
animation_64e.save('CelebA_embedding_animation_64e.gif', writer='imagemagick')