In [19]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [219]:
import sys
import os
sys.path.append(os.path.abspath(".."))

from functools import reduce
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from customLayer import CustomLayer

In [220]:
def xor(x):
    return reduce(lambda a, b: a ^ b, x)

batch_size = 5000
in_features = 10
X = np.random.randint(0, 2, size=(batch_size, in_features))
y = np.array([xor(x) for x in X])

X = torch.tensor(X).float()
y = torch.tensor(y).long()

In [221]:
X_train, y_train = X[:4000], y[:4000]
X_val, y_val = X[4000:], y[4000:]

In [242]:
first_layer_out_features = 15
first_layer_out_features_allowed = 8
second_layer_out_features = 20
second_layer_out_features_allowed = 10

class SimpleSparseModel(nn.Module):
    def __init__(self, in_features = in_features, out_features = 2):
        super().__init__()
        self._layer1 = CustomLayer(in_features, 
                                        first_layer_out_features, 
                                        out_features_allowed=first_layer_out_features_allowed)
        self._layer2 = CustomLayer(first_layer_out_features, 
                                        second_layer_out_features, 
                                        out_features_allowed=second_layer_out_features_allowed)
        self._layer3 = CustomLayer(second_layer_out_features, out_features)
        nn.Linear(in_features, out_features).forward

    def forward(self, x, return_acts = False):
        x_l1 = self._layer1(x)
        x_l2 = self._layer2(x_l1)
        out = self._layer3(x_l2)
        if return_acts:
            return out, {"l1": x_l1, "l2": x_l2, "l3": out}
        return out
    
class SimpleDenseModel(nn.Module):
    def __init__(self, in_features = in_features, out_features = 2):
        super().__init__()
        self._layer1 = CustomLayer(in_features, first_layer_out_features)
        self._layer2 = CustomLayer(first_layer_out_features, second_layer_out_features)
        self._layer3 = CustomLayer(second_layer_out_features, out_features)
        nn.Linear(in_features, out_features).forward
        self.net = nn.Sequential(
            self._layer1,
            self._layer2,
            self._layer3
        )

    def forward(self, x):
        return self.net(x)

In [None]:

def orthogonality_loss(h):
    """
    h: (batch_size, hidden_dim) activations
    returns scalar regularization loss
    """
    h_norm = F.normalize(h, p=2, dim=1)  # shape: (batch_size, hidden_dim)

    G = torch.matmul(h_norm, h_norm.T)  # shape: (batch_size, batch_size)

    I = torch.eye(G.size(0))
    off_diag = G * (1 - I)

    loss = (off_diag**2).mean()
    return loss

def train(model, X, y, criterion, optimizer, epochs=100, print_msg=False, ortho_lambda = None):
    # Training loop
    for epoch in range(epochs):
        optimizer.zero_grad()
        if ortho_lambda:
            outputs, activations = model(X, return_acts = True)
        else:
            outputs = model(X)
        
        loss = criterion(outputs, y)
        
        if ortho_lambda:
            ortho_loss = 0
            ortho_loss += orthogonality_loss(activations["l3"])
            loss = loss + ortho_lambda * ortho_loss

        loss.backward()
        optimizer.step()

        if print_msg and (epoch+1) % 50 == 0:
            print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}")

def get_accuracy(model, X, y):
    with torch.no_grad():
        op = model(X)
        accuracy = (torch.argmax(op, dim=1) == y).float().mean().item()
    return accuracy

def train_and_evaluate(model, name = "Model", epochs=500, ortho_lambda = None):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.01)
    train(model, X_train, y_train, criterion, optimizer, epochs=epochs, ortho_lambda=ortho_lambda)
    print(f"{name} Train Accuracy: {get_accuracy(model, X_train, y_train)*100:.4f}, Val Accuracy: {get_accuracy(model, X_val, y_val)*100:.4f}")

In [259]:
simpleSparseModel = SimpleSparseModel()
train_and_evaluate(simpleSparseModel, epochs=200, ortho_lambda=0.01)

Model Train Accuracy: 89.0250, Val Accuracy: 90.6000


In [260]:
simpleSparseModel = SimpleSparseModel()
train_and_evaluate(simpleSparseModel, epochs=200)

Model Train Accuracy: 94.7250, Val Accuracy: 94.4000


In [245]:
for i in range(5):
    simpleSparseModel = SimpleSparseModel()
    train_and_evaluate(simpleSparseModel, epochs=200, name="Sparse Model")

    simpleDenseModel = SimpleDenseModel()
    train_and_evaluate(simpleDenseModel, epochs=200, name="Dense Model")

Sparse Model Train Accuracy: 85.7000, Val Accuracy: 86.3000
Dense Model Train Accuracy: 91.4000, Val Accuracy: 89.3000
Sparse Model Train Accuracy: 94.0500, Val Accuracy: 91.5000
Dense Model Train Accuracy: 80.3000, Val Accuracy: 79.5000
Sparse Model Train Accuracy: 93.3500, Val Accuracy: 92.9000
Dense Model Train Accuracy: 92.3000, Val Accuracy: 89.2000
Sparse Model Train Accuracy: 98.7250, Val Accuracy: 98.1000
Dense Model Train Accuracy: 91.2750, Val Accuracy: 89.8000
Sparse Model Train Accuracy: 95.9000, Val Accuracy: 94.9000
Dense Model Train Accuracy: 95.1000, Val Accuracy: 94.6000


In [None]:
model = SimpleSparseModel()
train_and_evaluate(model)

model.eval()
random_input_index = 10
random_input = X_val[random_input_index]
random_output = y_val[random_input_index]
# Add batch dimension
input_batch = random_input.unsqueeze(0)
model_layer_1_activations = model._layer1.forward(input_batch)
model_layer_2_activations = model._layer2.forward(model_layer_1_activations)
model_layer_3_activations = model._layer3.forward(model_layer_2_activations)
print("Input: ", random_input)
print("True Output: ", random_output)
print("Model Output layer 1: ", model_layer_1_activations.squeeze(0))
print("Model Output layer 2: ", model_layer_2_activations.squeeze(0))
print("Model Output layer 3: ", model_layer_3_activations.squeeze(0))

model.train()

Input:  tensor([0., 1., 0., 1., 0., 1., 1., 0., 0., 0.])
True Output:  tensor(0)
Model Output layer 1:  tensor([1.6230, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 1.4005, 1.6835, 0.2312,
        0.0000, 0.0000, 1.3362, 0.0000, 0.0000, 0.0000],
       grad_fn=<SqueezeBackward1>)
Model Output layer 2:  tensor([0.0000, 2.5214, 0.0000, 0.0000, 2.7776, 2.7255, 3.2363, 0.0000, 0.0000,
        2.2269], grad_fn=<SqueezeBackward1>)
Model Output layer 3:  tensor([6.7198, 0.4847], grad_fn=<SqueezeBackward1>)


SimpleSparseModel(
  (_layer1): SparseCodedLayer(
    (activation): LeakyReLU(negative_slope=0.1)
  )
  (_layer2): SparseCodedLayer(
    (activation): LeakyReLU(negative_slope=0.1)
  )
  (_layer3): SparseCodedLayer(
    (activation): LeakyReLU(negative_slope=0.1)
  )
  (net): Sequential(
    (0): SparseCodedLayer(
      (activation): LeakyReLU(negative_slope=0.1)
    )
    (1): SparseCodedLayer(
      (activation): LeakyReLU(negative_slope=0.1)
    )
    (2): SparseCodedLayer(
      (activation): LeakyReLU(negative_slope=0.1)
    )
  )
)