In [1]:
"""
Libraries

"""

from typing import Callable

import matplotlib.pyplot as plt

import numpy as np

import torch
import torch.nn.functional as F
import torch.optim as optim
import torchvision

from torchvision import transforms
from scipy.optimize import linear_sum_assignment
from sklearn.metrics import accuracy_score, confusion_matrix
from torch.utils.data import DataLoader

  from .autonotebook import tqdm as notebook_tqdm


In [11]:
"""
Setting generic hyperparameters

"""

num_epochs: int = 5#20
batch_size: int = 250   # Should be set to a power of 2.
# Learning rate
lr:         float = 0.002

In [9]:
"""
Data Preprocessing

"""

from torch.utils.data import Dataset
from torchvision.datasets import MNIST
from torchvision.transforms import ToTensor

class MNISTDataset(Dataset):
    def __init__(self, train=True, augment=False):
        super().__init__()

        self.train = train
        self.augment = augment

        # Load the MNIST dataset
        self.mnist = MNIST(
            root='./data',
            train=self.train,
            download=True,
            transform=ToTensor())

        # Apply data augmentation if requested
        if self.augment:
            self.mnist_augmented = self._augment_data(self.mnist)

    def __len__(self):
        return len(self.mnist)

    def __getitem__(self, index):
        if self.augment:
            # Return the augmented and original image at the given index
            return self.mnist_augmented[index], self.mnist[index][1]
        else:
            # Return the original image and label at the given index
            return self.mnist[index]

    def _augment_data(self, dataset):
        # Define the data augmentation transforms to apply
        augmentations = torchvision.transforms.Compose([
                        torchvision.transforms.RandomRotation(degrees=15),
                        torchvision.transforms.RandomHorizontalFlip(p=0.5),
                        torchvision.transforms.RandomVerticalFlip(p=0.5),
                        torchvision.transforms.RandomCrop((28, 28), padding=0),
                        torchvision.transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1)
        ])

        # Create a new dataset by applying the transforms to the original dataset
        mnist_augmented = [(augmentations(img), label) for img, label in dataset]

        return mnist_augmented

# Create the train and test datasets
train_dataset = MNISTDataset(train=True, augment=False)
test_dataset  = MNISTDataset(train=False)

# Create the train and test data loaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader  = DataLoader(test_dataset, batch_size=batch_size)

In [372]:
"""
Data Preprocessing

"""

#TODO Preprocess the AILARON dataset to a suitable format.

# #TODO Implement custome dataset for AILARON data. Should inherit from torch.utils.data.Dataset
# class AILARONDataset(torchvision.Dataset):

#     def __init__(self):
#         # Load data
#         pass

#     def __getitem__(self, index):
#         # TODO
#         pass
#     def __len__(self):
#         # TODO
#         pass 

# ailaron_train = AILARONDataset()
# dataloader = DataLoader(dataset=ailaron_train, batch_size=batch_size, shuffle=True)

# Load MNIST dataset, normalizes data and transform to tensor.
mnist_train = torchvision.datasets.MNIST(root='./data', train=True, download=True, transform=torchvision.transforms.ToTensor())
mnist_test  = torchvision.datasets.MNIST(root='./data', train=False, download=False, transform=torchvision.transforms.ToTensor())


# # Create a subset of the MNIST dataset with the first 100 examples
# mnist_train_subset = torch.utils.data.Subset(mnist_train, range(3000))
# mnist_test_subset  = torch.utils.data.Subset(mnist_test, range(32))

# Get a random image from the dataset
# image, label = mnist_train[np.random.randint(0, len(mnist_train))]

# # Plot the image
# plt.imshow(image[0], cmap='gray')
# plt.title(f'Label: {label}')
# plt.show()

# Create DataLoader
train_loader = DataLoader(mnist_train, batch_size=batch_size, shuffle=True)
test_loader  = DataLoader(mnist_test, batch_size=batch_size, shuffle=True)

In [373]:
"""
Preprocessing Data
"""

# define data augmentation transforms
transform_aug = transforms.Compose([
    transforms.RandomRotation(20),
    transforms.RandomCrop(28, padding=4),
    transforms.ColorJitter(brightness=0.1),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (1/255,))      # Normalize to range [-1, 1]
])

# define the transformation to be applied on the original dataset
transform_orig = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (1/255,))      # Normalize to range [-1, 1]
])

# load the MNIST dataset
mnist_train = torchvision.datasets.MNIST(root='./data', train=True, download=True)

mnist_train_aug = [((transform_orig(image), transform_aug(image)), label) for image, label in mnist_train]

# create a data loader for the combined dataset
trainloader = torch.utils.data.DataLoader(mnist_train_aug, batch_size=batch_size, shuffle=True)

In [6]:
"""
Unsupervised Machine Learning Framework

"""

def train(model, train_loader: DataLoader, criterion: Callable, optimizer: torch.optim, num_epochs: int) -> None:
    """
    Trains a given model using the provided training data, optimizer and loss criterion for a given number of epochs.

    Args:
        model: Neural network model to train.
        train_loader: PyTorch data loader containing the training data.
        criterion: Loss criterion used for training the model.
        optimizer: Optimizer used to update the model's parameters.
        num_epochs: Number of epochs to train the model.

    Returns:
        None
    """
    # Loop over the epochs
    for epoch in range(num_epochs):

        # Initialize running loss for the epoch
        running_loss = 0.0
        running_acc  = 0.0

        # Loop over the mini-batches in the data loader
        for _, data in enumerate(train_loader):
        
            # Get the inputs and labels for the mini-batch and reshape
            inputs, labels = data
            inputs         = inputs.view(-1, 28*28)
        
            # Zero the parameter gradients
            optimizer.zero_grad()
        
            # Forward pass through the model
            outputs = F.softmax(model(inputs), dim=1)
        
            # Compute the loss
            loss = criterion(model, inputs, outputs)
            # Backward pass through the model and compute gradients
            loss.backward()
        
            # Update the weights
            optimizer.step()

            # Accumulate the loss for the mini-batch
            running_loss += loss.item()
            # Accumulate the accuracy for the mini-batch
            running_acc  += unsupervised_clustering_accuracy(labels, torch.argmax(outputs, dim=1))

        # Compute the average loss for the epoch and print
        print(f"Epoch {epoch+1} loss: {running_loss/len(train_loader):.4f}, ACC: {running_acc/len(train_loader):.4f}")


def unsupervised_clustering_accuracy(y_true: torch.Tensor, y_pred: torch.Tensor) -> float:
    """
    Computes the unsupervised clustering accuracy between two clusterings.
    Uses the Hungarian algorithm to find the best matching between true and predicted labels.

    Args:
        y_true: true cluster labels as a 1D torch.Tensor
        y_pred: predicted cluster labels as a 1D torch.Tensor

    Returns:
        accuracy: unsupervised clustering accuracy as a float
    """
    # Create confusion matrix
    cm = confusion_matrix(y_pred, y_true)

    # Compute best matching between true and predicted labels using the Hungarian algorithm
    _, col_ind = linear_sum_assignment(-cm)

    # Reassign labels for the predicted clusters
    y_pred_reassigned = torch.tensor(col_ind)[y_pred.long()]

    # Compute accuracy as the percentage of correctly classified samples
    acc = accuracy_score(y_true, y_pred_reassigned)

    return acc


def test_classifier(model, test_loader: DataLoader) -> None:
    """
    Testing a classifier given the model and a test set.

    Args:
        model: Neural network model to train.
        test_loader: PyTorch data loader containing the test data.
    
    Returns:
        None
    """
    
    # Disable gradient computation, as we don't need it for inference
    model.eval()
    # Initialize tensors for true and predicted labels
    y_true = torch.zeros(len(test_loader.dataset))
    y_pred = torch.empty(len(test_loader.dataset))

    with torch.no_grad():
        # Iterate over the mini-batches in the data loader
        for i, data in enumerate(test_loader):
            # Get the inputs and true labels for the mini-batch and reshape
            inputs, labels_true = data
            inputs = inputs.view(-1, 28*28)

            # Forward pass through the model to get predicted labels
            labels_pred = F.softmax(model(inputs), dim=1)

            # Store predicted and true labels in tensors
            y_pred[i*len(labels_true):(i+1)*len(labels_true)] = torch.argmax(labels_pred, dim=1)
            y_true[i*len(labels_true):(i+1)*len(labels_true)] = labels_true

    # Compute unsupervised clustering accuracy score
    acc = unsupervised_clustering_accuracy(y_true, y_pred)

    print(f"\nThe unsupervised clustering accuracy score of the classifier is: {acc}")


In [7]:
"""
Unsupervised Machine Learning Algorithms

"""

#TODO Consider implementing models as classes.

from archt import NeuralNet

# Information Maximizing Self-Augmented Training
from IMSAT import regularized_information_maximization

# Invariant Information Clustering
from IIC import invariant_information_clustering


In [12]:
"""

"""

# Initialize the model, loss function, and optimizer
model     = NeuralNet()
criterion = regularized_information_maximization
optimizer = optim.Adam(model.parameters(), lr=lr)

# Train the model
train(model, train_loader, criterion, optimizer, num_epochs)
# Test model
test_classifier(model, test_loader)

Epoch 1 loss: -0.0745, ACC: 0.6409
Epoch 2 loss: -0.1181, ACC: 0.7172
Epoch 3 loss: -0.1312, ACC: 0.7395
Epoch 4 loss: -0.1379, ACC: 0.7471
Epoch 5 loss: -0.1437, ACC: 0.7534

The unsupervised clustering accuracy score of the classifier is: 0.7604
