In [1]:
%reload_ext autoreload
%autoreload 2
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
import torchvision
import torchvision.transforms as transforms
from adic_components.prototype2 import P2Encoder

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Define a simple classifier that maps embeddings to class scores
class Classifier(nn.Module):
    def __init__(self, d_model: int, input_width: int, input_height: int, num_classes: int):
        super(Classifier, self).__init__()
        self.h = input_height // 16
        self.w = input_width // 16
        self.fchead = nn.Linear(d_model * self.h * self.w, num_classes)

    def forward(self, x):
        x = x.view(x.size(0), -1)  # Flatten the input
        return self.fchead(x)

# Parameters
input_channels = 3
input_width = 32
input_height = 32
d_model = 128
num_classes = 10


In [3]:
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import torchvision

from adic_components.prototype3_classifier import P3Classifier

batch_size = 256

# Data transforms for training (with augmentation)
train_transform = transforms.Compose([
    transforms.RandomCrop(32, padding=4),           # Standard and effective for CIFAR
    transforms.RandomHorizontalFlip(),              # Always helpful
    transforms.ColorJitter(0.2, 0.2, 0.2, 0.05),     # Slightly toned down
    transforms.RandomAffine(10, translate=(0.05, 0.05), scale=(0.95, 1.05), shear=5),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465),   # Actual CIFAR-10 mean/std
                         (0.2023, 0.1994, 0.2010)),
    transforms.RandomErasing(p=0.5, scale=(0.02, 0.1), ratio=(0.3, 3.3), value='random')
])

# Data transforms for testing (no augmentation)
test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Load CIFAR-10 datasets
train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                             download=True, transform=train_transform)
test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                            download=True, transform=test_transform)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2)

In [4]:
# Create model instances
classifier_p3 = P3Classifier(input_channels, input_width, input_height, num_classes)

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
classifier_p3.to(device)

P3Classifier(
  (p3_encoder): P3Encoder(
    (layers): ModuleList(
      (0): Sequential(
        (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU(inplace=True)
      )
      (1): Sequential(
        (0): P3EncoderBlock(
          (conv1): Conv2d(64, 32, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (conv2): Conv2d(32, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
          (bn2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (conv3): Conv2d(32, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (se): SqueezeExcitation(
            (avgpool): AdaptiveAvgPool2d(output_

In [5]:
import torchvision.models as models

classifier_resnet101 = models.resnet101(weights=None)
classifier_resnet101.fc = nn.Linear(classifier_resnet101.fc.in_features, 10).to(device)
classifier_resnet101.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
classifier_resnet101.maxpool = nn.Identity()
classifier_resnet101.to(device)
classifier_resnet101 = torch.compile(classifier_resnet101)
classifier_resnet101

OptimizedModule(
  (_orig_mod): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): Identity()
    (layer1): Sequential(
      (0): Bottleneck(
        (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (downsample): Sequential(
          (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1)

In [6]:
# Evaluation on the test set
def test_model(classifier, label: str):
    classifier.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = classifier(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = 100 * correct / total
    print(f'{label} Test Accuracy: {accuracy}%')
    return accuracy

def train_model(classifier, optimizer, images, labels, criterion):
    optimizer.zero_grad()
    outputs = classifier(images)
    loss = criterion(outputs, labels)
    loss.backward()
    optimizer.step()
    return loss.item()

In [7]:
import torch.nn as nn
import torch.optim as optim
import tqdm

epochs = 300

p3_accuracy = []
resnet101_accuracy = []
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer_p3 = optim.SGD(classifier_p3.parameters(), lr = 0.1, momentum=0.9, weight_decay=5e-4)
optimizer_resnet101 = optim.SGD(classifier_resnet101.parameters(), lr = 0.1, momentum=0.9, weight_decay=5e-4)
#optimizer_p3 = optim.AdamW(classifier_p3.parameters(), lr=1e-3, weight_decay=5e-4)
#optimizer_resnet101 = optim.AdamW(classifier_resnet101.parameters(), lr=1e-3, weight_decay=5e-4)
scheduler_p3 = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer_p3, T_max=epochs)
scheduler_resnet101 = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer_resnet101, T_max=epochs)
# scheduler_p3 = torch.optim.lr_scheduler.StepLR(optimizer_p3, step_size=30, gamma=0.1)
# scheduler_resnet101 = torch.optim.lr_scheduler.StepLR(optimizer_resnet101, step_size=30, gamma=0.1)

# Training loop
for epoch in range(epochs):
    classifier_p3.train()
    classifier_resnet101.train()
    running_loss_p3 = 0.0
    running_loss_resnet101 = 0.0
    pbar = tqdm.tqdm(train_loader, leave=False)
    for images, labels in pbar:
        images, labels = images.to(device), labels.to(device)

        loss_p3 = train_model(classifier_p3, optimizer_p3, images, labels, criterion)
        loss_resnet101 = train_model(classifier_resnet101, optimizer_resnet101, images, labels, criterion)

        running_loss_p3 += loss_p3 * images.size(0)
        running_loss_resnet101 += loss_resnet101 * images.size(0)
        pbar.set_description(f"Epoch [{epoch+1}/{epochs}] Loss P3: {running_loss_p3 / ((pbar.n + 1) * batch_size):.4f} Loss ResNet101: {running_loss_resnet101 / ((pbar.n + 1) * batch_size):.4f}")

    epoch_loss_p3 = running_loss_p3 / len(train_loader.dataset)
    epoch_loss_resnet101 = running_loss_resnet101 / len(train_loader.dataset)
    print(f"Epoch [{epoch+1}/{epochs}] Loss P3: {epoch_loss_p3:.4f} Loss ResNet101: {epoch_loss_resnet101:.4f}")

    # Scheduler step
    scheduler_p3.step()
    scheduler_resnet101.step()

    # Evaluation
    p3_accuracy.append(test_model(classifier_p3, "Prototype3 Encoder"))
    resnet101_accuracy.append(test_model(classifier_resnet101, "ResNet101"))


                                                                                                       

KeyboardInterrupt: 

In [None]:
import re

def parse_log_line(triplet):
    """Parses a 4-line log entry into metrics."""
    print(f"Triplet: {triplet}")
    log_line = " ".join(triplet)  # Combine 3 lines into a single string
    pattern = (
        r"Epoch\s*\[(\d+)/(\d+)\]\s*"
        r"Loss P3:\s*([0-9.]+)\s*"
        r"Loss ResNet101:\s*([0-9.]+).*?"
        r"Prototype3 Encoder Test Accuracy:\s*([0-9.]+)%\s*"
        r"ResNet101 Test Accuracy:\s*([0-9.]+)%"
    )

    match = re.search(pattern, log_line)
    if match:
        epoch_current = int(match.group(1))
        epoch_total = int(match.group(2))
        loss_p3 = float(match.group(3))
        loss_resnet = float(match.group(4))
        acc_p3 = float(match.group(5))
        acc_resnet = float(match.group(6))
        return epoch_current, loss_p3, loss_resnet, acc_p3, acc_resnet
    else:
        return None

def chunk_lines(lines, chunk_size=4):
    return [lines[i:i+chunk_size] for i in range(0, len(lines), chunk_size)]

def extract_metrics_from_file(filepath):
    with open(filepath, 'r') as f:
        lines = [line.strip() for line in f.readlines()]

    print(f"Total lines in file: {len(lines)}")
    triplets = chunk_lines(lines, 4)
    print(f"Total sample points: {len(triplets)}")

    # Arrays to hold each metric
    epochs = []
    loss_p3_list = []
    loss_resnet_list = []
    acc_p3_list = []
    acc_resnet_list = []

    for triplet in triplets:
        if len(triplet) == 4:
            result = parse_log_line(triplet)
            if result:
                epoch, loss_p3, loss_resnet, acc_p3, acc_resnet = result
                epochs.append(epoch)
                loss_p3_list.append(loss_p3)
                loss_resnet_list.append(loss_resnet)
                acc_p3_list.append(acc_p3)
                acc_resnet_list.append(acc_resnet)

    return epochs, loss_p3_list, loss_resnet_list, acc_p3_list, acc_resnet_list

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import re

file_path = 'benchmark_resnet101.txt'
epochs, loss_p3, loss_resnet, acc_p3, acc_resnet = extract_metrics_from_file(file_path)

print(f"Epochs: {epochs[:5]}")
print(f"Loss P3: {loss_p3[:5]}")
print(f"Loss ResNet: {loss_resnet[:5]}")
print(f"Acc P3: {acc_p3[:5]}")
print(f"Acc ResNet: {acc_resnet[:5]}")
print(f"Total samples: {len(acc_resnet)}")
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.plot(epochs, loss_p3, label='Loss P3', color='blue')
plt.plot(epochs, loss_resnet, label='Loss ResNet', color='orange')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Loss vs Epochs')
plt.legend()
plt.grid()
plt.subplot(1, 2, 2)
plt.plot(epochs, acc_p3, label='Acc P3', color='green')
plt.plot(epochs, acc_resnet, label='Acc ResNet', color='red')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Accuracy vs Epochs')
plt.legend()


FileNotFoundError: [Errno 2] No such file or directory: 'benchmark_resnet101.txt'