# Incremental learning on image classification
Ablation studies

## Libraries and packages


In [0]:
!pip3 install 'torch==1.4.0'
!pip3 install 'torchvision==0.5.0'
!pip3 install 'Pillow-SIMD'
!pip3 install 'tqdm'

In [0]:
import os
import urllib
import logging

import numpy as np

import torch
import torch.nn as nn
import torch.nn.init as init
import torch.optim as optim
from torch.utils.data import Dataset, Subset, DataLoader, ConcatDataset
from torch.backends import cudnn

import torchvision
from torchvision import transforms
from torchvision.models import resnet34

from PIL import Image
from tqdm import tqdm

from copy import deepcopy

from sklearn.metrics import confusion_matrix

In [0]:
# GitHub credentials for cloning private repository
username = 'xolotl18'
password = ''

# Download packages from repository
password = urllib.parse.quote(password)
!git clone https://$username:$password@github.com/manuelemacchia/incremental-learning-image-classification.git
password = ''

!mv -v incremental-learning-image-classification/* .
!rm -rf incremental-learning-image-classification README.md

In [0]:
from data.cifar100 import Cifar100
from model.resnet_cifar import resnet32
from model.manager import Manager
from model.icarl import Exemplars
from model.icarl import iCaRL
from utils import plot

## Arguments

In [0]:
# Directories
DATA_DIR = 'data'       # Directory where the dataset will be downloaded

# Settings
DEVICE = 'cuda'

# Dataset

RANDOM_STATE = None

RANDOM_STATES = [658, 423, 422]      # For reproducibility of results                        
                                     # Note: different random states give very different
                                     # splits and therefore very different results.

NUM_CLASSES = 100       # Total number of classes
NUM_BATCHES = 10
CLASS_BATCH_SIZE = 10   # Size of batch of classes for incremental learning

VAL_SIZE = 0.1          # Proportion of validation set with respect to training set (between 0 and 1)

# Training
BATCH_SIZE = 64         # Batch size (iCaRL sets this to 128)
LR = 2                  # Initial learning rate
                       
MOMENTUM = 0.9          # Momentum for stochastic gradient descent (SGD)
WEIGHT_DECAY = 1e-5     # Weight decay from iCaRL

NUM_RUNS = 3            # Number of runs of every method
                        # Note: this should be at least 3 to have a fair benchmark

NUM_EPOCHS = 70         # Total number of training epochs
MILESTONES = [49, 63]   # Step down policy from iCaRL (MultiStepLR)
                        # Decrease the learning rate by gamma at each milestone
GAMMA = 0.2             # Gamma factor from iCaRL

## Data preparation

In [0]:
# Transformations for Learning Without Forgetting
train_transform = transforms.Compose([transforms.RandomCrop(32, padding=4),
                                      transforms.RandomHorizontalFlip(),
                                      transforms.ToTensor(), # Turn PIL Image to torch.Tensor
                                      transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

test_transform = transforms.Compose([transforms.ToTensor(),
                                     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))                                    
])

In [6]:
train_subsets = [[] for i in range(NUM_RUNS)]
val_subsets = [[] for i in range(NUM_RUNS)]
test_subsets = [[] for i in range(NUM_RUNS)]

for run_i in range(NUM_RUNS):
    for split_i in range(CLASS_BATCH_SIZE):
        if run_i+split_i == 0: # Download dataset only at first instantiation
            download = True
        else:
            download = False

        # Create CIFAR100 dataset
        train_dataset = Cifar100(DATA_DIR, train=True, download=download, random_state=RANDOM_STATES[run_i], transform=train_transform)
        test_dataset = Cifar100(DATA_DIR, train=False, download=False, random_state=RANDOM_STATES[run_i], transform=test_transform)
    
        # Subspace of CIFAR100 of 10 classes
        train_dataset.set_classes_batch(train_dataset.batch_splits[split_i]) 
        test_dataset.set_classes_batch([test_dataset.batch_splits[i] for i in range(0, split_i+1)])

        # Define train and validation indices
        train_indices, val_indices = train_dataset.train_val_split(VAL_SIZE, RANDOM_STATES[run_i])

        # Define subsets
        train_subsets[run_i].append(Subset(train_dataset, train_indices))
        val_subsets[run_i].append(Subset(train_dataset, val_indices))
        test_subsets[run_i].append(test_dataset)

Downloading https://www.cs.toronto.edu/~kriz/cifar-100-python.tar.gz to data/cifar-100-python.tar.gz


HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))

Extracting data/cifar-100-python.tar.gz to data


## Classifiers

### K-NN

In [0]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score

class iCaRLwithKNN(iCaRL):
    def classifier_fit(self, train_dataset, **clf_args):
        """Fit classifier on the union of training dataset and exemplars."""

        # Union of training dataset and exemplars
        exemplars_dataset = Exemplars(self.exemplars, self.train_transform)
        train_dataset_with_exemplars = ConcatDataset([exemplars_dataset, train_dataset])

        # Convert dataset to numpy format
        # X contains training samples, y contains labels
        X, y = self.dataset_to_numpy(train_dataset_with_exemplars)

        # Extract features from the training dataset
        X_features = self.extract_features(torch.tensor(X, dtype=torch.float))
        for i in range(X_features.size(0)):
            X_features[i] = X_features[i]/X_features[i].norm()
        X_features = X_features.to('cpu').numpy()

        self.clf = KNeighborsClassifier(clf_args['n_neighbors'])
        self.clf.fit(X_features, y)

    def classifier_predict(self, test_dataset):
        """Predict labels of test_dataset."""

        X_test, y_test = self.dataset_to_numpy(test_dataset)

        # Extract features from the test set
        X_test_features = self.extract_features(torch.tensor(X_test, dtype=torch.float))
        for i in range(X_test_features.size(0)):
            X_test_features[i] = X_test_features[i]/X_test_features[i].norm()
        X_test_features = X_test_features.to('cpu').numpy()
        
        y_pred = self.clf.predict(X_test_features)

        return y_test, y_pred

    def dataset_to_numpy(self, dataset):
        # Preallocate arrays
        X = np.zeros((len(dataset), 3, 32, 32))
        y = np.zeros(len(dataset), dtype=int)

        dataloader = DataLoader(dataset, batch_size=1)

        for idx, (image, labels) in enumerate(dataloader):
            X[idx] = image[0].numpy()
            y[idx] = labels.numpy()[0]

        return X, y

    def test_knn(self, test_dataset, train_dataset):
        """Test the model.

        Args:
            test_dataset: dataset on which to test the network
            train_dataset: training set used to train the last split
        Returns:
            accuracy (float): accuracy of the model on the test set
        """

        self.net.train(False)
        if self.best_net is not None: self.best_net.train(False)  # Set Network to evaluation mode
        if self.old_net is not None: self.old_net.train(False)

        with torch.no_grad():
            self.classifier_fit(train_dataset, n_neighbors=3)
            y_truth, y_pred = self.classifier_predict(test_dataset)

        # Calculate accuracy
        accuracy = accuracy_score(y_truth, y_pred)

        print(f"Test accuracy (iCaRL with KNN): {accuracy} ")

        return accuracy

In [0]:
LR = 2
MOMENTUM = 0.9
WEIGHT_DECAY = 0.00001
MILESTONES = [49, 63]
GAMMA = 0.2
NUM_EPOCHS = 70
BATCH_SIZE = 128

In [0]:
logs_icarl = [[] for _ in range(NUM_RUNS)]
logs_icarl_knn = [[] for _ in range(NUM_RUNS)]

for run_i in range(NUM_RUNS):
    net = resnet32()
    icarl_knn = iCaRLwithKNN(DEVICE, net, LR, MOMENTUM, WEIGHT_DECAY, MILESTONES, GAMMA, NUM_EPOCHS, BATCH_SIZE, train_transform, test_transform)

    for split_i in range(10):
        print(f"## Split {split_i} of run {run_i} ##")
        
        train_logs = icarl_knn.incremental_train(split_i, train_subsets[run_i][split_i], val_subsets[run_i][split_i])

        acc, _ = icarl_knn.test(test_subsets[run_i][split_i], train_subsets[run_i][split_i])
        logs_icarl[run_i].append(acc)

        acc = icarl_knn.test_knn(test_subsets[run_i][split_i], train_subsets[run_i][split_i])
        logs_icarl_knn[run_i].append(acc)

### Linear SVM

In [0]:
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score

class iCaRLwithSVM(iCaRL):
    def classifier_fit(self, train_dataset, **clf_args):
        """Fit classifier on the union of training dataset and exemplars."""

        # Union of training dataset and exemplars
        exemplars_dataset = Exemplars(self.exemplars, self.train_transform)
        train_dataset_with_exemplars = ConcatDataset([exemplars_dataset, train_dataset])

        # Convert dataset to numpy format
        # X contains training samples, y contains labels
        X, y = self.dataset_to_numpy(train_dataset_with_exemplars)

        # Extract features from the training dataset
        X_features = self.extract_features(torch.tensor(X, dtype=torch.float))
        for i in range(X_features.size(0)):
            X_features[i] = X_features[i]/X_features[i].norm()
        X_features = X_features.to('cpu').numpy()

        self.clf = SVC(C=clf_args['C'], kernel=clf_args['kernel'], gamma=clf_args['gamma'])
        self.clf.fit(X_features, y)

    def classifier_predict(self, test_dataset):
        """Predict labels of test_dataset."""

        X_test, y_test = self.dataset_to_numpy(test_dataset)

        # Extract features from the test set
        X_test_features = self.extract_features(torch.tensor(X_test, dtype=torch.float))
        for i in range(X_test_features.size(0)):
            X_test_features[i] = X_test_features[i]/X_test_features[i].norm()
        X_test_features = X_test_features.to('cpu').numpy()
        
        y_pred = self.clf.predict(X_test_features)

        return y_test, y_pred

    def dataset_to_numpy(self, dataset):
        # Preallocate arrays
        X = np.zeros((len(dataset), 3, 32, 32))
        y = np.zeros(len(dataset), dtype=int)

        dataloader = DataLoader(dataset, batch_size=1)

        for idx, (image, labels) in enumerate(dataloader):
            X[idx] = image[0].numpy()
            y[idx] = labels.numpy()[0]

        return X, y

    def test_knn(self, test_dataset, train_dataset):
        """Test the model.

        Args:
            test_dataset: dataset on which to test the network
            train_dataset: training set used to train the last split
        Returns:
            accuracy (float): accuracy of the model on the test set
        """

        self.net.train(False)
        if self.best_net is not None: self.best_net.train(False)  # Set Network to evaluation mode
        if self.old_net is not None: self.old_net.train(False)

        with torch.no_grad():
            self.classifier_fit(train_dataset, C=1, kernel="rbf", gamma="auto")
            y_truth, y_pred = self.classifier_predict(test_dataset)

        # Calculate accuracy
        accuracy = accuracy_score(y_truth, y_pred)

        print(f"Test accuracy (iCaRL with SVC): {accuracy} ")

        return accuracy

In [0]:
LR = 2
MOMENTUM = 0.9
WEIGHT_DECAY = 0.00001
MILESTONES = [49, 63]
GAMMA = 0.2
NUM_EPOCHS = 70
BATCH_SIZE = 128

In [0]:
logs_icarl = [[] for _ in range(NUM_RUNS)]
logs_icarl_svm = [[] for _ in range(NUM_RUNS)]

for run_i in range(NUM_RUNS):
    net = resnet32()
    icarl_svm = iCaRLwithSVM(DEVICE, net, LR, MOMENTUM, WEIGHT_DECAY, MILESTONES, GAMMA, NUM_EPOCHS, BATCH_SIZE, train_transform, test_transform)

    for split_i in range(10):
        print(f"## Split {split_i} of run {run_i} ##")
        
        train_logs = icarl_svm.incremental_train(split_i, train_subsets[run_i][split_i], val_subsets[run_i][split_i])

        acc, _ = icarl_svm.test(test_subsets[run_i][split_i], train_subsets[run_i][split_i])
        logs_icarl[run_i].append(acc)

        acc = icarl_svm.test_knn(test_subsets[run_i][split_i], train_subsets[run_i][split_i])
        logs_icarl_svm[run_i].append(acc)

In [0]:
print(logs_icarl)
print(logs_icarl_svm)from sklearn.svm import SVC

### All classifiers

In [0]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC

from sklearn.metrics import accuracy_score
from sklearn.model_selection import GridSearchCV

In [0]:
LR = 2
MOMENTUM = 0.9
WEIGHT_DECAY = 0.00001
MILESTONES = [49, 63]
GAMMA = 0.2
NUM_EPOCHS = 70
BATCH_SIZE = 64

In [0]:
class iCaRLwithCLFS(iCaRL):
    def classifier_fit(self, train_dataset, split):
        """Fit classifier on the union of training dataset and exemplars."""

        # Union of training dataset and exemplars
        exemplars_dataset = Exemplars(self.exemplars, self.train_transform)
        train_dataset_with_exemplars = ConcatDataset([exemplars_dataset, train_dataset])

        # Convert dataset to numpy format
        # X contains training samples, y contains labels
        X, y = self.dataset_to_numpy(train_dataset_with_exemplars)

        # Extract features from the training dataset
        X_features = self.extract_features(torch.tensor(X, dtype=torch.float))
        for i in range(X_features.size(0)):
            X_features[i] = X_features[i]/X_features[i].norm()
        X_features = X_features.to('cpu').numpy()

        param_grid1 = { 
          'n_estimators': [100, 200, 500],
          'max_features': ['sqrt', 'log2'],
          'max_depth' : [5,6,7],
          'criterion' :['gini', 'entropy']
        }

        param_grid2 = {
            'n_neighbors': [3, 5, 7, 9],
            'leaf_size': [10, 30, 100]
        }
        param_grid3 = {
            'kernel' : ['linear', 'rbf', 'sigmoid'],
            'C' : [0.01, 0.1, 1, 10],
            'gamma' : [1, 0.1, 0.01, 0.001]
        }


        self.clf1 = GridSearchCV(estimator=RandomForestClassifier(), param_grid=param_grid1, cv=3, refit=True)
        self.clf2 = GridSearchCV(estimator=KNeighborsClassifier(), param_grid=param_grid2, cv=3, refit=True)
        self.clf3 = GridSearchCV(estimator=SVC(), param_grid=param_grid3, cv=3, refit=True)
        
        self.clf1.fit(X_features, y)
        self.clf2.fit(X_features, y)
        self.clf3.fit(X_features, y)

        with open('./params.txt', 'w') as writefile:
          writefile.write(f"split {split}")
          writefile.write(f"rfc best parameters:\n n_estimtors: {self.clf1.best_params_['n_estimators']}\n max_features : {self.clf1.best_params_['max_features']}\n max_depth : {self.clf1.best_params_['max_depth']}\n criterion : {self.clf1.best_params_['criterion']}")
          writefile.write(f"knn best parameters:\n knn__n_neighbors : {self.clf2.best_params_['n_neighbors']}\n leaf_size : {self.clf2.best_params_['leaf_size']}")
          writefile.write(f"svc best parameters:\n kernel : {self.clf3.best_params_['kernel']}\n C : {self.clf2.best_params_['C']}\n gamma : {self.clf3.best_params_['gamma']}\n")


        


    def classifier_predict(self, test_dataset):
        """Predict labels of test_dataset."""

        X_test, y_test = self.dataset_to_numpy(test_dataset)

        # Extract features from the test set
        X_test_features = self.extract_features(torch.tensor(X_test, dtype=torch.float))
        for i in range(X_test_features.size(0)):
            X_test_features[i] = X_test_features[i]/X_test_features[i].norm()
        X_test_features = X_test_features.to('cpu').numpy()
        
        y_pred1 = self.clf1.predict(X_test_features)
        y_pred2 = self.clf2.predict(X_test_features)
        y_pred3 = self.clf3.predict(X_test_features)

        return y_test, y_pred1, y_pred2, y_pred3

    def dataset_to_numpy(self, dataset):
        # Preallocate arrays
        X = np.zeros((len(dataset), 3, 32, 32))
        y = np.zeros(len(dataset), dtype=int)

        dataloader = DataLoader(dataset, batch_size=1)

        for idx, (image, labels) in enumerate(dataloader):
            X[idx] = image[0].numpy()
            y[idx] = labels.numpy()[0]

        return X, y

    def test_classifiers(self, test_dataset, train_dataset, split):
        """Test the model.

        Args:
            test_dataset: dataset on which to test the network
            train_dataset: training set used to train the last split
        Returns:
            accuracy (float): accuracy of the model on the test set
        """

        self.net.train(False)
        if self.best_net is not None: self.best_net.train(False)  # Set Network to evaluation mode
        if self.old_net is not None: self.old_net.train(False)

        with torch.no_grad():
            self.classifier_fit(train_dataset, split)
            y_truth, y_pred1, y_pred2, y_pred3 = self.classifier_predict(test_dataset)

        # Calculate accuracy
        accuracy1 = accuracy_score(y_truth, y_pred1)
        accuracy2 = accuracy_score(y_truth, y_pred2)
        accuracy3 = accuracy_score(y_truth, y_pred3)

        print(f"Test accuracy (iCaRL with RFC): {accuracy1} ")
        print(f"Test accuracy (iCaRL with KNN): {accuracy2} ")
        print(f"Test accuracy (iCaRL with SVC): {accuracy3} ")

        return accuracy1, accuracy2, accuracy3

In [0]:
logs_icarl = [[] for _ in range(NUM_RUNS)]
logs_icarl_rfc = [[] for _ in range(NUM_RUNS)]
logs_icarl_knn = [[] for _ in range(NUM_RUNS)]
logs_icarl_svc = [[] for _ in range(NUM_RUNS)]

for run_i in range(NUM_RUNS):
    net = resnet32()

    icarl_CLFS = iCaRLwithCLFS(DEVICE, net, LR, MOMENTUM, WEIGHT_DECAY, MILESTONES, GAMMA, NUM_EPOCHS, BATCH_SIZE, train_transform, test_transform)
    
    for split_i in range(10):
        print(f"## Split {split_i} of run {run_i} ##")
        
        train_logs = icarl_CLFS.incremental_train(split_i, train_subsets[run_i][split_i], val_subsets[run_i][split_i])

        acc, _ = icarl_CLFS.test(test_subsets[run_i][split_i], train_subsets[run_i][split_i])
        logs_icarl[run_i].append(acc)

        acc1, acc2, acc3 = icarl_CLFS.test_classifiers(test_subsets[run_i][split_i], train_subsets[run_i][split_i], split_i)
        logs_icarl_rfc[run_i].append(acc1)
        logs_icarl_knn[run_i].append(acc2)
        logs_icarl_svc[run_i].append(acc3)

        with open('./logs.txt', 'w') as writefile:
          writefile.write(f"split : {split_i}/n")
          writefile.write(f"accuracy iCaRL : {acc}/n")
          writefile.write(f"accuracy rfc : {acc1}/n")
          writefile.write(f"accuracy knn : {acc2}/n")
          writefile.write(f"accuracy svc : {acc3}/n/n")

from google.colab import files
files.download('params.txt')
files.download('logs.txt')