## Tutorial 10, Question 3 (Deep Stacked Classifier)

In [1]:
import os
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

if not os.path.isdir('figures'):
    print('creating the figures folder')
    os.makedirs('figures')

creating the figures folder


In [2]:
noise_prob = 0.1

# Add mulplicative noise
def add_noise(x):
    noise = torch.bernoulli(torch.full_like(x, 1 - noise_prob))  # 0.1 is the noise level
    noisy_x = x * noise
    return noisy_x

## Autoencoder with Tied Weights

In this section, we define a class `Autoencoder` using PyTorch's `nn.Module` to implement an autoencoder with tied weights. The autoencoder is a type of neural network used to learn efficient representations of the input data, typically for the purpose of dimensionality reduction or feature learning.

### Architecture Details:

- **Initialization Method:**
  The `__init__` method initializes the autoencoder with the following layers and parameters:

  - **Encoder**:
    - `W1`: The weight matrix for the first hidden layer in the encoder, with a size of `(n_hidden1, n_in)`. It is initialized with random values scaled by `sqrt(1/n_in)` for Xavier initialization.
    - `b1`: The bias vector for the first hidden layer in the encoder, initialized to zeros.

  - **Decoder**:
    - `b1_prime`: The bias vector for reconstructing the input from the first hidden layer's representation, initialized to zeros. Note that the weight matrix `W1` is tied to its transpose for the decoding process.

  - **Second Hidden Layer**:
    - `W2`: The weight matrix for the second hidden layer, with a size of `(n_hidden2, n_hidden1)`, initialized similarly to `W1`.
    - `b2`: The bias vector for the second hidden layer, initialized to zeros.
    - `b2_prime`: The bias vector for reconstructing the first hidden layer from the second hidden layer's representation, initialized to zeros.

  - **Classifier**:
    - `W3`: The weight matrix for the output layer, with a size of `(n_out, n_hidden2)`, initialized using the same strategy as `W1` and `W2`.
    - `b3`: The bias vector for the output layer, initialized to zeros.

### Forward Pass:

The `forward` method defines the forward pass of the autoencoder:

- The input `x` is transformed through the encoder to produce the first hidden layer representation `h1`.
- The representation `h1` is then used to reconstruct the input `y1` using the transpose of `W1` (implementing tied weights).
- The hidden layer `h1` is further encoded to produce the second hidden layer representation `h2`.
- The second hidden layer `h2` is used to reconstruct `h1` in `y2` using the transpose of `W2` (again using tied weights).
- Finally, `h2` is transformed to the output `y3` using the weights `W3` and bias `b3`.

The activation function used in each step is the sigmoid function, which adds non-linearity to the transformations.

This setup allows the autoencoder to learn to compress the input data into a more compact representation and then reconstruct the input from this representation as closely as possible, while also having the ability to produce a classification output.



In [3]:
# Define the Autoencoder architecture with tied weights
class Autoencoder(nn.Module):
    def __init__(self, n_in, n_hidden1, n_hidden2, n_out):
        super(Autoencoder, self).__init__()
        # Encoder
        self.W1 = nn.Parameter(torch.randn(n_hidden1, n_in) * np.sqrt(1. / n_in))
        self.b1 = nn.Parameter(torch.zeros(n_hidden1))

        # Decoder
        self.b1_prime = nn.Parameter(torch.zeros(n_in))

        # Second hidden layer
        self.W2 = nn.Parameter(torch.randn(n_hidden2, n_hidden1) * np.sqrt(1. / n_hidden1))
        self.b2 = nn.Parameter(torch.zeros(n_hidden2))
        self.b2_prime = nn.Parameter(torch.zeros(n_hidden1))

        # Classifier
        self.W3 = nn.Parameter(torch.randn(n_out, n_hidden2) * np.sqrt(1. / n_hidden2))
        self.b3 = nn.Parameter(torch.zeros(n_out))

    def forward(self, x):
        h1 = torch.sigmoid(F.linear(x, self.W1, self.b1))
        y1 = torch.sigmoid(F.linear(h1, self.W1.t(), self.b1_prime))
        h2 = torch.sigmoid(F.linear(h1, self.W2, self.b2))
        y2 = torch.sigmoid(F.linear(h2, self.W2.t(), self.b2_prime))
        y3 = F.linear(h2, self.W3, self.b3)
        return h1, y1, h2, y2, y3

In [4]:
# Loss functions
def loss_ae(h, y, original):
    rho = 0.02
    mse_loss = torch.mean(torch.sum((original - y) ** 2, dim=1))
    sparse_loss = torch.sum(rho * torch.log(rho / torch.mean(h, dim=0)) + (1 - rho) * torch.log((1 - rho) / (1 - torch.mean(h, dim=0))))
    return mse_loss + 0.4 * sparse_loss

def loss_class(output, target):
    # Cross-Entropy loss
    return F.cross_entropy(output, target)

def accuracy(output, target):
    pred = output.argmax(dim=1, keepdim=True)
    return pred.eq(target.view_as(pred)).float().mean()

## Training Function for a Stepwise Autoencoder Model

The following function, `train`, is used to train an autoencoder model in a stepwise fashion. This approach can be used to train the autoencoder and classifier components separately.

### Parameters:

- `model`: This is the autoencoder model to be trained.
- `train_loader`: The data loader that provides batches of training data.
- `optimizer`: The optimization algorithm used to update the weights of the model.
- `step`: An integer value that indicates the current training step or phase.
- `device`: The device (CPU or GPU) on which the model will be trained.

### Function Overview:

- The function sets the model to training mode using `model.train()`.
- It initializes `train_loss` to keep track of the cumulative loss for the epoch.
- The function then iterates over the `train_loader`, fetching batches of data and corresponding targets.
- It moves the data and targets to the specified `device`.
- Depending on the value of `step`, it executes a different part of the model:
  - **Step 1**: Trains the encoder and the first part of the decoder, optimizing the reconstruction loss between the input data and its first stage reconstruction `y1`.
  - **Step 2**: Continues training by optimizing the reconstruction loss between the first hidden representation `h1` and its reconstruction `y2`.
  - **Step 3**: Trains the classifier part of the model, optimizing the classification loss between the final output `y3` and the target labels.
- For each step, the function:
  - Clears the gradients of the optimizer.
  - Computes the loss using a predefined loss function specific to each step (`loss_ae` for steps 1 and 2, `loss_class` for step 3).
  - Accumulates the loss to `train_loss`.
  - Performs backpropagation using `loss.backward()`.
  - Updates the model parameters using `optimizer.step()`.

### Return Value:

- The function returns the average training loss for the epoch, which is the cumulative `train_loss` divided by the total number of items in the dataset.

By dividing the training process into steps, the model can first learn to encode and decode the input before focusing on the classification task, which can sometimes lead to better generalization and easier training.



In [5]:
# Training function
def train(model, train_loader, optimizer, step, device):
    model.train()
    train_loss = 0
    for data, target in train_loader:
        data, target = data.to(device), target.to(device)
        noisy_data = add_noise(data)
        optimizer.zero_grad()
        if step == 1:
            h1, y1, _, _, _ = model(noisy_data)
            loss = loss_ae(h1, y1, data)
            train_loss += loss.item()
            loss.backward()
            optimizer.step()
        elif step == 2:
            h1, _, h2, y2, _ = model(noisy_data)
            loss = loss_ae(h2, y2, h1)
            train_loss += loss.item()
            loss.backward()
            optimizer.step()
        elif step == 3:
            _, _, _, _, y3 = model(data)
            loss = loss_class(y3, target)
            train_loss += loss.item()
            loss.backward()
            optimizer.step()
    return train_loss / len(train_loader.dataset)

In [6]:
# Test function
def test(model, test_loader, device):
    model.eval()
    test_acc = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            _, _, _, _, output = model(data)
            test_acc += accuracy(output, target).item()
    return test_acc / len(test_loader)

## MNIST Data Loaders with Flatten Transform

To prepare the data loaders for the MNIST dataset, we are using the `datasets.MNIST` class provided by PyTorch's `torchvision` module. We apply a series of transforms to the data to prepare it for input into our neural network model.

### Data Loading and Transformations:

- **Training Dataset**:
  - We specify `train=True` to indicate we want the training portion of the MNIST dataset.
  - The `download=True` parameter tells the loader to download the data if it's not present in the specified directory (`./data`).
  - We then define a composition of transforms:
    - `transforms.ToTensor()`: Converts the PIL Image or numpy.ndarray to a float tensor and scales the image's intensity values in the range [0., 1.].
    - `transforms.Lambda(lambda x: torch.bernoulli(x))`: Applies a Bernoulli sampling to the input tensor, effectively binarizing the image. Each pixel will be set to 1 with a probability equal to its intensity value.
    - `transforms.Lambda(lambda x: x.view(-1))`: Flattens the image into a 1D tensor. The `-1` in the `view` function call infers the correct dimension for flattening.

- **Test Dataset**:
  - We specify `train=False` to load the test portion of the MNIST dataset.
  - The transformations are similar to the training dataset, except we do not apply the Bernoulli sampling. This is typically because we want to evaluate the model on unaltered test data.
    - `transforms.ToTensor()`: Scales and converts the image to a tensor.
    - `transforms.Lambda(lambda x: x.view(-1))`: Flattens the image into a 1D tensor for consistency with the training data format.

The resulting `train_dataset` and `test_dataset` are PyTorch Dataset objects that are ready to be wrapped by a `DataLoader` for batch processing and shuffling.



In [7]:
# MNIST data loaders with added Flatten transform
train_dataset = datasets.MNIST('./data', train=True, download=True,
                               transform=transforms.Compose([
                                   transforms.ToTensor(),
                                   transforms.Lambda(lambda x: x.view(-1))  # Flatten the images
                               ]))

test_dataset = datasets.MNIST('./data', train=False,
                              transform=transforms.Compose([
                                  transforms.ToTensor(),
                                  transforms.Lambda(lambda x: x.view(-1))  # Flatten the images
                              ]))

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1000, shuffle=True)


Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to ./data/MNIST/raw/train-images-idx3-ubyte.gz


100%|██████████| 9912422/9912422 [00:00<00:00, 108378459.83it/s]


Extracting ./data/MNIST/raw/train-images-idx3-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz to ./data/MNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 28881/28881 [00:00<00:00, 37678287.35it/s]


Extracting ./data/MNIST/raw/train-labels-idx1-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz to ./data/MNIST/raw/t10k-images-idx3-ubyte.gz


100%|██████████| 1648877/1648877 [00:00<00:00, 29785868.27it/s]


Extracting ./data/MNIST/raw/t10k-images-idx3-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz to ./data/MNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 4542/4542 [00:00<00:00, 3691962.94it/s]


Extracting ./data/MNIST/raw/t10k-labels-idx1-ubyte.gz to ./data/MNIST/raw



In [8]:
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Model, optimizer and steps
model = Autoencoder(784, 625, 100, 10).to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-4)

In [9]:
# Train the model
num_epochs = 10
steps = 3
for step in range(1, steps + 1):
    for epoch in range(num_epochs):
        train_loss = train(model, train_loader, optimizer, step, device)
        print(f"Step {step}, Epoch [{epoch + 1}/{num_epochs}], Loss: {train_loss:.4f}")

        if epoch % 10 == 0:
            test_acc = test(model, test_loader, device)
            print(f"Test Accuracy: {test_acc:.4f}")

# Save the model checkpoint
torch.save(model.state_dict(), 'sparse_autoencoder.pth')

Step 1, Epoch [1/10], Loss: 1.7722
Test Accuracy: 0.1028
Step 1, Epoch [2/10], Loss: 0.9480
Step 1, Epoch [3/10], Loss: 0.7299
Step 1, Epoch [4/10], Loss: 0.6265
Step 1, Epoch [5/10], Loss: 0.5538
Step 1, Epoch [6/10], Loss: 0.4964
Step 1, Epoch [7/10], Loss: 0.4475
Step 1, Epoch [8/10], Loss: 0.4045
Step 1, Epoch [9/10], Loss: 0.3671
Step 1, Epoch [10/10], Loss: 0.3356
Step 2, Epoch [1/10], Loss: 0.9001
Test Accuracy: 0.1136
Step 2, Epoch [2/10], Loss: 0.3838
Step 2, Epoch [3/10], Loss: 0.1941
Step 2, Epoch [4/10], Loss: 0.1054
Step 2, Epoch [5/10], Loss: 0.0664
Step 2, Epoch [6/10], Loss: 0.0473
Step 2, Epoch [7/10], Loss: 0.0361
Step 2, Epoch [8/10], Loss: 0.0285
Step 2, Epoch [9/10], Loss: 0.0227
Step 2, Epoch [10/10], Loss: 0.0180
Step 3, Epoch [1/10], Loss: 0.0234
Test Accuracy: 0.8543
Step 3, Epoch [2/10], Loss: 0.0086
Step 3, Epoch [3/10], Loss: 0.0060
Step 3, Epoch [4/10], Loss: 0.0050
Step 3, Epoch [5/10], Loss: 0.0044
Step 3, Epoch [6/10], Loss: 0.0041
Step 3, Epoch [7/10], 

In [10]:
# Do a final round of testing to check the accuracy
test_acc = test(model, test_loader, device)
print(f"Test Accuracy: {test_acc:.4f}")

Test Accuracy: 0.9419
