In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [25]:
import torch
import torch.nn as nn
import numpy as np
from sklearn.metrics import accuracy_score

import warnings
warnings.filterwarnings('ignore', category=UserWarning, message=".*torch.tensor.*")
warnings.filterwarnings('ignore', category=FutureWarning, message=".*torch.load.*")

# Define a multi-layer perceptron (MLP) for feature extraction
class MLPFeatureExtractor(nn.Module):
    def __init__(self, input_size, hidden_size, feature_dim):
        super(MLPFeatureExtractor, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)  # First fully connected layer
        self.fc2 = nn.Linear(hidden_size, feature_dim)  # Second fully connected layer
        self.relu = nn.ReLU()  # Activation function

    def forward(self, x):
        x = self.relu(self.fc1(x))  # Apply ReLU to the output of the first layer
        return self.fc2(x)  # Output of the second layer

# Define the Lightweight Prototype (LwP) classifier
class LwPClassifier:
    def __init__(self, n_classes=10, feature_dim=128):
        self.n_classes = n_classes  # Number of classes
        self.prototypes = np.zeros((n_classes, feature_dim))  # Prototype for each class
        self.class_counts = np.zeros(n_classes)  # Count of samples per class

    def partial_fit(self, X, y):
        """
        Update the prototypes incrementally for each sample.
        """
        for i in range(len(X)):
            class_label = y[i]
            self.class_counts[class_label] += 1  # Increment count for the class
            if self.class_counts[class_label] == 1:
                # If first sample for the class, initialize prototype
                self.prototypes[class_label] = X[i]
            else:
                # Update prototype using the running average
                self.prototypes[class_label] = (
                    self.prototypes[class_label] * (self.class_counts[class_label] - 1) + X[i]
                ) / self.class_counts[class_label]

    def predict(self, X):
        """
        Predict the class of each sample by finding the nearest prototype.
        """
        distances = np.linalg.norm(X[:, np.newaxis] - self.prototypes, axis=2)
        return np.argmin(distances, axis=1)  # Return the class with minimum distance

# Load a dataset from the specified path
def load_dataset(path, is_train=True):
    data = torch.load(path)  # Load dataset from file

    inputs = data['data']  # Input data
    labels = data.get('targets', None)  # Labels (optional for test data)

    # Convert inputs and labels to tensors if they are not already
    if not isinstance(inputs, torch.Tensor):
        inputs = torch.tensor(inputs, dtype=torch.float32)
    if labels is not None and not isinstance(labels, torch.Tensor):
        labels = torch.tensor(labels, dtype=torch.long)

    # Flatten inputs if they have more than two dimensions
    if len(inputs.shape) > 2:
        inputs = inputs.view(inputs.shape[0], -1)

    return (inputs, labels) if is_train else (inputs, labels)

# Train the feature extractor and classifier on the initial labeled dataset
def train_initial_model(feature_extractor, classifier, dataset, labels):
    feature_extractor.eval()  # Set model to evaluation mode
    with torch.no_grad():
        features = feature_extractor(dataset).numpy()  # Extract features
    classifier.partial_fit(features, labels.numpy())  # Train the classifier

# Update the classifier with pseudo-labeled data
def pseudo_label_and_update(feature_extractor, classifier, dataset):
    feature_extractor.eval()  # Set model to evaluation mode
    with torch.no_grad():
        features = feature_extractor(dataset).numpy()  # Extract features
    pseudo_labels = classifier.predict(features)  # Generate pseudo-labels
    classifier.partial_fit(features, pseudo_labels)  # Update classifier with pseudo-labels

# Evaluate the model's accuracy on the given dataset
def evaluate_model(feature_extractor, classifier, dataset, true_labels):
    feature_extractor.eval()  # Set model to evaluation mode
    with torch.no_grad():
        features = feature_extractor(dataset).numpy()  # Extract features
    predictions = classifier.predict(features)  # Predict labels
    accuracy = accuracy_score(true_labels.numpy(), predictions)  # Compute accuracy
    return accuracy

# Perform continual learning across multiple datasets
def continual_learning(base_path_train, base_path_eval, input_size, hidden_size, feature_dim, n_classes):
    accuracies = []  # List to store accuracy results for each task

    # Initialize the feature extractor and classifier
    feature_extractor = MLPFeatureExtractor(input_size, hidden_size, feature_dim)
    classifier = LwPClassifier(n_classes=n_classes, feature_dim=feature_dim)

    # Loop through all datasets for continual learning
    for i in range(1, 11):
        # Load the training dataset for the current task
        train_dataset_path = f"{base_path_train}/{i}_train_data.tar.pth"
        train_data, train_labels = load_dataset(train_dataset_path, is_train=True)

        # Load the evaluation dataset for the current task
        eval_dataset_path = f"{base_path_eval}/{i}_eval_data.tar.pth"
        eval_data, eval_labels = load_dataset(eval_dataset_path, is_train=False)

        if i == 1:
            # Train on the first dataset with labels
            train_initial_model(feature_extractor, classifier, train_data, train_labels)
        else:
            # Pseudo-label and update for subsequent datasets
            pseudo_label_and_update(feature_extractor, classifier, train_data)

        # Evaluate the model on all tasks seen so far
        model_accuracies = []
        for j in range(1, i + 1):
            eval_dataset_path_j = f"{base_path_eval}/{j}_eval_data.tar.pth"
            eval_data_j, eval_labels_j = load_dataset(eval_dataset_path_j, is_train=False)

            eval_acc = evaluate_model(feature_extractor, classifier, eval_data_j, eval_labels_j)
            model_accuracies.append(eval_acc)

        # Fill the rest of the row with `None` for tasks not yet encountered
        while len(model_accuracies) < 10:
            model_accuracies.append(None)

        accuracies.append(model_accuracies)  # Add the accuracies for this task

    return accuracies  # Return the accuracy matrix

# Paths and model parameters
base_path_train = "dataset/part_one_dataset/train_data"  # Path to training datasets
base_path_eval = "dataset/part_one_dataset/eval_data"  # Path to evaluation datasets

input_size = 3072  # Example: CIFAR-10 image flattened size
hidden_size = 256  # Size of the hidden layer in the MLP
feature_dim = 128  # Dimensionality of extracted features
n_classes = 10  # Number of classes

# Perform continual learning and get accuracy matrix
accuracy_matrix = continual_learning(base_path_train, base_path_eval, input_size, hidden_size, feature_dim, n_classes)

# Print the accuracy matrix
print("\nAccuracy matrix:")
for row in accuracy_matrix:
    print(" | ".join(f"{acc:.4f}" if acc is not None else "  N/A " for acc in row))



Accuracy matrix:
0.2564 |   N/A  |   N/A  |   N/A  |   N/A  |   N/A  |   N/A  |   N/A  |   N/A  |   N/A 
0.2500 | 0.2436 |   N/A  |   N/A  |   N/A  |   N/A  |   N/A  |   N/A  |   N/A  |   N/A 
0.2492 | 0.2428 | 0.2296 |   N/A  |   N/A  |   N/A  |   N/A  |   N/A  |   N/A  |   N/A 
0.2440 | 0.2396 | 0.2300 | 0.2380 |   N/A  |   N/A  |   N/A  |   N/A  |   N/A  |   N/A 
0.2412 | 0.2348 | 0.2244 | 0.2356 | 0.2644 |   N/A  |   N/A  |   N/A  |   N/A  |   N/A 
0.2412 | 0.2336 | 0.2212 | 0.2348 | 0.2620 | 0.2232 |   N/A  |   N/A  |   N/A  |   N/A 
0.2396 | 0.2332 | 0.2196 | 0.2320 | 0.2592 | 0.2240 | 0.2296 |   N/A  |   N/A  |   N/A 
0.2388 | 0.2288 | 0.2180 | 0.2304 | 0.2564 | 0.2220 | 0.2276 | 0.2396 |   N/A  |   N/A 
0.2380 | 0.2284 | 0.2168 | 0.2280 | 0.2548 | 0.2188 | 0.2260 | 0.2380 | 0.2380 |   N/A 
0.2360 | 0.2288 | 0.2168 | 0.2272 | 0.2520 | 0.2164 | 0.2264 | 0.2388 | 0.2356 | 0.2320
