In [None]:
#If you do not have the necessary packages or libraries installed, execute these commands.
#!pip install torchvision
#!conda install pytorch -c pytorch

In [None]:
# Importing necessary libraries or packages
from math import floor
import numpy as np
from torchvision import datasets
from torchvision.transforms import ToTensor
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [None]:
# Loading MNIST dataset for training and testing
# training MNIST data
train_data = datasets.MNIST(root='data', train=True, download=True, transform=ToTensor())
# testing MNIST data
test_data = datasets.MNIST(root='data', train=False, download=True, transform=ToTensor())

In [None]:
#print the type of training data and the shape of training data (images) and targets (labels)
print(type(train_data.data))
print(train_data.data.shape)
print(train_data.targets.shape)

In [None]:
# define the network structure (3 layers fully connected network)
class Network(nn.Module):
    def __init__(self, input_shape):
        super(Network, self).__init__()
        self.fc1 = nn.Linear(input_shape, 500)#input layer
        self.fc2 = nn.Linear(500, 300)#hidden layer
        self.output = nn.Linear(300, 2)#output layer

    def forward(self, x):
        x = F.relu(self.fc1(x))# activation function is relu
        x = F.relu(self.fc2(x))# activation function is relu
        x = self.output(x)
        return x

In [None]:
# The size of MNIST data is 28*28=784
input_shape = 784
# training hyperparameters
n_epoch = 2
learning_rate = 0.001
minibatch_sz = 64

In [None]:
# create the network, optimizer and define the loss function
network = Network(input_shape)
optimizer = optim.SGD(network.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()

##### We will perfom task-wise training. A single task comprises of two classes from the MNIST dataset.

In [None]:
task1 = [0, 1]#define task 1 classify digits '0' and '1'
task2 = [2, 3]#define task 2 classify digits '2' and '3'
task3 = [4, 5]#define task 3 classify digits '4' and '5' 
task4 = [6, 7]#define task 4 classify digits '6' and '7' 
task5 = [8, 9]#define task 5 classify digits '8' and '9'

##### Separate training and testing samples from each task. This is easier to work with.

In [None]:
#create training sample indexes for each task
task1_tr_samples = torch.where(torch.bitwise_or(train_data.targets == task1[0], train_data.targets == task1[1]))[0] #indexs of training samples '0' and '1'
task2_tr_samples = torch.where(torch.bitwise_or(train_data.targets == task2[0], train_data.targets == task2[1]))[0] #indexs of training samples '2' and '3'
task3_tr_samples = torch.where(torch.bitwise_or(train_data.targets == task3[0], train_data.targets == task3[1]))[0] #indexs of training samples '4' and '5'
task4_tr_samples = torch.where(torch.bitwise_or(train_data.targets == task4[0], train_data.targets == task4[1]))[0] #indexs of training samples '6' and '7'
task5_tr_samples = torch.where(torch.bitwise_or(train_data.targets == task5[0], train_data.targets == task5[1]))[0] #indexs of training samples '8' and '9'

In [None]:
#create testing sample indexes for each task
task1_ts_samples = torch.where(torch.bitwise_or(test_data.targets == task1[0], test_data.targets == task1[1]))[0] #indexs of testing samples '0' and '1'
task2_ts_samples = torch.where(torch.bitwise_or(test_data.targets == task2[0], test_data.targets == task2[1]))[0] #indexs of testing samples '2' and '3'
task3_ts_samples = torch.where(torch.bitwise_or(test_data.targets == task3[0], test_data.targets == task3[1]))[0] #indexs of testing samples '4' and '5'
task4_ts_samples = torch.where(torch.bitwise_or(test_data.targets == task4[0], test_data.targets == task4[1]))[0] #indexs of testing samples '6' and '7'
task5_ts_samples = torch.where(torch.bitwise_or(test_data.targets == task5[0], test_data.targets == task5[1]))[0] #indexs of testing samples '8' and '9'

### **Question 1**: The purpose of this question is to demonstrate the problem of catastrophic forgetting. For this purpose, we will train a single network on two different tasks in a sequence. After training evaluate the performance of the trained network on both tasks. What do you observe?

In [None]:
# train on task 1
for e in range(n_epoch):
    #calcuate the total number of batch. floor can round a real number x down to the nearest integer that is less than or equal to x
    n_batch = floor(task1_tr_samples.shape[0] / minibatch_sz)
    for b in range(n_batch):
        x_batch = train_data.data[task1_tr_samples[(b*minibatch_sz):((b+1)*minibatch_sz)]] #get training images
        y_batch = train_data.targets[task1_tr_samples[(b*minibatch_sz):((b+1)*minibatch_sz)]] #get training labels
        # flatten image before presenting to the network and normalize intensities to the range [0, 1]
        x_batch = torch.flatten(x_batch / 255, start_dim=1)
        # convert label to one hot
        y_batch = F.one_hot(y_batch).float()
        # get the prediction
        y_hat_batch = network(x_batch)
        # calculate the loss
        loss = criterion(y_hat_batch, y_batch)
        # execute backpropagation
        loss.backward()
        # update the model
        optimizer.step()
        print(f'Epoch {e}: {loss.item()}')

In [None]:
# test on Task 1
n_batch = floor(task1_ts_samples.shape[0] / minibatch_sz) #calcuate the total number of batch
n_correct = 0    
for b in range(n_batch):
    x_batch = test_data.data[task1_ts_samples[(b*minibatch_sz):((b+1)*minibatch_sz)]] #get testing images
    y_batch = test_data.targets[task1_ts_samples[(b*minibatch_sz):((b+1)*minibatch_sz)]] #get testing labels
    # flatten image before presenting to the network and normalize intensities to the range [0, 1]
    x_batch = torch.flatten(x_batch / 255, start_dim=1)
    # get the prediction
    y_hat_batch = network(x_batch)
    # get the indices of the maximum values along dimension 1
    _, prediction = torch.max(y_hat_batch, 1)
    # count the number of correct predictions
    n_correct += (prediction == y_batch).sum().item()
print(f'Accuracy = {(n_correct * 100) / task1_ts_samples.shape[0]}')

In [None]:
# train on task 2
for e in range(n_epoch):
    n_batch = floor(task2_tr_samples.shape[0] / minibatch_sz) #calcuate the total number of batch
    for b in range(n_batch):
        x_batch = train_data.data[task2_tr_samples[(b*minibatch_sz):((b+1)*minibatch_sz)]] #get training images
        y_batch = train_data.targets[task2_tr_samples[(b*minibatch_sz):((b+1)*minibatch_sz)]] #get training labels
        # flatten image before presenting to the network and normalize intensities to the range [0, 1]
        x_batch = torch.flatten(x_batch / 255, start_dim=1)
        #even numbers [0,2,4,6,8] are labeled 0，Odd numbers [1,3,5,7,9] are labeled 1 
        y_batch = y_batch % 2
        # convert label to one hot
        y_batch = F.one_hot(y_batch).float()
        # get the prediction
        y_hat_batch = network(x_batch)
        # calculate the loss
        loss = criterion(y_hat_batch, y_batch)
        # execute backpropagation
        loss.backward()
        # update the model
        optimizer.step()
        print(f'Epoch {e}: {loss.item()}')

In [None]:
# test on Task 2
n_batch = floor(task2_ts_samples.shape[0] / minibatch_sz) #calcuate the total number of batch
n_correct = 0    
for b in range(n_batch):
    x_batch = test_data.data[task2_ts_samples[(b*minibatch_sz):((b+1)*minibatch_sz)]] #get testing images
    y_batch = test_data.targets[task2_ts_samples[(b*minibatch_sz):((b+1)*minibatch_sz)]] #get testing labels
    # flatten image before presenting to the network and normalize intensities to the range [0, 1]
    x_batch = torch.flatten(x_batch / 255, start_dim=1)
    # get the prediction
    y_hat_batch = network(x_batch)
    # get the indices of the maximum values along dimension 1
    _, prediction = torch.max(y_hat_batch, 1)
    # count the number of correct predictions
    n_correct += (prediction == (y_batch % 2)).sum().item()
print(f'Accuracy = {(n_correct * 100) / task1_ts_samples.shape[0]}')

In [None]:
# test on Task 1
n_batch = floor(task1_ts_samples.shape[0] / minibatch_sz)#calcuate the total number of batch
n_correct = 0    
for b in range(n_batch):
    x_batch = test_data.data[task1_ts_samples[(b*minibatch_sz):((b+1)*minibatch_sz)]] #get testing images
    y_batch = test_data.targets[task1_ts_samples[(b*minibatch_sz):((b+1)*minibatch_sz)]] #get testing labels
    # flatten image before presenting to the network and normalize intensities to the range [0, 1]
    x_batch = torch.flatten(x_batch / 255, start_dim=1)
    # get the prediction
    y_hat_batch = network(x_batch)
    # get the indices of the maximum values along dimension 1
    _, prediction = torch.max(y_hat_batch, 1)
    # count the number of correct predictions
    n_correct += (prediction == y_batch).sum().item()
print(f'Accuracy = {(n_correct * 100) / task1_ts_samples.shape[0]}')

### **Question 2**: The purpose of this question is to study the effect of replay on catatophic forgetting. In this question also, we will train the network on two tasks in a sequence? When we train the network on the second task, we will also use some samples from the first task for replay. To keep things simple, select a random proportaion (say 50%) of samples from the first task for replay. After training evaluate the performance of the trained network on both tasks. What do you observe?

In [None]:
# Save some samples from previous tasks for replay
prop_saved = 0.5 # proportion of samples saved from a task for replay

In [None]:
# create the network and optimizer
network = Network(input_shape)
optimizer = optim.SGD(network.parameters(), lr=learning_rate)

In [None]:
# train on task 1
for e in range(n_epoch):
    n_batch = floor(task1_tr_samples.shape[0] / minibatch_sz)#calcuate the total number of batch
    for b in range(n_batch):
        x_batch = train_data.data[task1_tr_samples[(b*minibatch_sz):((b+1)*minibatch_sz)]] #get training images
        y_batch = train_data.targets[task1_tr_samples[(b*minibatch_sz):((b+1)*minibatch_sz)]] #get training labels
        # flatten image before presenting to the network and normalize intensities to the range [0, 1]
        x_batch = torch.flatten(x_batch / 255, start_dim=1)
        # convert label to one hot
        y_batch = F.one_hot(y_batch).float()
        # get the prediction
        y_hat_batch = network(x_batch)
        # calculate the loss
        loss = criterion(y_hat_batch, y_batch)
        # execute backpropagation
        loss.backward()
        # update the model
        optimizer.step()
        print(f'Epoch {e}: {loss.item()}')

In [None]:
#randomly choose indexes from task 1 training data 
task1_replay = np.random.choice(task1_tr_samples.numpy(), int(prop_saved * task1_tr_samples.shape[0]))
task1_replay_samples = torch.Tensor(task1_replay).int() #numpy convert to tensor 

In [None]:
# train on task 2 with replay

# concatenate samples from task 2 and replay samples from task 1
tr_samples = torch.concatenate([task2_tr_samples, task1_replay_samples], dim=0) 
# randomize the array to mix samples from task 2 and replay
np.random.shuffle(tr_samples.numpy())
#calcuate the total number of batch
n_batch = floor(tr_samples.shape[0] / minibatch_sz)
for e in range(n_epoch):    
    for b in range(n_batch):
        x_batch = train_data.data[tr_samples[(b*minibatch_sz):((b+1)*minibatch_sz)]] #get training images
        y_batch = train_data.targets[tr_samples[(b*minibatch_sz):((b+1)*minibatch_sz)]] #get training labels
        # flatten image before presenting to the network and normalize intensities to the range [0, 1]
        x_batch = torch.flatten(x_batch / 255, start_dim=1)
        #even numbers [0,2,4,6,8] are labeled 0，Odd numbers [1,3,5,7,9] are labeled 1 
        y_batch = y_batch % 2
        # convert label to one hot
        y_batch = F.one_hot(y_batch).float()
        # get the prediction
        y_hat_batch = network(x_batch)
        # calculate the loss
        loss = criterion(y_hat_batch, y_batch)
        # execute backpropagation
        loss.backward()
        # update the model
        optimizer.step()
        print(f'Epoch {e}: {loss.item()}')

In [None]:
# test on Task 1
n_batch = floor(task1_ts_samples.shape[0] / minibatch_sz)#calcuate the total number of batch
n_correct = 0    
for b in range(n_batch):
    x_batch = test_data.data[task1_ts_samples[(b*minibatch_sz):((b+1)*minibatch_sz)]]#get testing images
    y_batch = test_data.targets[task1_ts_samples[(b*minibatch_sz):((b+1)*minibatch_sz)]]#get testing labels
    # flatten image before presenting to the network and normalize intensities to the range [0, 1]
    x_batch = torch.flatten(x_batch / 255, start_dim=1)
    # get the prediction
    y_hat_batch = network(x_batch)
    # get the indices of the maximum values along dimension 1
    _, prediction = torch.max(y_hat_batch, 1)
    # count the number of correct predictions
    n_correct += (prediction == y_batch).sum().item()
print(f'Accuracy = {(n_correct * 100) / task1_ts_samples.shape[0]}')

In [None]:
# test on Task 2
n_batch = floor(task2_ts_samples.shape[0] / minibatch_sz)
n_correct = 0    
for b in range(n_batch):
    x_batch = test_data.data[task2_ts_samples[(b*minibatch_sz):((b+1)*minibatch_sz)]]#get testing images
    y_batch = test_data.targets[task2_ts_samples[(b*minibatch_sz):((b+1)*minibatch_sz)]]#get testing labels
    # flatten image before presenting to the network and normalize intensities to the range [0, 1]
    x_batch = torch.flatten(x_batch / 255, start_dim=1)
    # get the prediction
    y_hat_batch = network(x_batch)
    # get the indices of the maximum values along dimension 1
    _, prediction = torch.max(y_hat_batch, 1)
    # count the number of correct predictions
    n_correct += (prediction == (y_batch % 2)).sum().item()
print(f'Accuracy = {(n_correct * 100) / task1_ts_samples.shape[0]}')

# **Directions for further exploration**
We will not share solutions for these questions.

**Q1**: How does the proportion of samples saved for replay affect the model's performance?

**Q2**: Use replay to train the network on more than two tasks. What is the impact of replay on the memory used by your models? Note that replay-based approach requires that you save the replay samples from previous task forever. This implies that the memory required to store samples contributes to your models memory footprint.

**Q3**: Can we choose replay samples more smartly so that we generate maximal impact while using minimal memory? For instance, can you use the network's prediction on a given task to identify samples stored for replay?
