<a href="https://colab.research.google.com/github/MilenaOehlers/generative_models_for_deep_radar_object_detection/blob/main/Autoencoder_and_VariationalAutoencoder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Following https://medium.com/@sofeikov/implementing-variational-autoencoders-from-scratch-533782d8eb95

In [65]:
import gzip
from urllib import request
import numpy as np
import torch.nn as nn
import torch

In [74]:
# Download the files
url = "http://yann.lecun.com/exdb/mnist/"
filenames = ['train-images-idx3-ubyte.gz', 'train-labels-idx1-ubyte.gz',
             't10k-images-idx3-ubyte.gz', 't10k-labels-idx1-ubyte.gz']
data = []
for filename in filenames:
    print("Downloading", filename)
    request.urlretrieve(url + filename, filename)
    with gzip.open(filename, 'rb') as f:
        if 'labels' in filename:
            # Load the labels as a one-dimensional array of integers
            data.append(np.frombuffer(f.read(), np.uint8, offset=8))
        else:
            # Load the images as a two-dimensional array of pixels
            data.append(np.frombuffer(f.read(), np.uint8, offset=16).reshape(-1,28*28))

# Split into training and testing sets
X_train, y_train, X_test, y_test = data

# Normalize the pixel values
X_train = X_train.astype(np.float32) / 255.0
X_test = X_test.astype(np.float32) / 255.0

# Convert labels to integers
y_train = y_train.astype(np.int64)
y_test = y_test.astype(np.int64)

Downloading train-images-idx3-ubyte.gz
Downloading train-labels-idx1-ubyte.gz
Downloading t10k-images-idx3-ubyte.gz
Downloading t10k-labels-idx1-ubyte.gz


In [67]:
def show_images(images, labels):
    """
    Display a set of images and their labels using matplotlib.
    The first column of `images` should contain the image indices,
    and the second column should contain the flattened image pixels
    reshaped into 28x28 arrays.
    """
    # Extract the image indices and reshaped pixels
    pixels = images.reshape(-1, 28, 28)

    # Create a figure with subplots for each image
    fig, axs = plt.subplots(
        ncols=len(images), nrows=1, figsize=(10, 3 * len(images))
    )

    # Loop over the images and display them with their labels
    for i in range(len(images)):
        # Display the image and its label
        axs[i].imshow(pixels[i], cmap="gray")
        axs[i].set_title("Label: {}".format(labels[i]))

        # Remove the tick marks and axis labels
        axs[i].set_xticks([])
        axs[i].set_yticks([])
        axs[i].set_xlabel("Index: {}".format(i))

    # Adjust the spacing between subplots
    fig.subplots_adjust(hspace=0.5)

    # Show the figure
    plt.show()

In [68]:
class AutoEncoder(nn.Module):
    def __init__(self):
        super().__init__()

        # Set the number of hidden units
        self.num_hidden = 8

        # Define the encoder part of the autoencoder
        self.encoder = nn.Sequential(
            nn.Linear(784, 256),  # input size: 784, output size: 256
            nn.ReLU(),  # apply the ReLU activation function
            nn.Linear(256, self.num_hidden),  # input size: 256, output size: num_hidden
            nn.ReLU(),  # apply the ReLU activation function
        )

        # Define the decoder part of the autoencoder
        self.decoder = nn.Sequential(
            nn.Linear(self.num_hidden, 256),  # input size: num_hidden, output size: 256
            nn.ReLU(),  # apply the ReLU activation function
            nn.Linear(256, 784),  # input size: 256, output size: 784
            nn.Sigmoid(),  # apply the sigmoid activation function to compress the output to a range of (0, 1)
        )

    def forward(self, x):
        # Pass the input through the encoder
        encoded = self.encoder(x)
        # Pass the encoded representation through the decoder
        decoded = self.decoder(encoded)
        # Return both the encoded representation and the reconstructed output
        return encoded, decoded

In [75]:
def train_ae(X_train, learning_rate=0.001,num_epochs=10,device="cpu",
             batch_size=32):
  # Convert the training data to PyTorch tensors
  X_train = torch.from_numpy(X_train)

  # Create the autoencoder model and optimizer
  model = AutoEncoder()
  optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

  # Define the loss function
  criterion = nn.MSELoss()

  # Set the device to GPU if available, otherwise use CPU
  model.to(device)

  # Create a DataLoader to handle batching of the training data
  train_loader = torch.utils.data.DataLoader(
      X_train, batch_size=batch_size, shuffle=True
  )
  # Training loop
  for epoch in range(num_epochs):
      total_loss = 0.0
      for batch_idx, data in enumerate(train_loader):
          # Get a batch of training data and move it to the device
          data = data.to(device)

          # Forward pass
          encoded, decoded = model(data)

          # Compute the loss and perform backpropagation
          loss = criterion(decoded, data)
          optimizer.zero_grad()
          loss.backward()
          optimizer.step()

          # Update the running loss
          total_loss += loss.item() * data.size(0)

      # Print the epoch loss
      epoch_loss = total_loss / len(train_loader.dataset)
      print(
          "Epoch {}/{}: loss={:.4f}".format(epoch + 1, num_epochs, epoch_loss)
      )

  return model

In [70]:
class VAE(AutoEncoder):
    def __init__(self):
        super().__init__()
        # Add mu and log_var layers for reparameterization
        self.mu = nn.Linear(self.num_hidden, self.num_hidden)
        self.log_var = nn.Linear(self.num_hidden, self.num_hidden)

    def reparameterize(self, mu, log_var):
        # Compute the standard deviation from the log variance
        std = torch.exp(0.5 * log_var)
        # Generate random noise using the same shape as std
        eps = torch.randn_like(std)
        # Return the reparameterized sample
        return mu + eps * std

    def forward(self, x):
        # Pass the input through the encoder
        encoded = self.encoder(x)
        # Compute the mean and log variance vectors
        mu = self.mu(encoded)
        log_var = self.log_var(encoded)
        # Reparameterize the latent variable
        z = self.reparameterize(mu, log_var)
        # Pass the latent variable through the decoder
        decoded = self.decoder(z)
        # Return the encoded output, decoded output, mean, and log variance
        return encoded, decoded, mu, log_var

    def sample(self, num_samples):
        with torch.no_grad():
            # Generate random noise
            z = torch.randn(num_samples, self.num_hidden).to(device)
            # Pass the noise through the decoder to generate samples
            samples = self.decoder(z)
        # Return the generated samples
        return samples

In [71]:
# Define a loss function that combines binary cross-entropy and Kullback-Leibler divergence
def loss_function(recon_x, x, mu, logvar):
    # Compute the binary cross-entropy loss between the reconstructed output and the input data
    BCE = F.binary_cross_entropy(recon_x, x.view(-1, 784), reduction="sum")
    # Compute the Kullback-Leibler divergence between the learned latent variable distribution and a standard Gaussian distribution
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    # Combine the two losses by adding them together and return the result
    return BCE + KLD

In [72]:
def train_vae(X_train, learning_rate=1e-3, num_epochs=10, batch_size=32):
    # Convert the training data to PyTorch tensors
    X_train = torch.from_numpy(X_train).to(device)

    # Create the autoencoder model and optimizer
    model = VAE()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    # Define the loss function
    criterion = nn.MSELoss(reduction="sum")

    # Set the device to GPU if available, otherwise use CPU
    model.to(device)

    # Create a DataLoader to handle batching of the training data
    train_loader = torch.utils.data.DataLoader(
        X_train, batch_size=batch_size, shuffle=True
    )

    # Training loop
    for epoch in range(num_epochs):
        total_loss = 0.0
        for batch_idx, data in enumerate(train_loader):
            # Get a batch of training data and move it to the device
            data = data.to(device)

            # Forward pass
            encoded, decoded, mu, log_var = model(data)

            # Compute the loss and perform backpropagation
            KLD = -0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())
            loss = criterion(decoded, data) + 3 * KLD
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Update the running loss
            total_loss += loss.item() * data.size(0)

        # Print the epoch loss
        epoch_loss = total_loss / len(train_loader.dataset)
        print(
            "Epoch {}/{}: loss={:.4f}".format(epoch + 1, num_epochs, epoch_loss)
        )

    # Return the trained model
    return model

In [76]:
ae_model = train_ae(X_train)

Epoch 1/10: loss=0.0390
Epoch 2/10: loss=0.0308
Epoch 3/10: loss=0.0269
Epoch 4/10: loss=0.0254
Epoch 5/10: loss=0.0246
Epoch 6/10: loss=0.0241
Epoch 7/10: loss=0.0237
Epoch 8/10: loss=0.0233
Epoch 9/10: loss=0.0230
Epoch 10/10: loss=0.0228


In [77]:
vae_model = train_vae(X_train)

Epoch 1/10: loss=1623.8221
Epoch 2/10: loss=1484.7464
Epoch 3/10: loss=1462.6265
Epoch 4/10: loss=1447.3864
Epoch 5/10: loss=1436.3919
Epoch 6/10: loss=1428.9828
Epoch 7/10: loss=1423.6197
Epoch 8/10: loss=1418.4951
Epoch 9/10: loss=1415.6623
Epoch 10/10: loss=1412.5767
