In [14]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import numpy as np
import random
import copy
import matplotlib.pyplot as plt

from torch.utils.data import DataLoader, Subset

In [15]:
from torchmetrics.functional import structural_similarity_index_measure

def ssim(cam1, cam2):
    """
    Compute the ssim similarity between two saliency maps.
    """
    # cam1 = cam1.flatten()
    # cam2 = cam2.flatten()
    # return sum(abs(cam1 - cam2))
    # compute ssmi
    cam1 = torch.tensor(cam1).unsqueeze(0).unsqueeze(0)  # Add batch and channel dimensions
    cam2 = torch.tensor(cam2).unsqueeze(0).unsqueeze(0)  # Add batch and channel dimensions
    ssim_score = structural_similarity_index_measure(cam1, cam2, data_range=1.0)
    return ssim_score.item()

In [16]:
# Transform for CIFAR-10
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2470, 0.2435, 0.2616))  # standard CIFAR-10 normalization
])

# Load CIFAR-10
train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_dataset  = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

# Task label splits
tasks = [
    [0, 1, 2],   # Task 1
    [3, 4, 5],   # Task 2
    [6, 7, 8]    # Task 3
]

def filter_dataset_by_labels(dataset, labels):
    """Return indices of dataset samples having the specified labels."""
    indices = [i for i, (_, label) in enumerate(dataset) if label in labels]
    return Subset(dataset, indices)

# Create subsets for each task
train_subsets = [filter_dataset_by_labels(train_dataset, task) for task in tasks]
test_subsets = [filter_dataset_by_labels(test_dataset,  task) for task in tasks]


Files already downloaded and verified
Files already downloaded and verified


In [17]:
class SimpleVGG(nn.Module):
    def __init__(self, num_classes=5):
        """
        num_classes will be set to 5 for tasks [0–4] and 5 for tasks [5–9]
        in this example. Adjust if you want a single head with 10 classes or a multi-head setup.
        """
        super(SimpleVGG, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.ReLU(True),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.ReLU(True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )

        self.classifier = nn.Sequential(
            nn.Linear(8 * 8 * 128, 256),
            nn.ReLU(True),
            nn.Linear(256, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x
    
    def save(self, path):
        torch.save(self.state_dict(), path)


class ReplayBuffer:
    def __init__(self, buffer_size=200):
        self.buffer_size = buffer_size
        self.buffer_data = []
        self.buffer_labels = []

    def add_samples(self, data, labels):
        # If buffer is full, remove random samples to make space
        if len(self.buffer_data) >= self.buffer_size:
            # We can do random replacement or FIFO. Here, random removal for illustration.
            to_remove = len(self.buffer_data) + len(data) - self.buffer_size
            indices_to_remove = random.sample(range(len(self.buffer_data)), to_remove)
            for idx in sorted(indices_to_remove, reverse=True):
                del self.buffer_data[idx]
                del self.buffer_labels[idx]

        self.buffer_data.extend(data)
        self.buffer_labels.extend(labels)

    def get_samples(self, batch_size):
        """Return a random batch from the replay buffer."""
        if len(self.buffer_data) == 0:
            return None, None
        indices = random.sample(range(len(self.buffer_data)), min(batch_size, len(self.buffer_data)))
        replay_data = [self.buffer_data[i] for i in indices]
        replay_labels = [self.buffer_labels[i] for i in indices]
        return torch.stack(replay_data), torch.tensor(replay_labels)

    def __len__(self):
        return len(self.buffer_data)

In [32]:
def train_sequential(model, train_subsets, test_subsets, tasks, buffer_size=200,
                     num_epochs=5, batch_size=64, lr=0.01):
    """
    Train model sequentially on each task with a replay buffer.
    tasks: List of label subsets (e.g., [[0,1,2],[3,4,5],[6,7,8]])
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    replay_buffer = ReplayBuffer(buffer_size=buffer_size)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    # To record metrics across tasks
    train_acc_history = []
    test_acc_history = []

    for task_idx, (train_subset, test_subset, labels) in enumerate(zip(train_subsets, test_subsets, tasks)):
        print(f"=== Training on Task {task_idx+1} with labels {labels} ===")

        train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True)
        test_loader = DataLoader(test_subset, batch_size=batch_size, shuffle=False)
        train = True
        try:
            model.load_state_dict(torch.load(f"task_{'>'.join(map(str, range(1, task_idx+2)))}_nr_epochs_{num_epochs}_buffersize_{buffer_size}.pth"))
            train = False
            print("Model loaded from previous training instead of training from scratch")
        except:
            print("No model found, training from scratch")
            pass
        if train:
            for epoch in range(num_epochs):
                model.train()
                running_loss = 0.0
                correct = 0
                total = 0

                for images, lbls in train_loader:
                    images, lbls = images.to(device), lbls.to(device)
                    optimizer.zero_grad()

                    # Forward
                    outputs = model(images)
                    loss = criterion(outputs, lbls)
                    loss.backward()
                    optimizer.step()

                    running_loss += loss.item()
                    _, predicted = outputs.max(1)
                    correct += predicted.eq(lbls).sum().item()
                    total += lbls.size(0)

                    # Experience Replay step
                    # Mix replay buffer samples into training
                    replay_images, replay_labels = replay_buffer.get_samples(batch_size//2)
                    if replay_images is not None:
                        replay_images = replay_images.to(device)
                        replay_labels = replay_labels.to(device)

                        optimizer.zero_grad()
                        replay_outputs = model(replay_images)
                        replay_loss = criterion(replay_outputs, replay_labels)
                        replay_loss.backward()
                        optimizer.step()

                epoch_loss = running_loss / len(train_loader)
                epoch_acc = 100. * correct / total
                print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}, Acc: {epoch_acc:.2f}%")
            model.save(f"task_{'>'.join(map(str, range(1, task_idx+2)))}_nr_epochs_{num_epochs}_buffersize_{buffer_size}.pth")

        # Evaluate on the current task
        task_train_acc, train_correct, train_total = evaluate_accuracy(model, train_loader, device)
        task_test_acc , test_correct, test_total = evaluate_accuracy(model, test_loader, device)
        

        train_acc_history.append(task_train_acc)
        test_acc_history.append(task_test_acc)

        print(f"Task {task_idx+1} Train Accuracy: {task_train_acc:.2f}% ({train_correct}/{train_total})")
        print(f"Task {task_idx+1} Test Accuracy:  {task_test_acc:.2f}% ({test_correct}/{test_total})\n")

        for task_idx_test, test_subset in enumerate(test_subsets):
            test_loader = DataLoader(test_subset, batch_size=batch_size, shuffle=False)
            test_acc, test_correct, test_total = evaluate_accuracy(model, test_loader, device)
            nr_incorrect = test_total - test_correct
            print(f"Task {task_idx+1} -> Test Accuracy on {task_idx_test+1} : {test_acc:.2f}% ({test_correct}/{test_total}), Incorrect: {nr_incorrect}")


        # Add random samples from the current task to replay buffer
        store_in_replay_buffer(model, train_subset, replay_buffer, device, store_size=buffer_size//3)

    return model, train_acc_history, test_acc_history


def store_in_replay_buffer(model, dataset_subset, replay_buffer, device, store_size=50):
    """
    Randomly select store_size samples from the dataset_subset and add to replay buffer.
    """
    indices = np.random.choice(len(dataset_subset), size=store_size, replace=False)
    data = []
    labels = []
    for idx in indices:
        x, y = dataset_subset[idx]
        data.append(x)
        labels.append(y)
    replay_buffer.add_samples(data, labels)


def evaluate_accuracy(model, data_loader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in data_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = outputs.max(1)
            correct += predicted.eq(labels).sum().item()
            total += labels.size(0)
    return 100.0 * correct / total, correct, total



import torch
import torch.nn.functional as F

class GradCAM:
    """
    Minimal Grad-CAM implementation for a SimpleVGG model.
    target_layer_name = 'features.7' by default (the last conv layer).
    """
    def __init__(self, model, target_layer_name='features.7'):
        self.model = model
        self.target_layer_name = target_layer_name

        # Will be set by hooks
        self.gradients = None
        self.activations = None

        # Register hooks
        self._register_hooks()

    def _save_gradient(self, grad):
        self.gradients = grad

    def _register_hooks(self):
        """
        This finds the target layer by name and registers the forward/backward hooks
        to capture the activations and gradients.
        """
        # Build a dictionary of all modules by name
        modules_dict = dict([*self.model.named_modules()])

        # Confirm your layer name is in the dictionary (e.g., 'features.7')
        if self.target_layer_name not in modules_dict:
            raise ValueError(f"Layer {self.target_layer_name} not found in model. "
                             f"Available layers: {list(modules_dict.keys())}")

        target_layer = modules_dict[self.target_layer_name]

        def forward_hook(module, input, output):
            self.activations = output.detach()

        def backward_hook(module, grad_in, grad_out):
            # grad_out is a tuple with one element for the output gradient
            self.gradients = grad_out[0]

        # Register forward and backward hooks
        target_layer.register_forward_hook(forward_hook)
        target_layer.register_backward_hook(backward_hook)

    def generate_cam(self, input_tensor, target_class=None):
        """
        input_tensor: shape [B, 3, H, W]
        target_class: integer index of class to target. If None, uses predicted class.
        Returns: A list of CAM heatmaps (one per sample in the batch).
        """
        # Forward pass
        self.model.zero_grad()  # Clear any existing gradients
        output = self.model(input_tensor)

        # If target_class is None, use the top predicted class for each sample
        if target_class is None:
            target_class = output.argmax(dim=1)

        # Convert target_class to a list if it's a single tensor
        if isinstance(target_class, torch.Tensor):
            target_class = target_class.cpu().tolist()  # e.g., [class_idx for each sample]

        cams = []
        batch_size = input_tensor.size(0)

        for i in range(batch_size):
            # Backprop for sample i, class target_class[i]
            self.model.zero_grad()  # zero grads for each sample
            class_idx = target_class[i]
            score = output[i, class_idx]  # scalar
            score.backward(retain_graph=True)

            # Get gradients & activations for sample i
            gradients = self.gradients[i]       # shape: [128, 8, 8]
            activations = self.activations[i]   # shape: [128, 8, 8]

            # Compute channel-wise mean of gradients
            alpha = gradients.mean(dim=(1, 2), keepdim=True)  # shape: [128, 1, 1]

            # Linear combination of activations and alpha
            cam = (activations * alpha).sum(dim=0)  # shape: [8, 8]
            cam = F.relu(cam)  # ReLU to keep only positive activations

            # Normalize to [0, 1]
            cam -= cam.min()
            if cam.max() != 0:
                cam /= cam.max()

            cams.append(cam.detach().cpu().numpy())

        return cams


def _compute_total_divergence(model_task1, model_task3, dataset, filter_fn=None):
    """
    Compute the total L1 and L2 divergence across all misclassified samples.
    if filter_fn is passed, only compute divergence for samples that pass the filter(y_pred_task1, y_pred_task3, y_true).
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model_task1.to(device)
    model_task3.to(device)
    model_task1.eval()
    model_task3.eval()

    gradcam_task1 = GradCAM(model_task1)
    gradcam_task3 = GradCAM(model_task3)

    total_ssim_score = 0
    num_samples = 0


    for i in range(len(dataset)):
        x, y_true = dataset[i]
        x_ = x.unsqueeze(0).to(device)

        # Predictions
        y_pred_task1 = model_task1(x_).argmax(dim=1).item()
        y_pred_task3 = model_task3(x_).argmax(dim=1).item()

        if filter_fn is None or filter_fn(y_pred_task1, y_pred_task3, y_true):
            # Generate CAM for the ground-truth or the predicted class
            cam_task1 = gradcam_task1.generate_cam(x_, target_class=None)[0]
            cam_task3 = gradcam_task3.generate_cam(x_, target_class=None)[0]

            # Compute similarity scores
            ssim_score = ssim(cam_task1, cam_task3)

            total_ssim_score += ssim_score
            num_samples += 1

    return total_ssim_score, num_samples

def compute_total_divergence_misclassified(model_task1, model_task3, dataset):
    return _compute_total_divergence(model_task1, model_task3, dataset, 
                                     filter_fn=lambda y_pred_task1, y_pred_task3, y_true: y_pred_task1 == y_true and y_pred_task3 != y_true)

def compute_total_divergence_correctly_classified(model_task1, model_task3, dataset):
    return _compute_total_divergence(model_task1, model_task3, dataset, 
                                     filter_fn=lambda y_pred_task1, y_pred_task3, y_true: y_pred_task1 == y_true and y_pred_task3 == y_true)


def visualize_misclassified_samples(model_task1, model_task3, dataset, num_samples=20, output_folder="deliverables"):
    """
    1. Find samples in 'dataset' that model_task1 classified correctly but model_task3 misclassified.
    2. Generate Grad-CAM for both models on those samples.
    3. Plot side-by-side.
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model_task1.to(device)
    model_task3.to(device)
    model_task1.eval()
    model_task3.eval()

    gradcam_task1 = GradCAM(model_task1)
    gradcam_task3 = GradCAM(model_task3)

    misclassified_indices = []
    with torch.no_grad():
        for i in range(len(dataset)):
            x, y_true = dataset[i]
            x_ = x.unsqueeze(0).to(device)

            # Predictions
            y_pred_task1 = model_task1(x_).argmax(dim=1).item()
            y_pred_task3 = model_task3(x_).argmax(dim=1).item()

            if y_pred_task1 == y_true and y_pred_task3 != y_true:
                misclassified_indices.append(i)
            if len(misclassified_indices) >= num_samples:
                break

    # Visualize
    for idx in misclassified_indices:
        x, y_true = dataset[idx]
        x_ = x.unsqueeze(0).to(device)

        # Generate CAM for the ground-truth or the predicted class
        cam_task1 = gradcam_task1.generate_cam(x_, target_class=None)[0]
        cam_task3 = gradcam_task3.generate_cam(x_, target_class=None)[0]

        # Plot
        fig, axs = plt.subplots(1, 3, figsize=(12, 4))

        # SSIM score
        ssim_score = ssim(cam_task1, cam_task3)
        fig.suptitle(f"SSIM Score: {ssim_score:.2f}")
        # Original image
        axs[0].imshow(unnormalize_and_convert_to_numpy(x))
        axs[0].set_title(f"Original (label={y_true})")
        axs[0].axis('off')

        # Grad-CAM from Task 1 model
        axs[1].imshow(unnormalize_and_convert_to_numpy(x))
        axs[1].imshow(cam_task1, cmap='jet', alpha=0.5)
        axs[1].set_title(f"Grad-CAM Task 1 (pred={model_task1(x_.to(device)).argmax(dim=1).item()})")
        axs[1].axis('off')

        # Grad-CAM from Task 3 model
        axs[2].imshow(unnormalize_and_convert_to_numpy(x))
        axs[2].imshow(cam_task3, cmap='jet', alpha=0.5)
        axs[2].set_title(f"Grad-CAM Task 3 (pred={model_task3(x_.to(device)).argmax(dim=1).item()})")
        axs[2].axis('off')

        #plt.show()
        plt.savefig(f"{output_folder}/misclassified_samples{idx}.png")
        plt.close(fig)

import random

def visualize_correctly_classified_samples(model_task1, model_task3, dataset, num_samples=20, output_folder="deliverables"):
    """
    1. Find samples in 'dataset' that both model_task1 and model_task3 classified correctly.
    2. Randomly select 'num_samples' from these correctly classified samples.
    3. Generate Grad-CAM for both models on those samples.
    4. Plot side-by-side.
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model_task1.to(device)
    model_task3.to(device)
    model_task1.eval()
    model_task3.eval()

    gradcam_task1 = GradCAM(model_task1)
    gradcam_task3 = GradCAM(model_task3)

    correctly_classified_indices = []
    with torch.no_grad():
        for i in range(len(dataset)):
            x, y_true = dataset[i]
            x_ = x.unsqueeze(0).to(device)

            # Predictions
            y_pred_task1 = model_task1(x_).argmax(dim=1).item()
            y_pred_task3 = model_task3(x_).argmax(dim=1).item()

            if y_pred_task1 == y_true and y_pred_task3 == y_true:
                correctly_classified_indices.append(i)

    # Randomly select 'num_samples' from correctly classified samples
    selected_indices = random.sample(correctly_classified_indices, min(num_samples, len(correctly_classified_indices)))

    # Visualize
    for i, idx in enumerate(selected_indices):
        x, y_true = dataset[idx]
        x_ = x.unsqueeze(0).to(device)

        # Generate CAM for the ground-truth or the predicted class
        cam_task1 = gradcam_task1.generate_cam(x_, target_class=None)[0]
        cam_task3 = gradcam_task3.generate_cam(x_, target_class=None)[0]

        # Plot
        fig, axs = plt.subplots(1, 3, figsize=(12, 4))

        # compute l1 and l2 divergence
        ssim_score = ssim(cam_task1, cam_task3)
        fig.suptitle(f"SSIM: {ssim_score:.2f}")
        # Original image
        axs[0].imshow(unnormalize_and_convert_to_numpy(x))
        axs[0].set_title(f"Original (label={y_true})")
        axs[0].axis('off')

        # Grad-CAM from Task 1 model
        axs[1].imshow(unnormalize_and_convert_to_numpy(x))
        axs[1].imshow(cam_task1, cmap='jet', alpha=0.5)
        axs[1].set_title(f"Grad-CAM Task 1 (pred={model_task1(x_.to(device)).argmax(dim=1).item()})")
        axs[1].axis('off')

        # Grad-CAM from Task 3 model
        axs[2].imshow(unnormalize_and_convert_to_numpy(x))
        axs[2].imshow(cam_task3, cmap='jet', alpha=0.5)
        axs[2].set_title(f"Grad-CAM Task 3 (pred={model_task3(x_.to(device)).argmax(dim=1).item()})")
        axs[2].axis('off')

        #plt.show()
        plt.savefig(f"{output_folder}/correctly_classified_samples{i}.png")
        plt.close(fig)

def unnormalize_and_convert_to_numpy(tensor, mean=(0.4914, 0.4822, 0.4465),
                                     std=(0.2470, 0.2435, 0.2616)):
    # Convert from normalized tensor [C,H,W] to numpy array [H,W,C] for plotting
    img = tensor.cpu().numpy().transpose((1, 2, 0))
    img = img * np.array(std) + np.array(mean)
    img = np.clip(img, 0, 1)
    return img

In [30]:
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

# Prepare data subsets
# train_subsets, test_subsets already constructed above.

# Initialize model
#model = get_vgg_model(num_classes=10)
model = SimpleVGG(num_classes=10)


# Train the model sequentially on tasks
model, train_acc, test_acc = train_sequential(
    model, train_subsets, test_subsets, tasks,
    buffer_size=3_000, num_epochs=8, batch_size=64, lr=0.001
)

# (Optional) Save a copy of the model after Task 1 for visualization
# For demonstration, let's re-initialize and re-train only for tasks 1 (to compare).
# Alternatively, you can store a checkpoint after finishing Task 1 training.
model_task1 = SimpleVGG(num_classes=10)
model_task1, _, _ = train_sequential(
    model_task1, [train_subsets[0]], [test_subsets[0]], [tasks[0]],
    buffer_size=0, num_epochs=8, batch_size=64, lr=0.001
)

# After finishing Task 3 in the main model, compare Grad-CAM on samples
# that were correct for Task 1 model but are misclassified by the final model.
visualize_misclassified_samples(
    model_task1, model, test_subsets[0], num_samples=20
)

# Print or plot final accuracies
print("Train accuracies across tasks:", train_acc)
print("Test accuracies across tasks:", test_acc)

=== Training on Task 1 with labels [0, 1, 2] ===
Model loaded from previous training instead of training from scratch


  model.load_state_dict(torch.load(f"task_{'>'.join(map(str, range(1, task_idx+2)))}_nr_epochs_{num_epochs}_buffersize_{buffer_size}.pth"))


Task 1 Train Accuracy: 98.73% (14810/15000)
Task 1 Test Accuracy:  92.57% (2777/3000)

Task 1 -> Test Accuracy on 1 : 92.57% (2777/3000), Incorrect: 223
Task 1 -> Test Accuracy on 2 : 0.00% (0/3000), Incorrect: 3000
Task 1 -> Test Accuracy on 3 : 0.00% (0/3000), Incorrect: 3000
=== Training on Task 2 with labels [3, 4, 5] ===
Model loaded from previous training instead of training from scratch
Task 2 Train Accuracy: 97.44% (14616/15000)
Task 2 Test Accuracy:  69.97% (2099/3000)

Task 2 -> Test Accuracy on 1 : 50.23% (1507/3000), Incorrect: 1493
Task 2 -> Test Accuracy on 2 : 69.97% (2099/3000), Incorrect: 901
Task 2 -> Test Accuracy on 3 : 0.00% (0/3000), Incorrect: 3000
=== Training on Task 3 with labels [6, 7, 8] ===
Model loaded from previous training instead of training from scratch
Task 3 Train Accuracy: 97.73% (14659/15000)
Task 3 Test Accuracy:  88.50% (2655/3000)

Task 3 -> Test Accuracy on 1 : 43.30% (1299/3000), Incorrect: 1701
Task 3 -> Test Accuracy on 2 : 27.17% (815/3000)

  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)


Train accuracies across tasks: [98.73333333333333, 97.44, 97.72666666666667]
Test accuracies across tasks: [92.56666666666666, 69.96666666666667, 88.5]


In [20]:
visualize_correctly_classified_samples(model_task1, model, test_subsets[0] + test_subsets[1] + test_subsets[2] , num_samples=50)

In [21]:
visualize_misclassified_samples(
    model_task1, model, test_subsets[0], num_samples=50
)

In [22]:
# Compute total divergences for misclassified samples
ssim_score_misclassified, num_misclassified_samples = compute_total_divergence_misclassified(
    model_task1, model, test_subsets[0]
)

# Compute total divergences for correctly classified samples
ssim_score_correct, num_correct_samples = compute_total_divergence_correctly_classified(
    model_task1, model, test_subsets[0] + test_subsets[1] + test_subsets[2]
)

print(f"Average SSIM Score (Misclassified): {ssim_score_misclassified/num_misclassified_samples:.2f}")
print(f"Number of Misclassified Samples: {num_misclassified_samples}")

print(f"Average SSIM Score (Correctly Classified): {ssim_score_correct/num_correct_samples:.2f}")
print(f"Number of Correctly Classified Samples: {num_correct_samples}")


Average SSIM Score (Misclassified): 0.02
Number of Misclassified Samples: 2488
Average SSIM Score (Correctly Classified): 0.01
Number of Correctly Classified Samples: 249


In [23]:
def load_model(model, task_idx, num_epochs, buffer_size):
    model.load_state_dict(torch.load(f"task_{'>'.join(map(str, range(1, task_idx+2)))}_nr_epochs_{num_epochs}_buffersize_{buffer_size}.pth"))


In [None]:
NR_EPOCHS = 8
BUFFER_SIZE = 3000
model_final = SimpleVGG(num_classes=10)
load_model(model_final, 2, NR_EPOCHS, BUFFER_SIZE)

print("In the following we will compute the SSIM score between the model after each task T and the final model after task 3. The inputs used will correspond to the test set of task T.")
print(f"Buffer Size: {BUFFER_SIZE}")

for final_model_idx in range(3):
    model_final = SimpleVGG(num_classes=10)
    load_model(model_final, final_model_idx, NR_EPOCHS, BUFFER_SIZE)
    for task_idx in range(final_model_idx + 1):
        model = SimpleVGG(num_classes=10)
        load_model(model, task_idx, NR_EPOCHS, BUFFER_SIZE)
        ssim_score, num_samples = compute_total_divergence_misclassified(model, model_final, test_subsets[task_idx])
        if num_samples == 0:
            print(f"Model after Task {task_idx+1} vs Model {final_model_idx + 1} evaluated on {task_idx + 1}: No forgotten samples")
        else:
            print(f"Model after Task {task_idx+1} vs Model {final_model_idx + 1} evaluated on {task_idx + 1}: Average SSIM Score (Forgotten): {ssim_score/num_samples:.2f}")
        ssim_score, num_samples = compute_total_divergence_correctly_classified(model, model_final, test_subsets[task_idx])
        if num_samples == 0:
            print(f"Model after Task {task_idx+1} vs Model {final_model_idx + 1} evaluated on {task_idx + 1}: No correctly classified samples")
        else:
            print(f"Model after Task {task_idx+1} vs Model {final_model_idx + 1} evaluated on {task_idx + 1}: Average SSIM Score (Correctly Classified in both): {ssim_score/num_samples:.2f}")

  model.load_state_dict(torch.load(f"task_{'>'.join(map(str, range(1, task_idx+2)))}_nr_epochs_{num_epochs}_buffersize_{buffer_size}.pth"))
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)


In the following we will compute the SSIM score between the model after each task T and the final model after task 3. The inputs used will correspond to the test set of task T.
Buffer Size: 3000
Model after Task 1 vs Model 1 evaluated on 1: No forgotten samples




Model after Task 1 vs Model 1 evaluated on 1: Average SSIM Score (Correctly Classified in both): 1.00
Model after Task 1 vs Model 2 evaluated on 1: Average SSIM Score (Forgotten): 0.07
Model after Task 1 vs Model 2 evaluated on 1: Average SSIM Score (Correctly Classified in both): 0.14
Model after Task 2 vs Model 2 evaluated on 2: No forgotten samples
Model after Task 2 vs Model 2 evaluated on 2: Average SSIM Score (Correctly Classified in both): 1.00
Model after Task 1 vs Model 3 evaluated on 1: Average SSIM Score (Forgotten): 0.03
Model after Task 1 vs Model 3 evaluated on 1: Average SSIM Score (Correctly Classified in both): 0.07
Model after Task 2 vs Model 3 evaluated on 2: Average SSIM Score (Forgotten): 0.06
Model after Task 2 vs Model 3 evaluated on 2: Average SSIM Score (Correctly Classified in both): 0.08
Model after Task 3 vs Model 3 evaluated on 3: No forgotten samples
Model after Task 3 vs Model 3 evaluated on 3: Average SSIM Score (Correctly Classified in both): 1.00


In [None]:
NR_EPOCHS = 8
BUFFER_SIZE = 5000
model_final = SimpleVGG(num_classes=10)
load_model(model_final, 2, NR_EPOCHS, BUFFER_SIZE)

print("In the following we will compute the SSIM score between the model after each task T and the final model after task 3. The inputs used will correspond to the test set of task T.")
print(f"Buffer Size: {BUFFER_SIZE}")

for final_model_idx in range(3):
    model_final = SimpleVGG(num_classes=10)
    load_model(model_final, final_model_idx, NR_EPOCHS, BUFFER_SIZE)
    for task_idx in range(final_model_idx + 1):
        model = SimpleVGG(num_classes=10)
        load_model(model, task_idx, NR_EPOCHS, BUFFER_SIZE)
        ssim_score, num_samples = compute_total_divergence_misclassified(model, model_final, test_subsets[task_idx])
        if num_samples == 0:
            print(f"Model after Task {task_idx+1} vs Model {final_model_idx + 1} evaluated on {task_idx + 1}: No forgotten samples")
        else:
            print(f"Model after Task {task_idx+1} vs Model {final_model_idx + 1} evaluated on {task_idx + 1}: Average SSIM Score (Forgotten): {ssim_score/num_samples:.2f}")
        ssim_score, num_samples = compute_total_divergence_correctly_classified(model, model_final, test_subsets[task_idx])
        if num_samples == 0:
            print(f"Model after Task {task_idx+1} vs Model {final_model_idx + 1} evaluated on {task_idx + 1}: No correctly classified samples")
        else:
            print(f"Model after Task {task_idx+1} vs Model {final_model_idx + 1} evaluated on {task_idx + 1}: Average SSIM Score (Correctly Classified in both): {ssim_score/num_samples:.2f}")

In the following we will compute the SSIM score between the model after each task T and the final model after task 3. The inputs used will correspond to the test set of task T.
Buffer Size: 5000


  model.load_state_dict(torch.load(f"task_{'>'.join(map(str, range(1, task_idx+2)))}_nr_epochs_{num_epochs}_buffersize_{buffer_size}.pth"))


Model after Task 1 vs Model 1 evaluated on 1: No forgotten samples
Model after Task 1 vs Model 1 evaluated on 1: Average SSIM Score (Correctly Classified in both): 1.00
Model after Task 1 vs Model 2 evaluated on 1: Average SSIM Score (Forgotten): 0.09
Model after Task 1 vs Model 2 evaluated on 1: Average SSIM Score (Correctly Classified in both): 0.14
Model after Task 2 vs Model 2 evaluated on 2: No forgotten samples
Model after Task 2 vs Model 2 evaluated on 2: Average SSIM Score (Correctly Classified in both): 1.00
Model after Task 1 vs Model 3 evaluated on 1: Average SSIM Score (Forgotten): 0.05
Model after Task 1 vs Model 3 evaluated on 1: Average SSIM Score (Correctly Classified in both): 0.09
Model after Task 2 vs Model 3 evaluated on 2: Average SSIM Score (Forgotten): 0.04
Model after Task 2 vs Model 3 evaluated on 2: Average SSIM Score (Correctly Classified in both): 0.07
Model after Task 3 vs Model 3 evaluated on 3: No forgotten samples
Model after Task 3 vs Model 3 evaluated 

In [33]:
import os

for final_model_idx in range(3):
    model_final = SimpleVGG(num_classes=10)
    load_model(model_final, final_model_idx, NR_EPOCHS, BUFFER_SIZE)
    for task_idx in range(final_model_idx + 1):
        model = SimpleVGG(num_classes=10)
        load_model(model, task_idx, NR_EPOCHS, BUFFER_SIZE)
        folder_path = f"./deliverables/model_{final_model_idx+1}-->-model-task-{task_idx+1}-buffersize-{BUFFER_SIZE}"
        os.makedirs(folder_path, exist_ok=True)
        with open(os.path.join(folder_path, "info.txt"), "w") as f:
            f.write(f"Visualizing the saliency map of 10 examples of forgotten samples (that were known in model_{task_idx+1}, but forgotten in model_{final_model_idx+1}) evaluated on task {task_idx+1}.\n")
            f.write(f"Visualizing the saliency map of 10 examples of remembered samples (that were known in both model_{task_idx+1} and model_{final_model_idx+1}) evaluated on task {task_idx+1}.\n")
        visualize_misclassified_samples(
            model, model_final, test_subsets[task_idx], num_samples=10, output_folder=folder_path
        )
        visualize_correctly_classified_samples(
            model, model_final, test_subsets[task_idx], num_samples=10, output_folder=folder_path
        )

  model.load_state_dict(torch.load(f"task_{'>'.join(map(str, range(1, task_idx+2)))}_nr_epochs_{num_epochs}_buffersize_{buffer_size}.pth"))
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
