In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
import PIL
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torch.optim as optim

# from Modules import ConvBN, PoolConvBN, PoolLinearBN, SharpCosSim2d, SharpCosSimLinear, LReLU

from ConvBN import ConvBN as ConvBN_BiasTrick
from LinearBN import LinearBN

In [2]:
class LReLU(nn.Module):
    def __init__(self):
        super(LReLU, self).__init__()
        self.alpha = nn.Parameter(torch.tensor(5.0)) 
    def forward(self, x):
        return torch.nn.functional.relu(self.alpha*x)

In [3]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,)) # Normalize with mean 0.5 and std 0.5
])

batch_size= 2000
num_workers=2
pin_memory=True

dataset = torchvision.datasets.MNIST(root='../', train=True, download=True, transform=transform)
train_set, val_set = torch.utils.data.random_split(dataset, [58000, 2000])

train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=pin_memory)
val_loader = torch.utils.data.DataLoader(val_set, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=pin_memory)

test_set = torchvision.datasets.MNIST(root='../', train=False, download=True, transform=transform)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=pin_memory)


In [4]:
if torch.cuda.is_available():
    print("CUDA is available")
else:
    print("CUDA is not available")

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

CUDA is available


In [5]:
class Network(nn.Module):
    def __init__(self):
        super(Network, self).__init__()

        self.conv1_out = 32
        self.conv1_size = 5
        self.conv1_padding = 2


        self.conv2_out = 64
        self.conv2_size = 5
        self.conv2_padding = 2

        self.fc1_out = 512
        self.fc2_out = 10

        self.q = 1e-6
        self.bias_trick_par = nn.Parameter(torch.tensor(0.00005))

        # First Convolutional Block

        self.block1 = ConvBN_BiasTrick(in_channels=1, out_channels=self.conv1_out, kernel_size=self.conv1_size, padding=self.conv1_padding, std = .05, bias_par_init=0.001)
        self.block2 = ConvBN_BiasTrick(in_channels=self.conv1_out, out_channels=self.conv2_out, kernel_size=self.conv2_size, padding=self.conv2_padding, std = .05, bias_par_init=0.01)

        # Second Convolutional Block
       
        self.block3 = LinearBN(in_features = self.conv2_out * (28//2//2) * (28//2//2), out_features=self.fc1_out, std=.3)
        
        
        # torch.manual_seed(0)
        self.w2 = nn.Parameter(torch.randn(self.fc1_out, self.fc2_out))
        nn.init.normal_(self.w2, mean=0.0, std=.6)

        self.dropout = nn.Dropout(0.5)

        self.relu = LReLU()




    def forward(self, x):
        
        x = F.max_pool2d(self.relu(self.block1(x)), (2,2), padding=0)
        x = F.max_pool2d(self.relu(self.block2(x)), (2,2), padding=0)
        
        x = x.view(x.size(0), -1)
        
        x = self.relu(self.block3(x))
        x = self.dropout(x)

        x = x + self.bias_trick_par
        x_norm = x / (x.norm(p=2, dim=1, keepdim=True) + self.q)  # Normalize input x
        w2_norm = self.w2 / (self.w2.norm(p=2, dim=1, keepdim=True) + self.q)  # Normalize weights
        x = torch.matmul(x_norm, w2_norm) # Matrix multiplication 

        # Return raw logits (no softmax here, CrossEntropyLoss handles it)
        return x

    def custom_round(self, n):
        remainder = n % 1000
        base = n - remainder
        if remainder >= 101:
            return base + 1000
        elif remainder <= 100:
            return base
            

    def init_hdc(self, ratio, seed):
        if not isinstance(ratio, (tuple)):
            raise TypeError("ratio must be a tuple of size 3")

        if not isinstance(seed, (tuple)):
            raise TypeError("seed must be a tuple of size 3")
        
        self.block1.init_hdc(ratio = ratio[0], seed = seed[0])
        self.block2.init_hdc(ratio = ratio[1], seed = seed[1])
        self.block3.init_hdc(ratio = ratio[2], seed = seed[2])
                
        n_last = self.w2.size(0)
        nHDC_last = int(self.custom_round(ratio[3] * n_last))
        torch.manual_seed(seed[3])
        self.g = torch.randn(self.w2.size(0), nHDC_last, device=self.w2.device).to(torch.half)
        self.wg = torch.sign(torch.matmul(self.g.t(), self.w2.to(torch.half)))

        print(f'Block1: {self.block1.nHDC}, Block2: {self.block2.nHDC}, Block3: {self.block3.nHDC}, Classification Layer: {nHDC_last}')

    def hdc(self, x):
        x = F.max_pool2d(self.relu(self.block1.hdc(x)), (2,2), padding=0)
        x = F.max_pool2d(self.relu(self.block2.hdc(x)), (2,2), padding=0)

        x = x.view(x.size(0), -1)
        x = self.relu(self.block3.hdc(x))

        x = x + self.bias_trick_par
        x = torch.sign(torch.matmul(x.to(torch.half), self.g))

        return x
        
    def classification_layer(self, x):
        x = x @ self.wg
        return x


In [7]:
from tqdm import tqdm
import time
from torch.nn.parallel import data_parallel

torch.cuda.empty_cache()
model = Network().to(device)
model.load_state_dict(torch.load('MNIST_GNet_Training_99.35.pth', weights_only = True))


model.to(torch.half).to(device)
model.eval()

ratio = (12, 1.15/6, 3, 18)
torch.manual_seed(0)
random_seeds = tuple(torch.randint(0, 1000, (1,)).item() for _ in range(4))
model.init_hdc(ratio, random_seeds)

batch_size=1

trainset = torchvision.datasets.MNIST(root='../', train=True, download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=pin_memory)

test_set = torchvision.datasets.MNIST(root='../', train=False, download=True, transform=transform)
testloader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=pin_memory)

correct = 0
total = 0

train_outputs = []
train_labels = []
test_outputs = []
test_labels = []


with torch.no_grad():
    for images, labels in tqdm(trainloader, desc='train hdc'):
        images, labels = images.to(device), labels.to(device)
        output = model.hdc(images.to(torch.half))
        train_outputs.append(output.cpu())
        train_labels.append(labels.cpu())

    for images, labels in tqdm(testloader, desc='test hdc'):
        images = images.to(device).to(torch.half)
        output = model.hdc(images)
        test_outputs.append(output.cpu())
        test_labels.append(labels.cpu())

Block1: 10000, Block2: 10000, Block3: 10000, Classification Layer: 10000


train hdc: 100%|█████████████████████████████████████████████████████████████████| 60000/60000 [55:48<00:00, 17.92it/s]
test hdc: 100%|██████████████████████████████████████████████████████████████████| 10000/10000 [09:25<00:00, 17.69it/s]


In [8]:
from torch.utils.data import DataLoader, TensorDataset

train_outputs = torch.cat(train_outputs, dim=0)
train_labels = torch.cat(train_labels, dim=0)
trainset = TensorDataset(train_outputs, train_labels)
trainloader_hdc = DataLoader(trainset, batch_size=420, shuffle=False)

test_outputs = torch.cat(test_outputs, dim=0)
test_labels = torch.cat(test_labels, dim=0)
testset = TensorDataset(test_outputs, test_labels)
testloader_hdc = DataLoader(testset, batch_size=100, shuffle=False)

In [9]:
class linear(nn.Module):
    def __init__(self, hyperdim, num_classes=10):
        super().__init__()
        self.w = nn.Parameter(torch.Tensor(hyperdim, num_classes))
        nn.init.xavier_normal_(self.w)
        self.hyperdim=hyperdim

    def forward(self, x):
        x = x @ self.w
        x = x * (1 / self.hyperdim**0.5)

        return x        


In [10]:
wg3 = linear(hyperdim=10000, num_classes=10) # Hyperdim should be the same as the classification layer nHDC
wg3_half = wg3.to(torch.half)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(wg3.parameters(), lr = 1e-4)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
wg3 = wg3.to(device).to(torch.float32)
if torch.cuda.device_count() > 1:
    print(f"Using {torch.cuda.device_count()} GPUs")
    wg3 = torch.nn.DataParallel(wg3)
num_epochs = 100
torch.autograd.set_detect_anomaly(True)

for epoch in tqdm(range(num_epochs)):
    correct = 0
    total = 0
    running_loss = 0.0
    wg3.train()
    for hdc_input, labels in trainloader_hdc:
        hdc_input, labels = hdc_input.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = wg3(hdc_input.to(torch.float32))
        if torch.isnan(outputs).any():
            print('here')
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
          
        # with torch.no_grad():
            # wg3.w.data = torch.clamp(wg3.w.data, -1.0, 1.0)
            
    # if epoch % 5 == 0:
wg3.w.data = torch.sign(wg3.w.data)
model.wg.data = torch.sign(wg3.w.data).to(torch.half)

100%|████████████████████████████████████████████████████████████████████████████████| 100/100 [02:05<00:00,  1.26s/it]


In [11]:
def test(model, test_loader):
    # model.load_state_dict(torch.load('saved_model.pth'))
    model = model.cuda()
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.cuda(non_blocking=True), labels.cuda(non_blocking=True)  # Move data to GPU
            output = model(images)
            _, predicted = torch.max(output.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    return 100 * correct / total


print(test(wg3.to(torch.half), testloader_hdc))

99.17
