In [2]:
### Import all the libraries used
import os
import csv
import pickle
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
import time

### Set random seed
seed = 24
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
os.environ["PYTHONHASHSEED"] = str(seed)


# record start time
_START_RUNTIME = time.time()

# Define data and weight path
DATA_PATH = "../HW4_Autoencoder-lib/data"

## 1 Load and Visualize the Data [10 points]

The data is under `DATA_PATH`. In this part, you are required to load the data into the data loader, and calculate some statistics.

In [3]:
#input
# folder: str, 'train', 'val', or 'test'
#output
# number_normal: number of normal samples in the given folder
# number_pneumonia: number of pneumonia samples in the given folder
def get_count_metrics(folder, data_path=DATA_PATH):
    
    '''
    TODO: Implement this function to return the number of normal and pneumonia samples.
          Hint: !ls $DATA_PATH
    '''
    normal_path = os.path.join(data_path, folder, 'NORMAL')
    pneumonia_path = os.path.join(data_path, folder, 'PNEUMONIA')
    number_normal = len(os.listdir(normal_path))
    number_pneumonia = len(os.listdir(pneumonia_path))
    # your code here
#     raise NotImplementedError

    return number_normal, number_pneumonia


#output
# train_loader: train data loader (type: torch.utils.data.DataLoader)
# val_loader: val data loader (type: torch.utils.data.DataLoader)
def load_data(data_path=DATA_PATH):
    
    '''
    TODO: Implement this function to return the data loader for 
    train and validation dataset. Set batchsize to 32.
    
    You should add the following transforms (https://pytorch.org/docs/stable/torchvision/transforms.html):
        1. transforms.RandomResizedCrop: the images should be cropped to 224 x 224
        2. transforms.RandomResizedCrop: the images should be compressed to 24 x 24
        3. transforms.ToTensor: just to convert data/labels to tensors
        4. flatten_transform: to flatten the images away from their 3 x 24 x 24 representation (provided)
    You should set the *shuffle* flag for *train_loader* to be True, and False for *val_loader*.
    
    HINT: Consider using `torchvision.datasets.ImageFolder`.
    '''

    import torchvision
    import torchvision.datasets as datasets
    import torchvision.transforms as transforms

    flatten_transform = transforms.Lambda(lambda x: torch.flatten(x))
    # your code here
    transform = transforms.Compose([transforms.RandomResizedCrop(224),transforms.Resize(24),transforms.ToTensor(),flatten_transform])
    train_data = datasets.ImageFolder(os.path.join(data_path, 'train'), transform=transform)
    val_data = datasets.ImageFolder(os.path.join(data_path, 'val'), transform=transform)
    train_loader = torch.utils.data.DataLoader(train_data, batch_size=32, shuffle=True)
    val_loader = torch.utils.data.DataLoader(val_data, batch_size=32, shuffle=False)
#     raise NotImplementedError
    
    return train_loader, val_loader

In [6]:
# DO NOT MODIFY THIS PART


import torchvision
import matplotlib.pyplot as plt

def imshow(img, title):
    npimg = img.numpy()
    plt.figure(figsize=(15, 7))
    plt.axis('off')
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.title(title)
    plt.show()

def show_batch_images(dataloader, k=8):
    images, labels = next(iter(dataloader))
    images = images.reshape(-1, 3, 24, 24)
    images = images[:k]
    labels = labels[:k]
    img = torchvision.utils.make_grid(images, padding=3)
    imshow(img, title=["NORMAL" if x==0  else "PNEUMONIA" for x in labels])

train_loader, val_loader = load_data()   
for i in range(2):
    show_batch_images(train_loader)

---

In [None]:
"""
TODO: Build the MLP shown above.
HINT: Consider using `nn.Linear`, `torch.relu`, and `torch.sigmoid`.
"""

class VanillaAutoencoder(nn.Module):
    def __init__(self):
        super(VanillaAutoencoder, self).__init__()
        
        # DO NOT change the names
        self.fc1 = nn.Linear(1728, 128)
        self.fc2 = nn.Linear(128, 16)
        self.fc3 = nn.Linear(16, 128)
        self.fc4 = nn.Linear(128, 1728)
        
        """
        TODO: Initialize the model layers as shown above.
        """
        # your code here
#         raise NotImplementedError
        
    def encode(self, x):
        """
        TODO: Perform encoding operation with fc1, fc2, and the corresponding activation function.
        """
        # your code here
        raise NotImplementedError
        
    def decode(self, x):
        x = torch.relu(self.fc3(x))
        x = torch.sigmoid(self.fc4(x))
        return x

    def forward(self, x):
        return self.decode(self.encode(x))  

# initialize the NN
model = VanillaAutoencoder()
print(model)

In [None]:
"""
TODO: Build the MLP shown above.
HINT: Consider using `nn.Linear` and `torch.sigmoid`.
"""

class SparseAutoencoder(nn.Module):
    def __init__(self):
        super(SparseAutoencoder, self).__init__()
        
        # DO NOT change the names
        self.fc1 = None
        self.fc2 = None
        self.fc3 = None
        self.fc4 = None
        
        """
        TODO: Initialize the model layers as shown above.
        """
        # your code here
        raise NotImplementedError
        
        # used in training as sparsity regularization
        self.data_rho = 0
        
    def encode(self, x):
        """
        TODO: Perform encoding operation with fc1, fc2, and the corresponding activation function.
        """
        # your code here
        raise NotImplementedError
        
    def decode(self, x):
        """
        TODO: Perform decoding operation with fc3, fc4, and the corresponding activation function.
        """
        # your code here
        raise NotImplementedError

    def forward(self, x):
        x = self.encode(x)
        self.data_rho = x.mean(0)
        x = self.decode(x)
        return x
    

# initialize the NN
model = SparseAutoencoder()
print(model)

In [None]:
"""
TODO: Build the MLP shown above.
HINT: Consider using `nn.Linear`, `torch.relu`, and `torch.sigmoid`.
"""

class DenoisingAutoencoder(nn.Module):
    def __init__(self):
        super(DenoisingAutoencoder, self).__init__()
        
        # DO NOT change the names
        self.fc1 = None
        self.fc2 = None
        self.fc3 = None
        self.fc4 = None
        
        """
        TODO: Initialize the model layers as shown above.
        """
        # your code here
        raise NotImplementedError
        
    def encode(self, x):
        """
        TODO: Perform encoding operation with fc1, fc2, and the corresponding activation function.
        """
        # your code here
        raise NotImplementedError
        
    def decode(self, x):
        x = torch.relu(self.fc3(x))
        x = torch.sigmoid(self.fc4(x))
        return x

    def forward(self, x):
        noise = None
        std = 0.1
        mean = 0
        """
        TODO: Generate the noise from the normal distribution with the above mean and std.
        
        Note that the size of the noise should be the same as x.
        
        Hint: Use torch.randn().
        """
        # your code here
        raise NotImplementedError
        x = x + noise
        return self.decode(self.encode(x))  

# initialize the NN
model = DenoisingAutoencoder()
print(model)

In [None]:
"""
TODO: Build the StackedAutoencoder using your VanillaAutoencoder architecture.
"""

class StackedAutoencoder(nn.Module):
    def __init__(self):
        super(StackedAutoencoder, self).__init__()
        
        # DO NOT change the names
        self.ae1 = None
        self.ae2 = None
        self.ae3 = None
        
        """
        TODO: Initialize three Vanilla Autoencoders and assign them to self.ae1, self.ae2, self.ae3, respectively.
        """
        # your code here
        raise NotImplementedError

    def forward(self, x):
        x = self.ae1(x)
        x = self.ae2(x)
        x = self.ae3(x)
        return x
        
    def encode(self, x):
        """
        TODO: While we didn't implement the forward() function of the
        StackedAutoencoder as using an encode() and decode() function, 
        we may still be interested in the future of extracting the 
        compressed representation. So, implement the encode function
        to return the compressed representation from the third
        VanillaAutoencoder component (note you will have to call its 
        encode function).
        """
        # your code here
        raise NotImplementedError

# initialize the NN
model = StackedAutoencoder()
print(model)

In [None]:
"""
TODO: Define the loss (MSELoss), assign it to `criterion`.

REFERENCE: https://pytorch.org/docs/stable/generated/torch.nn.MSELoss.html#torch.nn.MSELoss
"""

criterion = None

# your code here
raise NotImplementedError

In [None]:
from sklearn.metrics import *

#input: Y_pred,Y_true
#output: mean squared error, mean absolute error
def classification_metrics(X_reconstructed, X_original):
    mse, mae = mean_squared_error(X_original, X_reconstructed), \
               mean_absolute_error(X_original, X_reconstructed)
    return mse, mae



#input: model, loader
def evaluate(model, loader):
    model.eval()
    all_X_original = torch.FloatTensor()
    all_X_reconstructed = torch.FloatTensor()
    for x, _ in loader:
        x_reconstructed = model(x)
        """
        TODO: Add the correct values to the lists in order to keep a
        running tab of all of the original and reconstructed inputs.
        
        Hint: use torch.cat().
        """
        # your code here
        raise NotImplementedError
        
    mse, mae = classification_metrics(all_X_reconstructed.detach().numpy(), all_X_original.detach().numpy())
    print(f"mse: {mse:.3f}, mae: {mae:.3f}")
    return mse, mae

In [None]:
print("model perfomance before training:")
# initialized the model
model = VanillaAutoencoder()
mae_train_init = evaluate(model, train_loader)[1]
mae_val_init = evaluate(model, val_loader)[1]

In [None]:
"""
TODO: Define the optimizer (Adam) with learning rate 0.001, assign it to `optimizer`.

REFERENCE: https://pytorch.org/docs/stable/optim.html
"""
def get_optimizer(model):
    optimizer = None

    # your code here
    raise NotImplementedError
   
    return optimizer

In [None]:
def train_model(model):
    # number of epochs to train the model
    n_epochs = 10
    
    # get the correct type of optimizer for the model
    optimizer = get_optimizer(model)

    # prep model for training
    model.train()

    train_loss_arr = []
    for epoch in range(n_epochs):

        train_loss = 0
        for x, _ in train_loader:
            """ Step 1. clear gradients """
            optimizer.zero_grad()
            """ 
            TODO: Step 2. perform forward pass using `model`, save the output to x_reconstructed;
                  Step 3. calculate the loss using `criterion`, save the output to loss.
                      If the model is a SparseAutoencoder, the loss will have an additional
                      regularization penalty. This is calculated by:
                          average of (- rho * log(data_rho)  +  (1 - rho) * log(1 - data_rho))
                      where we will use rho of 0.1
            """
            x_reconstructed = None
            loss = None
            # your code here
            raise NotImplementedError
            if isinstance(model, SparseAutoencoder):
                penalty = None
                rho = 0.1
                data_rho = model.data_rho
                # your code here
                raise NotImplementedError
                loss = loss + (0.5 * penalty)
            """ Step 4. backward pass """
            loss.backward()
            """ Step 5. optimization """
            optimizer.step()
            """ Step 6. record loss """
            train_loss += loss.item()

        train_loss = train_loss / len(train_loader)
        if epoch % 2 == 0:
            train_loss_arr.append(np.mean(train_loss))
            print('Epoch: {} \tTraining Loss: {:.6f}'.format(epoch, train_loss))
            evaluate(model, val_loader)
            
    return model, train_loss_arr

In [None]:
vanilla_model = VanillaAutoencoder()
vanilla_model, vanilla_train_loss_arr = train_model(vanilla_model)

In [None]:
sparse_model = SparseAutoencoder()
sparse_model, sparse_train_loss_arr = train_model(sparse_model)

In [None]:
denoising_model = DenoisingAutoencoder()
denoising_model, denoising_train_loss_arr = train_model(denoising_model)

In [None]:
stacked_model = StackedAutoencoder()
stacked_model, stacked_train_loss_arr = train_model(stacked_model)