# VCL

The code is taken from here, and changed to make it adaptable with python 3 and tensorflow 2 (the original code was written for python 2 and tensorflow 1)
https://github.com/nvcuong/variational-continual-learning/blob/master/ddm/alg/cla_models_multihead.py

In [None]:
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()

import numpy as np
from copy import deepcopy

import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt


np.random.seed(0)
tf.set_random_seed(0)

# variable initialization functions
def weight_variable(shape, init_weights=None):
    if init_weights is not None:
        initial = tf.constant(init_weights)
    else:
        initial = tf.truncated_normal(shape, stddev=0.1)
    return tf.Variable(initial)

def bias_variable(shape):
    initial = tf.constant(0.1, shape=shape)
    return tf.Variable(initial)

def small_variable(shape):
    initial = tf.constant(-6.0, shape=shape)
    return tf.Variable(initial)

def zero_variable(shape):
    initial = tf.zeros(shape=shape)
    return tf.Variable(initial)

def _create_weights_mf(in_dim, hidden_size, out_dim, init_weights=None, init_variances=None):
    size = deepcopy(hidden_size)
    size.append(out_dim)
    size.insert(0, in_dim)
    no_params = 0
    for i in range(len(size) - 1):
        no_weights = size[i] * size[i+1]
        no_biases = size[i+1]
        no_params += (no_weights + no_biases)
    m_weights = weight_variable([no_params], init_weights)
    if init_variances is None:
        v_weights = small_variable([no_params])
    else:
        v_weights = tf.Variable(tf.constant(init_variances, dtype=tf.float32))
    return no_params, m_weights, v_weights, size

class Cla_NN(object):
    def __init__(self, input_size, hidden_size, output_size, training_size):
        # input and output placeholders
        self.x = tf.placeholder(tf.float32, [None, input_size])
        self.y = tf.placeholder(tf.float32, [None, output_size])
        self.task_idx = tf.placeholder(tf.int32)

    def assign_optimizer(self, learning_rate=0.001):
        self.train_step = tf.train.AdamOptimizer(learning_rate).minimize(self.cost)

    def assign_session(self):
        # Initializing the variables
        init = tf.global_variables_initializer()

        # launch a session
        self.sess = tf.Session()
        self.sess.run(init)



    def train(self, x_train, y_train, task_idx, no_epochs=1000, batch_size=100, display_epoch=5):
        N = x_train.shape[0]
        if batch_size > N:
            batch_size = N

        costs = []
        # Training cycle
        for epoch in range(no_epochs):
            perm_inds = list(range(x_train.shape[0]))
            np.random.shuffle(perm_inds)
            cur_x_train = x_train[perm_inds]
            cur_y_train = y_train[perm_inds]

            avg_cost = 0.
            total_batch = int(np.ceil(N * 1.0 / batch_size))
            # Loop over all batches
            for i in range(total_batch):
                start_ind = i*batch_size
                end_ind = np.min([(i+1)*batch_size, N])
                batch_x = cur_x_train[start_ind:end_ind, :]
                batch_y = cur_y_train[start_ind:end_ind, :]
                # Run optimization op (backprop) and cost op (to get loss value)
                _, c = self.sess.run(
                    [self.train_step, self.cost],
                    feed_dict={self.x: batch_x, self.y: batch_y, self.task_idx: task_idx})
                # Compute average loss
                avg_cost += c / total_batch
            # Display logs per epoch step
            if epoch % display_epoch == 0:
                print("Epoch:", '%04d' % (epoch+1), "cost=", \
                    "{:.9f}".format(avg_cost))
            costs.append(avg_cost)
        print("Optimization Finished!")
        return costs

    def prediction(self, x_test, task_idx):
        # Test model
        prediction = self.sess.run([self.pred], feed_dict={self.x: x_test, self.task_idx: task_idx})[0]
        return prediction

    def prediction_prob(self, x_test, task_idx):
        prob = self.sess.run([tf.nn.softmax(self.pred)], feed_dict={self.x: x_test, self.task_idx: task_idx})[0]
        return prob

    def get_weights(self):
        weights = self.sess.run([self.weights])[0]
        return weights

    def close_session(self):
        self.sess.close()


""" Neural Network Model """
class Vanilla_NN(Cla_NN):
    def __init__(self, input_size, hidden_size, output_size, training_size, prev_weights=None, learning_rate=0.001):

        super(Vanilla_NN, self).__init__(input_size, hidden_size, output_size, training_size)
        # init weights and biases
        self.W, self.b, self.W_last, self.b_last, self.size = self.create_weights(
                input_size, hidden_size, output_size, prev_weights)
        self.no_layers = len(hidden_size) + 1
        self.pred = self._prediction(self.x, self.task_idx)
        self.cost = - self._logpred(self.x, self.y, self.task_idx)
        self.weights = [self.W, self.b, self.W_last, self.b_last]

        self.assign_optimizer(learning_rate)
        self.assign_session()

    def _prediction(self, inputs, task_idx):
        act = inputs
        for i in range(self.no_layers-1):
            pre = tf.add(tf.matmul(act, self.W[i]), self.b[i])
            act = tf.nn.relu(pre)
        pre = tf.add(tf.matmul(act, tf.gather(self.W_last, task_idx)), tf.gather(self.b_last, task_idx))
        return pre

    def _logpred(self, inputs, targets, task_idx):
        pred = self._prediction(inputs, task_idx)
        log_lik = - tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=pred, labels=targets))
        return log_lik

    def create_weights(self, in_dim, hidden_size, out_dim, prev_weights):
        hidden_size = deepcopy(hidden_size)
        hidden_size.append(out_dim)
        hidden_size.insert(0, in_dim)
        no_params = 0
        no_layers = len(hidden_size) - 1
        W = []
        b = []
        W_last = []
        b_last = []
        for i in range(no_layers-1):
            din = hidden_size[i]
            dout = hidden_size[i+1]
            if prev_weights is None:
                Wi_val = tf.truncated_normal([din, dout], stddev=0.1)
                bi_val = tf.truncated_normal([dout], stddev=0.1)
            else:
                Wi_val = tf.constant(prev_weights[0][i])
                bi_val = tf.constant(prev_weights[1][i])
            Wi = tf.Variable(Wi_val)
            bi = tf.Variable(bi_val)
            W.append(Wi)
            b.append(bi)

        if prev_weights is not None:
            prev_Wlast = prev_weights[2]
            prev_blast = prev_weights[3]
            no_prev_tasks = len(prev_Wlast)
            for j in range(no_prev_tasks):
                W_j = prev_Wlast[j]
                b_j = prev_blast[j]
                Wi = tf.Variable(W_j)
                bi = tf.Variable(b_j)
                W_last.append(Wi)
                b_last.append(bi)

        din = hidden_size[-2]
        dout = hidden_size[-1]
        Wi_val = tf.truncated_normal([din, dout], stddev=0.1)
        bi_val = tf.truncated_normal([dout], stddev=0.1)
        Wi = tf.Variable(Wi_val)
        bi = tf.Variable(bi_val)
        W_last.append(Wi)
        b_last.append(bi)

        return W, b, W_last, b_last, hidden_size


""" Bayesian Neural Network with Mean field VI approximation """
class MFVI_NN(Cla_NN):
    def __init__(self, input_size, hidden_size, output_size, training_size,
        no_train_samples=10, no_pred_samples=100, prev_means=None, prev_log_variances=None, learning_rate=0.001,
        prior_mean=0, prior_var=1):
        self.in_size = input_size
        self.out_size = output_size
        super(MFVI_NN, self).__init__(input_size, hidden_size, output_size, training_size)
        m, v, self.size = self.create_weights(
            input_size, hidden_size, output_size, prev_means, prev_log_variances)
        self.W_m, self.b_m, self.W_last_m, self.b_last_m = m[0], m[1], m[2], m[3]
        self.W_v, self.b_v, self.W_last_v, self.b_last_v = v[0], v[1], v[2], v[3]
        self.weights = [m, v]

        m, v = self.create_prior(input_size, hidden_size, output_size, prev_means, prev_log_variances, prior_mean, prior_var)
        self.prior_W_m, self.prior_b_m, self.prior_W_last_m, self.prior_b_last_m = m[0], m[1], m[2], m[3]
        self.prior_W_v, self.prior_b_v, self.prior_W_last_v, self.prior_b_last_v = v[0], v[1], v[2], v[3]

        self.no_layers = len(self.size) - 1
        self.no_train_samples = no_train_samples
        self.no_pred_samples = no_pred_samples
        self.pred = self._prediction(self.x, self.task_idx, self.no_pred_samples)
        self.cost = tf.div(self._KL_term(), training_size) - self._logpred(self.x, self.y, self.task_idx)
        #self.cost = tf.div(self._KL_term(), training_size)
        self.assign_optimizer(learning_rate)
        self.assign_session()

    def _prediction(self, inputs, task_idx, no_samples):
        return self._prediction_layer(inputs, task_idx, no_samples)

    # this samples a layer at a time
    def _prediction_layer(self, inputs, task_idx, no_samples):
        K = no_samples
        act = tf.tile(tf.expand_dims(inputs, 0), [K, 1, 1])
        for i in range(self.no_layers-1):
            din = self.size[i]
            dout = self.size[i+1]
            eps_w = tf.random_normal((K, din, dout), 0, 1, dtype=tf.float32)
            eps_b = tf.random_normal((K, 1, dout), 0, 1, dtype=tf.float32)

            weights = tf.add(tf.multiply(eps_w, tf.exp(0.5*self.W_v[i])), self.W_m[i])
            biases = tf.add(tf.multiply(eps_b, tf.exp(0.5*self.b_v[i])), self.b_m[i])

            act = tf.cast(act,tf.float32)
            pre = tf.add(tf.einsum('mni,mio->mno', act, weights), biases)
            act = tf.nn.relu(pre)
        din = self.size[-2]
        dout = self.size[-1]
        eps_w = tf.random_normal((K, din, dout), 0, 1, dtype=tf.float32)
        eps_b = tf.random_normal((K, 1, dout), 0, 1, dtype=tf.float32)

        Wtask_m = tf.gather(self.W_last_m, task_idx)
        Wtask_v = tf.gather(self.W_last_v, task_idx)
        btask_m = tf.gather(self.b_last_m, task_idx)
        btask_v = tf.gather(self.b_last_v, task_idx)
        weights = tf.add(tf.multiply(eps_w, tf.exp(0.5*Wtask_v)), Wtask_m)
        biases = tf.add(tf.multiply(eps_b, tf.exp(0.5*btask_v)), btask_m)
        act = tf.expand_dims(act, 3)
        weights = tf.expand_dims(weights, 1)
        pre = tf.add(tf.reduce_sum(act * weights, 2), biases)

        return pre

    def compute_gradients(self, x_batch, y_batch, task_idx):

        with tf.Session() as sess:
            sess.run(tf.global_variables_initializer())
            gradients = tf.gradients(self._KL_term() - self._logpred(x_batch.reshape(1,self.in_size)
                                                                           , y_batch.reshape(1,self.out_size), 0), tf.trainable_variables())
            gradients = [grad for grad in gradients if grad is not None]
            gradients = sess.run(gradients)

        return gradients

    def _logpred(self, inputs, targets, task_idx):
        pred = self._prediction(inputs, task_idx, self.no_train_samples)
        targets = tf.tile(tf.expand_dims(targets, 0), [self.no_train_samples, 1, 1])
        log_lik = - tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=pred, labels=targets))
        return log_lik

    def _KL_term(self):
        kl = 0
        for i in range(self.no_layers-1):
            din = self.size[i]
            dout = self.size[i+1]
            m, v = self.W_m[i], self.W_v[i]
            m0, v0 = self.prior_W_m[i], self.prior_W_v[i]
            const_term = -0.5 * dout * din
            log_std_diff = 0.5 * tf.reduce_sum(np.log(v0) - v)
            mu_diff_term = 0.5 * tf.reduce_sum((tf.exp(v) + (m0 - m)**2) / v0)
            kl += const_term + log_std_diff + mu_diff_term

            m, v = self.b_m[i], self.b_v[i]
            m0, v0 = self.prior_b_m[i], self.prior_b_v[i]
            const_term = -0.5 * dout
            log_std_diff = 0.5 * tf.reduce_sum(np.log(v0) - v)
            mu_diff_term = 0.5 * tf.reduce_sum((tf.exp(v) + (m0 - m)**2) / v0)
            kl += const_term + log_std_diff + mu_diff_term

        no_tasks = len(self.W_last_m)
        din = self.size[-2]
        dout = self.size[-1]
        for i in range(no_tasks):
            m, v = self.W_last_m[i], self.W_last_v[i]
            m0, v0 = self.prior_W_last_m[i], self.prior_W_last_v[i]
            const_term = -0.5 * dout * din
            log_std_diff = 0.5 * tf.reduce_sum(np.log(v0) - v)
            mu_diff_term = 0.5 * tf.reduce_sum((tf.exp(v) + (m0 - m)**2) / v0)
            kl += const_term + log_std_diff + mu_diff_term

            m, v = self.b_last_m[i], self.b_last_v[i]
            m0, v0 = self.prior_b_last_m[i], self.prior_b_last_v[i]
            const_term = -0.5 * dout
            log_std_diff = 0.5 * tf.reduce_sum(np.log(v0) - v)
            mu_diff_term = 0.5 * tf.reduce_sum((tf.exp(v) + (m0 - m)**2) / v0)
            kl += const_term + log_std_diff + mu_diff_term
        return kl

    def create_weights(self, in_dim, hidden_size, out_dim, prev_weights, prev_variances):
        hidden_size = deepcopy(hidden_size)
        hidden_size.append(out_dim)
        hidden_size.insert(0, in_dim)
        no_params = 0
        no_layers = len(hidden_size) - 1
        W_m = []
        b_m = []
        W_last_m = []
        b_last_m = []
        W_v = []
        b_v = []
        W_last_v = []
        b_last_v = []
        for i in range(no_layers-1):
            din = hidden_size[i]
            dout = hidden_size[i+1]
            if prev_weights is None:
                Wi_m_val = tf.truncated_normal([din, dout], stddev=0.1)
                bi_m_val = tf.truncated_normal([dout], stddev=0.1)
                Wi_v_val = tf.constant(-6.0, shape=[din, dout])
                bi_v_val = tf.constant(-6.0, shape=[dout])
            else:
                Wi_m_val = prev_weights[0][i]
                bi_m_val = prev_weights[1][i]
                if prev_variances is None:
                    Wi_v_val = tf.constant(-6.0, shape=[din, dout])
                    bi_v_val = tf.constant(-6.0, shape=[dout])
                else:
                    Wi_v_val = prev_variances[0][i]
                    bi_v_val = prev_variances[1][i]

            Wi_m = tf.Variable(Wi_m_val)
            bi_m = tf.Variable(bi_m_val)
            Wi_v = tf.Variable(Wi_v_val)
            bi_v = tf.Variable(bi_v_val)
            W_m.append(Wi_m)
            b_m.append(bi_m)
            W_v.append(Wi_v)
            b_v.append(bi_v)

        # if there are previous tasks
        if prev_weights is not None and prev_variances is not None:
            prev_Wlast_m = prev_weights[2]
            prev_blast_m = prev_weights[3]
            prev_Wlast_v = prev_variances[2]
            prev_blast_v = prev_variances[3]
            no_prev_tasks = len(prev_Wlast_m)
            for i in range(no_prev_tasks):
                W_i_m = prev_Wlast_m[i]
                b_i_m = prev_blast_m[i]
                Wi_m = tf.Variable(W_i_m)
                bi_m = tf.Variable(b_i_m)

                W_i_v = prev_Wlast_v[i]
                b_i_v = prev_blast_v[i]
                Wi_v = tf.Variable(W_i_v)
                bi_v = tf.Variable(b_i_v)

                W_last_m.append(Wi_m)
                b_last_m.append(bi_m)
                W_last_v.append(Wi_v)
                b_last_v.append(bi_v)

        din = hidden_size[-2]
        dout = hidden_size[-1]

        # if point estimate is supplied
        if prev_weights is not None and prev_variances is None:
            Wi_m_val = prev_weights[2][0]
            bi_m_val = prev_weights[3][0]
        else:
            Wi_m_val = tf.truncated_normal([din, dout], stddev=0.1)
            bi_m_val = tf.truncated_normal([dout], stddev=0.1)
        Wi_v_val = tf.constant(-6.0, shape=[din, dout])
        bi_v_val = tf.constant(-6.0, shape=[dout])

        Wi_m = tf.Variable(Wi_m_val)
        bi_m = tf.Variable(bi_m_val)
        Wi_v = tf.Variable(Wi_v_val)
        bi_v = tf.Variable(bi_v_val)
        W_last_m.append(Wi_m)
        b_last_m.append(bi_m)
        W_last_v.append(Wi_v)
        b_last_v.append(bi_v)

        return [W_m, b_m, W_last_m, b_last_m], [W_v, b_v, W_last_v, b_last_v], hidden_size

    def create_prior(self, in_dim, hidden_size, out_dim, prev_weights, prev_variances, prior_mean, prior_var):
        hidden_size = deepcopy(hidden_size)
        hidden_size.append(out_dim)
        hidden_size.insert(0, in_dim)
        no_params = 0
        no_layers = len(hidden_size) - 1
        W_m = []
        b_m = []
        W_last_m = []
        b_last_m = []
        W_v = []
        b_v = []
        W_last_v = []
        b_last_v = []
        for i in range(no_layers-1):
            din = hidden_size[i]
            dout = hidden_size[i+1]
            if prev_weights is not None and prev_variances is not None:
                Wi_m = prev_weights[0][i]
                bi_m = prev_weights[1][i]
                Wi_v = np.exp(prev_variances[0][i])
                bi_v = np.exp(prev_variances[1][i])
            else:
                Wi_m = prior_mean
                bi_m = prior_mean
                Wi_v = prior_var
                bi_v = prior_var

            W_m.append(Wi_m)
            b_m.append(bi_m)
            W_v.append(Wi_v)
            b_v.append(bi_v)

        # if there are previous tasks
        if prev_weights is not None and prev_variances is not None:
            prev_Wlast_m = prev_weights[2]
            prev_blast_m = prev_weights[3]
            prev_Wlast_v = prev_variances[2]
            prev_blast_v = prev_variances[3]
            no_prev_tasks = len(prev_Wlast_m)
            for i in range(no_prev_tasks):
                Wi_m = prev_Wlast_m[i]
                bi_m = prev_blast_m[i]
                Wi_v = np.exp(prev_Wlast_v[i])
                bi_v = np.exp(prev_blast_v[i])

                W_last_m.append(Wi_m)
                b_last_m.append(bi_m)
                W_last_v.append(Wi_v)
                b_last_v.append(bi_v)

        din = hidden_size[-2]
        dout = hidden_size[-1]
        Wi_m = prior_mean
        bi_m = prior_mean
        Wi_v = prior_var
        bi_v = prior_var
        W_last_m.append(Wi_m)
        b_last_m.append(bi_m)
        W_last_v.append(Wi_v)
        b_last_v.append(bi_v)

        return [W_m, b_m, W_last_m, b_last_m], [W_v, b_v, W_last_v, b_last_v]

# Helper functions

Same code as https://github.com/nvcuong/variational-continual-learning/blob/master/ddm/alg/utils.py, adapted for python 3

We also changed the plot function to adapt it for our project

In [None]:
def merge_coresets(x_coresets, y_coresets):
    merged_x, merged_y = x_coresets[0], y_coresets[0]
    for i in range(1, len(x_coresets)):
        merged_x = np.vstack((merged_x, x_coresets[i]))
        merged_y = np.vstack((merged_y, y_coresets[i]))
    return merged_x, merged_y

def get_scores(model, x_testsets, y_testsets, x_coresets, y_coresets, hidden_size, no_epochs, single_head, batch_size=None):
    mf_weights, mf_variances = model.get_weights()
    acc = []

    if single_head:
        if len(x_coresets) > 0:
            x_train, y_train = merge_coresets(x_coresets, y_coresets)
            bsize = x_train.shape[0] if (batch_size is None) else batch_size
            final_model = MFVI_NN(x_train.shape[1], hidden_size, y_train.shape[1], x_train.shape[0], prev_means=mf_weights, prev_log_variances=mf_variances)
            final_model.train(x_train, y_train, 0, no_epochs, bsize)
        else:
            final_model = model

    for i in range(len(x_testsets)):
        if not single_head:
            if len(x_coresets) > 0:
                x_train, y_train = x_coresets[i], y_coresets[i]
                bsize = x_train.shape[0] if (batch_size is None) else batch_size
                final_model = MFVI_NN(x_train.shape[1], hidden_size, y_train.shape[1], x_train.shape[0], prev_means=mf_weights, prev_log_variances=mf_variances)
                final_model.train(x_train, y_train, i, no_epochs, bsize)
            else:
                final_model = model

        head = 0 if single_head else i
        x_test, y_test = x_testsets[i], y_testsets[i]

        pred = final_model.prediction_prob(x_test, head)
        pred_mean = np.mean(pred, axis=0)
        pred_y = np.argmax(pred_mean, axis=1)
        y = np.argmax(y_test, axis=1)
        cur_acc = len(np.where((pred_y - y) == 0)[0]) * 1.0 / y.shape[0]
        acc.append(cur_acc)

        if len(x_coresets) > 0 and not single_head:
            final_model.close_session()

    if len(x_coresets) > 0 and single_head:
        final_model.close_session()

    return acc

def concatenate_results(score, all_score):
    if all_score.size == 0:
        all_score = np.reshape(score, (1,-1))
    else:
        new_arr = np.empty((all_score.shape[0], all_score.shape[1]+1))
        new_arr[:] = np.nan
        new_arr[:,:-1] = all_score
        all_score = np.vstack((new_arr, score))
    return all_score

def plot(data):
    fig, axs = plt.subplots(1, 5, figsize=(16, 4))

    for i, ax in enumerate(axs.flat):
        ax.plot(range(1,6), data[0][:, i], marker='o',label = 'No coreset')
        ax.plot(range(1,6), data[1][:, i], marker='o',label = 'Random')
        ax.plot(range(1,6), data[2][:, i], marker='o',label = 'K-center')
        ax.plot(range(1,6), data[3][:, i], marker='o',label = 'Forgetting')
        ax.plot(range(1,6), data[4][:, i], marker='o',label = 'Herding')
        ax.plot(range(1,6), data[5][:, i], marker='o',label = 'Entropy')
        ax.set_title(f'Task {i+1}')
        ax.set_xlabel('Task')
        ax.set_ylabel('Accuracy')
        ax.set_ylim(0.7,1.0)
        ax.set_xlim(0.5, 5.5)  # Setting x-axis range from 0.5 to 10.5
        ax.set_xticks(np.arange(1, 6, 1))  # Setting x-axis ticks from 1 to 10
        ax.set_xticklabels(range(1, 6))  # Setting x-axis tick labels from 1 to 10

    handles, labels = ax.get_legend_handles_labels()
    fig.legend(handles, labels, bbox_to_anchor=(0.5, 1.1),loc='upper center',fontsize='large',ncol=len(labels))
    plt.tight_layout()
    plt.show()

    fig.savefig("coreset3",bbox_inches='tight')

# Run VCL

Taken from https://github.com/nvcuong/variational-continual-learning/blob/master/ddm/alg/vcl.py, adapted for python 3

In [None]:


def run_vcl(hidden_size, no_epochs, data_gen, coreset_method, coreset_size=0, batch_size=None, single_head=True):
    in_dim, out_dim = data_gen.get_dims()
    x_coresets, y_coresets = [], []
    x_testsets, y_testsets = [], []

    all_acc = np.array([])

    for task_id in range(data_gen.max_iter):
        x_train, y_train, x_test, y_test = data_gen.next_task()
        x_testsets.append(x_test)
        y_testsets.append(y_test)
        # Set the readout head to train
        head = 0 if single_head else task_id
        bsize = x_train.shape[0] if (batch_size is None) else batch_size

        # Train network with maximum likelihood to initialize first model
        if task_id == 0:
            ml_model = Vanilla_NN(in_dim, hidden_size, out_dim, x_train.shape[0])
            ml_model.train(x_train, y_train, task_id, no_epochs, bsize)
            mf_weights = ml_model.get_weights()
            mf_variances = None
            ml_model.close_session()

        # Select coreset if needed
        if coreset_size > 0:
            x_coresets, y_coresets, x_train, y_train = coreset_method(x_coresets, y_coresets, x_train, y_train, coreset_size,in_dim,hidden_size,out_dim,mf_weights,mf_variances,head)

        # Train on non-coreset data
        mf_model = MFVI_NN(in_dim, hidden_size, out_dim, x_train.shape[0], prev_means=mf_weights, prev_log_variances=mf_variances)
        mf_model.train(x_train, y_train, head, no_epochs, bsize)
        mf_weights, mf_variances = mf_model.get_weights()

        # Incorporate coreset data and make prediction
        acc = get_scores(mf_model, x_testsets, y_testsets, x_coresets, y_coresets, hidden_size, no_epochs, single_head, batch_size)
        all_acc = concatenate_results(acc, all_acc)

        mf_model.close_session()

    return all_acc

# Coreset selection

entropy_selection, forgetting_selection, sampling_selection and herding_center was written by me.

rand_from_batch and k_center is from https://github.com/nvcuong/variational-continual-learning/blob/master/ddm/alg/coreset.py,

In [None]:
import numpy as np
import math
""" Random coreset selection """
def rand_from_batch(x_coreset, y_coreset, x_train, y_train, coreset_size,in_dim,hidden_size,out_dim,mf_weights,mf_variances,head):
    # Randomly select from (x_train, y_train) and add to current coreset (x_coreset, y_coreset)
    idx = np.random.choice(x_train.shape[0], coreset_size, False)
    x_coreset.append(x_train[idx,:])
    y_coreset.append(y_train[idx,:])
    x_train = np.delete(x_train, idx, axis=0)
    y_train = np.delete(y_train, idx, axis=0)
    return x_coreset, y_coreset, x_train, y_train

""" K-center coreset selection """
def k_center(x_coreset, y_coreset, x_train, y_train, coreset_size,in_dim,hidden_size,out_dim,mf_weights,mf_variances,head):
    # Select K centers from (x_train, y_train) and add to current coreset (x_coreset, y_coreset)
    dists = np.full(x_train.shape[0], np.inf)
    current_id = 0
    dists = update_distance(dists, x_train, current_id)
    idx = [ current_id ]

    for i in range(1, coreset_size):
        current_id = np.argmax(dists)
        dists = update_distance(dists, x_train, current_id)
        idx.append(current_id)

    x_coreset.append(x_train[idx,:])
    y_coreset.append(y_train[idx,:])
    x_train = np.delete(x_train, idx, axis=0)
    y_train = np.delete(y_train, idx, axis=0)

    return x_coreset, y_coreset, x_train, y_train

def entropy_selection(x_coreset, y_coreset, x_train, y_train, coreset_size,in_dim,hidden_size,out_dim,mf_weights,mf_variances,head):
    perm_indices = np.random.permutation(len(x_train))
    x_train = x_train[perm_indices]
    y_train = y_train[perm_indices]
    mf_model = MFVI_NN(in_dim, hidden_size, out_dim, x_train.shape[0], prev_means=mf_weights, prev_log_variances=mf_variances)
    batch_size = 10000
    N = x_train.shape[0]
    T = np.full(x_train.shape[0],0)
    total_batch = int(np.ceil(N * 1.0 / batch_size))
    for i in range(total_batch):
        print(i)
        start_ind = i*batch_size
        end_ind = np.min([(i+1)*batch_size, N])
        batch_x = x_train[start_ind:end_ind, :]
        pred = mf_model.prediction_prob(batch_x, head)

        pred_mean = np.mean(pred, axis=0)


        pred_mean_log = np.log(pred_mean)

        entropy = -pred_mean * pred_mean_log

        sums = np.sum(entropy,axis = 1)

        T[start_ind:end_ind] = sums

    inds = np.argpartition(T, -coreset_size)[-coreset_size:].astype(int)

    mask = np.full(len(x_train), True)  # Create a mask with all True
    mask[inds] = False  # Set selected indices to False
    x_coreset.append(x_train[~mask])
    y_coreset.append(y_train[~mask])
    x_train = x_train[mask]
    y_train = y_train[mask]
    return x_coreset, y_coreset, x_train, y_train

def forgetting_selection(x_coreset, y_coreset, x_train, y_train, coreset_size,in_dim,hidden_size,out_dim,mf_weights,mf_variances,head):
    perm_indices = np.random.permutation(len(x_train))
    x_train = x_train[perm_indices]
    y_train = y_train[perm_indices]
    mf_model = MFVI_NN(in_dim, hidden_size, out_dim, x_train.shape[0], prev_means=mf_weights, prev_log_variances=mf_variances)

    T = np.full(x_train.shape[0],0)
    prev_acc = np.full(x_train.shape[0],0)
    classification = np.full(x_train.shape[0],False)
    # use a large batch size
    batch_size = 10000

    N = x_train.shape[0]
    # Loop over all batches
    for e in range(20):
        total_batch = int(np.ceil(N * 1.0 / batch_size))
        for i in range(total_batch):
            print(i)
            start_ind = i*batch_size
            end_ind = np.min([(i+1)*batch_size, N])
            batch_x = x_train[start_ind:end_ind, :]
            batch_y = y_train[start_ind:end_ind, :]

            pred = mf_model.prediction_prob(batch_x, head)
            pred_mean = np.mean(pred, axis=0)
            pred_y = np.argmax(pred_mean, axis=1)
            y = np.argmax(batch_y, axis=1)
            ind = -1

            for j in range(start_ind,end_ind):
                ind+=1
                if(pred_y[ind] == y[ind]):
                    classification[j] = True
                if prev_acc[j] > (pred_y[ind] == y[ind]):
                    T[j] = T[j] + 1
                prev_acc[j] = (pred_y[ind] == y[ind])

            mf_model.train(batch_x, batch_y, 0, 1, batch_x.shape[0])
    inds = []
    for j in range(N):
        if(len(inds) >= coreset_size):
            break
        if(classification[j] == False):
            inds.append(j)
    rem = coreset_size - len(inds)
    if(rem > 0):
        otherMxInds = np.argpartition(T, -rem)[-rem:]
        inds = np.concatenate((otherMxInds,inds))
        inds = inds.astype(int)
    print(inds)
    mask = np.ones(len(x_train), dtype=bool)  # Create a mask with all True
    mask[inds[:coreset_size]] = False  # Set selected indices to False
    x_coreset.append(x_train[~mask])
    y_coreset.append(y_train[~mask])
    x_train = x_train[mask]
    y_train = y_train[mask]
    return x_coreset, y_coreset, x_train, y_train
def compute_score(G,g):
    h = np.linalg.norm(g)
    k = np.linalg.norm(G, axis=1)
    c = np.max(np.divide(np.dot(g, np.array(G).T), (h * k)))
    return max(c, 0) + 1  # Ensure the score is positive


'''
The loss based coreset selection method mentioned in the appendix
'''
def sampling_selection(x_coreset,y_coreset,x_train,y_train,coreset_size,in_dim,hidden_size,out_dim,mf_weights,mf_variances,head):
    perm_indices = np.random.permutation(len(x_train))
    x_train = x_train[perm_indices]
    y_train = y_train[perm_indices]
    mf_model = MFVI_NN(in_dim, hidden_size, out_dim, 1, prev_means=mf_weights, prev_log_variances=mf_variances)
    grads = []
    res = []
    G = []
    M = []
    C = []
    for j in range(x_train.shape[0]):

        if(j> 3*coreset_size):
            break
        g_i = mf_model.compute_gradients(x_train[j], y_train[j], -1)
        g_i = np.concatenate([0 if x is None else x.flatten() for x in g_i])
        if(j>0):
            c = compute_score(G,g_i)
        else:
            c=0
        if(j%10 == 0):
            print(c)
        if len(M) >= coreset_size:
            if(math.isnan(c)):
                i = np.random.randint(0, len(M))
                r = np.random.uniform(0, 1)
                if r < 0.3:
                    print("Replaced")
                    M[i] = j
                    C[i] = 0
                    G[i] = g_i
            elif c < 1:  # Check if cosine similarity is less than 0

                i = np.random.choice(len(M), p=C/np.sum(C))
                r = np.random.uniform(0, 1)
                if r < C[i] / (C[i] + c):
                    print("Replaced")
                    M[i] = (j)
                    C[i] = c
                    G[i] = g_i
        else:
            if(math.isnan(c)):
                c = 0
            M.append(j)
            C.append(c)
            G.append(g_i)

    mask = np.ones(len(x_train), dtype=bool)  # Create a mask with all True
    mask[M[:coreset_size]] = False  # Set selected indices to False

    x_coreset.append(x_train[~mask])
    y_coreset.append(y_train[~mask])
    x_train = x_train[mask]
    y_train = y_train[mask]
    return x_coreset, y_coreset, x_train, y_train



""" Herding coreset selection """
def herding_center(x_coreset, y_coreset, x_train, y_train, coreset_size,in_dim,hidden_size,out_dim,mf_weights,mf_variances,head):

    dists = np.full(x_train.shape[0], np.inf)
    current_cent = x_train[0]
    dists = update_distance_herding(dists, x_train, x_train[0])
    idx = [ 0 ]
    msk = np.full(x_train.shape[0],True)
    msk[0] = False
    for i in range(1, coreset_size):
        filtered_array = dists[msk]
        argmax_filtered = np.argmax(filtered_array)
        current_id = np.where(msk)[0][argmax_filtered]
        res = []
        for j in range(x_train[current_id].shape[0]):
            res.append((i*current_cent[j] + x_train[current_id][j])/(i+1))
        current_cent = np.array(res)
        dists = update_distance_herding(dists, x_train, current_cent)
        msk[current_id] = False
        idx.append(current_id)

    x_coreset.append(x_train[idx,:])
    y_coreset.append(y_train[idx,:])
    x_train = np.delete(x_train, idx, axis=0)
    y_train = np.delete(y_train, idx, axis=0)

    return x_coreset, y_coreset, x_train, y_train
def update_distance_herding(dists, x_train, current_id):
    for i in range(x_train.shape[0]):
        current_dist = np.linalg.norm(x_train[i,:]-current_id)
        dists[i] = current_dist
    return dists

def update_distance(dists, x_train, current_id):
    for i in range(x_train.shape[0]):
        current_dist = np.linalg.norm(x_train[i,:]-x_train[current_id,:])
        dists[i] = np.minimum(current_dist, dists[i])
    return dists

# Rotated MNIST

Code written by me

In [None]:
import gzip
import pickle
import cv2
import sys
sys.path.extend(['alg/'])
from copy import deepcopy

class RotatedMnistGenerator():
    def __init__(self, max_iter=5):
        with open("../input/dataset/mnist.pkl", 'rb') as f:
            print(f)
            train_set, valid_set, test_set = pickle.load(f,encoding='latin1')


        f.close()

        self.X_train = np.vstack((train_set[0], valid_set[0]))
        self.Y_train = np.hstack((train_set[1], valid_set[1]))
        self.X_test = test_set[0]
        self.Y_test = test_set[1]

        print(self.Y_train.shape,self.X_train.shape)
        print(self.Y_test.shape,self.X_test.shape)
        self.max_iter = max_iter
        self.cur_iter = 0

    def get_dims(self):
        # Get data input and output dimensions
        return self.X_train.shape[1], 10
    def rotate_image(self,img, angle):
      # Get rotation matrix
        rot_mat = cv2.getRotationMatrix2D((img.shape[1] // 2, img.shape[0] // 2), np.degrees(angle), 1.0)
        return cv2.warpAffine(img, rot_mat, (img.shape[1], img.shape[0]))

    def next_task(self):
        if self.cur_iter >= self.max_iter:
            raise Exception('Number of tasks exceeded!')
        else:
            np.random.seed(self.cur_iter)
            angle = np.random.rand() * 2 * np.pi

            # Retrieve train data
            next_x_train = deepcopy(self.X_train)
            next_y_train = np.eye(10)[self.Y_train]
            next_x_test = deepcopy(self.X_test)
            next_y_test = np.eye(10)[self.Y_test]
            for i in range(next_x_train.shape[0]):
                img = next_x_train[i].reshape(28, 28)
                next_x_train[i] = self.rotate_image(img, angle).reshape(1,28*28)

                # Retrieve test data

            for i in range(next_x_test.shape[0]):
                img = next_x_test[i].reshape(28, 28)
                next_x_test[i] = self.rotate_image(img, angle).reshape(1,28*28)


            self.cur_iter += 1

            return next_x_train, next_y_train, next_x_test, next_y_test

hidden_size = [100,100]
batch_size = 256
no_epochs = 100
single_head = True


num_tasks = 5
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

coreset_size = 0
data_gen = RotatedMnistGenerator()
vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    rand_from_batch, coreset_size, batch_size, single_head)
print(vcl_result)

# Run random coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

coreset_size = 200
data_gen = RotatedMnistGenerator()
rand_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    rand_from_batch, coreset_size, batch_size, single_head)
print(rand_vcl_result)

# Run k-center coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

data_gen = RotatedMnistGenerator()
kcen_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    k_center, coreset_size, batch_size, single_head)
print(kcen_vcl_result)

# Run forgetting coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

data_gen = RotatedMnistGenerator()
forgetting_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    forgetting_selection, coreset_size, batch_size, single_head)
print(forgetting_vcl_result)

# Run herding coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

data_gen = RotatedMnistGenerator()
herding_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    herding_center, coreset_size, batch_size, single_head)
print(herding_vcl_result)


# Run entropy coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

data_gen = RotatedMnistGenerator()
entropy_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    entropy_selection, coreset_size, batch_size, single_head)
print(entropy_vcl_result)


# Plot average accuracy
vcl_avg = np.nanmean(vcl_result, 1)
rand_vcl_avg = np.nanmean(rand_vcl_result, 1)
kcen_vcl_avg = np.nanmean(kcen_vcl_result, 1)
forgetting_vcl_avg = np.nanmean(forgetting_vcl_result, 1)
entropy_vcl_avg = np.nanmean(entropy_vcl_result,1)
herding_vcl_avg = np.nanmean(herding_vcl_result,1)

print(vcl_avg)
print(rand_vcl_avg)
print(kcen_vcl_avg)
print(forgetting_vcl_avg)
print(herding_vcl_avg)
print(entropy_vcl_avg)




# split notMNIST

Code written by me

In [None]:
import os
from sklearn.model_selection import train_test_split
import tarfile
from scipy import ndimage
import numpy as np
import imageio


class notMNISTSplit():
    def __init__(self):
        with tarfile.open("../input/notmnist/notMNIST_small.tar.gz","r:gz") as tar:
            tar.extractall('../working/')
        tar.close()
        lst1 = ['A','B','C','D','E']
        lst2 = ['F','G','H','I','J']
        self.X_train = []
        self.X_test = []
        self.Y_train = []
        self.Y_test = []
        for i in range(5):
            X,Y = self.convert_images2(lst1[i],lst2[i])
            X = np.array(X)
            Y = np.array(Y)
            X_split_train, X_split_test, y_split_train, y_split_test = train_test_split(X, Y, test_size=0.2, random_state=42)

            y_split_train = np.vstack((y_split_train,1-y_split_train))
            y_split_test = np.vstack((y_split_test,1-y_split_test))
            y_split_train = y_split_train.transpose()
            y_split_test = y_split_test.transpose()
            self.X_train.append(X_split_train)
            self.X_test.append(X_split_test)
            self.Y_train.append(y_split_train)
            self.Y_test.append(y_split_test)
        self.max_iter = 5
        self.cur_iter = 0

    def convert_images2(self,a,b):
        res_x = []
        res_y = []
        for filename in os.listdir('../working/notMNIST_small'):
            # Construct the full path of the file
            if(filename != a and filename != b):
                continue
            file_path = os.path.join('../working/notMNIST_small', filename)

            for imgpath in os.listdir(file_path):
                new_path = os.path.join(file_path, imgpath)
                image_path = new_path
                try:
                    image_data = imageio.imread(image_path).flatten()  # Load as 2D array

                    res_x.append(image_data)
                    if(filename == a):
                        res_y.append(0)
                    else:
                        res_y.append(1)
                except Exception as e:
                    print(e)
        return (res_x, res_y)
    def get_dims(self):
        # Get data input and output dimensions
        return self.X_train[0].shape[1], 2

    def next_task(self):
        if self.cur_iter >= self.max_iter:
            raise Exception('Number of tasks exceeded!')
        else:
            self.cur_iter += 1
            return self.X_train[self.cur_iter-1], self.Y_train[self.cur_iter-1],self.X_test[self.cur_iter-1],self.Y_test[self.cur_iter-1]

hidden_size = [150, 150, 150, 150]
batch_size = None
no_epochs = 100
single_head = False


num_tasks = 5
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

coreset_size = 0
data_gen = notMNISTSplit()
vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    rand_from_batch, coreset_size, batch_size, single_head)
print(vcl_result)

# Run random coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

coreset_size = 200
data_gen = notMNISTSplit()
rand_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    rand_from_batch, coreset_size, batch_size, single_head)
print(rand_vcl_result)

# Run k-center coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

data_gen = notMNISTSplit()
kcen_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    k_center, coreset_size, batch_size, single_head)
print(kcen_vcl_result)

# Run forgetting coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

data_gen = notMNISTSplit()
forgetting_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    forgetting_selection, coreset_size, batch_size, single_head)
print(forgetting_vcl_result)

# Run herding coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

data_gen = notMNISTSplit()
herding_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    herding_center, coreset_size, batch_size, single_head)
print(herding_vcl_result)


# Run entropy coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

data_gen = notMNISTSplit()
entropy_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    entropy_selection, coreset_size, batch_size, single_head)
print(entropy_vcl_result)


# Plot average accuracy
vcl_avg = np.nanmean(vcl_result, 1)
rand_vcl_avg = np.nanmean(rand_vcl_result, 1)
kcen_vcl_avg = np.nanmean(kcen_vcl_result, 1)
forgetting_vcl_avg = np.nanmean(forgetting_vcl_result, 1)
entropy_vcl_avg = np.nanmean(entropy_vcl_result,1)
herding_vcl_avg = np.nanmean(herding_vcl_result,1)

print(vcl_avg)
print(rand_vcl_avg)
print(kcen_vcl_avg)
print(forgetting_vcl_avg)
print(herding_vcl_avg)
print(entropy_vcl_avg)



# CIFAR-10

Code written by me

In [None]:
import pickle
import numpy as np

class Cifar10Generator():
    def __init__(self, max_iter=5):
        self.X_train = []
        self.Y_train = []
        self.sets_0 = [0, 2, 4, 6, 8]
        self.sets_1 = [1, 3, 5, 7, 9]
        with open("/content/drive/MyDrive/cifar-10-python/cifar-10-batches-py/data_batch_1", 'rb') as f:
            print(f)
            dict2 = pickle.load(f,encoding='latin1')
            X_train_batch = dict2["data"]
            Y_train_batch = dict2["labels"]

        f.close()

        with open("/content/drive/MyDrive/cifar-10-python/cifar-10-batches-py/data_batch_2", 'rb') as f:
            print(f)
            dict3 = pickle.load(f,encoding='latin1')
            X_train_batch = np.vstack((X_train_batch,dict3["data"]))
            Y_train_batch = np.concatenate((Y_train_batch,dict3["labels"]))

        f.close()

        with open("/content/drive/MyDrive/cifar-10-python/cifar-10-batches-py/data_batch_3", 'rb') as f:
            print(f)
            dict4 = pickle.load(f,encoding='latin1')
            X_train_batch = np.vstack((X_train_batch,dict4["data"]))
            Y_train_batch = np.concatenate((Y_train_batch,dict4["labels"]))

        f.close()

        with open("/content/drive/MyDrive/cifar-10-python/cifar-10-batches-py/data_batch_4", 'rb') as f:
            print(f)
            dict5 = pickle.load(f,encoding='latin1')
            X_train_batch = np.vstack((X_train_batch,dict5["data"]))
            Y_train_batch = np.concatenate((Y_train_batch,dict5["labels"]))

        f.close()

        with open("/content/drive/MyDrive/cifar-10-python/cifar-10-batches-py/data_batch_5", 'rb') as f:
            print(f)
            dict6 = pickle.load(f,encoding='latin1')
            X_train_batch = np.vstack((X_train_batch,dict6["data"]))
            Y_train_batch = np.concatenate((Y_train_batch,dict6["labels"]))

        f.close()

        self.X_train = X_train_batch
        self.Y_train = Y_train_batch
        with open("/content/drive/MyDrive/cifar-10-python/cifar-10-batches-py/test_batch", 'rb') as f:
            print(f)
            dict7 = pickle.load(f,encoding='latin1')
            self.X_test = dict7["data"]
            self.Y_test = dict7["labels"]

        f.close()


        self.max_iter = 5
        self.cur_iter = 0

    def get_dims(self):
        # Get data input and output dimensions
        return self.X_train.shape[1], 2

    def next_task(self):
        if self.cur_iter >= self.max_iter:
            raise Exception('Number of tasks exceeded!')
        else:
            # Retrieve train data
            train_0_id = np.where(self.Y_train == self.sets_0[self.cur_iter])[0]
            train_1_id = np.where(self.Y_train == self.sets_1[self.cur_iter])[0]
            next_x_train = np.vstack((self.X_train[train_0_id], self.X_train[train_1_id]))

            next_y_train = np.vstack((np.ones((train_0_id.shape[0], 1)), np.zeros((train_1_id.shape[0], 1))))
            next_y_train = np.hstack((next_y_train, 1-next_y_train))

            # Retrieve test data
            test_0_id = np.where(np.array(self.Y_test) == self.sets_0[self.cur_iter])[0]
            test_1_id = np.where(np.array(self.Y_test) == self.sets_1[self.cur_iter])[0]
            next_x_test = np.vstack((self.X_test[test_0_id], self.X_test[test_1_id]))
            next_y_test = np.vstack((np.ones((test_0_id.shape[0], 1)), np.zeros((test_1_id.shape[0], 1))))
            next_y_test = np.hstack((next_y_test, 1-next_y_test))

            self.cur_iter += 1

            return next_x_train, next_y_train, next_x_test, next_y_test

hidden_size = [150,150]
batch_size = 256
no_epochs = 100
single_head = False


num_tasks = 5

tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

coreset_size = 0
data_gen = Cifar10Generator()
vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    rand_from_batch, coreset_size, batch_size, single_head)
print(vcl_result)

# Run random coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

coreset_size = 200
data_gen = Cifar10Generator()
rand_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    rand_from_batch, coreset_size, batch_size, single_head)
print(rand_vcl_result)

# Run k-center coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

data_gen = Cifar10Generator()
kcen_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    k_center, coreset_size, batch_size, single_head)
print(kcen_vcl_result)

# Run forgetting coreset VCL

coreset_size = 200
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

data_gen = Cifar10Generator()
forgetting_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    forgetting_selection, coreset_size, batch_size, single_head)
print(forgetting_vcl_result)

# Run herding coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

data_gen = Cifar10Generator()
herding_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    herding_center, coreset_size, batch_size, single_head)
print(herding_vcl_result)

# Run entropy coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

data_gen = Cifar10Generator()
entropy_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    entropy_selection, coreset_size, batch_size, single_head)
print(entropy_vcl_result)

# Plot average accuracy
vcl_avg = np.nanmean(vcl_result, 1)
rand_vcl_avg = np.nanmean(rand_vcl_result, 1)
kcen_vcl_avg = np.nanmean(kcen_vcl_result, 1)
forgetting_vcl_avg = np.nanmean(forgetting_vcl_result, 1)
entropy_vcl_avg = np.nanmean(entropy_vcl_result,1)
herding_vcl_avg = np.nanmean(herding_vcl_result,1)

print(vcl_avg)
print(rand_vcl_avg)
print(kcen_vcl_avg)
print(forgetting_vcl_avg)
print(entropy_vcl_avg)
print(herding_vcl_avg)

# Permuted MNIST

Adapted from https://github.com/nvcuong/variational-continual-learning/blob/master/ddm/run_permuted.py

In [None]:
import gzip
import pickle

import sys
sys.path.extend(['alg/'])
from copy import deepcopy

class PermutedMnistGenerator():
    def __init__(self, max_iter=10):
        with open("../input/dataset/mnist.pkl", 'rb') as f:
            print(f)
            train_set, valid_set, test_set = pickle.load(f,encoding='latin1')


        f.close()

        self.X_train = np.vstack((train_set[0], valid_set[0]))
        self.Y_train = np.hstack((train_set[1], valid_set[1]))
        self.X_test = test_set[0]
        self.Y_test = test_set[1]

        print(self.Y_train.shape,self.X_train.shape)
        print(self.Y_test.shape,self.X_test.shape)
        self.max_iter = max_iter
        self.cur_iter = 0

    def get_dims(self):
        # Get data input and output dimensions
        return self.X_train.shape[1], 10

    def next_task(self):
        if self.cur_iter >= self.max_iter:
            raise Exception('Number of tasks exceeded!')
        else:
            np.random.seed(self.cur_iter)
            perm_inds = list(range(self.X_train.shape[1]))
            np.random.shuffle(perm_inds)

            # Retrieve train data
            next_x_train = deepcopy(self.X_train)
            next_x_train = next_x_train[:,perm_inds]
            next_y_train = np.eye(10)[self.Y_train]

            # Retrieve test data
            next_x_test = deepcopy(self.X_test)
            next_x_test = next_x_test[:,perm_inds]
            next_y_test = np.eye(10)[self.Y_test]

            self.cur_iter += 1

            return next_x_train, next_y_train, next_x_test, next_y_test

hidden_size = [100, 100]
batch_size = 256
no_epochs = 100
single_head = True
num_tasks = 10
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

coreset_size = 0
data_gen = PermutedMnistGenerator(num_tasks)
vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    rand_from_batch, coreset_size, batch_size, single_head)
print(vcl_result)

# Run random coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

coreset_size = 200
data_gen = PermutedMnistGenerator(num_tasks)
rand_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    rand_from_batch, coreset_size, batch_size, single_head)
print(rand_vcl_result)

# Run k-center coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

data_gen = PermutedMnistGenerator(num_tasks)
kcen_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    k_center, coreset_size, batch_size, single_head)
print(kcen_vcl_result)

# Run forgetting coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

data_gen = PermutedMnistGenerator(num_tasks)
forgetting_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    forgetting_selection, coreset_size, batch_size, single_head)
print(forgetting_vcl_result)

# Run herding coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

data_gen = PermutedMnistGenerator(num_tasks)
herding_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    herding_center, coreset_size, batch_size, single_head)
print(herding_vcl_result)


# Run entropy coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

data_gen = PermutedMnistGenerator(num_tasks)
entropy_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    entropy_selection, coreset_size, batch_size, single_head)
print(entropy_vcl_result)


# Plot average accuracy
vcl_avg = np.nanmean(vcl_result, 1)
rand_vcl_avg = np.nanmean(rand_vcl_result, 1)
kcen_vcl_avg = np.nanmean(kcen_vcl_result, 1)
forgetting_vcl_avg = np.nanmean(forgetting_vcl_result, 1)
entropy_vcl_avg = np.nanmean(entropy_vcl_result,1)
herding_vcl_avg = np.nanmean(herding_vcl_result,1)

print(vcl_avg)
print(rand_vcl_avg)
print(kcen_vcl_avg)
print(forgetting_vcl_avg)
print(herding_vcl_avg)
print(entropy_vcl_avg)




# split MNIST

Adapted from https://github.com/nvcuong/variational-continual-learning/blob/master/ddm/run_split.py

In [None]:

class SplitMnistGenerator():
    def __init__(self):
        with open("../input/dataset/mnist.pkl", 'rb') as f:
            train_set, valid_set, test_set = pickle.load(f,encoding='latin1')
        f.close()

        self.X_train = np.vstack((train_set[0], valid_set[0]))
        self.X_test = test_set[0]
        self.train_label = np.hstack((train_set[1], valid_set[1]))
        self.test_label = test_set[1]

        self.sets_0 = [0, 2, 4, 6, 8]
        self.sets_1 = [1, 3, 5, 7, 9]
        self.max_iter = len(self.sets_0)
        self.cur_iter = 0

    def get_dims(self):
        # Get data input and output dimensions
        return self.X_train.shape[1], 2

    def next_task(self):
        if self.cur_iter >= self.max_iter:
            raise Exception('Number of tasks exceeded!')
        else:
            # Retrieve train data
            train_0_id = np.where(self.train_label == self.sets_0[self.cur_iter])[0]
            train_1_id = np.where(self.train_label == self.sets_1[self.cur_iter])[0]
            next_x_train = np.vstack((self.X_train[train_0_id], self.X_train[train_1_id]))

            next_y_train = np.vstack((np.ones((train_0_id.shape[0], 1)), np.zeros((train_1_id.shape[0], 1))))
            next_y_train = np.hstack((next_y_train, 1-next_y_train))

            # Retrieve test data
            test_0_id = np.where(self.test_label == self.sets_0[self.cur_iter])[0]
            test_1_id = np.where(self.test_label == self.sets_1[self.cur_iter])[0]
            next_x_test = np.vstack((self.X_test[test_0_id], self.X_test[test_1_id]))

            next_y_test = np.vstack((np.ones((test_0_id.shape[0], 1)), np.zeros((test_1_id.shape[0], 1))))
            next_y_test = np.hstack((next_y_test, 1-next_y_test))

            self.cur_iter += 1

            return next_x_train, next_y_train, next_x_test, next_y_test

hidden_size = [256, 256]
batch_size = None
no_epochs = 120
single_head = False

num_tasks = 5
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

coreset_size = 0
data_gen = SplitMnistGenerator()
vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    rand_from_batch, coreset_size, batch_size, single_head)
print(vcl_result)

# Run random coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

coreset_size = 200
data_gen = SplitMnistGenerator()
rand_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    rand_from_batch, coreset_size, batch_size, single_head)
print(rand_vcl_result)

# Run k-center coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

data_gen = SplitMnistGenerator()
kcen_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    k_center, coreset_size, batch_size, single_head)
print(kcen_vcl_result)

# Run forgetting coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

data_gen = SplitMnistGenerator()
forgetting_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    forgetting_selection, coreset_size, batch_size, single_head)
print(forgetting_vcl_result)

# Run herding coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

data_gen = SplitMnistGenerator()
herding_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    herding_center, coreset_size, batch_size, single_head)
print(herding_vcl_result)


# Run entropy coreset VCL
tf.reset_default_graph()
tf.set_random_seed(12)
np.random.seed(1)

data_gen = SplitMnistGenerator()
entropy_vcl_result = run_vcl(hidden_size, no_epochs, data_gen,
    entropy_selection, coreset_size, batch_size, single_head)
print(entropy_vcl_result)


# Plot average accuracy
vcl_avg = np.nanmean(vcl_result, 1)
rand_vcl_avg = np.nanmean(rand_vcl_result, 1)
kcen_vcl_avg = np.nanmean(kcen_vcl_result, 1)
forgetting_vcl_avg = np.nanmean(forgetting_vcl_result, 1)
entropy_vcl_avg = np.nanmean(entropy_vcl_result,1)
herding_vcl_avg = np.nanmean(herding_vcl_result,1)

print(vcl_avg)
print(rand_vcl_avg)
print(kcen_vcl_avg)
print(forgetting_vcl_avg)
print(herding_vcl_avg)
print(entropy_vcl_avg)



