In [1]:
# import libraries
import random
import numpy as np
import torch
import torch.nn as nn
import torch.utils.data
import torch.optim as optim
from data_preprocessing import import_data
from data_preprocessing import split_data
from data_preprocessing import CreateDataset
from sklearn.metrics import confusion_matrix

In [2]:
class CasperNet(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(CasperNet, self).__init__()
        self.total_neurons = input_dim + output_dim
        self.output_dim = output_dim
        self.hidden_dim = input_dim # dimension of the hidden layer (dimension of the cascased input)
        self.output_layer = nn.Linear(input_dim, output_dim) # start with minimal network
        self.hidden_layers = nn.ModuleList() # maintain a list of hidden layer
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        
        # loop through all hidden layer, cascade to inputs
        for layer in self.hidden_layers:
            hidden_output = self.relu(layer(x)) # output of a hidden unit
            x = torch.cat((x, hidden_output), dim=1)  # cascade the output to the previous inputs
        
        return self.sigmoid(self.output_layer(x))

    
    def add_neuron(self):
        new_neuron = nn.Linear(self.hidden_dim, 1)
        self.hidden_dim += 1 # update hidden layer dimension
        self.total_neurons += 1
        self.hidden_layers.append(new_neuron) # add the candidate to the hidden unit list
        
        
        # Preserve old weights and biases
        old_weights = self.output_layer.weight.data
        old_biases = self.output_layer.bias.data
    
        # Create new output layer
        self.output_layer = nn.Linear(self.hidden_dim, self.output_dim)
    
        # Assign old weights and biases back
        self.output_layer.weight.data[:, :-1] = old_weights
        self.output_layer.bias.data = old_biases
        
        # Initialize the new weights (last column) - using xavier initialization as an example
        nn.init.xavier_uniform_(self.output_layer.weight.data[:, -1].unsqueeze(1))

In [3]:
def train_network(model, train_data, optimiser, min_epoch, threshold, P):
    criterion = nn.CrossEntropyLoss()
    epoch = 1
    prev_train_loss = float('inf')
    train_loss = 0
    
    threshold_P = 0 # how many times does the model fall below the threshold
    threshold_P_max = 15 + model.total_neurons * P # stop training if threshold_P exceed this value
    
    while True:
        correct = 0 # total correct predictions      
        
        train_input = train_data.iloc[:, 1:]
        train_target = train_data.iloc[:, 0]
        inputs = torch.Tensor(train_input.values).float()
        labels = torch.Tensor(train_target.values).long()
        
        optimiser.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimiser.step()
        train_loss = loss.item()
            
        # calculate accuracy
        _, predicted = torch.max(outputs, 1)
        correct += sum(predicted.data.numpy() == labels.data.numpy())
        
        
        if prev_train_loss != float('inf'):
            percent = abs((prev_train_loss - train_loss) / prev_train_loss)
        else:
            percent = 1
        
        # print loss and accuracy
        if (epoch % 50) == 0 or epoch - 1 == 0:
            accuracy = correct / len(train_data) * 100
            print(f'Epoch {epoch}, Loss: {train_loss:.4f}, Accuracy: {accuracy:.4f} %')
        
        
        if percent < threshold:
            threshold_P += 1
            
        
        # print(percent, prev_train_loss, train_loss)
        # print(threshold_P, threshold_P_max)
        
        # if epoch >= min_epoch:
        #     break 
        
        if threshold_P >= threshold_P_max:
            break                          
        
        prev_train_loss = train_loss
        epoch += 1

    print(f'Training stop at epoch {epoch} with loss = {train_loss:.4f}')
    return train_loss


def print_params(model):
    for p in model.parameters():
        is_frozen = "Frozen " if not p.requires_grad else "Trainable "
        print(f'{is_frozen} {p.data} {p.data.shape}')
        print()

In [4]:
def train(model, train_loader, update_optimiser, min_iter, min_epoch, threshold = 0.0001, P = 1):
    
    iteration = 1
    
    # keep adding neuron to the network
    while(True):
        # train the network
        optimiser = update_optimiser(model)
        train_network(model, train_data, optimiser, min_epoch, threshold, P)
        
        if iteration >= min_iter:
            break
        iteration += 1
        
        # add an neuron
        model.add_neuron()
        # print_params(model)

In [5]:
# test the model (modified from lab 2)
def test(model, train_data, test_data):
    model.eval()
    
    # test on train set
    train_input = train_data.iloc[:, 1:]
    train_target = train_data.iloc[:, 0]
    inputs = torch.Tensor(train_input.values).float()
    targets = torch.Tensor(train_target.values).long()
    outputs = model(inputs)
    _, predicted = torch.max(outputs, 1)
    print("Confusion matrix for training:")
    print(confusion_matrix(targets.data, predicted.cpu().long().data))

    # test on test set
    test_input = test_data.iloc[:, 1:]
    test_target = test_data.iloc[:, 0]
    inputs = torch.Tensor(test_input.values).float()
    targets = torch.Tensor(test_target.values).long()
    outputs = model(inputs)
    _, predicted = torch.max(outputs, 1)
    print("Confusion matrix for testing:")
    print(confusion_matrix(targets.data, predicted.cpu().long().data))
    
    # print test accuracy
    total = predicted.size(0)
    correct = predicted.cpu().data.numpy() == targets.data.numpy()
    print('Testing Accuracy: %.2f %%' % (100 * sum(correct)/total))

In [6]:
class CustomRprop(optim.Rprop):
    def __init__(self, params, lr=1e-2, etas=(0.5, 1.2), step_sizes=(1e-6, 50), D=0.01, T=1, *args, **kwargs):
        super(CustomRprop, self).__init__(params, lr, etas, step_sizes, *args, **kwargs)
        
        # Additional parameters for weight decay
        self.D = D
        self.T = T
        self.H_epoch = 0  # Initialize epoch count
        
    def step(self, closure=None):
        # Increment epoch count
        self.H_epoch += 1
        
        loss = None
        if closure is not None:
            with torch.enable_grad():
                loss = closure()

        for group in self.param_groups:
            for p in group['params']:
                if p.grad is None:
                    continue
                
                grad = p.grad.data
                state = self.state[p]
                
                # Your custom weight decay term
                weight_decay_term = -self.D * torch.sign(p.data) * p.data**2 * (2**(-self.T * self.H_epoch))
                
                # Apply the weight decay to the gradient
                grad.add_(weight_decay_term)
                
                # Rest of the Rprop logic remains the same...

        # Call the parent's step method to apply the modified gradient
        super(CustomRprop, self).step(closure)
        
        return loss

In [7]:
def get_optimiser_parameters(model, lr_l1, lr_l2, lr_l3):
    param_l1 = []
    param_l3 = []
    
    param_l2 = [list(model.output_layer.parameters())[0]]
    param_l3.append(list(model.output_layer.parameters())[1]) # bias of output neurons is L3
    
    if len(model.hidden_layers) != 0:
        param_l1 = [list(model.hidden_layers[-1].parameters())[0]] # weights of the new hidden neuron is L1
        param_l3.extend(list(model.hidden_layers[:-1].parameters())) # weights and bias of the other hidden neurons are L3
        param_l3.append(list(model.hidden_layers[-1].parameters())[1]) # bias of the new hidden neuron is L3
        

    
    # Create parameter groups
    parameters = [
        {"params": param_l1, "lr": lr_l1},
        {"params": param_l2, "lr": lr_l2},
        {"params": param_l3, "lr": lr_l3},
    ]
    
    return parameters

In [10]:
# make results determinstic
seed = 4660
if seed != None:
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# Define hyperparameter
input_size = None
num_classes = None
num_epochs = 500
batch_size = 10
num_candidates = 5
max_iter = 4

lr_1 = 0.2
lr_2 = 0.005
lr_3 = 0.001

# import data
data, num_classes, input_size = import_data()

# randomly split data into training set (80%) and testing set (20%)
train_data, test_data = split_data(data, seed = None)

# initialise network
casper_net = CasperNet(input_size, num_classes)


# initialise optimiser (since network structure is changing, we need to update our optimiser frequently)
def update_optimiser(model):
    optimiser_parameters = get_optimiser_parameters(model, lr_1, lr_2, lr_3)
    return CustomRprop(optimiser_parameters)


# train the model
# train(simple_nn, train_loader, num_epochs, optimiser)
train(casper_net, train_data, update_optimiser, max_iter, num_epochs)

# test the model
test(casper_net, train_data, test_data)

Epoch 1, Loss: 1.1047, Accuracy: 33.6538 %
Epoch 50, Loss: 0.8593, Accuracy: 58.1731 %
Epoch 100, Loss: 0.8327, Accuracy: 58.6538 %
Epoch 150, Loss: 0.8244, Accuracy: 57.6923 %
Epoch 200, Loss: 0.8197, Accuracy: 55.7692 %
Training stop at epoch 227 with loss = 0.8181
Epoch 1, Loss: 0.8180, Accuracy: 55.7692 %
Epoch 50, Loss: 0.8142, Accuracy: 57.2115 %
Training stop at epoch 73 with loss = 0.8131
Epoch 1, Loss: 0.8132, Accuracy: 56.2500 %
Epoch 50, Loss: 0.8092, Accuracy: 56.7308 %
Training stop at epoch 71 with loss = 0.8082
Epoch 1, Loss: 0.8081, Accuracy: 56.2500 %
Epoch 50, Loss: 0.8040, Accuracy: 55.7692 %
Training stop at epoch 81 with loss = 0.8019
Confusion matrix for training:
[[47 13  5]
 [24 38 12]
 [ 9 28 32]]
Confusion matrix for testing:
[[12  2  3]
 [ 1  9  8]
 [ 2 10  6]]
Testing Accuracy: 50.94 %
