In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!unzip /content/drive/MyDrive/dataset.zip >> /dev/null

In [3]:
import torch
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, TensorDataset
from PIL import Image
import numpy as np

# Check for GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load pre-trained EfficientNet-B0 model and remove the classification head
efficientnet = models.efficientnet_b0(pretrained=True)
efficientnet.classifier = torch.nn.Sequential(*list(efficientnet.classifier.children())[:-1])  # Remove last layer
# Unfreeze all layers for fine-tuning
for param in efficientnet.parameters():
    param.requires_grad = True  # Unfreeze all layers

efficientnet.eval()
efficientnet.to(device)  # Move the model to GPU

# Transform CIFAR-10 images to 224x224 and normalize to EfficientNet's expected input
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Function to load dataset in mini-batches to manage memory usage
def load_train_dataset(filepath, transform, batch_size=64):
    dataset = torch.load(filepath)
    print(f"Loaded dataset from {filepath}. Keys:", dataset.keys())  # Include dataset name in the print statement

    # Check if 'data' and 'targets' exist
    if 'data' in dataset and 'targets' in dataset:
        data, targets = dataset['data'], dataset['targets']
    else:
        raise KeyError("The dataset does not contain the required keys 'data' and 'targets'.")

    # Convert numpy array to PIL Image for each image in the dataset
    data = [Image.fromarray(img) for img in data]

    # Apply transforms to the data
    data = [transform(img) for img in data]

    # Create a TensorDataset
    dataset = TensorDataset(torch.stack(data), torch.tensor(targets))

    # Create a DataLoader
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)

    return dataloader

# Extract features in mini-batches
def extract_features(dataloader, model):
    features = []
    with torch.no_grad():  # Disable gradient calculation to save memory
        for data, _ in dataloader:
            data = data.to(device)  # Move data to GPU
            batch_features = model(data)  # Extract features from EfficientNet
            features.append(batch_features.cpu())  # Move features back to CPU if needed
    return torch.cat(features, dim=0)

# LwP Classifier
class EuclideanLwPClassifier:
    def __init__(self, num_classes, feature_dim):
        self.num_classes = num_classes
        self.prototypes = np.zeros((num_classes, feature_dim))

    def calculate_prototypes(self, features, labels):
        for label in range(self.num_classes):
            class_features = features[labels == label]
            if len(class_features) > 0:
                self.prototypes[label] = class_features.mean(axis=0)

    def predict(self, features):
        distances = np.zeros((features.shape[0], self.num_classes))

        # Calculate Euclidean distance between each feature and the prototypes
        for label in range(self.num_classes):
            for i, feature in enumerate(features):
                distances[i, label] = np.linalg.norm(feature - self.prototypes[label])

        # Return the class with the minimum Euclidean distance
        return np.argmin(distances, axis=1)

    def update_prototypes(self, features, predicted_labels, alpha=0.7):
        for label in range(self.num_classes):
            class_features = features[predicted_labels == label]
            if len(class_features) > 0:
                # Update prototype with a mix of old and new information (alpha = 0.7)
                self.prototypes[label] = alpha * self.prototypes[label] + (1 - alpha) * class_features.mean(axis=0)

# Initialize LwP with number of classes (10) and feature dimension from EfficientNet-B0 (1280)
num_classes = 10
feature_dim = 1280  # EfficientNet-B0 feature dimension
lwp_model = EuclideanLwPClassifier(num_classes, feature_dim)

# Task 1 Evaluation and Logging
def evaluate_and_log_lower_triangle(model, eval_path_template, efficientnet_model, transform, num_datasets=10):
    # Initialize an accuracy matrix to store accuracy for each dataset
    accuracy_matrix = np.zeros((num_datasets, num_datasets))

    # Iterate through each dataset as the training set
    for i in range(1, num_datasets + 1):
        print(f"Training with Dataset D^{i} and evaluating...")

        # Update prototypes for the current training dataset
        train_dataloader = load_train_dataset(eval_path_template.format(i), transform)
        train_features = extract_features(train_dataloader, efficientnet_model).cpu().numpy()
        train_targets = [target.cpu().numpy() for _, target in train_dataloader]
        train_targets = np.concatenate(train_targets)

        # Update the prototypes of the LwP model (model F^i)
        model.calculate_prototypes(train_features, train_targets)

        # Evaluate the model on all datasets up to the current one (lower triangular)
        for j in range(1, i + 1):  # Change the range to evaluate on D^1 to D^i
            eval_dataloader = load_train_dataset(eval_path_template.format(j), transform)
            eval_features = extract_features(eval_dataloader, efficientnet_model).cpu().numpy()
            eval_targets = [target.cpu().numpy() for _, target in eval_dataloader]
            eval_targets = np.concatenate(eval_targets)

            # Predict labels and calculate accuracy
            predicted_labels = model.predict(eval_features)
            accuracy = (predicted_labels == eval_targets).mean()
            accuracy_matrix[i - 1, j - 1] = accuracy  # Fill the lower triangular matrix

            print(f"Accuracy on Dataset D^{j} after training with Model F^{i}: {accuracy:.4f}")

    # Return the final accuracy matrix
    return accuracy_matrix

# Final Evaluation of Task 1
accuracy_matrix_task1 = evaluate_and_log_lower_triangle(
    lwp_model,
    '/content/dataset/part_one_dataset/eval_data/{}_eval_data.tar.pth',
    efficientnet,
    transform,
    num_datasets=10
)

print("Lower Triangular Accuracy Matrix (F1-F10):\n", accuracy_matrix_task1)

# Save the model state after the final training step (after training with all datasets)
torch.save(efficientnet.state_dict(), 'f10_model_final.pth')
print("f10 final model saved successfully.")

# Save the prototypes to a .npy file after calculating them in Task 1
np.save('f10_prototypes.npy', lwp_model.prototypes)
print("f10 prototypes saved successfully.")


Downloading: "https://download.pytorch.org/models/efficientnet_b0_rwightman-7f5810bc.pth" to /root/.cache/torch/hub/checkpoints/efficientnet_b0_rwightman-7f5810bc.pth
100%|██████████| 20.5M/20.5M [00:00<00:00, 72.0MB/s]


Training with Dataset D^1 and evaluating...
Loaded dataset from /content/dataset/part_one_dataset/eval_data/1_eval_data.tar.pth. Keys: dict_keys(['data', 'targets'])


  dataset = torch.load(filepath)


Loaded dataset from /content/dataset/part_one_dataset/eval_data/1_eval_data.tar.pth. Keys: dict_keys(['data', 'targets'])
Accuracy on Dataset D^1 after training with Model F^1: 0.8544
Training with Dataset D^2 and evaluating...
Loaded dataset from /content/dataset/part_one_dataset/eval_data/2_eval_data.tar.pth. Keys: dict_keys(['data', 'targets'])
Loaded dataset from /content/dataset/part_one_dataset/eval_data/1_eval_data.tar.pth. Keys: dict_keys(['data', 'targets'])
Accuracy on Dataset D^1 after training with Model F^2: 0.8412
Loaded dataset from /content/dataset/part_one_dataset/eval_data/2_eval_data.tar.pth. Keys: dict_keys(['data', 'targets'])
Accuracy on Dataset D^2 after training with Model F^2: 0.8620
Training with Dataset D^3 and evaluating...
Loaded dataset from /content/dataset/part_one_dataset/eval_data/3_eval_data.tar.pth. Keys: dict_keys(['data', 'targets'])
Loaded dataset from /content/dataset/part_one_dataset/eval_data/1_eval_data.tar.pth. Keys: dict_keys(['data', 'targe