### From github repo: https://github.com/donlee90/icarl/blob/master/model.py

In [1]:
import os
import numpy as np
import torch
from torchvision.io import read_image
from PIL import Image
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
import torchvision.transforms as transforms
from torchvision.transforms import Compose, Resize, ToTensor
from torch.utils.data import Dataset, DataLoader, Subset, ConcatDataset
from pathlib import Path
from torchvision.models import resnet18

class CustomDataset(Dataset):
    def __init__(self, txt_file_paths=None, transform=None, images_np=None, labels_np=None):
        """
        Initialize the dataset with either a list of .txt files containing image paths or numpy arrays of images and labels.
        
        Args:
            txt_file_paths (list of str): List of paths to .txt files, each containing image paths. Each .txt file represents a class.
            transform (callable, optional): Optional transform to be applied on a sample.
            images_np (numpy.ndarray, optional): Numpy array of images (used when not using .txt files for image paths).
            labels_np (numpy.ndarray, optional): Numpy array of labels (used when not using .txt files for image paths).
        """
        self.transform = transform
        self.images_np = images_np
        self.labels_np = labels_np
        self.image_paths = []
        self.labels = []

        if txt_file_paths is not None:
            # Load image paths and labels from .txt files
            for label, txt_path in enumerate(txt_file_paths):
                with open(txt_path, 'r') as f:
                    for line in f:
                        self.image_paths.append(line.strip())  # Remove newline characters
                        self.labels.append(label)  # The index of the .txt file is the label

        self.from_numpy = images_np is not None and labels_np is not None
        
    def __len__(self):
        if self.from_numpy:
            return len(self.images_np)
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        if self.from_numpy:
            # Handle numpy array data
            image = torch.from_numpy(self.images_np[idx])
            label = self.labels_np[idx]
            
            # # Convert numpy image to PIL Image if necessary before applying transform
            # if self.transform is not None:
            #     image = Image.fromarray)(image)
            #     image = self.transform(image)
        else:
            # Load image and label from the list populated from .txt files
            img_path = self.image_paths[idx]
            label = self.labels[idx]
            image = Image.open(img_path)  # Assuming these are paths to images
            
            if self.transform:
                image = self.transform(image)
                
        return idx, image, label




In [2]:


# Hyper Parameters
num_epochs = 5
batch_size = 32
learning_rate = 0.01

class iCaRLNet(nn.Module):
    def __init__(self, feature_size, n_classes):
        # Network architecture
        super(iCaRLNet, self).__init__()
        self.feature_extractor = resnet18(weights=True)
        self.feature_extractor.fc = nn.Linear(self.feature_extractor.fc.in_features, feature_size)
        
        self.bn = nn.BatchNorm1d(feature_size, momentum=0.01)
        self.ReLU = nn.ReLU()
        self.fc = nn.Linear(feature_size, n_classes, bias=False)

        self.n_classes = n_classes
        self.n_known = 0

        # List containing exemplar_sets
        # Each exemplar_set is a np.array of N images
        # with shape (N, C, H, W)
        self.exemplar_sets = []

        # Learning method
        self.cls_loss = nn.CrossEntropyLoss()
        self.dist_loss = nn.BCELoss()
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate,
                                    weight_decay=0.00001)
        #self.optimizer = optim.SGD(self.parameters(), lr=2.0,
        #                           weight_decay=0.00001)

        # Means of exemplars
        self.compute_means = True
        self.exemplar_means = []

    def forward(self, x):
        x = self.feature_extractor(x)
        x = self.bn(x)
        x = self.ReLU(x)
        x = self.fc(x)
        return x

    def increment_classes(self, n):
        """Add n classes in the final fc layer"""
        in_features = self.fc.in_features
        out_features = self.fc.out_features
        weight = self.fc.weight.data

        self.fc = nn.Linear(in_features, out_features+n, bias=False)
        self.fc.weight.data[:out_features] = weight
        self.n_classes += n

    def classify(self, x, transform):
        """Classify images by neares-means-of-exemplars

        Args:
            x: input image batch
        Returns:
            preds: Tensor of size (batch_size,)
        """
        batch_size = x.size(0)

        if self.compute_means:
            print ("Computing mean of exemplars...")
            exemplar_means = []
            for P_y in self.exemplar_sets:
                features = []
                # Extract feature for each exemplar in P_y
                for ex in P_y:
                    ex = Variable(torch.from_numpy(ex), volatile=True).cuda()
                    feature = self.feature_extractor(ex.unsqueeze(0))
                    feature = feature.squeeze()
                    feature.data = feature.data / feature.data.norm() # Normalize
                    features.append(feature)
                features = torch.stack(features)
                mu_y = features.mean(0).squeeze()
                mu_y.data = mu_y.data / mu_y.data.norm() # Normalize
                exemplar_means.append(mu_y)
            self.exemplar_means = exemplar_means
            self.compute_means = False
            print("Done")

        exemplar_means = self.exemplar_means
        means = torch.stack(exemplar_means) # (n_classes, feature_size)
        means = torch.stack([means] * batch_size) # (batch_size, n_classes, feature_size)
        means = means.transpose(1, 2) # (batch_size, feature_size, n_classes)

        feature = self.feature_extractor(x) # (batch_size, feature_size)
        for i in range(feature.size(0)): # Normalize
            feature.data[i] = feature.data[i] / feature.data[i].norm()
        feature = feature.unsqueeze(2) # (batch_size, feature_size, 1)
        
        feature = feature.expand_as(means) # (batch_size, feature_size, n_classes)

        dists = (feature - means).pow(2).sum(1).squeeze() #(batch_size, n_classes)
        _, preds = dists.min(1)

        return preds
        

    def construct_exemplar_set(self, dataset,  m, transform):  
        # ***************************************************************
        # Constructing exemplar set needs to be done through dataloader 
        # Passing all the images just to create 'm' exemplar set is not good.
        # Get exemplar_set and exemplar_feature only fit 'm' new exemplars so only best m fit. Sort by some similarity metric?
        # ***************************************************************
        
        """Construct an exemplar set for image set

        Args:
            dataset: This dataset will have images from which exemplar set will be created
        """
        # Compute and cache features for each example
        features = []

        features = []

        dataloader = DataLoader(dataset, batch_size=32, shuffle=False)
        
        for batch in dataloader:
            # Assuming batch[0] is a batch of images
            imgs = batch[1]
            imgs = imgs.cuda()

            # print(imgs.shape)

            with torch.no_grad():  # Use torch.no_grad() instead of volatile
                feature = self.feature_extractor(imgs).cpu().numpy()
                feature = feature / np.linalg.norm(feature, axis=1, keepdims=True)  # Normalize
                features.extend(feature)
        
        # for img in images:
        #     # x = Variable(transform(Image.fromarray(img)), volatile=True).cuda()
        #     x = Variable(transform(img), volatile=True).cuda()
        #     feature = self.feature_extractor(x.unsqueeze(0)).data.cpu().numpy()
        #     feature = feature / np.linalg.norm(feature) # Normalize
        #     features.append(feature[0])

        features = np.array(features)
        class_mean = np.mean(features, axis=0)
        class_mean = class_mean / np.linalg.norm(class_mean) # Normalize

        exemplar_set = []
        exemplar_features = [] # list of Variables of shape (feature_size,)
        for k in range(m):
            S = np.sum(exemplar_features, axis=0)
            phi = features
            mu = class_mean
            mu_p = 1.0/(k+1) * (phi + S)
            mu_p = mu_p / np.linalg.norm(mu_p)
            i = np.argmin(np.sqrt(np.sum((mu - mu_p) ** 2, axis=1)))
 
            exemplar_set.append(dataset[i][1])   # 0th index is index, 1st is image, 2nd is label
            exemplar_features.append(features[i])
            
            """
            print ("Selected example", i)
            print ("|exemplar_mean - class_mean|:")
            print (np.linalg.norm((np.mean(exemplar_features, axis=0) - class_mean))
            #features = np.delete(features, i, axis=0)
            """
        
        self.exemplar_sets.append(np.array(exemplar_set))
                

    def reduce_exemplar_sets(self, m):
        for y, P_y in enumerate(self.exemplar_sets):
            self.exemplar_sets[y] = P_y[:m]


    def combine_dataset_with_exemplars(self, dataset):
        # ***************************************************************
        # This too needs to be done with data loader.. Why load all the dataset into memory just to append?
        # Rather create a dataset with exemplar images + exemplar labels and then union them.
        # ***************************************************************

        transform = transforms.Compose([
                Resize((128, 128)),
                transforms.ToTensor(),
                transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
        ])
        
        for y, P_y in enumerate(self.exemplar_sets):
            exemplar_images = P_y
            exemplar_labels = [y] * len(P_y)
        
            exemplar_dataset = CustomDataset(images_np=exemplar_images, labels_np=exemplar_labels, transform=transform)

            # Create dataset with numpy arrays exemplar_images, exemplar_labels
            dataset = ConcatDataset([dataset, exemplar_dataset])

        return dataset
            


    def update_representation(self, dataset):
        # ***************************************************************
        # Pass dataloader not dataset.. Creating  new loader inside is not necessary.
        # ***************************************************************
        
        self.compute_means = True

        # Increment number of weights in final fc layer
        classes = list(set(dataset.labels))
        new_classes = [cls for cls in classes if cls > self.n_classes - 1]
        self.increment_classes(len(new_classes))
        self.cuda()
        print ("%d new classes" % (len(new_classes)))

        # Form combined training set
        dataset = self.combine_dataset_with_exemplars(dataset)

        loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size,
                                               shuffle=True, num_workers=2)

        # Store network outputs with pre-update parameters
        q = torch.zeros(len(dataset), self.n_classes).cuda()
        for indices, images, labels in loader:
            images = Variable(images).cuda()
            indices = indices.cuda()
            g = F.sigmoid(self.forward(images))
            q[indices] = g.data
        q = Variable(q).cuda()

        # Run network training
        optimizer = self.optimizer

        for epoch in range(num_epochs):
            for i, (indices, images, labels) in enumerate(loader):
                images = Variable(images).cuda()
                labels = Variable(labels).cuda()
                indices = indices.cuda()

                optimizer.zero_grad()
                g = self.forward(images)
                
                # Classification loss for new classes
                loss = self.cls_loss(g, labels)
                #loss = loss / len(range(self.n_known, self.n_classes))

                # Distilation loss for old classes
                if self.n_known > 0:
                    g = F.sigmoid(g)
                    q_i = q[indices]
                    dist_loss = sum(self.dist_loss(g[:,y], q_i[:,y])\
                            for y in range(self.n_known))
                    #dist_loss = dist_loss / self.n_known
                    loss += dist_loss

                loss.backward()
                optimizer.step()

                if (i+1) % 10 == 0:
                    print ('Epoch [%d/%d], Iter [%d/%d] Loss: %.4f' 
                           %(epoch+1, num_epochs, i+1, len(dataset)//batch_size, loss.item()))

In [3]:
transform = transforms.Compose([
        Resize((128, 128)),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

# Scenario 1: Real vs GAN
txt_file_paths = ['data/real_train_filepaths.txt', 'data/gan_train_filepaths.txt']  
train_dataset = CustomDataset(txt_file_paths=txt_file_paths, transform=transform)
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)

txt_file_paths = ['data/real_test_filepaths.txt', 'data/gan_test_filepaths.txt']  
test_dataset = CustomDataset(txt_file_paths=txt_file_paths, transform=transform)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=True)


In [4]:
list(set(train_dataset.labels))

[0, 1]

In [5]:
# Initialize CNN
K = 2000 # total number of exemplars
icarl = iCaRLNet(2048, 0)
icarl.cuda()

icarl.update_representation(train_dataset)



2 new classes
Epoch [1/5], Iter [10/50] Loss: 0.6627
Epoch [1/5], Iter [20/50] Loss: 0.6980
Epoch [1/5], Iter [30/50] Loss: 0.7042
Epoch [1/5], Iter [40/50] Loss: 0.7007
Epoch [1/5], Iter [50/50] Loss: 0.7115
Epoch [2/5], Iter [10/50] Loss: 0.7120
Epoch [2/5], Iter [20/50] Loss: 0.7361
Epoch [2/5], Iter [30/50] Loss: 0.6916
Epoch [2/5], Iter [40/50] Loss: 0.6848
Epoch [2/5], Iter [50/50] Loss: 0.6975
Epoch [3/5], Iter [10/50] Loss: 0.6688
Epoch [3/5], Iter [20/50] Loss: 0.6250
Epoch [3/5], Iter [30/50] Loss: 0.5741
Epoch [3/5], Iter [40/50] Loss: 0.4296
Epoch [3/5], Iter [50/50] Loss: 0.2612
Epoch [4/5], Iter [10/50] Loss: 0.4466
Epoch [4/5], Iter [20/50] Loss: 0.4430
Epoch [4/5], Iter [30/50] Loss: 0.6446
Epoch [4/5], Iter [40/50] Loss: 0.3206
Epoch [4/5], Iter [50/50] Loss: 0.4531
Epoch [5/5], Iter [10/50] Loss: 0.2847
Epoch [5/5], Iter [20/50] Loss: 0.3683
Epoch [5/5], Iter [30/50] Loss: 0.4499
Epoch [5/5], Iter [40/50] Loss: 0.4907
Epoch [5/5], Iter [50/50] Loss: 0.3636


In [6]:
m = int(K / icarl.n_classes)
icarl.reduce_exemplar_sets(m)

In [7]:
import torchvision.transforms as transforms

transform_test = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

In [8]:
# Construct exemplar sets for new classes
for y in range(icarl.n_known, icarl.n_classes):
    print("Constructing exemplar set for class-%d..." %(y))
    # images = train_dataset.get_image_class(y).transpose(0,3,2,1)
    # Need subset of train_dataset
    # Identify indices of samples with the desired label
    filtered_indices = [i for i, (idx, _, label) in enumerate(train_dataset) if label == y]
    
    subset_dataset = Subset(train_dataset, filtered_indices)
    
    icarl.construct_exemplar_set(subset_dataset, m, transform_test)
    print("Done")

Constructing exemplar set for class-0...
Done
Constructing exemplar set for class-1...
Done


In [9]:
for y, P_y in enumerate(icarl.exemplar_sets):
    print("Exemplar set for class-%d:" % (y), P_y.shape)
    #show_images(P_y[:10])

icarl.n_known = icarl.n_classes
print("iCaRL classes: %d" % icarl.n_known)

Exemplar set for class-0: (1000, 3, 128, 128)
Exemplar set for class-1: (1000, 3, 128, 128)
iCaRL classes: 2


In [10]:
total = 0.0
correct = 0.0

for idx,images, labels in train_dataloader:
    images = Variable(images).cuda()
    # print(type(images), images.shape)
    preds = icarl.classify(images, transform)
    total += labels.size(0)
    correct += (preds.data.cpu() == labels).sum()

print('Train Accuracy: %f %%' % (100 * correct / total))

Computing mean of exemplars...


  ex = Variable(torch.from_numpy(ex), volatile=True).cuda()


Done
Train Accuracy: 35.937500 %


In [11]:
preds,labels

(tensor([0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1,
         1, 0, 1, 0, 1, 1, 1, 1], device='cuda:0'),
 tensor([1, 0, 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0,
         0, 1, 0, 1, 0, 1, 0, 1]))

In [12]:
total, correct

(1600.0, tensor(575.))

#### Now, create dataset for diffusion model 
and run the iCarl to see how well it does.

In [13]:
# Scenario 1: Real vs GAN
txt_file_paths = ['data/real_train_filepaths.txt', 'data/diffusion_train_filepaths.txt']  
train_dataset = CustomDataset(txt_file_paths=txt_file_paths, transform=transform)
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)

txt_file_paths = ['data/real_test_filepaths.txt', 'data/diffusion_test_filepaths.txt']  
test_dataset = CustomDataset(txt_file_paths=txt_file_paths, transform=transform)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=True)

In [14]:
train_dataset[0]

(0,
 tensor([[[ 0.3430, -0.1416, -0.0447,  ...,  0.7501,  0.6531,  0.7113],
          [ 0.6725,  0.2073,  0.0910,  ...,  0.7501,  0.7501,  0.7694],
          [ 0.9439,  0.5562, -0.0447,  ...,  0.4399,  0.6531,  0.7307],
          ...,
          [-0.3743, -0.4906, -0.7426,  ..., -1.5567, -1.5567, -1.5374],
          [-0.8977, -0.7813, -0.7426,  ..., -1.5761, -1.5761, -1.5761],
          [-1.0140, -0.7426, -0.7426,  ..., -1.6149, -1.6149, -1.5955]],
 
         [[-0.4712, -0.8842, -0.4122,  ...,  0.1188,  0.2368,  0.3941],
          [-0.1762, -0.5499, -0.2942,  ...,  0.0598,  0.1384,  0.2171],
          [ 0.0401, -0.1959, -0.3336,  ..., -0.3336, -0.1762, -0.0582],
          ...,
          [-0.4516, -0.3532, -0.4712,  ..., -1.5529, -1.5529, -1.5332],
          [-1.0416, -0.7662, -0.5499,  ..., -1.5726, -1.5726, -1.5726],
          [-1.0612, -0.7072, -0.6089,  ..., -1.6119, -1.6119, -1.5922]],
 
         [[-0.4850, -1.0898, -0.6606,  ..., -0.0362,  0.1003,  0.2564],
          [-0.1338, -0.6

In [15]:
### Define dataset for diffusion model data
for indices, images, labels in train_dataloader:
    print(labels)
    break

tensor([1, 1, 1, 0, 1, 0, 1, 1, 0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1,
        1, 0, 0, 0, 0, 1, 0, 0])


In [16]:
icarl.update_representation(train_dataset)

0 new classes
Epoch [1/5], Iter [10/112] Loss: 1.9632
Epoch [1/5], Iter [20/112] Loss: 1.9495
Epoch [1/5], Iter [30/112] Loss: 1.9548
Epoch [1/5], Iter [40/112] Loss: 2.0165
Epoch [1/5], Iter [50/112] Loss: 1.8742
Epoch [1/5], Iter [60/112] Loss: 1.9543
Epoch [1/5], Iter [70/112] Loss: 1.8636
Epoch [1/5], Iter [80/112] Loss: 1.9186
Epoch [1/5], Iter [90/112] Loss: 2.0417
Epoch [1/5], Iter [100/112] Loss: 1.9725
Epoch [1/5], Iter [110/112] Loss: 2.0107
Epoch [2/5], Iter [10/112] Loss: 2.0366
Epoch [2/5], Iter [20/112] Loss: 2.0058
Epoch [2/5], Iter [30/112] Loss: 1.9491
Epoch [2/5], Iter [40/112] Loss: 2.0852
Epoch [2/5], Iter [50/112] Loss: 1.7597
Epoch [2/5], Iter [60/112] Loss: 1.8620
Epoch [2/5], Iter [70/112] Loss: 1.8633
Epoch [2/5], Iter [80/112] Loss: 1.8836
Epoch [2/5], Iter [90/112] Loss: 1.8795
Epoch [2/5], Iter [100/112] Loss: 1.8343
Epoch [2/5], Iter [110/112] Loss: 1.9384
Epoch [3/5], Iter [10/112] Loss: 1.8167
Epoch [3/5], Iter [20/112] Loss: 1.7740
Epoch [3/5], Iter [30/

In [17]:
dataset = icarl.combine_dataset_with_exemplars(train_dataset)

loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size,
                                       shuffle=True, num_workers=2)

dataset[3500][1].shape, dataset[0][1].shape

(torch.Size([3, 128, 128]), torch.Size([3, 128, 128]))

In [20]:
m = int(K / icarl.n_classes)

# Reduce exemplar sets for known classes
icarl.reduce_exemplar_sets(m)


In [21]:
# Construct exemplar sets for new classes
for y in range(icarl.n_known, icarl.n_classes):
    print ("Constructing exemplar set for class-%d..." %(y))
    images = train_set.get_image_class(y)
    icarl.construct_exemplar_set(images, m, transform_test)
    print ("Done")

for y, P_y in enumerate(icarl.exemplar_sets):
    print ("Exemplar set for class-%d:" % (y), P_y.shape)
    #show_images(P_y[:10])

icarl.n_known = icarl.n_classes
print ("iCaRL classes: %d" % icarl.n_known)

Exemplar set for class-0: (1000, 3, 128, 128)
Exemplar set for class-1: (1000, 3, 128, 128)
iCaRL classes: 2


In [23]:
total = 0.0
correct = 0.0
for indices, images, labels in train_dataloader:
    images = Variable(images).cuda()
    preds = icarl.classify(images, transform_test)
    total += labels.size(0)
    correct += (preds.data.cpu() != labels).sum()

print('Train Accuracy: %d %%' % (100 * correct / total))

Computing mean of exemplars...


  ex = Variable(torch.from_numpy(ex), volatile=True).cuda()


Done
Train Accuracy: 36 %
