## RLP vs Cross Entropy MNIST

In [None]:
# Step 1: Import necessary libraries
import numpy as np
import matplotlib.pyplot as plt
import torchvision.transforms as transforms
from torchvision.datasets import MNIST
from torch.utils.data import DataLoader, TensorDataset
import torch.nn.functional as F
from sklearn.metrics import recall_score, f1_score
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Subset

torch.manual_seed(42)


# Check if CUDA is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if device == "cuda":
    print("Running on GPU")
else:
    print("Running on CPU")


# Step 2: Data Preparation
transform = transforms.Compose([
    transforms.Pad(2),  # Pad images to achieve 32x32 size
    transforms.ToTensor(),
])

train_dataset = MNIST(root='./data', train=True, transform=transform, download=False)
test_dataset = MNIST(root='./data', train=False, transform=transform, download=False)

## Take the first 5000 data points
train_dataset = Subset(train_dataset, indices=range(100))
test_dataset = Subset(test_dataset, indices=range(1000))


train_loader = DataLoader(dataset=train_dataset, batch_size=100, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=100, shuffle=False)

import torch.nn.functional as F

class LeNet5(nn.Module):
    def __init__(self):
        super(LeNet5, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=5)
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5)
        self.fc1 = nn.Linear(16*5*5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = torch.tanh(self.conv1(x))
        x = F.avg_pool2d(x, 2, stride=2)
        x = torch.tanh(self.conv2(x))
        x = F.avg_pool2d(x, 2, stride=2)
        x = x.view(-1, 16*5*5)  # Flatten
        x = torch.tanh(self.fc1(x))
        x = torch.tanh(self.fc2(x))
        x = self.fc3(x)
        return x

# Updated LeNet5_RLP model with softmax
class LeNet5_RLP(nn.Module):
    def __init__(self):
        super(LeNet5_RLP, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=5)
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5)
        self.fc1 = nn.Linear(16*5*5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = torch.tanh(self.conv1(x))
        x = F.avg_pool2d(x, 2, stride=2)
        x = torch.tanh(self.conv2(x))
        x = F.avg_pool2d(x, 2, stride=2)
        x = x.view(-1, 16*5*5)  # Flatten
        x = torch.tanh(self.fc1(x))
        x = torch.tanh(self.fc2(x))
        x = self.fc3(x)
        return F.softmax(x, dim=1)  # Using softmax activation

def balanced_batch_generator_mnist(data, labels, M, K):
    num_samples = len(data)
    selected_batches = set()
    all_indices = np.arange(num_samples)

    # Make sure each data point is in at least one batch
    np.random.shuffle(all_indices)

    # Sample until we obtain K unique batches
    while len(selected_batches) < K:
        # Generate indices and shuffle them
        all_indices = np.arange(num_samples)
        np.random.shuffle(all_indices)

        # Iterate over data and form batches of size M
        for i in range(0, num_samples, M):
            batch_indices = tuple(sorted(all_indices[i:i+M]))
            if batch_indices not in selected_batches:
                if i + M > num_samples:  # skip batches smaller than N
                  continue
                selected_batches.add(batch_indices)

            if len(selected_batches) >= K:
                break

    # Transform the set to a list
    selected_batches = list(selected_batches)

    # Yield data batches with their labels
    for indices in selected_batches:
        yield data[np.array(indices)], labels[np.array(indices)]

iterations = 5
num_epochs = 50
batch_size = 95 # using batch size of 100 as given for MNIST
num_batches = 1000

# Lists to hold accuracy and recall values for both loss types
accuracy_bce_array = np.zeros((num_epochs, iterations))
recall_bce_array = np.zeros((num_epochs, iterations))
accuracy_rlp_array = np.zeros((num_epochs, iterations))
recall_rlp_array = np.zeros((num_epochs, iterations))

# Extract data and labels from DataLoader
X_train_list, y_train_list = [], []
for data, labels in train_loader:
    X_train_list.append(data)
    y_train_list.append(labels)
X_train = torch.cat(X_train_list, dim=0)
y_train = torch.cat(y_train_list, dim=0)

X_test_list, y_test_list = [], []
for data, labels in test_loader:
    X_test_list.append(data)
    y_test_list.append(labels)
X_test = torch.cat(X_test_list, dim=0)
y_test = torch.cat(y_test_list, dim=0)

#y_train_rlp = y_train.float().unsqueeze(1)  # Convert to float and add an extra dimension
#y_test_rlp = y_test.float().unsqueeze(1)
y_train_rlp = F.one_hot(y_train.long(), num_classes=10).float().to(device)
y_test_rlp = F.one_hot(y_test.long(), num_classes=10).float().to(device)

unique_batches = list(balanced_batch_generator_mnist(X_train, y_train_rlp, batch_size, num_batches))
print("All the unique batches have been generated")
X_train = X_train.to(device)
y_train = y_train.to(device)


for i in range(iterations):
    print("This is iterations number: ",i)
    # Reset models
    model_bce = LeNet5()
    model_rlp = LeNet5_RLP()
    model_bce = model_bce.to(device)
    model_rlp = model_rlp.to(device)

    # Split data into train and test

    optimizer_bce = optim.AdamW(model_bce.parameters(), lr=2e-3)
    optimizer_rlp = optim.AdamW(model_rlp.parameters(), lr=2e-3)
    criterion_bce = nn.CrossEntropyLoss()
    criterion_rlp = nn.MSELoss()

    # Convert labels for BCE
    y_train_bce = y_train.long()
    y_test_bce = y_test.long()
    y_train_bce = y_train_bce.to(device)
    y_train_rlp = y_train_rlp.to(device)

    # Convert labels for RLP
    #y_train_rlp = y_train.float().unsqueeze(1)
    #y_test_rlp = y_test.float().unsqueeze(1)
    y_train_rlp = F.one_hot(y_train.long(), num_classes=10).float().to(device)
    y_test_rlp = y_test.long() #F.one_hot(y_test.long(), num_classes=10).float().to(device)



    for epoch in range(num_epochs):
        # Training loop for BCE
        optimizer_bce.zero_grad()
        outputs_bce = model_bce(X_train)
        loss_bce = criterion_bce(outputs_bce, y_train_bce)
        loss_bce.backward()
        optimizer_bce.step()

        # Training loop for RLP
        for batch_X, batch_y in unique_batches:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            model_rlp.train()
            optimizer_rlp.zero_grad()
            outputs_rlp = model_rlp(batch_X)

            # Reshape batch_X for matrix operations
            batch_X_reshaped = batch_X.view(batch_X.size(0), -1)

            # Compute pseudo-inverse
            reg_matrix = torch.linalg.pinv(batch_X_reshaped.transpose(0, 1) @ batch_X_reshaped) @ batch_X_reshaped.transpose(0, 1)
            # Matrix multiplications
            c = reg_matrix @ batch_y
            c_pred = reg_matrix @ outputs_rlp

            loss_rlp = criterion_rlp(batch_X_reshaped @ c_pred , batch_X_reshaped @ c )
            loss_rlp.backward()
            optimizer_rlp.step()

        X_test = X_test.to(device)
        y_test = y_test.to(device)
        y_test_bce = y_test_bce.to(device)
        y_test_rlp = y_test_rlp.to(device)

        with torch.no_grad():
            outputs_bce_test = model_bce(X_test)
            _, predicted_bce = torch.max(outputs_bce_test, 1)
            accuracy_bce_array[epoch,i] = (predicted_bce == y_test_bce).sum().item() / len(y_test) * 100
            recall_bce_array[epoch,i] = f1_score(y_test_bce.cpu().numpy(), predicted_bce.cpu().numpy(), average='macro')


        # Evaluation for RLP
        with torch.no_grad():
            outputs_rlp_test = model_rlp(X_test)
            _, predicted_rlp = torch.max(outputs_rlp_test, 1)
            accuracy_rlp_array[epoch, i] = (predicted_rlp == y_test_rlp).sum().item() / len(y_test) * 100
            recall_rlp_array[epoch,i] = f1_score(y_test_rlp.cpu().numpy(), predicted_rlp.cpu().numpy(), average='macro')


        print(f'Epoch: {epoch}, BCE Accuracy: {accuracy_bce_array[epoch,i]}, BCE Recall: {recall_bce_array[epoch,i]}')
        print(f'Epoch: {epoch}, RLP Accuracy: {accuracy_rlp_array[epoch,i]}, RLP Recall: {recall_rlp_array[epoch,i]}')


## RLP vs Cross Entropy on top of MIXUP data augmentation

In [None]:
# Step 1: Import necessary libraries
import numpy as np
import matplotlib.pyplot as plt
import torchvision.transforms as transforms
from torchvision.datasets import MNIST
from torch.utils.data import DataLoader, TensorDataset
import torch.nn.functional as F
from sklearn.metrics import recall_score, f1_score
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Subset

torch.manual_seed(42)


# Check if CUDA is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if device == "cuda":
    print("Running on GPU")
else:
    print("Running on CPU")


# Step 2: Data Preparation
transform = transforms.Compose([
    transforms.Pad(2),  # Pad images to achieve 32x32 size
    transforms.ToTensor(),
])

train_dataset = MNIST(root='./data', train=True, transform=transform, download=False)
test_dataset = MNIST(root='./data', train=False, transform=transform, download=False)

## Take the first 5000 data points
train_dataset = Subset(train_dataset, indices=range(5000))
test_dataset = Subset(test_dataset, indices=range(5000))


train_loader = DataLoader(dataset=train_dataset, batch_size=100, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=100, shuffle=False)

import torch.nn.functional as F

class LeNet5(nn.Module):
    def __init__(self):
        super(LeNet5, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=5)
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5)
        self.fc1 = nn.Linear(16*5*5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = torch.tanh(self.conv1(x))
        x = F.avg_pool2d(x, 2, stride=2)
        x = torch.tanh(self.conv2(x))
        x = F.avg_pool2d(x, 2, stride=2)
        x = x.view(-1, 16*5*5)  # Flatten
        x = torch.tanh(self.fc1(x))
        x = torch.tanh(self.fc2(x))
        x = self.fc3(x)
        return x

# Updated LeNet5_RLP model with softmax
class LeNet5_RLP(nn.Module):
    def __init__(self):
        super(LeNet5_RLP, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=5)
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5)
        self.fc1 = nn.Linear(16*5*5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = torch.tanh(self.conv1(x))
        x = F.avg_pool2d(x, 2, stride=2)
        x = torch.tanh(self.conv2(x))
        x = F.avg_pool2d(x, 2, stride=2)
        x = x.view(-1, 16*5*5)  # Flatten
        x = torch.tanh(self.fc1(x))
        x = torch.tanh(self.fc2(x))
        x = self.fc3(x)
        return F.softmax(x, dim=1)  # Using softmax activation

def balanced_batch_generator_mnist(data, labels, M, K):
    num_samples = len(data)
    selected_batches = set()
    all_indices = np.arange(num_samples)

    # Make sure each data point is in at least one batch
    np.random.shuffle(all_indices)

    # Sample until we obtain K unique batches
    while len(selected_batches) < K:
        # Generate indices and shuffle them
        all_indices = np.arange(num_samples)
        np.random.shuffle(all_indices)

        # Iterate over data and form batches of size M
        for i in range(0, num_samples, M):
            batch_indices = tuple(sorted(all_indices[i:i+M]))
            if batch_indices not in selected_batches:
                if i + M > num_samples:  # skip batches smaller than N
                  continue
                selected_batches.add(batch_indices)

            if len(selected_batches) >= K:
                break

    # Transform the set to a list
    selected_batches = list(selected_batches)

    # Yield data batches with their labels
    for indices in selected_batches:
        yield data[np.array(indices)], labels[np.array(indices)]

iterations = 5
num_epochs = 50
batch_size = 28*28+1 # using batch size of 100 as given for MNIST
num_batches = 1000

# Lists to hold accuracy and recall values for both loss types
accuracy_bce_array = np.zeros((num_epochs, iterations))
recall_bce_array = np.zeros((num_epochs, iterations))
accuracy_rlp_array = np.zeros((num_epochs, iterations))
recall_rlp_array = np.zeros((num_epochs, iterations))

# Extract data and labels from DataLoader
X_train_list, y_train_list = [], []
for data, labels in train_loader:
    X_train_list.append(data)
    y_train_list.append(labels)
X_train = torch.cat(X_train_list, dim=0)
y_train = torch.cat(y_train_list, dim=0)

X_test_list, y_test_list = [], []
for data, labels in test_loader:
    X_test_list.append(data)
    y_test_list.append(labels)
X_test = torch.cat(X_test_list, dim=0)
y_test = torch.cat(y_test_list, dim=0)

#y_train_rlp = y_train.float().unsqueeze(1)  # Convert to float and add an extra dimension
#y_test_rlp = y_test.float().unsqueeze(1)
y_train_rlp = F.one_hot(y_train.long(), num_classes=10).float().to(device)
y_test_rlp = F.one_hot(y_test.long(), num_classes=10).float().to(device)

unique_batches = list(balanced_batch_generator_mnist(X_train, y_train_rlp, batch_size, num_batches))
np.random.shuffle(unique_batches)  # Shuffle the batches for the second iterator
unique_batches2 = list(unique_batches)
print("All the unique batches have been generated")
X_train = X_train.to(device)
y_train = y_train.to(device)


for i in range(iterations):
    print("This is iterations number: ",i)
    # Reset models
    model_bce = LeNet5()
    model_rlp = LeNet5_RLP()
    model_bce = model_bce.to(device)
    model_rlp = model_rlp.to(device)

    # Split data into train and test

    optimizer_bce = optim.AdamW(model_bce.parameters(), lr=2e-3)
    optimizer_rlp = optim.AdamW(model_rlp.parameters(), lr=2e-3)
    criterion_bce = nn.CrossEntropyLoss()
    criterion_rlp = nn.MSELoss()

    # Convert labels for BCE
    y_train_bce = y_train.long()
    y_test_bce = y_test.long()
    y_train_bce = y_train_bce.to(device)
    y_train_rlp = y_train_rlp.to(device)

    # Convert labels for RLP
    #y_train_rlp = y_train.float().unsqueeze(1)
    #y_test_rlp = y_test.float().unsqueeze(1)
    y_train_rlp = F.one_hot(y_train.long(), num_classes=10).float().to(device)
    y_test_rlp = y_test.long() #F.one_hot(y_test.long(), num_classes=10).float().to(device)

    #x_train and y_trainn loading
    batch_size_bce = 500
    train_dataset = TensorDataset(X_train, y_train_bce)
    train_dataloader = DataLoader(train_dataset, batch_size_bce, shuffle=True)
    train_dataloader_2 = DataLoader(train_dataset, batch_size_bce, shuffle=True)

    for epoch in range(num_epochs):
        # Training loop for BCE
        for (x1, y1), (x2, y2) in zip(train_dataloader, train_dataloader_2):
            model_bce.train()
            alpha = 0.15
            lam = np.random.beta(alpha, alpha)
            x = lam * x1 + (1. - lam) * x2
            y = lam * y1 + (1. - lam) * y2
            y = torch.LongTensor(y.cpu().numpy())
            y = y.to(device)
            x = x.to(device)

            optimizer_bce.zero_grad()
            outputs = model_bce(x)
            loss_bce = criterion_bce(outputs, y)
            loss_bce.backward()
            optimizer_bce.step()

        #optimizer_bce.zero_grad()
        #outputs_bce = model_bce(X_train)
        #loss_bce = criterion_bce(outputs_bce, y_train_bce)


        # Training loop for RLP
        for (batch_X, batch_y), (batch_X2, batch_y2) in zip(unique_batches, unique_batches2):
            model_rlp.train()
            alpha = 0.15
            lam = np.random.beta(alpha, alpha)
            x = lam * batch_X + (1. - lam) * batch_X2
            y = lam * batch_y + (1. - lam) * batch_y2
            x = x.to(device)
            y = y.to(device)
            optimizer_rlp.zero_grad()
            outputs_rlp = model_rlp(x)

            # Reshape batch_X for matrix operations
            x = x.view(batch_X.size(0), -1)

            # Compute pseudo-inverse
            reg_matrix = torch.linalg.pinv(x.transpose(0, 1) @ x) @ x.transpose(0, 1)
            # Matrix multiplications
            c = reg_matrix @ y
            c_pred = reg_matrix @ outputs_rlp

            loss_rlp = criterion_rlp(x @ c_pred , x @ c )
            loss_rlp.backward()
            optimizer_rlp.step()

        X_test = X_test.to(device)
        y_test = y_test.to(device)
        y_test_bce = y_test_bce.to(device)
        y_test_rlp = y_test_rlp.to(device)

        with torch.no_grad():
            outputs_bce_test = model_bce(X_test)
            _, predicted_bce = torch.max(outputs_bce_test, 1)
            accuracy_bce_array[epoch,i] = (predicted_bce == y_test_bce).sum().item() / len(y_test) * 100
            recall_bce_array[epoch,i] = f1_score(y_test_bce.cpu().numpy(), predicted_bce.cpu().numpy(), average='macro')


        # Evaluation for RLP
        with torch.no_grad():
            outputs_rlp_test = model_rlp(X_test)
            _, predicted_rlp = torch.max(outputs_rlp_test, 1)
            accuracy_rlp_array[epoch, i] = (predicted_rlp == y_test_rlp).sum().item() / len(y_test) * 100
            recall_rlp_array[epoch,i] = f1_score(y_test_rlp.cpu().numpy(), predicted_rlp.cpu().numpy(), average='macro')


        print(f'Epoch: {epoch}, BCE Accuracy: {accuracy_bce_array[epoch,i]}, BCE Recall: {recall_bce_array[epoch,i]}')
        print(f'Epoch: {epoch}, RLP Accuracy: {accuracy_rlp_array[epoch,i]}, RLP Recall: {recall_rlp_array[epoch,i]}')

