# Autoencoders

<p>
CAS on Advanced Machine Learning <br>
Data Science Lab, University of Bern, 2024<br>
Prepared by Dr. Mykhailo Vladymyrov.

</p>

This work is licensed under a <a href="https://creativecommons.org/licenses/by-sa/4.0/">Creative Commons Attribution-ShareAlike 4.0 International License</a>.

# Libs and utils

In [None]:
# on colab:
# !pip install einops
# !pip install mlflow
# !pip install optuna


In [None]:
import os

# set env var to allow duplicated lib
os.environ['KMP_DUPLICATE_LIB_OK']='TRUE'

#This code snippet is doing the following:

#import os - Imports Python's built-in operating system module, which provides functions for interacting with the operating system.
#os.environ['KMP_DUPLICATE_LIB_OK']='TRUE' - Sets an environment variable called 'KMP_DUPLICATE_LIB_OK' to 'TRUE'.

#This specific environment variable is related to Intel's Math Kernel Library (MKL), which PyTorch often uses for performance optimization on Intel CPUs. The setting is addressing a known issue where the Intel OpenMP runtime library might be loaded multiple times, which can cause warnings or errors on some systems (particularly macOS).
#By setting this to 'TRUE', you're essentially telling the system to ignore the duplicate library loading issue. This is a common workaround when using PyTorch on macOS to prevent warnings about duplicate libraries being loaded.
#This line doesn't affect the functionality of your model, but rather helps avoid environment-related warnings or errors during execution.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import einops as eo
import pathlib as pl

import matplotlib.cm as cm
from matplotlib import collections  as mc
from matplotlib import animation
%matplotlib inline

from scipy.stats import norm
from scipy.stats import entropy

import pandas as pd
import pickle
from PIL import Image
from time import time as timer
#import umap

from IPython.display import HTML
from IPython.display import Audio
import IPython

import tqdm.auto as tqdm

import torch
from torchvision import datasets, transforms
from torch import nn
from torch import optim
import torch.nn.functional as F

from torchvision import transforms

import sys
is_colab = 'google.colab' in sys.modules

from source import image_id_converter as img_idc
from source import load_process_images as lpi

from pathlib import Path
import random

In [None]:
# get mean and std of an array with numpy:
def get_mean_std(x):
    x_mean = np.mean(x)
    x_std = np.std(x)
    return x_mean, x_std

# get min and max of an array with numpy:
def get_min_max(x):
    x_min = np.min(x)
    x_max = np.max(x)
    return x_min, x_max

def is_iterable(obj):
    try:
        iter(obj)
    except Exception:
        return False
    else:
        return True

#This function checks if an object is iterable (can be looped over).
#It uses a try-except block to attempt to call iter(obj), which will succeed only if obj is iterable.
#If calling iter(obj) raises any exception, the function returns False.
#If no exception occurs, the function returns True.

def type_len(obj):
    t = type(obj)
    if is_iterable(obj):
        sfx = f', shape: {obj.shape}' if t == np.ndarray else ''
        print(f'type: {t}, len: {len(obj)}{sfx}')
    else:
        print(f'type: {t}, len: {len(obj)}')

#This is a utility function for debugging that prints information about an object.
#t = type(obj) - Gets the type of the provided object.
#It checks if the object is iterable using the is_iterable function defined earlier.
#If the object is iterable:
#
#It checks if the object is a NumPy array (t == np.ndarray).
#If it's a NumPy array, it adds shape information to the output string.
#It prints the type and length of the object, along with shape information if applicable.
#
#
#If the object is not iterable, it still attempts to print the type and length (though this might raise an error if len() isn't applicable to the object).
#
#Note: There seems to be an issue with the type_len function - it tries to call len() on non-iterable objects in the else clause, 
#which would typically cause an error. This might be a bug in the code.


In [None]:
def to_np_showable(pt_img):
  np_im = pt_img.detach().cpu().numpy()
  if len(np_im.shape) == 4:
    np_im = np_im[0]

  if np_im.shape[0] > 3:
    np_im = np_im[-3:]

  return (eo.rearrange(np_im, 'c h w -> h w c')/2+.5).clip(0., 1.)

#This function converts a PyTorch tensor image to a NumPy array suitable for visualization.
#pt_img.detach().cpu().numpy() - Detaches the tensor from the computation graph, moves it to CPU if it's on GPU, and converts it to a NumPy array.
#if len(np_im.shape) == 4: - Checks if the image has a batch dimension (shape: [batch, channels, height, width]).
#np_im = np_im[0] - If there's a batch dimension, takes only the first image in the batch.
#if np_im.shape[0] > 3: - Checks if there are more than 3 channels.
#np_im = np_im[-3:] - If there are more than 3 channels, keeps only the last 3 channels (useful for handling multi-channel data).
#eo.rearrange(np_im, 'c h w -> h w c') - Uses the einops library to rearrange the tensor from PyTorch's [channels, height, width] format to matplotlib's [height, width, channels] format.
#/2+.5 - Applies normalization assuming the image data is in the range [-1, 1], converting it to [0, 1].
#.clip(0., 1.) - Ensures all values are within the [0, 1] range, clamping any values outside this range.

def plot_im(im, is_torch=True):
  plt.imshow(to_np_showable(im) if is_torch else im, cmap='gray')
  plt.show()
  plt.close()

#This function plots a single image.
#is_torch=True - Default parameter indicating whether the input is a PyTorch tensor.
#to_np_showable(im) if is_torch else im - Converts the image to a NumPy array if it's a PyTorch tensor, otherwise uses it directly.
#plt.imshow(..., cmap='gray') - Displays the image using matplotlib with a grayscale colormap.
#plt.show() - Renders the plot.
#plt.close() - Closes the figure to free up memory.

def plot_im_samples(ds, n=5, is_torch=False):
  fig, axs = plt.subplots(1, n, figsize=(16, n))
  for i, image in enumerate(ds[:n]):
      axs[i].imshow(to_np_showable(image) if is_torch else image, cmap='gray')
      axs[i].set_axis_off()
  plt.show()
  plt.close()


#This function plots multiple images from a dataset in a row.
#ds - The dataset or collection of images to sample from.
#n=5 - Default number of images to display.
#is_torch=False - Default parameter indicating whether the inputs are PyTorch tensors.
#plt.subplots(1, n, figsize=(16, n)) - Creates a figure with a single row of n subplots, with a width of 16 inches and height of n inches.
#The loop iterates through the first n images in the dataset:
#
#axs[i].imshow(...) - Displays each image in its corresponding subplot.
#axs[i].set_axis_off() - Removes axis labels and ticks for cleaner visualization.
#
#
#plt.show() - Renders the entire plot with all images.
#plt.close() - Closes the figure to free up memory.

In [None]:
# merging 2d matrix of images in 1 image
def mosaic(mtr_of_ims):
  ny = len(mtr_of_ims)
  assert(ny != 0)
  #Gets the number of rows in the matrix and asserts that it's not empty.

  nx = len(mtr_of_ims[0])
  assert(nx != 0)
  #Gets the number of columns in the first row and asserts that it's not empty.

  im_sh = mtr_of_ims[0][0].shape

  assert (2 <= len(im_sh) <= 3)
  #Gets the shape of the first image in the matrix.
  #Verifies that the image is either 2D (grayscale) or 3D (with channels).
    
  multichannel = len(im_sh) == 3

  if multichannel:
    h, w, c = im_sh
  else:
    h, w = im_sh
  #Determines if the images have multiple channels.
  #If multichannel, unpacks height, width, and channels. Otherwise, just height and width.

  h_c = h * ny + 1 * (ny-1)
  w_c = w * nx + 1 * (nx-1)
  #Calculates the total height and width of the canvas.
  #Adds 1 pixel spacing between images (both horizontally and vertically).

  canv_sh = (h_c, w_c, c) if multichannel else (h_c, w_c)
  canvas = np.ones(shape=canv_sh, dtype=np.float32)*0.5
  #Defines the shape of the canvas based on whether images are multichannel.
  #Creates a canvas filled with gray (0.5) values, assuming image values are in [0,1] range.

  for iy, row in enumerate(mtr_of_ims):
    y_ofs = iy * (h + 1)
    #Loops through each row of images.
    #Calculates the vertical offset for the current row.
    for ix, im in enumerate(row):
      x_ofs = ix * (w + 1)
      #Loops through each image in the current row.
      #Calculates the horizontal offset for the current image.
      canvas[y_ofs:y_ofs + h, x_ofs:x_ofs + w] = im
      #Copies the current image to the appropriate position in the canvas.
      #This uses NumPy's array slicing to place the image at the correct location.
  return canvas

In [None]:
# Verifica se 'mps' è disponibile su Apple Silicon

device = torch.device("mps" if torch.backends.mps.is_built() else "cpu")

 

# Conferma del dispositivo selezionato

print("Using device:", device)

In [None]:
torch.mps.empty_cache()

## Error message when MPS is out of memory: 
MPS backend out of memory (MPS allocated: 5.65 GB, other allocations: 492.00 KB, max allowed: 5.10 GB). Tried to allocate 500.00 MB on private pool. Use PYTORCH_MPS_HIGH_WATERMARK_RATIO=0.0 to disable upper limit for memory allocations (may cause system failure.


### Solution proposed on: https://discuss.pytorch.org/t/mps-backend-out-of-memory-in-mac/198245

firstly run this command in notebook cell

torch.mps.set_per_process_memory_fraction(0.0)

### Need to confirm that this solution does not damage anything first!

In [None]:
device

# Setup

## Load dataset

Lets start with a simple, well understood mnist dataset.

In [None]:
NOISE_RATE = 0.1
N_SAMPLE = 32
N_VIS_SAMPLE = 2
BATCH_SIZE = 128

In [None]:
def collate_ae_dataset(samples):
    """
    The function collates sampels into a batch, and creates noisy samples if DENOISING is True
    for the denoising autoencoder.
    """
    xs = [s[0] for s in samples]
    ys = [s[1] for s in samples]
    #Extracts the first element (input data) from each sample into list xs.
    #Extracts the second element (labels or targets) from each sample into list ys.
    #This assumes each sample is a tuple or list with at least two elements.
    
    xs = torch.stack(xs)
    ys = torch.concat(ys)
    #torch.stack(xs) combines the list of input tensors into a single 
    #tensor along a new dimension (creating a batch dimension).
    #torch.concat(ys) concatenates the label tensors along 
    #the existing first dimension. This suggests the labels might have 
    #variable lengths or already include a batch-like dimension.

    add_noise = NOISE_RATE > 0.
    #Checks if noise should be added based on a global variable NOISE_RATE.
    #If NOISE_RATE is greater than 0, noise will be added to the inputs.
    
    if add_noise:
      sh = xs.shape
      noise_mask = torch.bernoulli(torch.full(sh, NOISE_RATE))  # 0 (keep) or 1 (replace with noise)
      #Gets the shape of the input tensor batch.
      #Creates a binary mask using Bernoulli sampling, where each element has NOISE_RATE probability of being 1 
      #(indicating where noise will be applied) and 1-NOISE_RATE probability of being 0.
            
      sp_noise = torch.bernoulli(torch.full(sh, 0.5))-0.5  # -1 or 1
      #Generates the actual noise values as either -0.5 or 0.5.
      #First creates a tensor of the same shape filled with 0.5, then applies Bernoulli 
      #sampling to get 0s or 1s.
      #Subtracts 0.5 to convert to -0.5 or 0.5 (this creates salt and pepper noise).
        
      xns = xs * (1-noise_mask) + sp_noise * noise_mask
      #Creates the noisy input xns by:
          #Keeping original values where the mask is 0: xs * (1-noise_mask)
          #Adding noise values where the mask is 1: sp_noise * noise_mask
          #The result is a tensor where some values are preserved 
          #from the original input and others are replaced with noise.
      
      # sp = sp_noise
    else:
       xns = xs
    #If no noise is to be added, the noisy input is the same as the original input.

    return xns.to(device), xs.to(device), ys.to(device)
    #Returns three tensors, all moved to the specified device (likely GPU):

    #xns: The inputs with noise added (or original inputs if no noise)
    #xs: The original clean inputs
    #ys: The labels or targets
    #
    #
    #This return structure is typical for denoising autoencoders, where you need 
    #both the noisy input (fed to the encoder) and the clean target 
    #(used to compute the reconstruction loss).
    #
    #This function is specifically designed for training denoising autoencoders, 
    #where the model learns to remove noise from corrupted inputs by trying 
    #to reconstruct the original clean data.

In [None]:
# # given a AE model `model`
# for img, label in valid_dataset:
#     reconstruction = model(img)
#     loss_value = loss(img, reconstruction).item()

## Helper Autoencoder Interface

We will start from implementing an Autoencoder model base class

In [None]:

def eval_on_samples(ae_model, epoch, samples):
    # this is called on end of each training epoch
    xns = samples['images_noisy']
    xns = torch.tensor(xns, dtype=torch.float32).to(device)
    #labels = samples['labels']

# Function to evaluate the autoencoder on sample data after each epoch
# Takes the model, current epoch number, and samples dictionary
# Extracts noisy images from the samples and converts them to a PyTorch tensor on the target device
# The labels are extracted but commented out (not used)

    with torch.no_grad():
        yz = ae_model(xns, return_z=True)
        yz = [el.detach().cpu().numpy() for el in yz]

        y = yz[0]
        z = yz[1:]
    # Uses torch.no_grad() to disable gradient calculation (for efficiency during evaluation)
    # Gets both reconstructions and encodings (i.e. latent space!) by calling the model with return_z=True
    # Converts all outputs to NumPy arrays
    # Separates the reconstructions y and encodings z

    res = {'z': z, 'y': y, 'epoch': epoch}
    return res

# Creates and returns a dictionary containing:

# z: The encoded representations
# y: The reconstructed images
# epoch: The current epoch number
# 

# This evaluation function captures the model's performance at each epoch, allowing for tracking reconstruction quality and analyzing the learned representations over time.


In [None]:
def plot_hist(history, logscale=True):
    """
    plot training loss
    """

    loss = history['loss']
    v_loss = history['val_loss']
    epochs = history['epoch']

    # This function visualizes training history (loss over epochs).
    # Extracts training loss, validation loss, and epoch numbers from the history dictionary.

    
    plot = plt.semilogy if logscale else plt.plot
    # Cleverly chooses between logarithmic scale (plt.semilogy) or linear scale (plt.plot) based on the logscale parameter.
    # Default is logarithmic scale, which is often better for visualizing loss curves as they typically decrease exponentially.
    
    plot(epochs, loss, label='training');
    plot(epochs, v_loss, label='validation');
    # Plots both training and validation loss curves using the selected plotting function.
    # Labels each curve for the legend.
    
    plt.legend()
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.show()
    plt.close()
    # Adds a legend, axis labels, displays the plot, and then closes the figure.



def plot_samples(sample_history, samples, epoch_stride=5, fig_scale=1):
    """
    Plots input, noisy samples (for DAE) and reconstruction.
    Each `epoch_stride`-th epoch
    """
    # This function visualizes sample reconstructions over training epochs.
    # Shows how the model's reconstruction capability improves over time.

    single_el_idx = samples['single_el_idx']
    images_noisy = samples['images_noisy'][single_el_idx, 0]
    images = samples['images'][single_el_idx, 0]
    # Extracts indices for selected samples to visualize.
    # Gets the noisy input images and the original clean images for these samples.
    # The , 0 indexing suggests selecting the first channel of each image.

    last_epoch = np.max(list(sample_history.keys()))
    # Determines the last epoch number in the history data.

    for epoch_idx, hist_el in sample_history.items():
      if epoch_idx % epoch_stride != 0 and epoch_idx != last_epoch:
        continue
    # Iterates through each epoch's results in the history.
    # Uses epoch_stride to select only every nth epoch (to avoid too many visualizations).
    # Always includes the last epoch regardless of the stride.

      samples_arr = [images_noisy, hist_el['y'][single_el_idx, 0], images]
    # Creates an array of three sets of images to visualize side by side:
       # The noisy input images
       # The model's reconstructions for the current epoch
       # The original clean images (ground truth)

      ny = len(samples_arr)
      nx = len(samples_arr[0])

      plt.figure(figsize=(fig_scale*nx, fig_scale*ny))
      # Calculates the dimensions of the visualization grid.
      # Creates a figure with size proportional to the number of samples.

        
      m = mosaic(samples_arr)
      # Uses the previously defined mosaic function to create a grid of all images.

      plt.title(f'after epoch {int(epoch_idx)}')
      plt.imshow(m, cmap='gray', vmin=-.5, vmax=.5)
      # Adds a title showing which epoch this visualization represents.
      # Displays the mosaic with a grayscale colormap and fixed value range.
      # The vmin=-.5, vmax=.5 matches the normalized data range we've seen before.

        
      plt.tight_layout(pad=0.1, h_pad=0, w_pad=0)
      plt.show()
      plt.close()
      # Ensures proper spacing in the figure.
      # Displays the figure and then closes it to free memory.

# This function creates a powerful visualization showing the progression of the model's reconstruction ability across epochs. Each visualization has three rows:
# 
# The noisy inputs
# The model's reconstructions
# The original clean images (targets)
# 
# This makes it easy to see how the model gradually learns to denoise and reconstruct the images over the course of training.

In [None]:
# These are utility functions for working with trained models at different stages of training. Let me break them down:

def run_on_trained(model, root_dir, run_fn, ep=None, model_filename=None):
    """
    Helper function to excecute any function on model in state after `ep` training epoch
    """
    # This function loads a model checkpoint and runs a specified function on it.
    # Parameters:
    # 
    # model: The neural network model instance
    # root_dir: Directory containing saved model checkpoints
    # run_fn: The function to run on the loaded model
    # ep: Specific epoch to load (optional)
    # model_filename: Specific checkpoint file to load (optional)

    if model_filename is None:
        if ep is not None:
            model_filename = root_dir/f'model_{ep:03d}.pth'
        else:
            model_filename = sorted(list(root_dir.glob('*.pth')))[-1]  # last model state
    # Determines which model checkpoint file to load:
    # 
    # If a specific filename is provided, use that (in this case this code block would be skipped)
    # If an epoch number is provided, construct the filename using a pattern
    # If neither is provided, use the last checkpoint file (by alphabetical sorting)
    # The code uses pathlib's Path objects for file handling (using / for path joining)

    
    model_dict = torch.load(model_filename,weights_only=False)

    model.load_state_dict(model_dict['model_state_dict'])

    # Loads the saved model state from the specified file
    # The weights_only=False parameter indicates to load the full state dictionary (not just weights)
    # Restores the model parameters from the saved state dictionary
    

    run_fn(model)
    # Calls the provided function on the loaded model

def run_on_all_training_history(model, root_dir, run_fn, n_ep=None):
    """
    Helper function to excecute any function on model state after each of the training epochs
    """
    # This function runs a specified function on multiple model checkpoints from different training epochs.
    # Parameters:
    # 
    # model: The neural network model instance
    # root_dir: Directory containing saved model checkpoints
    # run_fn: The function to run on each loaded model state
    # n_ep: Specific number of epochs to process (optional)
    
    if n_ep is not None:
        for ep in range(n_ep):
            print(f'running on epoch {ep+1}/{n_ep}...')
            run_on_trained(model, root_dir, run_fn, ep=ep)
    # If a specific number of epochs is provided:
    # 
    # Iterates through each epoch from 0 to n_ep-1
    # Prints progress information
    # Calls run_on_trained for each epoch
    
    else:
        for model_filename in sorted(root_dir.glob('*.pth')):
            print(f'running on checkpoint {model_filename}...')
            run_on_trained(model, root_dir, run_fn, model_filename=model_filename)

    # If no specific number of epochs is provided:
    # 
    # Finds all .pth files in the root directory
    # Sorts them (presumably by name, which would be by epoch if using the naming pattern)
    # Processes each checkpoint file in order
    
    print(f'done')

    # Prints a completion message when all checkpoints have been processed
    # 
    # These utility functions make it easy to:
    # 
    # Analyze a model at a specific point in its training history
    # Run the same analysis across multiple stages of training
    # Visualize or evaluate how the model's behavior changes over the course of training
    # 
    # They're particularly useful for post-training analysis, debugging, and creating visualizations of model evolution.

In [None]:
#xns.shape[1:]

## Experiment with Dias

In [None]:
root_path = Path('/Users/stephanehess/Documents/CAS_AML/autoencoder_tutorials')

In [None]:
image_dir = root_path/"data"

In [None]:
with_without_person = pd.read_csv(image_dir/'with_without_person_mod.csv')
with_without_person

In [None]:
def match_labels_idx(label_data, image_ids):
    labels = []
    for image_id in image_ids:
        selection_bools = label_data.image_id == image_id
        idx_label = label_data[selection_bools].recognisable.iloc[0]
        labels.append(int(idx_label))
    return labels

In [None]:
image_dir

In [None]:
os.listdir(image_dir)

## Load image data

In [None]:
# Process images and split into training/validation sets
train_images, val_images, train_ids, val_ids = lpi.load_process_images(image_dir)

# Print shapes and identifiers
print(f"Training data shape: {train_images.shape}")
print(f"Validation data shape: {val_images.shape}")
print(f"Training identifiers: {train_ids}")
print(f"Validation identifiers: {val_ids}")

## Get smaller copy of data set: 

In [None]:
num_ims_small = 10

In [None]:
train_images_small = train_images[0:num_ims_small].copy()
#val_images_small = val_images[0:num_ims_small].copy()
val_images_small = train_images_small.copy() 
train_ids_small = train_ids[0:num_ims_small].copy()
#val_ids_small = val_ids[0:num_ims_small].copy()
val_ids_small = train_ids_small.copy()

In the small data set training and validation set are identical for the following purposes:
1. To test if the algorithm learns anything at all.
2. To test if clustering with a limited data set is possible. This means that the model will be completely overfitted and not generalizable at all. 

## Add channel dimension to image data:

In [None]:
print(train_images.shape)
print(val_images.shape)
print(train_images_small.shape)
print(val_images_small.shape)


In [None]:
train_images_small_n = train_images_small[:, np.newaxis, :, :]
val_images_small_n = val_images_small[:, np.newaxis, :, :]
print(train_images_small_n.shape)
print(val_images_small_n.shape)

In [None]:
train_images_n = train_images[:, np.newaxis, :, :]
val_images_n = val_images[:, np.newaxis, :, :]
print(train_images_n.shape)
print(val_images_n.shape)

## Load label data and reconvert image ids: 

In [None]:
labels_person = pd.read_csv(image_dir/'with_without_person_mod.csv')
labels_person.image_id = img_idc.reconvert_image_ids(labels_person.image_id)
labels_person.head()

In [None]:
labels_person.shape

## Get the corresponding for each image in train and val data: 

In [None]:
train_labels_small = match_labels_idx(labels_person, train_ids_small)
val_labels_small = match_labels_idx(labels_person, val_ids_small)
print(val_labels_small[0:3])
print(train_labels_small[0:3])

In [None]:
train_labels = match_labels_idx(labels_person, train_ids)
val_labels = match_labels_idx(labels_person, val_ids)
print(val_labels[0:3])
print(train_labels[0:3])

## Make new indices for train images: 

In [None]:
new_indices_train_small = list(range(0, len(train_images_small_n)))
new_indices_train_small[0:3]

In [None]:
new_indices_train = list(range(0, len(train_images_n)))
new_indices_train[0:3]

In [None]:
samples_img_small = {
    'images_noisy': train_images_small_n,
    'images': train_images_small_n,
    'labels': train_labels_small,
    'single_el_idx': new_indices_train_small,
    'image_ids': train_ids_small
    
}

In [None]:
samples_img = {
    'images_noisy': train_images_n,
    'images': train_images_n,
    'labels': train_labels,
    'single_el_idx': new_indices_train,
    'image_ids': train_ids
    
}

In [None]:
m, s = 0.5, 1.
# m, s = 0.5, 0.5
#m, s = 0., 1.

#Defines normalization parameters for the images: mean (m) and standard deviation (s).
#The active values are m=0.5 and s=1.0.
#The commented lines show alternative normalization parameters that were tried.

transform = transforms.Compose([
    transforms.ToTensor(),
    #transforms.Pad(2), # to make images 32x32
    transforms.Normalize((m,), (s,))
])

In [None]:
def array_to_tensor(image_array):
    tensor_list = []
    for idx in range(image_array.shape[0]):
        image_tensor_idx = transform(image_array[idx])
        tensor_list.append(image_tensor_idx)

    images_tensor = torch.stack(tensor_list)
    return images_tensor

## Basic Autoencoder

In [None]:
train_images_tensor = array_to_tensor(train_images_small)
val_images_tensor = array_to_tensor(val_images_small)
print(type(train_images_tensor))
print(len(train_images_tensor))

In [None]:
train_images_tensor.shape

In [None]:
image_length = train_images_tensor.shape[-1]
image_length

In [None]:
image_length/(2**6)

In [None]:
hidden_l = 17
#hidden_fs = 34
my_hidden_size = (hidden_l**2)
my_hidden_size

In [None]:
class AutoEncoder(nn.Module):
    def __init__(self, input_size, code_size):
        self.input_size = list(input_size)  # shape of data sample
        self.flat_data_size = np.prod(self.input_size)
        self.hidden_size = my_hidden_size

        self.code_size = code_size  # code size

        super(AutoEncoder, self).__init__()
        #Creates an autoencoder neural network that inherits from PyTorch's nn.Module.
        #Takes two parameters:
        #
        #input_size: The shape of input data (e.g., [1, 28, 28] for MNIST)
        #code_size: The dimension of the encoded representation (bottleneck)
        #
        #
        #Calculates the flattened input size by multiplying all dimensions.
        #Sets an intermediate hidden layer size of 128 neurons.
        #Calls the parent class initializer.

        
        self.encoder = nn.Sequential(
            nn.Flatten(),

            nn.Linear(self.flat_data_size, self.hidden_size),
            nn.ReLU(),

            nn.Linear(self.hidden_size, self.hidden_size*2//3),
            nn.ReLU(),

            nn.Linear(self.hidden_size*2//3, self.hidden_size*4//9),
            nn.ReLU(),

            nn.Linear(self.hidden_size*4//9, self.code_size),
            nn.Sigmoid(),
        )
        #Defines the encoder network as a sequence of operations:
            #
            #Flattens the input (e.g., converts a 2D image to 1D)
            #Linear layer mapping from input size to hidden size
            #ReLU activation
            #Linear layer mapping from hidden size to code size
            #Sigmoid activation (constrains the encoded values to [0, 1])
        
        self.decoder = nn.Sequential(
            nn.Linear(self.code_size, self.hidden_size*4//9),
            nn.ReLU(),

            nn.Linear(self.hidden_size*4//9, self.hidden_size*2//3),
            nn.ReLU(),

            nn.Linear(self.hidden_size*2//3, self.hidden_size),
            nn.ReLU(),

            nn.Linear(self.hidden_size, self.flat_data_size),
            nn.Tanh(),  # Think: why tanh?

            nn.Unflatten(1, self.input_size),
        )
        #Defines the decoder network:
            #Linear layer from code size to hidden size
            #ReLU activation
            #Linear layer from hidden size back to the flattened input size
            #Tanh activation (outputs values in [-1, 1], matching the normalized input range)
            #Unflattens the output back to the original input shape

#Regarding "why tanh?": Tanh is used because the input images were normalized to approximately [-0.5, 0.5] 
    #range (using m=0.5, s=1.0). Tanh outputs values in the range [-1, 1], 
    #which after scaling by 1.1 in the decode method closely matches the input data range.

    def forward(self, x, return_z=False):
        encoded = self.encode(x)
        decoded = self.decode(encoded)
        return (decoded, encoded) if return_z else decoded
    # The forward pass:
        #Encodes the input
        #Decodes the encoded representation
        #If return_z=True, returns both the reconstruction and the encoded values
        #Otherwise, just returns the reconstruction
        

    def encode(self, x):
        return self.encoder(x)

    def decode(self, z):
        return self.decoder(z)*1.1
# Helper methods to encode and decode separately
# Note the multiplication by 1.1 in the decode method, 
    # which slightly amplifies the output range to better match the input data distribution

        

    def get_n_params(self):
        return sum(p.numel() for p in self.parameters() if p.requires_grad)
    # Utility method to count the total number of trainable parameters in the model


In [None]:
CODE_SIZE = 50
NOISE_RATE = 0
MODEL_NAME = 'ae_model'
in_size = train_images_tensor.shape[1:]
model = AutoEncoder(input_size=in_size, code_size=CODE_SIZE).to(device)
train_images_tensor = train_images_tensor.to(device)
val_images_tensor = val_images_tensor.to(device)


In [None]:
in_size

In [None]:
# train the autoencoder model, for N_EPOCHS epochs,
# save history of loss values for training and validation sets,
# history of validation samples evolution, and model weights history,

N_EPOCHS = 185
LR = 0.0004


model_root = pl.Path(MODEL_NAME)
model_root.mkdir(exist_ok=True)



optimizer = optim.Adam(model.parameters(), lr=LR)

# implement loss explicitly
loss = nn.L1Loss()

# train the model
history = {'loss': [], 'val_loss': [], 'epoch': []}
sample_history = {}

pbar = tqdm.tqdm(range(0, N_EPOCHS), postfix=f'epoch 0/{N_EPOCHS}')
for epoch_idx in pbar:
    epoch_loss = 0
    model.train()
    #for batch_idx, (noisy_data, data, target) in enumerate(train_loader):

    optimizer.zero_grad()
    output = model(train_images_tensor)
    loss_value = loss(output, train_images_tensor)
    loss_value.backward()
    optimizer.step()
    epoch_loss += loss_value.detach().cpu().item()
    
    history['loss'].append(epoch_loss)
    history['epoch'].append(epoch_idx)
    # update progress bar

    # evaluate on validation set
    model.eval()
    with torch.no_grad():
        val_loss = 0
        #for batch_idx, (noisy_data, data, target) in enumerate(valid_loader):
        
        output = model(val_images_tensor)
        loss_value = loss(output, val_images_tensor)
        val_loss += loss_value.detach().cpu().item()
        
        history['val_loss'].append(val_loss)

    pbar.set_postfix({'epoch': f'{epoch_idx+1}/{N_EPOCHS}', 'loss':f'{epoch_loss:.4f}', 'val_loss':f'{val_loss:.4f}'})
    # evaluate on samples
    sample_res = eval_on_samples(model, epoch_idx, samples=samples_img_small)
    sample_history[epoch_idx] = sample_res

    # save model weights
    torch.save({
                'epoch': epoch_idx,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': loss
                }, model_root/f'model_{epoch_idx:03d}.pth')

In [None]:
plot_hist(history)

In [None]:
single_el_idx = samples_img_small['single_el_idx']
#plot_im_samples(samples_img_small['images_noisy'][single_el_idx, 0], n=2, is_torch=False)
plot_im_samples(samples_img_small['images'][single_el_idx, 0], n=10, is_torch=False)
plot_im_samples(sample_history[174]['y'][single_el_idx, 0], n=10, is_torch=False)

In [None]:
samples_img_small.keys()

In [None]:
samples_img_small['image_ids']

In [None]:
type(sample_history[29]['z'])

In [None]:
type(sample_history[29]['z'][0])

In [None]:
sample_history[124]['z'][0].shape

In [None]:
latent_space = sample_history[124]['z'][0]

In [None]:
from sklearn.manifold import TSNE

tsne = TSNE(perplexity= 1, init="pca", learning_rate="auto", random_state=42)
X_valid_2D = tsne.fit_transform(latent_space)

In [None]:
plt.scatter(X_valid_2D[:, 0], X_valid_2D[:, 1], s=10, cmap="tab10")
plt.show()

In [None]:
X_valid_2D

In [None]:
X_valid_2D.shape

In [None]:
type(X_valid_2D)

In [None]:
import numpy as np
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt

# Example array with shape (10, 2)
# Replace this with your actual data
data = X_valid_2D.copy()


inertia = []
k_range = range(1, 10)  # Test from 1 to 9 clusters

for k in k_range:
    kmeans = KMeans(n_clusters=k, random_state=42, n_init='auto')
    kmeans.fit(data)
    inertia.append(kmeans.inertia_)

# Plot the elbow curve
plt.figure(figsize=(8, 6))
plt.plot(k_range, inertia, 'o-', color='blue')
plt.xlabel('Number of Clusters (k)')
plt.ylabel('Inertia')
plt.title('Elbow Method for Optimal k')
plt.grid(True, alpha=0.3)
plt.show()

In [None]:


# Specify the number of clusters (k)
# You can adjust this based on your needs
k = 5

# Initialize and fit the k-means model
kmeans = KMeans(n_clusters=k, random_state=42, n_init='auto')
kmeans.fit(data)

# Get cluster assignments for each data point
labels = kmeans.labels_

# Get the coordinates of the cluster centers
centers = kmeans.cluster_centers_

# Print results
print("Cluster assignments:", labels)
print("Cluster centers:", centers)

# Visualize the clusters
plt.figure(figsize=(8, 6))
plt.scatter(data[:, 0], data[:, 1], c=labels, cmap='viridis', s=100, alpha=0.7)
#plt.scatter(centers[:, 0], centers[:, 1], c='red', marker='X', s=200, label='Centroids')
#plt.scatter(centers[:, 0], centers[:, 1], c='red', marker='X', s=200)
plt.title('K-means Clustering Results')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

# Calculate the silhouette score to evaluate the clustering quality
from sklearn.metrics import silhouette_score
if k > 1 and len(np.unique(labels)) > 1:  # Ensure there are at least 2 clusters with data
    score = silhouette_score(data, labels)
    print(f"Silhouette Score: {score:.3f}")

In [None]:
import numpy as np
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches


# Specify the number of clusters (k)
k = 5

# Initialize and fit the k-means model
kmeans = KMeans(n_clusters=k, random_state=42, n_init='auto')
kmeans.fit(data)

# Get cluster assignments for each data point
labels = kmeans.labels_

# Get the coordinates of the cluster centers
centers = kmeans.cluster_centers_

# Print results
print("Cluster assignments:", labels)
print("Cluster centers:", centers)

# Visualize the clusters
plt.figure(figsize=(8, 6))
scatter = plt.scatter(data[:, 0], data[:, 1], c=labels, cmap='viridis', s=100, alpha=0.7)

# Create custom legend handles
legend_handles = []
for i in range(k):
    color = scatter.cmap(scatter.norm(i))
    patch = mpatches.Patch(color=color, label=f'Cluster {i}')
    legend_handles.append(patch)

plt.title('K-means Clustering Results')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.legend(handles=legend_handles)
plt.grid(True, alpha=0.3)
plt.show()

# Calculate the silhouette score to evaluate the clustering quality
from sklearn.metrics import silhouette_score
if k > 1 and len(np.unique(labels)) > 1:  # Ensure there are at least 2 clusters with data
    score = silhouette_score(data, labels)
    print(f"Silhouette Score: {score:.3f}")

In [None]:
def plot_im_samples(ds, numbers=None, n=5, is_torch=False):
    fig, axs = plt.subplots(1, n, figsize=(16, n))
    
    for i, image in enumerate(ds[:n]):
        axs[i].imshow(to_np_showable(image) if is_torch else image, cmap='gray')
        
        # Add title if numbers array is provided
        if numbers is not None and i < len(numbers):
            axs[i].set_title(str(numbers[i]), fontsize=12)
            
        axs[i].set_axis_off()
    
    plt.tight_layout()
    plt.show()
    plt.close()

In [None]:
single_el_idx = samples_img_small['single_el_idx']
#plot_im_samples(samples_img_small['images_noisy'][single_el_idx, 0], n=2, is_torch=False)
plot_im_samples(samples_img_small['images'][single_el_idx, 0], labels, n=10, is_torch=False)
plot_im_samples(sample_history[174]['y'][single_el_idx, 0], labels, n=10, is_torch=False)

## Convolutional Autoencoder:

In [None]:
train_images_tensor = array_to_tensor(train_images)
val_images_tensor = array_to_tensor(val_images)
print(type(train_images_tensor))
print(len(train_images_tensor))

In [None]:
image_length = train_images_tensor.shape[-1]
image_length

In [None]:
image_length/(2**6)

In [None]:
hidden_l = 9
hidden_fs = 64
my_hidden_size = (hidden_l**2)*hidden_fs
my_hidden_size

In [None]:
my_hidden_size//8

In [None]:
class AutoEncoder(nn.Module):
    def __init__(self, input_size, code_size):
        self.input_size = list(input_size)  # shape of data sample
        self.flat_data_size = np.prod(self.input_size)
        self.hidden_size = my_hidden_size

        self.code_size = code_size  # code size

        super(AutoEncoder, self).__init__()
        #Creates an autoencoder neural network that inherits from PyTorch's nn.Module.
        #Takes two parameters:
        #
        #input_size: The shape of input data (e.g., [1, 28, 28] for MNIST)
        #code_size: The dimension of the encoded representation (bottleneck)
        #
        #
        #Calculates the flattened input size by multiplying all dimensions.
        #Sets an intermediate hidden layer size of 128 neurons.
        #Calls the parent class initializer.

        
        self.encoder = nn.Sequential(
            nn.Flatten(),

            nn.Linear(self.flat_data_size, self.hidden_size),
            nn.ReLU(),

            nn.Linear(self.hidden_size, self.code_size),
            nn.Sigmoid(),
        )
        #Defines the encoder network as a sequence of operations:
            #
            #Flattens the input (e.g., converts a 2D image to 1D)
            #Linear layer mapping from input size to hidden size
            #ReLU activation
            #Linear layer mapping from hidden size to code size
            #Sigmoid activation (constrains the encoded values to [0, 1])
        
        self.decoder = nn.Sequential(
            nn.Linear(self.code_size, self.hidden_size),
            nn.ReLU(),

            nn.Linear(self.hidden_size, self.flat_data_size),
            nn.Tanh(),  # Think: why tanh?

            nn.Unflatten(1, self.input_size),
        )
        #Defines the decoder network:
            #Linear layer from code size to hidden size
            #ReLU activation
            #Linear layer from hidden size back to the flattened input size
            #Tanh activation (outputs values in [-1, 1], matching the normalized input range)
            #Unflattens the output back to the original input shape

#Regarding "why tanh?": Tanh is used because the input images were normalized to approximately [-0.5, 0.5] 
    #range (using m=0.5, s=1.0). Tanh outputs values in the range [-1, 1], 
    #which after scaling by 1.1 in the decode method closely matches the input data range.

    def forward(self, x, return_z=False):
        encoded = self.encode(x)
        decoded = self.decode(encoded)
        return (decoded, encoded) if return_z else decoded
    # The forward pass:
        #Encodes the input
        #Decodes the encoded representation
        #If return_z=True, returns both the reconstruction and the encoded values
        #Otherwise, just returns the reconstruction
        

    def encode(self, x):
        return self.encoder(x)

    def decode(self, z):
        return self.decoder(z)*1.1
# Helper methods to encode and decode separately
# Note the multiplication by 1.1 in the decode method, 
    # which slightly amplifies the output range to better match the input data distribution

        

    def get_n_params(self):
        return sum(p.numel() for p in self.parameters() if p.requires_grad)
    # Utility method to count the total number of trainable parameters in the model

In [None]:
class ConvolutionalAutoEncoder(AutoEncoder):
# This class inherits from the AutoEncoder base class we discussed earlier, 
# extending it to use convolutional layers instead of fully connected layers.
    def __init__(self, input_size, code_size):
        self.input_size = list(input_size)  # shape of data sample

        self.hidden_size = my_hidden_size

        self.code_size = code_size  # code size

        super(ConvolutionalAutoEncoder, self).__init__(input_size, code_size)
        # Initializes with the same parameters as the base class
        # Sets hidden_size to 128 (32×2×2), which will be the size of the flattened representation before the final encoding
        # Calls the parent class initializer, but will override the encoder and decoder definitions
        
        self.encoder = nn.Sequential(
            nn.Conv2d(1,   hidden_fs//(2**3), 3, padding=1, stride=1), nn.LeakyReLU(negative_slope=0.3),
            nn.Conv2d(hidden_fs//(2**3),   hidden_fs//(2**3), 3, padding=1, stride=2), nn.LeakyReLU(negative_slope=0.3),
            nn.Conv2d(hidden_fs//(2**3),  hidden_fs//(2**2), 3, padding=1, stride=2), nn.LeakyReLU(negative_slope=0.3),
            nn.Conv2d(hidden_fs//(2**2), hidden_fs//(2**2), 3, padding=1, stride=2), nn.LeakyReLU(negative_slope=0.3),
            nn.Conv2d(hidden_fs//(2**2), hidden_fs//(2**1), 3, padding=1, stride=2), nn.LeakyReLU(negative_slope=0.3),
            nn.Conv2d(hidden_fs//(2**1), hidden_fs//(2**1), 3, padding=1, stride=2), nn.LeakyReLU(negative_slope=0.3),
            #nn.Conv2d(hidden_fs//(2**1), hidden_fs//(2**1), 3, padding=1, stride=2), nn.LeakyReLU(negative_slope=0.3),
            nn.Conv2d(hidden_fs//(2**1), hidden_fs, 3, padding=1, stride=2), nn.LeakyReLU(negative_slope=0.3),

            # Defines a series of convolutional layers for the encoder
            # Each layer increases the number of feature maps while reducing spatial dimensions
            # Starting with 1 channel (grayscale input), increasing to 8, then 16, then 32 channels
            # Uses padding=1 to maintain spatial dimensions before downsampling
            # Uses stride=2 in most layers to perform downsampling (reducing spatial dimensions by half)
            # Each convolution is followed by LeakyReLU activation with negative_slope=0.3

            nn.Flatten(),

            nn.Linear(self.hidden_size, self.hidden_size//8), nn.LeakyReLU(negative_slope=0.3),
            nn.Linear(self.hidden_size//8, self.code_size),
            # nn.Tanh(),
            # After the convolutional layers, flattens the 3D feature maps into a 1D vector
            # Applies two fully connected layers to reduce dimensions to the final code_size
            # The final Tanh activation is commented out
            
        )
        self.decoder = nn.Sequential(
            nn.Linear(self.code_size, self.hidden_size), nn.LeakyReLU(negative_slope=0.3),

            nn.Unflatten(1, (hidden_fs, hidden_l, hidden_l)),
            # The decoder starts with a fully connected layer to expand from code_size to hidden_size
            # Reshapes the 1D vector back to a 3D feature map with shape (32, 2, 2)

            nn.ConvTranspose2d(hidden_fs, hidden_fs//(2**1), 3, padding=1, output_padding=1, stride=2), nn.LeakyReLU(negative_slope=0.3),
            #nn.ConvTranspose2d(hidden_fs//(2**1), hidden_fs//(2**1), 3, padding=1, output_padding=1, stride=2), nn.LeakyReLU(negative_slope=0.3),
            nn.ConvTranspose2d(hidden_fs//(2**1), hidden_fs//(2**1), 3, padding=1, output_padding=1, stride=2), nn.LeakyReLU(negative_slope=0.3),
            nn.ConvTranspose2d(hidden_fs//(2**1), hidden_fs//(2**1), 3, padding=1, output_padding=1, stride=2), nn.LeakyReLU(negative_slope=0.3),
            nn.ConvTranspose2d(hidden_fs//(2**1), hidden_fs//(2**1), 3, padding=1, output_padding=1, stride=2), nn.LeakyReLU(negative_slope=0.3),
            nn.ConvTranspose2d(hidden_fs//(2**1),  hidden_fs//(2**2), 3, padding=1, output_padding=1, stride=2), nn.LeakyReLU(negative_slope=0.3),
            nn.ConvTranspose2d(hidden_fs//(2**2),   hidden_fs//(2**2), 3, padding=1, output_padding=1, stride=2), nn.LeakyReLU(negative_slope=0.3),

            # Uses transposed convolutions (also called deconvolutions) to upsample the feature maps
            # Mirrors the encoder structure in reverse, gradually decreasing channels and increasing spatial dimensions
            # output_padding=1 helps ensure the spatial dimensions match the original input after upsampling
            
            nn.Conv2d(hidden_fs//(2**2), 1, 3, padding=1, stride=1), nn.Tanh(),
            # Final convolution layer produces a single channel output (grayscale image)
            # Tanh activation constrains the output values to [-1, 1] range
        )

    def decode(self, z):
        reconstruction = self.decoder(z)
        reconstruction = reconstruction[:, :, 2:-2, 2:-2]
        return reconstruction

        # Overrides the parent class's decode method
        # Applies the decoder to the latent representation
        # Crops the reconstructed image by removing 2 pixels from each side
        # This cropping likely compensates for any dimension mismatches caused by the convolution/deconvolution operations

# This convolutional architecture is much more powerful for image data than the fully connected version, as it:
# 
# 1. Preserves spatial relationships in the data
# 2. Uses parameter sharing for efficiency
# 3. Can learn hierarchical features (edges, textures, patterns)
# 4. Generally results in better reconstructions and more meaningful latent representations for image data
# 
# The progression from a simple fully connected autoencoder to this convolutional version is a common step in improving deep learning models for image data.


In [None]:
CODE_SIZE = 50
MODEL_NAME = 'cdae_model'
in_size = train_images_tensor.shape[1:]
model = ConvolutionalAutoEncoder(input_size=in_size, code_size=CODE_SIZE).to(device)
train_images_tensor = train_images_tensor.to(device)
val_images_tensor = val_images_tensor.to(device)

In [None]:
#from torchinfo import summary
#summary(model, input_size=(1, 1, 572, 572))

In [None]:
# train the autoencoder model, for N_EPOCHS epochs,
# save history of loss values for training and validation sets,
# history of validation samples evolution, and model weights history,

N_EPOCHS = 35
LR = 0.0004


model_root = pl.Path(MODEL_NAME)
model_root.mkdir(exist_ok=True)



optimizer = optim.Adam(model.parameters(), lr=LR)

# implement loss explicitly
loss = nn.L1Loss()

# train the model
history = {'loss': [], 'val_loss': [], 'epoch': []}
sample_history = {}

pbar = tqdm.tqdm(range(0, N_EPOCHS), postfix=f'epoch 0/{N_EPOCHS}')
for epoch_idx in pbar:
    epoch_loss = 0
    model.train()
    #for batch_idx, (noisy_data, data, target) in enumerate(train_loader):

    optimizer.zero_grad()
    output = model(train_images_tensor)
    loss_value = loss(output, train_images_tensor)
    loss_value.backward()
    optimizer.step()
    epoch_loss += loss_value.detach().cpu().item()
    
    history['loss'].append(epoch_loss)
    history['epoch'].append(epoch_idx)
    # update progress bar

    # evaluate on validation set
    model.eval()
    with torch.no_grad():
        val_loss = 0
        #for batch_idx, (noisy_data, data, target) in enumerate(valid_loader):
        
        output = model(val_images_tensor)
        loss_value = loss(output, val_images_tensor)
        val_loss += loss_value.detach().cpu().item()
        
        history['val_loss'].append(val_loss)

    pbar.set_postfix({'epoch': f'{epoch_idx+1}/{N_EPOCHS}', 'loss':f'{epoch_loss:.4f}', 'val_loss':f'{val_loss:.4f}'})
    # evaluate on samples
    sample_res = eval_on_samples(model, epoch_idx, samples=samples_img)
    sample_history[epoch_idx] = sample_res

    # save model weights
    torch.save({
                'epoch': epoch_idx,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': loss
                }, model_root/f'model_{epoch_idx:03d}.pth')

In [None]:
plot_hist(history)

In [None]:
samples_img['images_noisy'][0:3, 0].shape

In [None]:
samples_img['images_noisy'].shape

In [None]:
single_el_idx = samples_img['single_el_idx']
plot_im_samples(samples_img['images_noisy'][single_el_idx, 0], n=20, is_torch=False)
plot_im_samples(samples_img['images'][single_el_idx, 0], n=20, is_torch=False)

### Latent space

In [None]:
%%capture
# Jupyter notebook cell magic to suppress output (useful for code that might produce verbose output)

single_el_idx = samples_img['single_el_idx']
images_noisy = samples_img['images_noisy'][single_el_idx, 0]
images = samples_img['images'][single_el_idx, 0]
# Extracts the indices for the selected sample images
# Gets the noisy input images and original clean images for these indices
# The , 0 selects the first channel (since these are grayscale images)

smpl_ims = []
for epoch_idx, hist_el in sample_history.items():
    samples_arr = [images_noisy, hist_el['y'][single_el_idx, 0], images]
    smpl_ims.append(samples_arr)
# Creates a list to store image arrays for each epoch
# For each epoch in the training history:
    # 
    # Creates an array containing [noisy inputs, reconstructions, original images]
    # Adds this array to the list

ny, nx = len(smpl_ims[0]), len(smpl_ims[0][0])
# Determines the number of rows (3: noisy, reconstructed, original) and columns (number of samples)

plt.rcParams["animation.html"] = "jshtml"  # for matplotlib 2.1 and above animations use JavaScript
# Sets matplotlib to use JavaScript for HTML animations in Jupyter

s=1
fig = plt.figure(figsize=(s*nx, s*ny))
# Creates a figure with size proportional to the number of images

m = mosaic(smpl_ims[0])

ttl = plt.title(f'after epoch {int(0)}')
# plot 0th epoch - 0th frame
imsh = plt.imshow(m, cmap='gray', vmin=-0.5, vmax=0.5)
# Creates the initial frame of the animation using the first epoch's images
# Uses the mosaic function to arrange the images in a grid
# Sets grayscale colormap with value range [-0.5, 0.5]

# this function will be called to render each of the frames
def animate(i):
    m = mosaic(smpl_ims[i])
    imsh.set_data(m)

    ttl.set_text(f'after epoch {i}')

    return imsh

# Defines a function to update the plot for each frame of the animation
# Creates a mosaic of images for the current epoch
# Updates the image data and title text
# Returns the updated image object

# create animation
ani = animation.FuncAnimation(fig, animate, frames=len(smpl_ims))

# Creates an animation that calls the animate function for each epoch
# The result is a dynamic visualization showing how reconstructions evolve throughout training

# This animation provides an intuitive way to observe the autoencoder's learning 
# progress, allowing you to see how the model gradually improves at reconstructing 
# the original images from the noisy inputs.



In [None]:
ani

In [None]:
%%capture
plt.rcParams["animation.html"] = "jshtml"  # for matplotlib 2.1 and above, uses JavaScript
fig = plt.figure(figsize=(8,8))
# Suppresses output with %%capture
# Sets matplotlib to use JavaScript for HTML animations
# Creates a square figure with size 8×8 inches

labels = samples_img['labels']
epochs = sorted(sample_history.keys())
z_res = [sample_history[ep]['z'][0] for ep in epochs]
# Gets the digit labels from the samples dictionary
# Creates a sorted list of all epoch numbers
# Extracts the latent space representations from each epoch

scat = plt.scatter(z_res[0][:,0], z_res[0][:,1], c=labels, cmap=cm.rainbow)
# Creates a scatter plot using the first two dimensions of the latent space from the first epoch
# Colors the points according to their digit labels (0-9)
# Uses the rainbow colormap to distinguish between different digits

plt.xlim(-6.1, 6.1)
plt.ylim(-6.1, 6.1)

ax = plt.gca()
legend1 = ax.legend(*scat.legend_elements(), title="digits")
ax.add_artist(legend1)
ax.set_aspect('equal')
ttl = plt.title(f'after epoch {0}')
# Sets fixed axis limits for consistent visualization across frames
# Gets the current axis
# Adds a legend showing the mapping between colors and digit classes
# Sets the aspect ratio to equal so circles appear as circles
# Adds a title showing the current epoch

def animate(i):
    z = z_res[i]
    scat.set_offsets(z)
    ttl.set_text(f'after epoch {i}')
    return scat

# Defines a function to update the plot for each animation frame
# Updates the scatter plot with the latent representations from the current epoch
# Updates the title text with the current epoch number
# Returns the updated scatter plot object

ani = animation.FuncAnimation(fig, animate, frames=len(z_res))

# Creates an animation that runs through all epochs

# This animation shows how the model progressively learns to organize 
# the latent space, with points representing the same digit class gradually 
# clustering together. It's a powerful visualization that helps understand 
# how the autoencoder is learning meaningful representations and 
# separating different classes in the latent space, even though it's 
# trained in an unsupervised manner without using the labels for training.

In [None]:
ani