In [1]:

import torch
import torch.nn as nn
import torchvision
from torchvision import transforms

from torch.nn import functional as F

import time
import math


In [2]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda', index=0)

In [3]:
def create_data_loader(batch_size):
  # Split the provided CIFAR-10 train set (50,000 images) into your train and val sets
  # Use the first 40,000 images as your train set and the remaining 10,000 images as val set
  # Use all 10,000 images in the provided test set as your test set

  transform = transforms.Compose([
    transforms.Resize((227, 227)),  # Resize the images to a larger size
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2470, 0.2435, 0.2616)), 
  ])

  # load cifar
  train_set = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
  test_set = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

  # split train set into train and val
  train_set, val_set = torch.utils.data.random_split(train_set, [40000, 10000])

  # create data loaders
  train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True,num_workers=4)
  val_loader = torch.utils.data.DataLoader(val_set, batch_size=batch_size, shuffle=True,num_workers=4)
  test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=True,num_workers=4)

  train_loader = [(inputs.to(device), labels.to(device)) for inputs, labels in train_loader]
  val_loader = [(inputs.to(device), labels.to(device)) for inputs, labels in val_loader]
  test_loader = [(inputs.to(device), labels.to(device)) for inputs, labels in test_loader]


  return train_loader, val_loader, test_loader

In [4]:

class QuasiPolySynapse(nn.Module):
    def __init__(self):
        super().__init__() 
        # 50% chance of being 1 and 50% change of being 2
        power_initial_value = 1.0 if torch.rand(1) < 0.5 else 1.5
        self.main = nn.ParameterList([torch.randn(1),torch.tensor([power_initial_value],requires_grad=False)])
        # self.main = nn.ParameterList([torch.randn(1)])
        self.terms = nn.ParameterList()

        
    def forward(self, x):
        if  len(self.terms) < math.floor(self.main[1]) - 1:
            self.add_term()
        y = self.main[0] * x ** self.main[1]
        for idx,coeff in enumerate(self.terms):
             y = y + coeff * x**(idx+1)
        return y

    def add_term(self):
        print("Adding term")
        # Add new term, init coeff to 1. non-trainable exponent to floor of main exponent
        self.terms.append(nn.Parameter(torch.tensor([1.])))

    def __str__(self):
        s = ""
        s += f"Main: {self.main[0]}x^{self.main[1]} + "
        for idx,term in enumerate(self.terms):
            s += f"{term}x^{idx+1} + "
        return s[:-3]



class QuasiPolyLayer(nn.Module):
  def __init__(self, in_features, out_features, product=False):
    super().__init__()
    self.out_features = out_features
    self.in_features = in_features
    # a 2d module list of quasipoly synapses sizes in_features x out_features
    self.synapses = nn.ModuleList([nn.ModuleList([QuasiPolySynapse() for _ in range(self.in_features)]) for _ in range(self.out_features)])
    self.bias = nn.Parameter(torch.randn(out_features))
    self.product = product
  
  def forward(self, x): # x is a batch of inputs

    y = torch.zeros(x.shape[0], self.out_features, dtype=torch.float32)

    # move to device
    y = y.to(device)

    # print if synapses are on device
    # for i in range(self.out_features):
    #     for j in range(self.in_features):
    #         print(f"synapse {i} {j} on device {self.synapses[i][j].to(device)}")

    # print(f"input of quasipoly layer {x}")
    
    # apply synapses to inputs
    for i in range(self.out_features):
        if self.product:
            y[:, i] = 1
        else:
            y[:, i] = 0
    
        for j in range(self.in_features):
            if self.product:
                y[:, i] = y[:, i] * self.synapses[i][j](x[:, j])
            else:
                y[:, i] = y[:, i] + self.synapses[i][j](x[:, j])

    # add bias
    y = y + self.bias

    return y
  


class AddEpsilon(nn.Module):
    def __init__(self, epsilon=1e-10): # 1e-10 is smallest possible float in pytorch
        super().__init__()
        self.epsilon = epsilon

    def forward(self, x):
        return x + self.epsilon

    




In [5]:
def init_train_var(model):
  
  criterion = nn.CrossEntropyLoss()
  optimizer = torch.optim.SGD(model.parameters(), lr=0.005, momentum=0.2,weight_decay=0.0001) # set momentum to 0.2 after trial and error
  # optimizer = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.00001)
  # optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

  return criterion, optimizer

In [6]:
def test(model_path, test_loader):
  # use the provided test loader
  # return the test accuracy
  model = torch.load(model_path)
  model.eval()
  with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
      outputs = model(images)
      _, predicted = torch.max(outputs, 1)
      total += labels.size(0)
      correct += (predicted == labels).sum().item()
    test_acc = correct / total
  return test_acc


In [7]:
BATCH_SIZE = 64
train_loader, val_loader, test_loader = create_data_loader(batch_size=BATCH_SIZE)

# use the alexnet model
model = torchvision.models.alexnet(pretrained=False)

# # replace the fully connected layer with quasipoly layer
model.classifier[6] = QuasiPolyLayer(in_features=4096, out_features=10)

model = model.to(device)
# model = nn.DataParallel(model)
criterion, optimizer = init_train_var(model)


Files already downloaded and verified
Files already downloaded and verified




In [8]:
# print model architecture
print(model)

AlexNet(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(11, 11), stride=(4, 4), padding=(2, 2))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(64, 192, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (4): ReLU(inplace=True)
    (5): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(192, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU(inplace=True)
    (8): Conv2d(384, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): ReLU(inplace=True)
    (10): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (avgpool): AdaptiveAvgPool2d(output_size=(6, 6))
  (classifier): Sequential(
    (0): Dropout(p=0.5, inplace=False)
    (1): Linear(in_features=9216, out_features=4096, bias=True)
 

In [9]:
EPOCH_COUNT = 100

In [10]:
%%time

for epoch in range(EPOCH_COUNT):
    model.train()
    train_loss = 0
    train_correct = 0
    # for each batch
    for i, (inputs, labels) in enumerate(train_loader):
        # count the number of weights with very small magnitute < 1e-3
        # nans_count = 0
        # for param in model.parameters():
        #     if param.grad is not None:
        #         nans_count += torch.sum(torch.isnan(param.grad)).item()
        
        # print(f"nans count: {nans_count}")
        
        # zero the parameter gradients
        optimizer.zero_grad()
        # forward + backward + optimize
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        # print all gradients of the first synapse
        # print(f"synapse grads : {model.classifier[-1].synapses[0][0].main[0].grad}")
        # print(f"weights of first layer {model.classifier[-1].synapses[0][0]}")
        train_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        train_correct += (predicted == labels).sum().item()


    print(f'Epoch {epoch+1}/{EPOCH_COUNT}, Train Loss: {train_loss:.4f}, Train Accuracy: {train_correct/40000:.4f}')
    # validate
    model.eval()
    val_loss = 0
    val_correct = 0
    with torch.no_grad():
        for i, (inputs, labels) in enumerate(val_loader):
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            val_correct += (predicted == labels).sum().item()
    val_acc = val_correct/10000
    print(f'Epoch {epoch+1}/{EPOCH_COUNT}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_acc:.4f}')



KeyboardInterrupt: 

In [36]:
# print weights of quasi poly layer

quasi_poly_layer = model.module.classifier[-1]
for i in range(quasi_poly_layer.out_features):
    for j in range(quasi_poly_layer.in_features):
        synapse = quasi_poly_layer.synapses[i][j]
        print(f"Weights of synapse ({i}, {j}): {synapse}")



Weights of synapse (0, 0): Main: Parameter containing:
tensor([3.5350], requires_grad=True)x^Parameter containing:
tensor([0.9736], requires_grad=True)
Weights of synapse (0, 1): Main: Parameter containing:
tensor([-0.3921], requires_grad=True)x^Parameter containing:
tensor([0.9941], requires_grad=True)
Weights of synapse (0, 2): Main: Parameter containing:
tensor([0.7600], requires_grad=True)x^Parameter containing:
tensor([1.4931], requires_grad=True)
Weights of synapse (0, 3): Main: Parameter containing:
tensor([2.3518], requires_grad=True)x^Parameter containing:
tensor([0.9796], requires_grad=True)
Weights of synapse (0, 4): Main: Parameter containing:
tensor([1.5484], requires_grad=True)x^Parameter containing:
tensor([1.4898], requires_grad=True)
Weights of synapse (0, 5): Main: Parameter containing:
tensor([-1.9674], requires_grad=True)x^Parameter containing:
tensor([1.4968], requires_grad=True)
Weights of synapse (0, 6): Main: Parameter containing:
tensor([1.6850], requires_grad=

In [22]:
# TODO: report test performance of best model

test_acc = test('best_model.pth', test_loader)
print(test_acc)


FileNotFoundError: [Errno 2] No such file or directory: 'best_model.pth'