# Dataloading, Memory Buffers, and Replay

We are going to follow Avalanche implementation of replay methods. We need to define two components:
- **Dataloaders** are used to provide balancing between groups (e.g. tasks/classes/experiences). This is especially useful when you have unbalanced data.
- **Buffers** are used to store data from the previous experiences. They are dynamic datasets with a fixed maximum size, and they can be updated with new data continuously.

A **Replay** method is a combination of a replay buffer and a custom dataloader.

First, let's install Avalanche. You can skip this step if you have installed it already.

In [None]:
!pip install avalanche-lib==0.3.1

## Dataloaders
In Avalanche (and Pytorch) dataloaders are simple iterators, located under `avalanche.benchmarks.utils.data_loader`. Their interface is equivalent to pytorch's dataloaders. For example, `GroupBalancedDataLoader` takes a sequence of datasets and iterates over them by providing balanced mini-batches, where the number of samples is split equally among groups. Internally, it instantiate a `DataLoader` for each separate group. More specialized dataloaders exist such as `TaskBalancedDataLoader`.

All the dataloaders accept keyword arguments (`**kwargs`) that are passed directly to the dataloaders for each group.

We are going to use Avalanche dataloaders. You can check the code if you are curious. The bulk of the code is just setting the correct dimensions for the two batch sizes over time, depending on the hypeparameters.

In [1]:
from avalanche.benchmarks import SplitMNIST
from avalanche.benchmarks.utils.data_loader import GroupBalancedDataLoader
benchmark = SplitMNIST(5, return_task_id=True)

dl = GroupBalancedDataLoader([exp.dataset for exp in benchmark.train_stream], batch_size=5)
for x, y, t in dl:
    print(t.tolist())
    break

[0, 1, 2, 3, 4]


## Memory Buffers
Memory buffers store data up to a maximum capacity, and they implement policies to select which data to store and which the to remove when the buffer is full. They are available in the module `avalanche.training.storage_policy`. The base class is the `ExemplarsBuffer`, which implements two methods:
- `update(strategy)` - given the strategy's state it updates the buffer (using the data in `strategy.experience.dataset`).
- `resize(strategy, new_size)` - updates the maximum size and updates the buffer accordingly.

The data can be access using the attribute `buffer`.

Here is a simple implementation of reservoir sampling:

In [5]:
import torch
from avalanche.benchmarks.utils import AvalancheDataset
from avalanche.training.storage_policy import ExemplarsBuffer
from types import SimpleNamespace


class ReservoirSamplingBuffer(ExemplarsBuffer):
    """Buffer updated with reservoir sampling."""

    def __init__(self, max_size: int):
        """
        :param max_size:
        """
        # The algorithm follows
        # https://en.wikipedia.org/wiki/Reservoir_sampling
        # We sample a random uniform value in [0, 1] for each sample and
        # choose the `size` samples with higher values.
        # This is equivalent to a random selection of `size_samples`
        # from the entire stream.
        super().__init__(max_size)
        # INVARIANT: _buffer_weights is always sorted.
        self._buffer_weights = torch.zeros(0)

    def update(self, strategy: "SupervisedTemplate", **kwargs):
        """Update buffer."""
        self.update_from_dataset(strategy.experience.dataset)

    def update_from_dataset(self, new_data: AvalancheDataset):
        """Update the buffer using the given dataset.

        :param new_data:
        :return:
        """
        new_weights = torch.rand(len(new_data))
        
        # add new samples and sort them again
        cat_weights = torch.cat([new_weights, self._buffer_weights])
        cat_data = new_data.concat(self.buffer)
        sorted_weights, sorted_idxs = cat_weights.sort(descending=True)
        
        # keep the top-k
        buffer_idxs = sorted_idxs[: self.max_size]
        self.buffer = cat_data.subset(buffer_idxs)
        self._buffer_weights = sorted_weights[: self.max_size]

    def resize(self, strategy, new_size):
        """Update the maximum size of the buffer."""
        self.max_size = new_size
        if len(self.buffer) <= self.max_size:
            return
        self.buffer = classification_subset(
            self.buffer, torch.arange(self.max_size)
        )
        self._buffer_weights = self._buffer_weights[: self.max_size]

Let's see how to use it:

In [6]:
benchmark = SplitMNIST(5, return_task_id=False)
storage_p = ReservoirSamplingBuffer(max_size=30)

print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")

Max buffer size: 30, current size: 0


At first, the buffer is empty. We can update it with data from a new experience.

Notice that we use a `SimpleNamespace` because we want to use the buffer standalone, without instantiating an Avalanche strategy. Reservoir sampling requires only the `experience` from the strategy's state.

In [7]:
for i in range(5):
    strategy_state = SimpleNamespace(experience=benchmark.train_stream[i])
    storage_p.update(strategy_state)
    targets = ', '.join([str(e) for e in storage_p.buffer.targets])
    print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")
    print(f"class targets: {targets}\n")

Max buffer size: 30, current size: 30
class targets: 7, 3, 7, 3, 3, 3, 7, 3, 7, 7, 7, 7, 7, 3, 7, 3, 7, 3, 3, 7, 3, 7, 3, 3, 3, 7, 7, 7, 7, 3

Max buffer size: 30, current size: 30
class targets: 7, 3, 6, 6, 6, 6, 6, 0, 7, 6, 0, 0, 3, 0, 3, 3, 6, 7, 3, 7, 7, 6, 6, 0, 7, 7, 0, 7, 0, 3

Max buffer size: 30, current size: 30
class targets: 7, 3, 6, 6, 6, 2, 6, 6, 1, 2, 0, 7, 6, 0, 2, 0, 3, 0, 2, 3, 3, 6, 7, 3, 2, 2, 7, 2, 7, 6

Max buffer size: 30, current size: 30
class targets: 9, 7, 3, 6, 6, 9, 6, 4, 2, 6, 6, 1, 2, 0, 7, 6, 4, 0, 9, 2, 0, 4, 3, 4, 0, 2, 3, 3, 6, 9

Max buffer size: 30, current size: 30
class targets: 9, 8, 7, 3, 6, 6, 5, 9, 6, 8, 8, 8, 4, 2, 6, 6, 8, 1, 2, 0, 7, 5, 6, 4, 0, 9, 8, 2, 0, 4



Notice after each update some samples are substituted with new data. Reservoir sampling select these samples randomly.

Avalanche offers many more storage policies. For example, `ParametricBuffer` is a buffer split into several groups according to the `groupby` parameters (`None`, 'class', 'task', 'experience'), and according to an optional `ExemplarsSelectionStrategy` (random selection is the default choice).

In [8]:
from avalanche.training.storage_policy import ParametricBuffer, RandomExemplarsSelectionStrategy
storage_p = ParametricBuffer(
    max_size=30,
    groupby='class',
    selection_strategy=RandomExemplarsSelectionStrategy()
)

print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")
for i in range(5):
    strategy_state = SimpleNamespace(experience=benchmark.train_stream[i])
    storage_p.update(strategy_state)
    targets = ', '.join([str(e) for e in storage_p.buffer.targets])
    print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")
    print(f"class targets: {targets}\n")

Max buffer size: 30, current size: 0
Max buffer size: 30, current size: 30
class targets: 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7

Max buffer size: 30, current size: 30
class targets: 3, 3, 3, 3, 3, 3, 3, 3, 7, 7, 7, 7, 7, 7, 7, 0, 0, 0, 0, 0, 0, 0, 0, 6, 6, 6, 6, 6, 6, 6

Max buffer size: 30, current size: 30
class targets: 3, 3, 3, 3, 3, 7, 7, 7, 7, 7, 0, 0, 0, 0, 0, 6, 6, 6, 6, 6, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2

Max buffer size: 30, current size: 30
class targets: 3, 3, 3, 3, 7, 7, 7, 0, 0, 0, 0, 6, 6, 6, 6, 1, 1, 1, 1, 2, 2, 2, 2, 4, 4, 4, 4, 9, 9, 9

Max buffer size: 30, current size: 30
class targets: 3, 3, 3, 7, 7, 7, 0, 0, 0, 6, 6, 6, 1, 1, 1, 2, 2, 2, 4, 4, 4, 9, 9, 9, 5, 5, 5, 8, 8, 8



The advantage of using grouping buffers is that you get a balanced rehearsal buffer. You can even access the groups separately with the `buffer_groups` attribute. Combined with balanced dataloaders, you can ensure that the mini-batches stay balanced during training.

In [9]:
for k, v in storage_p.buffer_groups.items():
    print(f"(group {k}) -> size {len(v.buffer)}")

(group 3) -> size 3
(group 7) -> size 3
(group 0) -> size 3
(group 6) -> size 3
(group 1) -> size 3
(group 2) -> size 3
(group 4) -> size 3
(group 9) -> size 3
(group 5) -> size 3
(group 8) -> size 3


In [10]:
datas = [v.buffer for v in storage_p.buffer_groups.values()]
dl = GroupBalancedDataLoader(datas)

for x, y, t in dl:
    print(y.tolist())
    break

[3, 3, 3, 7, 7, 7, 0, 0, 0, 6, 6, 6, 1, 1, 1, 2, 2, 2, 4, 4, 4, 9, 9, 9, 5, 5, 5, 8, 8, 8]


# Training

In [11]:
from avalanche.training.storage_policy import ParametricBuffer, RandomExemplarsSelectionStrategy
from types import SimpleNamespace
from avalanche.models import SimpleMLP
from types import SimpleNamespace
from avalanche.benchmarks.utils.data_loader import ReplayDataLoader
from torch.optim import SGD
from torch.nn import CrossEntropyLoss
from avalanche.models.utils import avalanche_model_adaptation
from avalanche.models.dynamic_optimizers import reset_optimizer
import torch.nn.functional as F


# scenario
benchmark = SplitMNIST(
    n_experiences=5,
    return_task_id=False,
    seed=1
)

# model
model = SimpleMLP()
optimizer = SGD(model.parameters(), lr=0.001, momentum=0.9)
criterion = CrossEntropyLoss()

device = 'cpu'
num_epochs = 2

# AVALANCHE: init replay buffer
storage_p = ParametricBuffer(
    max_size=30,
    groupby='class',
    selection_strategy=RandomExemplarsSelectionStrategy()
)

print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")
print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")   
for exp in benchmark.train_stream:
    print(f"Experience ({exp.current_experience})")
    model.train()
    avalanche_model_adaptation(model, exp)
    reset_optimizer(optimizer, model)
    dataset = exp.dataset
    dataset = dataset.train()
 
    for epoch in range(num_epochs):
        # AVALANCHE: ReplayDataLoader to sample jointly from buffer and current data.
        dl = ReplayDataLoader(dataset, storage_p.buffer, batch_size=128)
        for x, y, t in dl:
          x, y, t = x.to(device), y.to(device), t.to(device)

          optimizer.zero_grad()
          output = model(x)
          loss = F.cross_entropy(output, y)
          loss.backward()
          optimizer.step()
        print('Train Epoch: {} \tLoss: {:.6f}'.format(epoch, loss.item()))
    
    # AVALANCHE: you can use a SimpleNamespace if you want to use Avalanche components with your own code.
    strategy_state = SimpleNamespace(experience=exp)
    # AVALANCHE: update replay buffer
    storage_p.update(strategy_state)
    print(f"Max buffer size: {storage_p.max_size}, current size: {len(storage_p.buffer)}")
    print(f"class targets: {list(storage_p.buffer.targets)}\n")


Max buffer size: 30, current size: 0
Max buffer size: 30, current size: 0
Experience (0)
Train Epoch: 0 	Loss: 0.242850
Train Epoch: 1 	Loss: 0.244739
Max buffer size: 30, current size: 30
class targets: [5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6]

Experience (1)
Train Epoch: 0 	Loss: 0.107958
Train Epoch: 1 	Loss: 0.067566
Max buffer size: 30, current size: 30
class targets: [5, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 6, 6, 6, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2]

Experience (2)
Train Epoch: 0 	Loss: 0.083866
Train Epoch: 1 	Loss: 0.050811
Max buffer size: 30, current size: 30
class targets: [5, 5, 5, 5, 5, 6, 6, 6, 6, 6, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 0, 0, 0, 0, 0, 8, 8, 8, 8, 8]

Experience (3)
Train Epoch: 0 	Loss: 0.079434
Train Epoch: 1 	Loss: 0.030323
Max buffer size: 30, current size: 30
class targets: [5, 5, 5, 5, 6, 6, 6, 6, 1, 1, 1, 1, 2, 2, 2, 2, 0, 0, 0, 0, 8, 8, 8, 9, 9, 9, 3, 3, 3, 3]

Experience (4)
Train Epoch: 0 	Loss: 0

# Exercises
- try different memory sizes
- write a custom dataloader that returns two minibatches, one from the memory, one from the current data. Add a coefficient `alpha` that controls the ratio between the memory and current loss such that `loss = curr_loss + alpha*memory_loss`.
    - try different values of `alpha`
    - try a linearly growing `alpha_t = alpha_base * t`, where t is the number of experiences seen up to now
- implement GDumb. You can use the `ParametricBuffer` buffer to implement the class-balanced greedy sampler.