# Evaluation of the Fully Supervised with Self-Supervised Embeddings Framework

In [16]:
# All Imports
import os
import sys
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Subset
from torchvision.models import resnet18
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.metrics import pairwise_distances
from sklearn.manifold import TSNE

# Get the absolute path of the parent directory (main directory)
parent_dir = os.path.abspath(os.path.join(os.getcwd(), ".."))

# Add parent directory to sys.path
sys.path.append(parent_dir)

# Device Configuration
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")

# CIFAR-10 Class Names
cifar10_labels = {0: "airplane", 1: "automobile", 2: "bird", 3: "cat", 4: "deer", 
                   5: "dog", 6: "frog", 7: "horse", 8: "ship", 9: "truck"}

Using device: cuda


In [17]:
# Ensure reproducibility 
def set_seed(seed=35):
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
    print(f"Random seed set to {seed}")

set_seed(35)

Random seed set to 35


In [None]:
# src/evaluation.py

# Import TPCRP (Typiclust) functions from your module.
from typiclust_alg import SimCLRResNet18, compute_embeddings, typical_clustering_selection, DEVICE

# Import visualization and seed utilities from visualisation.py.
from visualisation import plot_tsne, set_seed, plot_selected_images_by_label

# Set random seed for reproducibility.
set_seed(30)

# ---------------------------
# Evaluate Linear (Encoder + Linear Head)
# ---------------------------
def evaluate_linear(encoder, linear_head, dataloader, device=DEVICE):
    """
    Evaluates the combination of the frozen encoder and the trained linear head.
    """
    encoder.eval()
    linear_head.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)
            feats = encoder(images)
            outputs = linear_head(feats)
            preds = outputs.argmax(dim=1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
    acc = correct / total
    print(f"Linear Evaluation Test Accuracy: {acc*100:.2f}%")
    return acc

# ---------------------------
# 1. DATASET PREPARATION
# ---------------------------
def get_cifar10_datasets():
    """
    Loads CIFAR-10 training and test datasets.
    For training, uses random crop and horizontal flip (per Appendix F.2.1 for fully supervised training).
    """
    train_transform = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465),
                             (0.247, 0.243, 0.261))
    ])
    test_transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465),
                             (0.247, 0.243, 0.261))
    ])
    train_dataset = torchvision.datasets.CIFAR10(
        root="./data", train=True, download=True, transform=train_transform)
    test_dataset = torchvision.datasets.CIFAR10(
        root="./data", train=False, download=True, transform=test_transform)
    return train_dataset, test_dataset

# ---------------------------
# 2. (Re)Define TPCRP Functions (if needed) 
# ---------------------------
# (These functions are defined in typiclust.py, so they are re-imported as needed.)

# ---------------------------
# 3. TRAINING & EVALUATION FUNCTIONS
# ---------------------------


# (B) Fully Supervised with Self-Supervised Embeddings (Linear Evaluation).
def train_linear_classifier(encoder, train_dataset, selected_indices, epochs, device=DEVICE):
    """
    Trains a linear classifier on top of the frozen pre-trained encoder.
    Hyperparameters follow Appendix F.2.2: SGD with lr=2.5, momentum=0.9, cosine scheduler.
    """
    subset = Subset(train_dataset, selected_indices)
    train_loader = DataLoader(subset, batch_size=32, shuffle=True)
    encoder.eval()
    for param in encoder.parameters():
        param.requires_grad = False
    linear_head = nn.Linear(512, 10).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(linear_head.parameters(), lr=2.5, momentum=0.9, nesterov=True)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)
    linear_head.train()
    for epoch in range(epochs):
        total_loss = 0.0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            with torch.no_grad():
                feats = encoder(images)
            logits = linear_head(feats)
            loss = criterion(logits, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        scheduler.step()
        avg_loss = total_loss / len(train_loader)
        print(f"[Linear Eval] Epoch [{epoch+1}/{epochs}] Loss: {avg_loss:.4f}")
    return linear_head


# ---------------------------
# 4. SAMPLE SELECTION
# ---------------------------
def select_samples(train_dataset, budget, method='typiclust'):
    import numpy as np
    """
    Selects a subset of samples from the training dataset.
    If method == 'typiclust', uses TPCRP; if 'random', selects uniformly.
    """
    if method == 'typiclust':
        encoder = SimCLRResNet18(feature_dim=128).to(DEVICE)
        checkpoint_path = 'model/simclr_cifar_10.pth.tar'
        if os.path.exists(checkpoint_path):
            checkpoint = torch.load(checkpoint_path, map_location=DEVICE)
            state_dict = checkpoint.get('state_dict', checkpoint)
            encoder.load_state_dict(state_dict, strict=False)
            print("Loaded pretrained SimCLR model for TPCRP selection.")
        else:
            print("Pretrained checkpoint not found; TPCRP selection may be affected.")
        encoder.eval()

        # 1) Compute embeddings
        all_embeddings, _ = compute_embeddings(encoder, train_dataset, batch_size=128, num_workers=4)
        
        # 2) Perform clustering to find typical samples
        selected_indices, cluster_labels = typical_clustering_selection(
            all_embeddings, budget=budget, k_nn=20, random_state=30
        )
        print("Unique final selected indices:", len(set(selected_indices)))
        print("selected_indices:", selected_indices)
        
        # 3) Print info
        print(f"Number of clusters (budget) = {budget}")
        print(f"Number of typical points selected = {len(selected_indices)}")
        
        # Check empty clusters if you wish
        import numpy as np
        for cluster_id in range(budget):
            cluster_idxs = np.where(cluster_labels == cluster_id)[0]
            if len(cluster_idxs) == 0:
                print(f"Cluster {cluster_id} is empty.")

        # 4) Now call plot_tsne, passing cluster_labels as the "cluster_assignments"
        #    and selected_indices as 'selected_indices' so they appear as black X.
        plot_tsne(
            embeddings=all_embeddings,
            cluster_assignments=cluster_labels,   # We color by cluster
            selected_indices=selected_indices,     # Mark typical points
            title="t-SNE of CIFAR-10 Embeddings",
            n_samples=2000
        )

        # 5) Show the actual images of the selected points in a grid, grouped by label
        #   We need the label array for all data: label_array[i] = dataset[i][1].
        label_array = np.array([train_dataset[i][1] for i in range(len(train_dataset))])
        
        # We'll display up to 3 images per label column, for example
        plot_selected_images_by_label(
            dataset=train_dataset,
            selected_indices=selected_indices,
            label_array=label_array,
            
        )

        return selected_indices
    elif method == 'random':
        total_samples = len(train_dataset)
        return random.sample(range(total_samples), budget)
    else:
        raise ValueError("Method must be 'typiclust' or 'random'.")

# ---------------------------
# 5. EVALUATION FRAMEWORKS
# ---------------------------

def evaluate_fully_supervised_self_supervised(method='typiclust', budget=100, epochs=100):
    """
    Fully Supervised with Self-Supervised Embeddings Framework:
    Loads a pre-trained SimCLR encoder (frozen), trains a linear classifier on top,
    and evaluates the combination on the test set.
    Hyperparameters follow Appendix F.2.2.
    """
    print("\n=== Fully Supervised with Self-Supervised Embeddings Evaluation ===")
    train_dataset, test_dataset = get_cifar10_datasets()
    encoder = SimCLRResNet18(feature_dim=128).to(DEVICE)
    checkpoint_path = 'model/simclr_cifar_10.pth.tar'
    if os.path.exists(checkpoint_path):
        checkpoint = torch.load(checkpoint_path, map_location=DEVICE)
        state_dict = checkpoint.get('state_dict', checkpoint)
        encoder.load_state_dict(state_dict, strict=False)
        print("Loaded pretrained SimCLR model.")
    else:
        print("Pretrained checkpoint not found; proceeding without pretraining.")
    encoder.eval()
    
    selected_indices = select_samples(train_dataset, budget, method)
    print(f"Number of training samples selected: {len(selected_indices)}")
    linear_head = train_linear_classifier(encoder, train_dataset, selected_indices, epochs, DEVICE)
    test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)
    # Use evaluate_linear to combine encoder and linear_head
    test_acc = evaluate_linear(encoder, linear_head, test_loader, DEVICE)
    return test_acc

# ---------------------------
# 6. PLOT ACCURACY VS. BUDGET (COMPARISON)
# ---------------------------
def plot_accuracy_vs_budget_comparison(evaluation_func, budget_list, methods=['typiclust', 'random'], epochs=100):
    """
    Evaluates the fully supervised with self-supervised embeddings evaluation for each method
    and plots test accuracy vs. budget for each method on the same graph.
    """
    method_accuracies = {}
    for method in methods:
        accuracies = []
        for budget in budget_list:
            print(f"\n--- Evaluating for Budget = {budget}, Method = {method} ---")
            acc = evaluation_func(method=method, budget=budget, epochs=epochs)
            accuracies.append(acc * 100)  # Convert to percentage
        method_accuracies[method] = accuracies

    plt.figure(figsize=(8, 6))
    for method, accuracies in method_accuracies.items():
        plt.plot(budget_list, accuracies, marker='o', linestyle='-', label=method)
    plt.xlabel("Budget (Number of Selected Samples)")
    plt.ylabel("Test Accuracy (%)")
    plt.title("Test Accuracy vs. Budget Size: Typiclust vs Random")
    plt.grid(True)
    plt.legend()
    plt.show()
# ---------------------------
# 7. MAIN FUNCTION
# ---------------------------
def main():
    # Plot accuracy vs. budget for fully supervised with self-supervised embeddings evaluation.
    budgets = [10, 20, 30, 40, 50]
    plot_accuracy_vs_budget_comparison(evaluate_fully_supervised_self_supervised, budgets, methods=['typiclust', 'random'], epochs=100)

if __name__ == "__main__":
    main()


Random seed set to 30

--- Evaluating for Budget = 10, Method = typiclust ---

=== Fully Supervised with Self-Supervised Embeddings Evaluation ===
Files already downloaded and verified
Files already downloaded and verified


  checkpoint = torch.load(checkpoint_path, map_location=DEVICE)


Loaded pretrained SimCLR model.


  checkpoint = torch.load(checkpoint_path, map_location=DEVICE)


Loaded pretrained SimCLR model for TPCRP selection.
