# Unsupervised learning using pacmac
### This file shows the first "vanilla" implementation to get a baseline

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torchvision
import numpy as np
from sklearn.model_selection import train_test_split
from pacmap import PaCMAP
from tqdm import tqdm
import matplotlib.pyplot as plt

In [3]:
def load_and_preprocess_data():
    """Loads and preprocesses the MNIST dataset."""
    train_dataset = torchvision.datasets.MNIST(root='./data', train=True, download=True)
    test_dataset = torchvision.datasets.MNIST(root='./data', train=False, download=True)

    x_train = train_dataset.data.numpy().astype('float32') / 255.0
    y_train = train_dataset.targets.numpy()
    x_test = test_dataset.data.numpy().astype('float32') / 255.0
    y_test = test_dataset.targets.numpy()

    return x_train, y_train, x_test, y_test

In [4]:
def split_data(x_train, y_train, labeled_ratio):
    """Splits the data into labeled and unlabeled data."""
    num_labeled = int(labeled_ratio * len(x_train))
    x_labeled, x_unlabeled, y_labeled, _ = train_test_split(x_train, y_train, train_size=num_labeled, stratify=y_train, random_state=42)
    return x_labeled, x_unlabeled, y_labeled

In [None]:
def perform_pacmap(data, n_components, n_neighbors=10, MN_ratio=0.5, FP_ratio=2.0):
    """Performs PaCMAP on the data."""
    pacmap = PaCMAP(n_components=n_components, n_neighbors=n_neighbors, MN_ratio=MN_ratio, FP_ratio=FP_ratio)
    return pacmap.fit_transform(data.reshape(data.shape[0], -1))

In [6]:
class Net(nn.Module):
    """First neural network model. 28*28 -> 256 -> 128 -> 64."""
    # do the model below with l2 regularization
    def __init__(self):
        super(Net, self).__init__()
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(28 * 28, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 64)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.flatten(x)
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [7]:
class SecondNet(nn.Module):
    """Second neural network model. 64 -> 32 -> 32 -> 16."""
    def __init__(self, input_dim=64, hidden_dim=32, output_dim=16):
        super(SecondNet, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, output_dim)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [8]:
class ThirdNet(nn.Module):
    """Third neural network model. 16 -> 16 -> 16 -> 10."""
    def __init__(self, input_dim=16, hidden_dim=16, num_classes=10):
        super(ThirdNet, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, num_classes)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [None]:
def train_model(model, train_loader, criterion, optimizer, num_epochs, device):
    """Generic training function for a neural network model."""
    losses = []
    for epoch in tqdm(range(num_epochs)):
        model.train()
        epoch_loss = 0
        for batch_x, batch_y in train_loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            optimizer.zero_grad()
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        losses.append(epoch_loss / len(train_loader))
    return losses

In [None]:
def train_first_stage(x_unlabeled, device):
    """Trains the first neural network."""
    x_reduced = perform_pacmap(x_unlabeled, n_components=64)
    x_train_nn = torch.FloatTensor(x_unlabeled).unsqueeze(1)
    y_train_nn = torch.FloatTensor(x_reduced)
    train_dataset = TensorDataset(x_train_nn, y_train_nn)
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

    model1 = Net().to(device)
    criterion1 = nn.MSELoss()
    optimizer1 = optim.Adam(model1.parameters())
    losses1 = train_model(model1, train_loader, criterion1, optimizer1, num_epochs=10, device=device)
    
    return model1, x_train_nn

In [11]:
def train_second_stage(model1, x_train_nn, device):
    """Trains the second neural network."""
    transformed_unlabeled = model1(x_train_nn.to(device)).detach().cpu().numpy()
    x_transformed_16 = perform_pacmap(transformed_unlabeled, n_components=16)
    x_train_2 = torch.FloatTensor(transformed_unlabeled)
    y_train_2 = torch.FloatTensor(x_transformed_16)
    train_dataset_2 = TensorDataset(x_train_2, y_train_2)
    train_loader_2 = DataLoader(train_dataset_2, batch_size=32, shuffle=True)

    model2 = SecondNet().to(device)
    criterion2 = nn.MSELoss()
    optimizer2 = optim.Adam(model2.parameters())
    losses2 = train_model(model2, train_loader_2, criterion2, optimizer2, num_epochs=10, device=device)
    
    return model2

In [12]:
def train_third_stage(model1, model2, x_labeled, y_labeled, device):
    """Trains the third neural network."""
    x_labeled_tensor = torch.FloatTensor(x_labeled).unsqueeze(1)
    with torch.no_grad():
        model1.eval()
        model2.eval()
        intermediate = model1(x_labeled_tensor.to(device))
        processed_labeled = model2(intermediate).cpu().numpy()

    x_train_3 = torch.FloatTensor(processed_labeled)
    y_train_3 = torch.LongTensor(y_labeled)
    train_dataset_3 = TensorDataset(x_train_3, y_train_3)
    train_loader_3 = DataLoader(train_dataset_3, batch_size=32, shuffle=True)

    model3 = ThirdNet().to(device)
    criterion3 = nn.CrossEntropyLoss()
    optimizer3 = optim.Adam(model3.parameters())
    losses3 = train_model(model3, train_loader_3, criterion3, optimizer3, num_epochs=10, device=device)
    
    return model3

In [None]:
def process_and_classify(x_new, model1, model2, model3, device):
    """Classifies new data using the trained models."""
    with torch.no_grad():
        model1.eval()
        model2.eval()
        model3.eval()
        x_new_tensor = torch.FloatTensor(x_new).unsqueeze(1).to(device)
        intermediate = model1(x_new_tensor)
        processed = model2(intermediate)
        output = model3(processed)
        _, predicted = output.max(1)
    return predicted.cpu().numpy()

In [14]:
def evaluate_models(model1, model2, model3, x_test, y_test, device):
    """Evaluates the trained models on the test set."""
    predicted_classes = process_and_classify(x_test, model1, model2, model3, device)
    accuracy = np.mean(predicted_classes == y_test)
    print(f"Accuracy on the test set: {accuracy:.2f}")
    return accuracy

In [15]:
def train_and_evaluate(x_labeled, y_labeled, x_unlabeled, x_test, y_test):
    """Trains model using unlabeled and labeled data, then evaluates on test-data."""
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # First stage: PaCMAP data to 64 dimensions, then train a NN with embeddings as targets
    model1, x_train_nn = train_first_stage(x_unlabeled, device)

    # Second stage: PaCMAP the output of the first NN to 16 dimensions, then train a NN with embeddings as targets
    model2 = train_second_stage(model1, x_train_nn, device)

    # Third stage: Take the output of the first NN, pass it through the second NN, then train a classifier on the labeled data
    model3 = train_third_stage(model1, model2, x_labeled, y_labeled, device)

    # Evaluate on test set
    accuracy = evaluate_models(model1, model2, model3, x_test, y_test, device)
    
    return accuracy

In [16]:
def train_and_evaluate_with_labeled_ratio(labeled_ratio):
    """Trains and evaluates the model given a ratio of labeled data."""
    x_train, y_train, x_test, y_test = load_and_preprocess_data()
    x_labeled, x_unlabeled, y_labeled = split_data(x_train, y_train, labeled_ratio)
    accuracy = train_and_evaluate(x_labeled, y_labeled, x_unlabeled, x_test, y_test)
    return accuracy

In [None]:
labeled_ratios = [0.5, 0.1, 0.05, 0.01, 0.001]
accuracies = {}

for ratio in labeled_ratios:
    print(f"\nTraining with {ratio*100}% labeled data:")
    accuracy = train_and_evaluate_with_labeled_ratio(ratio)
    accuracies[ratio] = accuracy
labeled_ratios = [0.5, 0.1, 0.05, 0.01, 0.001]
accuracies_val = {}

In [None]:
x_train, y_train, x_test, y_test = load_and_preprocess_data()

# One sample per class
x_labeled_one_per_class = []
y_labeled_one_per_class = []
x_unlabeled_one_per_class = []
for i in range(10):
    indices = np.where(y_train == i)[0]
    x_labeled_one_per_class.append(x_train[indices[0]])
    y_labeled_one_per_class.append(i)
    x_unlabeled_one_per_class.extend(x_train[indices[1:]])

x_labeled_one_per_class = np.array(x_labeled_one_per_class)
y_labeled_one_per_class = np.array(y_labeled_one_per_class)
x_unlabeled_one_per_class = np.array(x_unlabeled_one_per_class)

print("\nTraining with one sample per class:")
accuracy_one_per_class = train_and_evaluate(x_labeled_one_per_class, y_labeled_one_per_class, x_unlabeled_one_per_class, x_test, y_test)
accuracies['one_per_class'] = accuracy_one_per_class

In [34]:
# split the data into labeled and unlabeled sets using different ratios
num_labeled_0_5 = int(0.5 * len(x_train))
num_labeled_0_1 = int(0.1 * len(x_train))
num_labeled_0_05 = int(0.05 * len(x_train))
num_labeled_0_01 = int(0.01 * len(x_train))
num_labeled_0_001 = int(0.001 * len(x_train))

x_labeled_0_5, x_unlabeled_0_5, y_labeled_0_5, _ = train_test_split(x_train, y_train, train_size=num_labeled_0_5, stratify=y_train, random_state=42)
x_labeled_0_1, x_unlabeled_0_1, y_labeled_0_1, _ = train_test_split(x_train, y_train, train_size=num_labeled_0_1, stratify=y_train, random_state=42)
x_labeled_0_05, x_unlabeled_0_05, y_labeled_0_05, _ = train_test_split(x_train, y_train, train_size=num_labeled_0_05, stratify=y_train, random_state=42)
x_labeled_0_01, x_unlabeled_0_01, y_labeled_0_01, _ = train_test_split(x_train, y_train, train_size=num_labeled_0_01, stratify=y_train, random_state=42)
x_labeled_0_001, x_unlabeled_0_001, y_labeled_0_001, _ = train_test_split(x_train, y_train, train_size=num_labeled_0_001, stratify=y_train, random_state=42)

In [28]:
# create a data split where all data is unlabeled except for one sample from each class
x_labeled_one_per_class = []
y_labeled_one_per_class = []
x_unlabeled_one_per_class = []
y_unlabeled_one_per_class = []

for i in range(10):
    indices = np.where(y_train == i)[0]
    x_labeled_one_per_class.append(x_train[indices[0]])
    y_labeled_one_per_class.append(i)
    x_unlabeled_one_per_class.extend(x_train[indices[1:]])
    y_unlabeled_one_per_class.extend(y_train[indices[1:]])

x_labeled_one_per_class = np.array(x_labeled_one_per_class)
y_labeled_one_per_class = np.array(y_labeled_one_per_class)
x_unlabeled_one_per_class = np.array(x_unlabeled_one_per_class)

print("\nTraining with one sample per class:")
accuracy_one_per_class = train_and_evaluate(x_labeled_one_per_class, y_labeled_one_per_class, x_unlabeled_one_per_class, x_test, y_test)
accuracies['one_per_class'] = accuracy_one_per_class

In [33]:
# Print results
for ratio, accuracy in accuracies.items():
    if ratio == 'one_per_class':
        print(f"Accuracy with one sample per class: {accuracy}")
    else:
        print(f"Accuracy with {ratio*100}% labeled data: {accuracy}")

Accuracy with 50% labeled data: 0.9526
Accuracy with 10% labeled data: 0.9602
Accuracy with 5% labeled data: 0.9571
Accuracy with 1% labeled data: 0.9489
Accuracy with one sample per class: 0.2783
