
# Final layer updation of a pre-trained model, if new classes come in the target dataset



In [1]:

import numpy as np
import math
import copy as cp
from sklearn.ensemble import RandomForestClassifier

# define dot function: calculate the inner product used in recompute voting weight.
def dot(K, L):
   if len(K) != len(L):
      return 0
   return sum(i[0] * i[1] for i in zip(K, L))
0
# get the dataset in next time step
def get_next_dataset(df, batch, k, target_name):
    return X_train, y_train


In [2]:

class LearnNSE:
    '''Learn++.NSE ensemble classifier
    <Parameters>
    @base_classifier: arbitary supervised classifier (default=RandomForestClassifier)
    @slope: float (default=0.5)
    @crossing_point: float (default=10.0)
    @models: list (default=None)
    @voting_weights: list (default=None)
    @error_weights: list (default=None)
    @error_distribution: list (default=None)
    '''
    def __init__(self, base_classifier=RandomForestClassifier(n_estimators=100, random_state=10),
                 alpha=0.5, beta=10.0):
        self.base_classifier = cp.deepcopy(base_classifier) # reset a model for current dataset
        self.models = []
        self.slope = alpha
        self.crossing_point = beta
        self.voting_weights = [1.0] # default=1.0
        self.error_distribution = []
        self.bkts = [] # save beta computed from the formula based on punishment of error rate
        self.wkts = []

    def fit(self, X_train, y_train):
        '''Function fit(): training model, and ensemble them
        <Parameters>
        @X_train: Dataframe
            A multi-dimension dataset for training model
        @y_train: Dataframe {0,1,2,...,n}
            The set of labels for each sample in training data
        '''
        # Initialize error_distribution to match the size of the current batch
        self.error_distribution = [1 / len(X_train)] * len(X_train)

        clf = cp.deepcopy(self.base_classifier)
        clf.fit(X_train, y_train)
        self.models.append(clf)


    def predict(self, X_test):
        '''Function predict(): testing model, and get the result from the ensemble model
        <Parameters>
        @X_test: Dataframe
            A multi-dimension dataset for testing model

        <Returns>
        @y_pred: numpy.ndarray
            A numpy.ndarray with the label prediction for all the samples in X
        '''
        y_pred = []
        t = len(self.models)

        for idx in range(len(X_test)):
            weighted_pred = [0.0]  # Initialize the weighted predictions list
            for k in range(1, t + 1):
                clf = self.models[k - 1]
                temp_target = clf.predict(X_test[idx:idx + 1])  # Predict for a single sample

                # Expand weighted_pred if necessary to accommodate the label
                while len(weighted_pred) <= temp_target[0]:
                    weighted_pred.append(0.0)

                # Add voting weight to the corresponding label index
                weighted_pred[temp_target[0]] += self.voting_weights[k - 1]

            # Determine the prediction by taking the class with the highest weighted vote
            y_pred.append(weighted_pred.index(max(weighted_pred)))

        return np.array(y_pred)


    def score(self, X_test, y_test):
        '''Function score(): testing model, and ensemble them
        <Parameters>
        @X_test: Dataframe
            A multi-dimension dataset for testing model
        @y_test: Series or Dataframe
            The set of labels for each test sample
        @score_list: List
            Saves the prediction accuracy in binary format
        '''
        score_list = []
        y_pred = self.predict(X_test)
        for idx in range(len(X_test)):
            # Fix: Remove [0] from y_test.values[idx]
            if y_pred[idx] == y_test.values[idx]:
                score_list.append(1)
            else:
                score_list.append(0)
        return np.sum(score_list) / len(score_list)


    def redistribute_error_rate(self, X_train, y_train):
        '''Function redistribute_error_rate(): redistribute error rate for the current batch
        <Parameters>
        @X_train: DataFrame
        @y_train: Series
        '''
        # Calculate the error rate based on current batch predictions
        ErrorRate = 1.0 - self.score(X_train, y_train)
        y_pred = self.predict(X_train)

        # Update error_distribution based on correct and incorrect predictions
        for idx in range(len(X_train)):
            if y_pred[idx] == y_train.values[idx]:
                self.error_distribution[idx] = ErrorRate
            else:
                self.error_distribution[idx] = 1

    def revoting(self, X_train, y_train):
        '''Function revoting(): update the voting weight based on error rate
        <Parameters>
        @X_train: DataFrame
        @y_train: Series
        '''
        ##### Step 5-1. Compute Error-based Weight #####
        t = len(self.models)
        self.bkts.append([])
        for k in range(1, t + 1):
            clf = self.models[k - 1]
            ekt = 0
            y_pred = clf.predict(X_train)

            # Calculate error rate for each sample in the current batch
            for idx in range(len(X_train)):
                if y_pred[idx] != y_train.values[idx]:
                    ekt += self.error_distribution[idx]

            ekt /= np.sum(self.error_distribution)
            bkt = 0.5 / (1 - 0.5) if ekt > 0.5 else ekt / (1 - ekt)
            self.bkts[k - 1].append(bkt)

        ##### Step 5-2. Compute the Time-based Weight #####
        curr_wkt_list = []
        self.wkts.append([])
        for k in range(1, t + 1):
            wkt = 1.0 / (1.0 + np.exp(-self.slope * (t - k - self.crossing_point)))
            curr_wkt_list.append(wkt)

        for k in range(1, t + 1):
            wkt = curr_wkt_list[k - 1]
            wkt /= (np.sum(self.wkts[k - 1]) + wkt) if self.wkts[k - 1] else wkt
            self.wkts[k - 1].append(wkt)

        ##### Step 6. Calculate the voting weight #####
        voting_weight_list = []
        for k in range(1, t + 1):
            TimeAndErrorWeight = np.sum(dot(self.bkts[k - 1], self.wkts[k - 1])) + 5e-2
            voting_weight_list.append(np.log(1 / TimeAndErrorWeight))
        self.voting_weights = voting_weight_list


In [3]:
import pandas as pd
from tensorflow.keras.datasets import mnist

# Load MNIST dataset
(x_train_full, y_train_full), (x_test_full, y_test_full) = mnist.load_data()

# Normalize and flatten the data
x_train_full = x_train_full / 255.0
x_test_full = x_test_full / 255.0
x_train_full = x_train_full.reshape(-1, 28 * 28)
x_test_full = x_test_full.reshape(-1, 28 * 28)

# Filter for "low" classes (0-7) for initial training
low_class_indices_train = np.where(y_train_full < 8)[0]
x_train_low = x_train_full[low_class_indices_train]
y_train_low = y_train_full[low_class_indices_train]

# Convert training data to DataFrame format to match Learn++.NSE expectations
df = pd.DataFrame(x_train_low)
df['Label'] = y_train_low  # Append labels as the last column

# Define the fixed test set for "low" classes (0-7)
low_class_indices_test = np.where(y_test_full < 8)[0]
x_test_low = x_test_full[low_class_indices_test]
y_test_low = y_test_full[low_class_indices_test]

# Create DataFrames for the test set
x_test_low_df = pd.DataFrame(x_test_low)
y_test_low_df = pd.DataFrame(y_test_low, columns=["Label"])  # Ensure y_test_low_df has a column name


Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
[1m11490434/11490434[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


In [4]:

############# Training Learn++.NSE #############
# record
record = []

# number of run time
RunTime = 5 # default=10

# count the time length
time_length = len(df) # df is your dataframe
batch = int((1/RunTime)*time_length) # take 1/RunTime of all dataset

# set a model
LearnPPNSE = LearnNSE()


In [5]:
def get_next_dataset(df, batch, k, target_name):
    # Calculate the starting and ending indices for the batch
    start_idx = (k - 1) * batch
    end_idx = start_idx + batch if k * batch < len(df) else len(df)  # Ensure last batch uses all remaining data

    # Slice the DataFrame to get the batch
    batch_df = df.iloc[start_idx:end_idx]
    X_train = batch_df.drop(columns=[target_name])  # Features (all columns except the label)
    y_train = batch_df[target_name]  # Labels (only the label column)

    return X_train, y_train


In [None]:
for k in range(1, 2):
    # Get the dataset for the current batch
    X_train, y_train = get_next_dataset(df=df, batch=batch, k=k, target_name='Label')

    # Training the model
    if k == 1:
        # Initial training
        LearnPPNSE.fit(X_train, y_train)
        LearnPPNSE.revoting(X_train, y_train)
    else:
        # Rebuild the error distribution and train on new data batch
        LearnPPNSE.redistribute_error_rate(X_train, y_train)
        LearnPPNSE.fit(X_train, y_train)
        LearnPPNSE.revoting(X_train, y_train)

    # Evaluate and record the score on the fixed test set
    score_B = LearnPPNSE.score(x_test_low_df, y_test_low_df)
    record.append(round(score_B, 3))
    print(f"Iteration {k}, Test Accuracy on 'low' classes (0-7): {score_B * 100:.2f}%")


Iteration 1, Test Accuracy on 'low' classes (0-7): 96.63%
Iteration 2, Test Accuracy on 'low' classes (0-7): 96.66%
Iteration 3, Test Accuracy on 'low' classes (0-7): 96.93%


In [None]:
# Filter for "high" classes (8-9) for training
high_class_indices_train = np.where(y_train_full >= 8)[0]
x_train_high = x_train_full[high_class_indices_train]
y_train_high = y_train_full[high_class_indices_train]

# Convert "high" training data to DataFrame format
df_high = pd.DataFrame(x_train_high)
df_high['Label'] = y_train_high  # Append labels as the last column

# Define the fixed test set for "high" classes (8-9)
high_class_indices_test = np.where(y_test_full >= 8)[0]
x_test_high = x_test_full[high_class_indices_test]
y_test_high = y_test_full[high_class_indices_test]

# Create DataFrames for the test set of high classes
x_test_high_df = pd.DataFrame(x_test_high)
y_test_high_df = pd.DataFrame(y_test_high, columns=["Label"])  # Ensure y_test_high_df has a column name


In [None]:
num_high_samples = len(df_high)
RunTime_high = min(RunTime, num_high_samples)  # Ensure RunTime isn't larger than the number of samples
batch_high = max(1, num_high_samples // RunTime_high)  # Ensure each batch has at least one sample

In [None]:
# Training on the "high" classes (8-9) with adjusted batch size and RunTime
for k in range(1, RunTime_high + 1):
    # Get the dataset for the current batch from the high-class data
    X_train, y_train = get_next_dataset(df=df_high, batch=batch_high, k=k, target_name='Label')

    # Skip empty batches
    if X_train.empty or y_train.empty:
        print(f"Skipping empty batch at iteration {k}")
        continue

    # Train the model on "high" classes
    if k == 1:
        # Initial training on high classes
        LearnPPNSE.fit(X_train, y_train)
        LearnPPNSE.revoting(X_train, y_train)
    else:
        # Redistribute the error rate, train on new data batch, and recompute weights
        LearnPPNSE.redistribute_error_rate(X_train, y_train)
        LearnPPNSE.fit(X_train, y_train)
        LearnPPNSE.revoting(X_train, y_train)


In [None]:

# Evaluate the model on "low", "high", and combined test sets
def evaluate_model(model, x_test_df, y_test_df, description="Test"):
    accuracy = model.score(x_test_df, y_test_df)
    loss = 1.0 - accuracy  # Simplified loss as (1 - accuracy) for demonstration
    print(f"{description} Accuracy: {accuracy * 100:.2f}%, Loss: {loss:.3f}")

# Evaluate on the fixed "high" classes test set
evaluate_model(LearnPPNSE, x_test_high_df, y_test_high_df,
               description="High classes (8-9) Accuracy: 96.12")

# Evaluate on the fixed "low" classes test set
evaluate_model(LearnPPNSE, x_test_low_df, y_test_low_df, description="Low classes (0-7)")


# # Full test set for all classes 0-9
# x_test_full_df = pd.DataFrame(x_test_full.reshape(-1, 28 * 28))
# y_test_full_df = pd.DataFrame(y_test_full, columns=["Label"])
# evaluate_model(LearnPPNSE, x_test_full_df, y_ted]st_full_df, description="All classes (0-9)")