In [None]:
import tensorflow as tf
from keras.datasets import cifar10
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense
from sklearn.decomposition import PCA
import numpy as np
from keras.utils import to_categorical
from sklearn.model_selection import train_test_split
import keras

# Load CIFAR-10 dataset
(x_train, y_train), (x_test, y_test) = cifar10.load_data()

# Normalize pixel values to between 0 and 1
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0

# Convert labels to one-hot encoding
y_train = to_categorical(y_train, num_classes=10)
y_test = to_categorical(y_test, num_classes=10)

# Split the training data into training and validation sets
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2, random_state=42)

# Define LeNet architecture
model = Sequential()
model.add(Conv2D(6, kernel_size=(5, 5), activation='relu', input_shape=(32, 32, 3)))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Conv2D(16, kernel_size=(5, 5), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Flatten())
model.add(Dense(120, activation='relu'))
model.add(Dense(84, activation='relu'))
model.add(Dense(10, activation='softmax'))


# Compile the model# Compile the model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Function to compute the rotation matrix
def compute_rotation_matrix(weights):
    _, _, vh = np.linalg.svd(weights)
    return vh.T

# Function to rotate the weights using the rotation matrix
def rotate_weights(weights, rotation_matrix):
    return np.dot(weights, rotation_matrix)

# Function to compute important directions using PCA
def compute_important_directions(weights, k):
    pca = PCA(n_components=k)
    pca.fit(weights.T)
    return pca.components_.T

# Function to construct the rotation matrix
def construct_rotation_matrix(important_directions):
    return important_directions @ important_directions.T

# Function to compute the PCA-aligned weights
def compute_pca_aligned_weights(weights, rotation_matrix):
    return np.dot(weights, rotation_matrix)

# Function to compute the regularization term
def compute_regularization_term(important_directions_all_tasks, lambda_reg):
    return lambda_reg * np.sum(important_directions_all_tasks, axis=2)

# Function to train the network with regularization
def train_network_with_regularization(x, y, model, pca_aligned_weights, regularization_term, epochs):
    model.set_weights(pca_aligned_weights)
    # Compile the model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Algorithm implementation
def rotated_fisher_information_matrix(x_train, y_train, model, num_tasks, k, lambda_reg, learning_rate, epochs):
    weights_all_tasks = []
    important_directions_all_tasks = []

    for task in range(num_tasks):
        # Train the network on the current task
        history=model.fit(x_train,y_train, batch_size=32, epochs=5)

        # Compute the rotation matrix and rotate the weights
        weights = np.concatenate([layer.get_weights()[0].flatten() for layer in model.layers])
        rotation_matrix = compute_rotation_matrix(weights)
        rotated_weights = rotate_weights(weights, rotation_matrix)

        # Compute the important directions using PCA
        important_directions = compute_important_directions(rotated_weights, k)
        important_directions_all_tasks.append(important_directions)

        # Construct the rotation matrix and align the weights
        rotation_matrix_i = construct_rotation_matrix(important_directions)
        aligned_weights = compute_pca_aligned_weights(rotation_matrix_i)
# Function to compute the REWC loss
def compute_rewc_loss(model, fisher_diagonal, prev_weights, prev_task_loss, lambda_rewc):
    rew_losses = []
    for i, layer in enumerate(model.layers):
        weights = layer.get_weights()[0]
        rew_losses.append(lambda_rewc * np.sum(np.multiply(fisher_diagonal[i], np.square(weights - prev_weights[i]))))
    return prev_task_loss + np.sum(rew_losses)

# Train the network with REWC regularization
def train_network_with_rewc(x, y, model, fisher_diagonal, prev_weights, prev_task_loss, lambda_rewc, epochs):
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    rew_loss = compute_rewc_loss(model, fisher_diagonal, prev_weights, prev_task_loss, lambda_rewc)
    return rew_loss

# Rotated Fisher Information Matrix with Rotation and PCA Algorithm
def rotated_fisher_information_matrix(x_train, y_train, x_test, y_test, model, num_tasks, k, lambda_reg, lambda_rewc, learning_rate, epochs):
    weights_all_tasks = []
    important_directions_all_tasks = []
    fisher_diagonal_all_tasks = []
    prev_task_loss = 0

 # Set hyperparameters
num_tasks = 7
k = 10
lambda_rewc = 100
lambda_reg = 100
learning_rate = 0.001

for task in range(num_tasks):
    # Train the network on the current task
    history = model.fit(x_train, y_train, batch_size=32, epochs=10)
        # Function to compute the rotation matrix
def compute_rotation_matrix(weights):
        _, _, vh = np.linalg.svd(weights.reshape(-1, 784))
        return vh.T

        # Compute the rotation matrix and rotate the weights
        weights = np.concatenate([layer.get_weights()[0].flatten() for layer in model.layers])
        rotation_matrix = compute_rotation_matrix(weights)
        rotated_weights = rotate_weights(weights, rotation_matrix)

        # Compute the important directions using PCA
        important_directions = compute_important_directions(rotated_weights, k)
        important_directions_all_tasks.append(important_directions)

        # Construct the rotation matrix and align the weights
        rotation_matrix_i = construct_rotation_matrix(important_directions)
        aligned_weights = compute_pca_aligned_weights(rotated_weights, rotation_matrix_i)

        # Compute the Fisher diagonal for REWC
        fisher_diagonal = compute_fisher_diagonal(aligned_weights)

        # Train the network with REWC regularization
        rew_loss = train_network_with_rewc(x_train, y_train, model, fisher_diagonal, weights, prev_task_loss, lambda_rewc, epochs)
        prev_task_loss = rew_loss

        # Store the weights and Fisher diagonal
        weights_all_tasks.append(aligned_weights)
        fisher_diagonal_all_tasks.append(fisher_diagonal)



loss, accuracy = model.evaluate(x_test, y_test)
print("Test Loss:", loss)
print("Test Accuracy:", accuracy)


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Test Loss: 3.7390143871307373
Test Accuracy: 0.5616999864578247


training of three tasks sequentially and observe the catastrophic forgeeting issue

In [1]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.datasets import cifar10
from sklearn.decomposition import PCA

# Constants
NUM_CLASSES = 10
IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS = 32, 32, 3
LAMBDA = 0.4  # EWC regularization strength
PCA_COMPONENTS = 100  # Number of PCA components

# Load CIFAR-10 data
(x_train, y_train), (x_test, y_test) = cifar10.load_data()

# Preprocess data
x_train, x_test = x_train / 255.0, x_test / 255.0
y_train, y_test = y_train.flatten(), y_test.flatten()

# Flatten the images for PCA
x_train_flat = x_train.reshape(x_train.shape[0], -1)
x_test_flat = x_test.reshape(x_test.shape[0], -1)

# Apply PCA
pca = PCA(n_components=PCA_COMPONENTS)
x_train_pca = pca.fit_transform(x_train_flat)
x_test_pca = pca.transform(x_test_flat)

# Reshape back to image shape with reduced components
x_train_pca = x_train_pca.reshape(-1, PCA_COMPONENTS)
x_test_pca = x_test_pca.reshape(-1, PCA_COMPONENTS)

# Class Task Definitions
tasks = [(0, 1, 2), (3, 4, 5), (6, 7, 8, 9)]

class ReplayBuffer:
    def __init__(self, max_size=2000):
        self.buffer = []
        self.max_size = max_size

    def add(self, data):
        if len(self.buffer) >= self.max_size:
            self.buffer = self.buffer[len(data):] + data
        else:
            self.buffer.extend(data)

    def sample(self, batch_size):
        if len(self.buffer) == 0:
            return np.array([]), np.array([])
        indices = np.random.choice(len(self.buffer), batch_size)
        x, y = zip(*[self.buffer[i] for i in indices])
        return np.array(x), np.array(y)

def create_model():
    model = models.Sequential()
    model.add(layers.Dense(256, activation='relu', input_shape=(PCA_COMPONENTS,)))
    model.add(layers.Dense(256, activation='relu'))
    model.add(layers.Dense(NUM_CLASSES, activation='softmax'))
    return model

# Initialize the replay buffer
replay_buffer = ReplayBuffer(max_size=2000)

# Placeholder for EWC variables
prev_task_vars = []
prev_task_fisher = []

# Function to compute Fisher Information Matrix
def compute_fisher(model, x, y):
    fisher = []
    for layer in model.trainable_weights:
        fisher.append(np.zeros(layer.shape))

    # Use GradientTape for automatic differentiation
    with tf.GradientTape() as tape:
        predictions = model(x, training=False)
        loss = tf.keras.losses.sparse_categorical_crossentropy(y, predictions)

    grads = tape.gradient(loss, model.trainable_weights)

    for i, grad in enumerate(grads):
        fisher[i] += grad.numpy() ** 2

    return fisher

# Function to compute EWC loss
def ewc_loss(model, prev_task_vars, prev_task_fisher):
    ewc_loss_value = 0
    for var, fisher, prev_var in zip(model.trainable_weights, prev_task_fisher, prev_task_vars):
        ewc_loss_value += tf.reduce_sum(fisher * (var - prev_var) ** 2)
    return ewc_loss_value

# Training loop for each task
for task_index, classes in enumerate(tasks):
    print(f"Training Task {task_index + 1} - Classes: {classes}")
    model = create_model()
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

    # Filter data for the current task
    task_indices = np.isin(y_train, classes)
    x_task, y_task = x_train_pca[task_indices], y_train[task_indices]

    # Update labels to be zero-indexed
    y_task = np.array([np.where(classes == y)[0][0] for y in y_task])

    # Training the model on the current task
    for epoch in range(10):
        model.fit(x_task, y_task, epochs=1, validation_split=0.1, verbose=1)

        # Add current task data to replay buffer
        replay_buffer.add(list(zip(x_task, y_task)))

        # Sample from replay buffer and retrain
        x_replay, y_replay = replay_buffer.sample(len(x_task) // 10)
        if len(x_replay) > 0:
            model.fit(x_replay, y_replay, epochs=1, verbose=1)

    # Store variables and Fisher Information Matrix for EWC
    if task_index > 0:
        prev_task_vars.append([var.numpy() for var in model.trainable_weights])
        prev_task_fisher.append(compute_fisher(model, x_task, y_task))

    # Evaluate the model on the current task
    test_indices = np.isin(y_test, classes)
    x_test_task, y_test_task = x_test_pca[test_indices], y_test[test_indices]
    y_test_task = np.array([np.where(classes == y)[0][0] for y in y_test_task])

    test_loss, test_accuracy = model.evaluate(x_test_task, y_test_task, verbose=1)
    print(f"Task {task_index + 1} - Test Results")
    print(f"Test Loss: {test_loss}")
    print(f"Test Accuracy: {test_accuracy}")

    # Re-evaluation on previous tasks
    for prev_task_index in range(task_index):
        prev_classes = tasks[prev_task_index]
        prev_test_indices = np.isin(y_test, prev_classes)
        x_prev_test, y_prev_test = x_test_pca[prev_test_indices], y_test[prev_test_indices]
        y_prev_test = np.array([np.where(prev_classes == y)[0][0] for y in y_prev_test])

        prev_test_loss, prev_test_accuracy = model.evaluate(x_prev_test, y_prev_test, verbose=1)
        print(f"Task {task_index + 1} - Re-evaluation on Task {prev_task_index + 1}")
        print(f"Test Loss: {prev_test_loss}")
        print(f"Test Accuracy: {prev_test_accuracy}")


Downloading data from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz
Training Task 1 - Classes: (0, 1, 2)
Task 1 - Test Results
Test Loss: 0.7017877697944641
Test Accuracy: 0.8293333053588867
Training Task 2 - Classes: (3, 4, 5)
Task 2 - Test Results
Test Loss: 1.3413347005844116
Test Accuracy: 0.606333315372467
Task 2 - Re-evaluation on Task 1
Test Loss: 3.842710494995117
Test Accuracy: 0.2879999876022339
Training Task 3 - Classes: (6, 7, 8, 9)
Task 3 - Test Results
Test Loss: 0.757237434387207
Test Accuracy: 0.8107500076293945
Task 3 - Re-evaluation on Task 1
Test Loss: 10.624130249023438
Test Accuracy: 0.1066666692495346
Task 3 - Re-evaluation on Task 2
Test Loss: 5.865112781524658
Test Accuracy: 0.28999999165534973
