# Master's Thesis on the topic of
## "Continual learning method for image classification in computer vision"
by Taras Kreshchenko

In [None]:
import os
from PIL import Image
from datetime import datetime
from enum import Enum

import torch
from torch import Tensor, nn, optim
from torch.optim import Optimizer, SGD, lr_scheduler
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, dataloader
from torchvision import transforms as T
from torchmetrics import Metric
from torchmetrics.classification import BinaryAccuracy, BinaryAUROC

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
from typing import Tuple, List, Dict, Callable, TypeAlias, TypeVar

Transform: TypeAlias = Callable[[Image.Image], Tensor]

In [None]:
def now(format: str = "%Y-%m-%d %H:%M:%S.%f") -> str:
  return datetime.now().strftime(format)[:-3]

def log(value: str, end: str = '\n') -> None:
  print(f'{now()} | {value}', end=end)

In [None]:
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
DEVICE.type

## Downloading data & preprocessing

Download the datasets if needed

In [None]:
!mkdir data

!curl -Lo CNRPark.zip https://github.com/fabiocarrara/deep-parking/releases/download/archive/CNRPark-Patches-150x150.zip
!mkdir data/CNRPark
!unzip CNRPark.zip -d data/CNRPark
!rm CNRPark.zip

!curl -Lo CNR-EXT.zip https://github.com/fabiocarrara/deep-parking/releases/download/archive/CNR-EXT-Patches-150x150.zip
!mkdir data/CNR-EXT
!unzip CNR-EXT.zip -d data/CNR-EXT
!rm CNR-EXT.zip

!curl -LO https://www.inf.ufpr.br/vri/databases/PKLot.tar.gz
!tar -xf PKLot.tar.gz -C data
!rm PKLot.tar.gz

!curl -LO https://github.com/fabiocarrara/deep-parking/releases/download/archive/splits.zip
!unzip splits.zip -d data
!rm splits.zip

In [None]:
class DS(Enum):
  CNRPark = 0
  CNRParkExt = 1
  PKLot = 2

DS_PATHS = {
  DS.CNRPark: 'data/CNRPark',
  DS.CNRParkExt: 'data/CNR-EXT/PATCHES',
  DS.PKLot: 'data/PKLot/PKLotSegmented'
}
SPLIT_PATHS = {
  DS.CNRPark: 'data/splits/CNRParkAB',
  DS.CNRParkExt: 'data/splits/CNRPark-EXT',
  DS.PKLot: 'data/splits/PKLot'
}

In [None]:
class DatasetSource:
  def __init__(self, ds: DS, split_name: str) -> None:
    self.ds = ds
    self.ds_name = ds.name
    self.split_name = split_name
    self.ds_path = DS_PATHS[ds]
    self.split_path = SPLIT_PATHS[ds] + '/' + split_name + '.txt'

In [None]:
class ParkingDataset(Dataset[Tuple[Tensor, float]]):
  def __init__(self, ds_source: DatasetSource, transform: Transform) -> None:
    img_path = ds_source.ds_path
    with open(ds_source.split_path, 'r') as f:
      lines = f.readlines()
      # split files contain copies of a few images with (2) in their names,
      # possibly caused by an accidental copy; exluding these from the dataset
      data = [line.split() for line in lines if '(2)' not in line]
      self.img_path_list = [os.path.join(img_path, row[0]) for row in data]
      self.label_list = [float(row[1]) for row in data]
      self.transform = transform
      self.size = len(self.label_list)

  def __getitem__(self, index: int) -> Tuple[Tensor, float]:
    img_path = self.img_path_list[index]
    img = Image.open(img_path)

    tensor = self.transform(img)
    label = self.label_list[index]

    return tensor, label

  def __len__(self) -> int:
    return self.size

In [None]:
def collate_fn(batch: List[Tensor]) -> dataloader._collate_fn_t:
  batch = list(filter(lambda x: x is not None, batch))
  return dataloader.default_collate(batch)

D = TypeVar('D')
def create_data_loader(dataset: Dataset[D], batch_size: int = 1, shuffle: bool = False) -> DataLoader[D]:
  # return DataLoader[D](dataset, batch_size=batch_size, shuffle=shuffle, collate_fn=collate_fn)
  return DataLoader[D](dataset, batch_size=batch_size, shuffle=shuffle)

In [None]:
grayscale_to_rgb: Callable[[Tensor], Tensor] = \
  lambda tensor: tensor.repeat(3, 1, 1) if tensor.size(0) == 1 else tensor

simple_transform = T.Compose([
  T.Resize((256, 256)),
  T.ToTensor(),
  T.Lambda(grayscale_to_rgb)
])

In [None]:
def get_mean_and_std(ds_source: DatasetSource) -> tuple[Tensor, Tensor]:
  ds = ParkingDataset(ds_source, transform=simple_transform)
  means = Tensor([0., 0., 0.])
  stds = Tensor([0., 0., 0.])

  for i in range(len(ds)):
    means += ds[i][0].mean([1, 2])
  means /= len(ds)

  for i in range(len(ds)):
    im = ds[i][0]
    for ch in range(3):
      stds[ch] += ((im[ch, :, :] - means[ch])**2).sum() / (im.shape[1] * im.shape[2])
  stds = (stds / len(ds)).sqrt()

  return means, stds

In [None]:
def create_transform(mean: Tensor | List[float],
                     std: Tensor | List[float],
                     train: bool) -> Transform:
  if train:
    return T.Compose([
      T.Resize((256, 256)),
      T.RandomHorizontalFlip(),
      T.RandomCrop(224),
      T.ToTensor(),
      T.Lambda(grayscale_to_rgb),
      T.Normalize(mean, std)
   ])
  else:
    return T.Compose([
      T.Resize((224, 224)),
      T.ToTensor(),
      T.Lambda(grayscale_to_rgb),
      T.Normalize(mean, std)
   ])

## Reproducing existing solutions

### Amato, G. et al. Deep learning for decentralized parking lot occupancy detection. 2017

In [None]:
class mAlexNet(nn.Module):
  def __init__(self, num_classes: int = 2) -> None:
    super().__init__()
    self.input_channels = 3
    self.num_output = num_classes
    self.conv1 = nn.Sequential(
      nn.Conv2d(in_channels=self.input_channels, out_channels=16, kernel_size=11, stride=4),
      nn.ReLU(inplace=True),
      nn.LocalResponseNorm(5, k=2),
      nn.MaxPool2d(kernel_size=3, stride=2)
    )
    self.conv2 = nn.Sequential(
      nn.Conv2d(in_channels=16, out_channels=20, kernel_size=5, stride=1),
      nn.ReLU(inplace=True),
      nn.LocalResponseNorm(5, k=2),
      nn.MaxPool2d(kernel_size=3, stride=2)
    )
    self.conv3 = nn.Sequential(
      nn.Conv2d(in_channels=20, out_channels=30, kernel_size=3, stride=1),
      nn.ReLU(inplace=True),
      nn.MaxPool2d(kernel_size=3, stride=2)
    )
    self.fc1 = nn.Sequential(
      nn.Linear(30*3*3, out_features=48),
      nn.ReLU(inplace=True)
    )
    self.fc2 = nn.Linear(in_features=48, out_features=2)

    self.conv1.apply(self.__init_weights)
    self.conv2.apply(self.__init_weights)
    self.conv3.apply(self.__init_weights)
    self.fc1.apply(self.__init_weights)
    self.__init_weights(self.fc2)
    # nn.init.constant_(self.fc2.bias, 0)

  def __init_weights(self, layer: nn.Module) -> None:
    if isinstance(layer, nn.Conv2d) or isinstance(layer, nn.Linear):
      nn.init.xavier_uniform_(layer.weight)
      # if layer.bias is not None:
      #   nn.init.constant_(layer.bias, 1)

  def get_features(self, x: Tensor) -> Tensor:
    x = self.conv1(x)
    x = self.conv2(x)
    x = self.conv3(x)
    x = x.view(x.size(0), -1)
    x = self.fc1(x)
    return x
  
  def get_classes(self, x: Tensor) -> Tensor:
    x = self.fc2(x)
    x = F.softmax(x, dim=1)
    return x

  def forward(self, x: Tensor) -> Tensor:
    x = self.get_features(x)
    x = self.get_classes(x)
    return x

In [None]:
lr_mults = {
    'conv1.0.weight': 1,
    'conv1.0.bias': 2,
    'conv2.0.weight': 1,
    'conv2.0.bias': 2,
    'conv3.0.weight': 1,
    'conv3.0.bias': 2,
    'fc1.0.weight': 1,
    'fc1.0.bias': 2,
    'fc2.weight': 1,
    'fc2.bias': 2
}

decay_mults = {
    'conv1.0.weight': 1,
    'conv1.0.bias': 0,
    'conv2.0.weight': 1,
    'conv2.0.bias': 0,
    'conv3.0.weight': 1,
    'conv3.0.bias': 0,
    'fc1.0.weight': 1,
    'fc1.0.bias': 1,
    'fc2.weight': 1,
    'fc2.bias': 1
}

def create_malexnet_optimiser(model: nn.Module,
                              lr: float,
                              weight_decay: float = 0.,
                              momentum: float = 0.) -> Optimizer:
  param_groups: List[Dict[str, List[nn.Parameter] | float]] = []
  for name, parameter in model.named_parameters():
    param_groups.append({
      'params': [parameter],
      'lr': lr * lr_mults[name],
      'weight_decay': weight_decay * decay_mults[name]
    })
  
  optimiser = SGD(param_groups, lr=lr, weight_decay=weight_decay, momentum=momentum)
  return optimiser

In [None]:
def train_single_epoch(
    model: nn.Module,
    data_loader: DataLoader,
    optimiser: Optimizer,
    loss_fn: Callable[[Tensor, Tensor], Tensor],
    metric_fns: List[Metric]
) -> Tuple[float, List[float]]:
  model.train()
  running_loss = 0.

  for input, target in data_loader:
    input: Tensor = input.to(DEVICE)
    target: Tensor = target.to(DEVICE)
    optimiser.zero_grad()
    
    output: Tensor = model(input)
    # since it's binary classification, we can discard one of the probabilities
    output = output[:, 0]
    loss: Tensor = loss_fn(output, target)

    loss.backward()
    optimiser.step()

    running_loss += loss.item()
    for m in metric_fns:
      m(output, target)
  
  running_loss /= len(data_loader)
  metrics = [m.compute() for m in metric_fns]
  for m in metric_fns:
    m.reset()

  return (running_loss, metrics)

def evaluate(
    model: nn.Module,
    data_loader: DataLoader,
    loss_fn: Callable[[Tensor, Tensor], Tensor],
    metric_fns: List[Metric]
) -> Tuple[float, List[float]]:
  model.eval()
  running_loss = 0.

  with torch.no_grad():
    for input, target in data_loader: 
      input: Tensor = input.to(DEVICE)
      target: Tensor = target.to(DEVICE)

      output: Tensor = model(input)
      # since it's binary classification, we can discard one of the probabilities
      output = output[:, 0]

      running_loss += loss_fn(output, target).item()
      for m in metric_fns:
        m(output, target)

  running_loss /= len(data_loader)
  metrics = [m.compute() for m in metric_fns]
  for m in metric_fns:
    m.reset()
  
  return (running_loss, metrics)

def train(model: nn.Module,
          train_data_loader: DataLoader,
          val_data_loader: DataLoader | None,
          optimiser: Optimizer,
          lr_scheduler: lr_scheduler.LRScheduler,
          loss_fn: Callable[[Tensor, Tensor], Tensor],
          metric_fns: List[Metric],
          epochs: int) -> None:
  print_metrics: Callable[[List[float]], str] = \
    lambda metrics: ' | '.join([f'{type(fn).__name__}: {m:.4f}' for fn, m in zip(metric_fns, metrics)])

  for i in range(epochs):
    log(f'Epoch {i+1}')

    loss, metrics = train_single_epoch(model, train_data_loader, optimiser, loss_fn, metric_fns)
    log(f'Train | Loss: {loss:.4f} | {print_metrics(metrics)}')

    if val_data_loader is not None:
      loss, metrics = evaluate(model, val_data_loader, loss_fn, metric_fns)
      log(f'Validation  | Loss: {loss:.4f} | {print_metrics(metrics)}')
      
    lr_scheduler.step()
  log('Finished training')

In [None]:
def run_experiment(train_ds_source: DatasetSource,
                   val_ds_source: DatasetSource | None,
                   test_ds_sources: List[DatasetSource] | DatasetSource | None,
                   epochs: int,
                   lr: float,
                   weight_decay: float,
                   gamma: float,
                   step_size: int,
                   mean: List[float] | None = None,
                   std: List[float] | None = None,
                   shuffle: bool = False,
                   save_path: str | None = None) -> nn.Module:
  model = mAlexNet().to(DEVICE)
  loss_fn = nn.CrossEntropyLoss().to(DEVICE)
  metric_fns = [BinaryAccuracy().to(DEVICE), BinaryAUROC().to(DEVICE)]
  optimiser = create_malexnet_optimiser(model, lr=lr, weight_decay=weight_decay, momentum=0.9)
  scheduler = lr_scheduler.StepLR(optimiser, step_size=step_size, gamma=gamma)

  mean_ = mean
  std_ = std
  if mean_ is None or std_ is None:
    log(f'Calculating mean and std for {train_ds_source.ds_name}/{train_ds_source.split_name}')
    mean_, std_ = get_mean_and_std(train_ds_source)
    log(f'Mean: {mean_.tolist()}')
    log(f'Std: {std_.tolist()}\n')
  train_transform = create_transform(mean_, std_, train=True)
  test_transform = create_transform(mean_, std_, train=False)

  train_dataset = ParkingDataset(train_ds_source, train_transform)
  train_loader = create_data_loader(train_dataset, batch_size=64, shuffle=shuffle)
  log(f'Training on {train_ds_source.ds_name}/{train_ds_source.split_name}', end='')

  if val_ds_source is not None:
    val_dataset = ParkingDataset(val_ds_source, test_transform)
    val_loader = create_data_loader(val_dataset, batch_size=64)
    print(f', validating on {val_ds_source.ds_name}/{val_ds_source.split_name}')
  else:
    val_loader = None
    print()

  train(model, train_loader, val_loader, optimiser, scheduler, loss_fn, metric_fns, epochs)

  if save_path:
    torch.save(model.state_dict(), 'models/' + save_path)

  if test_ds_sources is not None:
    test_ds_sources = test_ds_sources if isinstance(test_ds_sources, List) else [test_ds_sources]
    for ds_source in test_ds_sources:
      test_dataset = ParkingDataset(ds_source, test_transform)
      test_loader = create_data_loader(test_dataset, batch_size=64)
      log(f'Testing on {ds_source.ds_name}/{ds_source.split_name}')
      loss, metrics = evaluate(model, test_loader, loss_fn, metric_fns)
      metrics_str = ' | '.join([f'{type(fn).__name__}: {m:.4f}' for fn, m in zip(metric_fns, metrics)])
      log(f'Loss: {loss:.4f} | {metrics_str}')

  return model

In [None]:
def test_model(model: nn.Module,
               ds_sources: List[DatasetSource] | DatasetSource,
               mean: List[float],
               std: List[float]) -> None:
  loss_fn = nn.CrossEntropyLoss().to(DEVICE)
  metric_fns = [BinaryAccuracy().to(DEVICE), BinaryAUROC().to(DEVICE)]
  transform = create_transform(mean, std, train=False)

  ds_sources = ds_sources if isinstance(ds_sources, List) else [ds_sources]
  for test_ds_source in ds_sources:
    test_dataset = ParkingDataset(test_ds_source, transform)
    test_loader = create_data_loader(test_dataset, batch_size=64)
    log(f'Testing on {test_ds_source.ds_name}/{test_ds_source.split_name}')
    loss, metrics = evaluate(model, test_loader, loss_fn, metric_fns)
    metrics_str = ' | '.join([f'{type(fn).__name__}: {m:.4f}' for fn, m in zip(metric_fns, metrics)])
    log(f'Loss: {loss:.4f} | {metrics_str}')

#### Running the experiments

In [None]:
model_cnr_even = run_experiment(
  DatasetSource(DS.CNRPark, 'even'), DatasetSource(DS.CNRPark, 'odd'), None,
  epochs=18, lr=0.0001, weight_decay=0.0005, gamma=0.5, step_size=6,
  mean=[0.4422, 0.4524, 0.3867], std=[0.1783, 0.1732, 0.1743])

In [None]:
model_cnr_odd = run_experiment(
  DatasetSource(DS.CNRPark, 'odd'), DatasetSource(DS.CNRPark, 'even'), None,
  epochs=18, lr=0.0001, weight_decay=0.0005, gamma=0.5, step_size=6,
  mean=[0.4353, 0.445, 0.3782], std=[0.1881, 0.1817, 0.1813])

In [None]:
model_cnr_all = run_experiment(
  DatasetSource(DS.CNRPark, 'all'),
  None,
  [DatasetSource(DS.CNRParkExt, 'test'), DatasetSource(DS.PKLot, 'twodays')],
  epochs=6, lr=0.0008, weight_decay=0.0005, gamma=0.5, step_size=2,
  mean=[0.4387, 0.4486, 0.3823], std=[0.1834, 0.1776, 0.178])

In [None]:
model_pklot_train = run_experiment(
  DatasetSource(DS.PKLot, 'train'),
  DatasetSource(DS.CNRPark, 'all'),
  [DatasetSource(DS.CNRParkExt, 'test'), DatasetSource(DS.PKLot, 'test')],
  epochs=6, lr=0.0008, weight_decay=0.0005, gamma=0.5, step_size=2)

In [None]:
train_ds_source = DatasetSource(DS.CNRParkExt, 'camera1')
val_ds_source = DatasetSource(DS.CNRParkExt, 'camera5')
test_ds_sources = [
    DatasetSource(DS.CNRParkExt, 'camera2'),
    DatasetSource(DS.CNRParkExt, 'camera3'),
    DatasetSource(DS.CNRParkExt, 'camera4'),
    DatasetSource(DS.CNRParkExt, 'camera5'),
    DatasetSource(DS.CNRParkExt, 'camera6'),
    DatasetSource(DS.CNRParkExt, 'camera7'),
    DatasetSource(DS.CNRParkExt, 'camera8'),
    DatasetSource(DS.CNRParkExt, 'camera9'),
    DatasetSource(DS.PKLot, 'test')
]
model_cnrext_c1 = run_experiment(
  train_ds_source, val_ds_source, test_ds_sources,
  epochs=6, lr=0.0008, weight_decay=0.0005, gamma=0.75, step_size=2,
  save_path='cnrext_c1_malexnet_v2.pth'
)

In [None]:
model_cnrext_c8 = mAlexNet().to(DEVICE)
model_cnrext_c8.load_state_dict(torch.load('models/cnrext_c8_malexnet.pth', map_location=DEVICE))
ds_sources = [
    DatasetSource(DS.CNRParkExt, 'camera1'),
    DatasetSource(DS.CNRParkExt, 'camera2'),
    DatasetSource(DS.CNRParkExt, 'camera3'),
    DatasetSource(DS.CNRParkExt, 'camera4'),
    DatasetSource(DS.CNRParkExt, 'camera5'),
    DatasetSource(DS.CNRParkExt, 'camera6'),
    DatasetSource(DS.CNRParkExt, 'camera7'),
    DatasetSource(DS.CNRParkExt, 'camera9'),
    DatasetSource(DS.PKLot, 'all')
]
test_model(model_cnrext_c8,
           ds_sources,
           mean=[0.4026, 0.3916, 0.3482],
           std=[0.1811, 0.176, 0.1854])

## Applying Continual Learning

#### Implementing UCL-GV

In [None]:
from avalanche.benchmarks.utils.data_loader import ReplayDataLoader
from avalanche.core import SupervisedPlugin
from avalanche.training.supervised import Naive
from avalanche.training.templates import SupervisedTemplate
from avalanche.training.storage_policy import ReservoirSamplingBuffer
from avalanche.training.utils import get_last_fc_layer
from avalanche.benchmarks.utils import FilelistDataset, default_flist_reader
from avalanche.benchmarks.scenarios.dataset_scenario import benchmark_from_datasets
from torch_kmeans import KMeans

In [None]:
class UCLGVLoss(nn.Module):
  def __init__(self, gamma1: float, gamma2: float, gamma3: float) -> None:
    super().__init__()
    self.l_ce = nn.CrossEntropyLoss()

    self.gamma1 = nn.Parameter(gamma1)
    self.gamma2 = nn.Parameter(gamma2)
    self.gamma3 = nn.Parameter(gamma3)

  def l_pc(self, x: Tensor, pseudo_y: Tensor) -> Tensor:
    x_norm = F.normalize(x)
    y_norm = F.normalize(pseudo_y)
    loss = - torch.log(torch.exp(x_norm) / (torch.exp(x_norm * y_norm) + torch.exp(x_norm * (1 - y_norm))))
    return loss
  
  def l_ent(self, x: Tensor) -> Tensor:
    entropy = -x * torch.log(x + 1e-5)
    entropy = torch.sum(entropy, dim=1)
    return entropy 

  def forward(self, x: Tensor, pseudo_y: Tensor) -> Tensor:
    return self.gamma1 * self.l_ent(x) + \
      self.gamma2 * self.l_ce(x, pseudo_y) + \
      self.gamma3 * self.l_pc(x, pseudo_y)

In [None]:
class UCLGVPlugin(SupervisedPlugin):
  def __init__(self, mem_size: int, gamma1: float, gamma2: float, gamma3: float) -> None:
    super().__init__()
    self.buffer = ReservoirSamplingBuffer(max_size=mem_size)
    self.kmeans = KMeans(n_clusters=2)
    self.loss = UCLGVLoss(gamma1, gamma2, gamma3)

  def before_eval_exp(self, strategy: SupervisedTemplate,
                      num_workers: int = 0, shuffle: bool = False,
                      *args, **kwargs) -> None:
    """ Use a custom dataloader to combine samples from the current data and memory buffer. """
    if len(self.buffer.buffer) > 0:
      strategy.dataloader = ReplayDataLoader(
        strategy.adapted_dataset, # type: ignore
        self.buffer.buffer,
        num_workers=num_workers,
        batch_size=strategy.train_mb_size,
        shuffle=shuffle)

  def after_eval_forward(self, strategy: SupervisedTemplate, *args, **kwargs) -> None:
    """ Calculate loss, adapt the feature extractor. """
    strategy.model.train()
    _, classifier = get_last_fc_layer(strategy.model)
    classifier.eval()

    assert(callable(strategy.model.get_features))
    features = strategy.model.get_features(strategy.mbatch)
    pseudo_labels = self.kmeans(features)
    
    loss = self.loss(features, pseudo_labels)
    loss.backward()

    strategy.model.eval()

  def after_eval_exp(self, strategy: SupervisedTemplate, *args, **kwargs) -> None:
    """ Update the buffer. """
    self.buffer.update(strategy, **kwargs)

In [None]:
def ordered_flist_reader(flist: str) -> List[Tuple[str, int]]:
  res = default_flist_reader(flist)
  res.sort(key=(lambda path: path[-28:-12]))
  return res

def create_avalanche_dataset(ds_source: DatasetSource) -> FilelistDataset:
  return FilelistDataset(
    ds_source.ds_path,
    ds_source.split_path,
    create_transform(mean=[0.4353, 0.445, 0.3782], std=[0.1881, 0.1817, 0.1813], train=True),
    create_transform(mean=[0.4353, 0.445, 0.3782], std=[0.1881, 0.1817, 0.1813], train=False),
    ordered_flist_reader
  )

In [None]:
cnrpark_all      = create_avalanche_dataset(DatasetSource(DS.CNRPark, 'all'))
cnrext_c1        = create_avalanche_dataset(DatasetSource(DS.CNRParkExt, 'camera1'))
cnrext_c2        = create_avalanche_dataset(DatasetSource(DS.CNRParkExt, 'camera2'))
cnrext_c3        = create_avalanche_dataset(DatasetSource(DS.CNRParkExt, 'camera3'))
cnrext_c4        = create_avalanche_dataset(DatasetSource(DS.CNRParkExt, 'camera4'))
cnrext_c5        = create_avalanche_dataset(DatasetSource(DS.CNRParkExt, 'camera5'))
cnrext_c6        = create_avalanche_dataset(DatasetSource(DS.CNRParkExt, 'camera6'))
cnrext_c7        = create_avalanche_dataset(DatasetSource(DS.CNRParkExt, 'camera7'))
cnrext_c8        = create_avalanche_dataset(DatasetSource(DS.CNRParkExt, 'camera8'))
cnrext_c9        = create_avalanche_dataset(DatasetSource(DS.CNRParkExt, 'camera9'))
cnrext_all       = create_avalanche_dataset(DatasetSource(DS.CNRParkExt, 'all'))
pklot_twodays    = create_avalanche_dataset(DatasetSource(DS.PKLot, 'twodays'))
pklot_nottwodays = create_avalanche_dataset(DatasetSource(DS.PKLot, 'nottwodays'))

In [None]:
def run_cl_experiment(train_ds: List[FilelistDataset] | FilelistDataset,
                      test_ds: FilelistDataset,
                      lr: float,
                      weight_decay: float,
                      uclgv_params: Tuple[float, float, float] | None = None,
                      save_path: str | None = None) -> nn.Module:
  train_ds_list = train_ds if isinstance(train_ds, List) else [train_ds]
  test_ds_list = [test_ds]
  benchmark = benchmark_from_datasets(train=train_ds_list, test=test_ds_list)
  plugins = \
    [] if uclgv_params is None \
    else [UCLGVPlugin(mem_size=2000, gamma1=uclgv_params[0], gamma2=uclgv_params[1], gamma3=uclgv_params[2])]
  model = mAlexNet().to(DEVICE)
  strategy = Naive(
    model=model,
    optimizer=create_malexnet_optimiser(model, lr=lr, weight_decay=weight_decay, momentum=0.9),
    criterion=nn.CrossEntropyLoss().to(DEVICE),
    train_mb_size=64,
    plugins=plugins)

  for experience in benchmark.train_stream:
    print('Start of experience: ', experience.current_experience)
    strategy.train(experience)
    print('Training completed.')

  if save_path:
    torch.save(model.state_dict(), 'models/' + save_path)

  print('Computing accuracy on the whole test set.')
  strategy.eval(benchmark.test_stream)

  return model

#### Running the experiments

In [None]:
run_cl_experiment(
  cnrext_all, cnrpark_all,
  lr=0.0008, weight_decay=0.0005, uclgv_params=(1, 0.1, 1))

In [None]:
run_cl_experiment(
  cnrext_all, cnrpark_all,
  lr=0.0008, weight_decay=0.0005)

In [None]:
run_cl_experiment(
  [cnrext_c1, cnrext_c2, cnrext_c3, cnrext_c4, cnrext_c5, cnrext_c6, cnrext_c7, cnrext_c8], cnrext_c9,
  lr=0.0008, weight_decay=0.0005, uclgv_params=(1, 0.1, 1))

In [None]:
run_cl_experiment(
  [cnrext_c1, cnrext_c2, cnrext_c3, cnrext_c4, cnrext_c5, cnrext_c6, cnrext_c7, cnrext_c8], cnrext_c9,
  lr=0.0008, weight_decay=0.0005)

In [None]:
run_cl_experiment(
  pklot_nottwodays, pklot_twodays,
  lr=0.0008, weight_decay=0.0005, uclgv_params=(1, 0.1, 1))

In [None]:
run_cl_experiment(
  pklot_nottwodays, pklot_twodays,
  lr=0.0008, weight_decay=0.0005)