<a href="https://colab.research.google.com/github/WHU-Peter/COMP6200-Project/blob/main/project_demo_benchmark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch import nn
from torch import optim
from torch.optim import *
from torch.utils.data import DataLoader
from torchvision.datasets import MNIST
import math
import matplotlib.pyplot as plt
import timeit

# fix random seed for reproducibility
seed = 7
torch.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
import numpy as np
np.random.seed(seed)

device = "cuda:0" if torch.cuda.is_available() else "cpu"

In [2]:
# flatten 28*28 images to a 784 vector for each image
transform = transforms.Compose([
    transforms.ToTensor(),  # convert to tensor
    transforms.Lambda(lambda x: x.view(-1))  # flatten into vector
])

trainset = MNIST(".", train=True, download=True, transform=transform)
testset = MNIST(".", train=False, download=True, transform=transform)

# create data loaders
trainloader = DataLoader(trainset, batch_size=128, shuffle=True)
testloader = DataLoader(testset, batch_size=128, shuffle=True)

class_counts = torch.zeros(10, dtype=torch.int32)

for (images, labels) in trainloader:
  for label in labels:
    class_counts[label] += 1

assert class_counts.sum()==60000

  return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)


Baseline model, two linear layers. 

In [3]:
# define baseline model
class BaselineModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(BaselineModel, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size) 
        self.fc2 = nn.Linear(hidden_size, num_classes)  
    
    def forward(self, x):
        out = self.fc1(x)
        out = F.relu(out)
        out = self.fc2(out)
        if not self.training:
            out = F.softmax(out, dim=1)
        return out

In [4]:
# # build the model 
# baseline_model = BaselineModel(784, 784, 10).to(device)

# # define the loss function and the optimiser
# loss_function = nn.CrossEntropyLoss()
# optimiser = optim.Adam(baseline_model.parameters())

# Loss_Baseline_Model = []
# # the epoch loop
# for epoch in range(50):
#     running_loss = 0.0
#     for data in trainloader:
#         # get the inputs
#         inputs, labels = data

#         # zero the parameter gradients
#         optimiser.zero_grad()

#         # forward + loss + backward + optimise (update weights)
#         outputs = baseline_model(inputs.to(device))
#         loss = loss_function(outputs, labels.to(device))
#         loss.backward()
#         optimiser.step()

#         # keep track of the loss this epoch
#         running_loss += loss.item()
#     print("Epoch %d, loss %4.2f" % (epoch, running_loss))
#     Loss_Baseline_Model.append(running_loss)
# print('**** Finished Training ****')

In [5]:
# build the model 
baseline_model = BaselineModel(784, 784, 10).to(device)

# define the loss function and the optimiser
loss_function = nn.CrossEntropyLoss()
optimiser = optim.Adam(baseline_model.parameters())

def baseline_model_train(inputs, labels):
  baseline_model.train()
  # zero the parameter gradients
  optimiser.zero_grad()
  # forward + loss + backward + optimise (update weights)
  outputs = baseline_model(inputs)
  loss = loss_function(outputs, labels)
  loss.backward()
  optimiser.step()

def baseline_model_inference(inputs):
  baseline_model.eval()
  outputs = baseline_model(inputs)

x = 0
y = 0
for data in testloader:
  inputs, labels = data
  x = inputs.to(device)
  y = labels.to(device)
  break

t_baseline_train = timeit.Timer(
    stmt='baseline_model_train(x, y)', 
    setup='from __main__ import baseline_model_train',
    globals={'x': x, 'y' : y}
)

t_baseline_inference = timeit.Timer(
    stmt='baseline_model_inference(x)', 
    setup='from __main__ import baseline_model_inference',
    globals={'x': x}
)

print(f'baseline_model_train(x, y):  {t_baseline_train.timeit(100) / 100 * 1e3:>5.1f} ms')
print(f'baseline_model_inference(x):  {t_baseline_inference.timeit(100) / 100 * 1e3:>5.1f} ms')

baseline_model_train(x, y):   13.0 ms
baseline_model_inference(x):    2.9 ms


learnable temperature parameter

In [6]:
class SoftMaxWithTemperature(nn.Module):
    def __init__(self):
        super(SoftMaxWithTemperature, self).__init__()
        self.temperature = nn.Parameter(torch.tensor(0.0001))
    def forward(self, x):
        # print(self.temperature)
        return F.softmax(x / self.temperature, dim=-1)

In [7]:
def softmax_temperature(logits, temperature=0.00001):
  pro = F.softmax(logits / temperature, dim=-1)
  # pro = torch.matmul(pro, torch.FloatTensor(range(0, pro.shape[1])));
  return pro;
  # return one_hot_code

In [8]:
# define LUTModel
class LUTModelWithLearnableTemperature(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(LUTModelWithLearnableTemperature, self).__init__()
        self.softmax_temperature = SoftMaxWithTemperature();
        self.emb = nn.Embedding(input_size, hidden_size) 
        self.fc2 = nn.Linear(hidden_size, num_classes)  
    
    def forward(self, x):
        # if self.training:
        #   out = self.softmax_temperature(x)
        #   out = out @ self.emb.weight
        # else:
        #   out = self.softmax_temperature(x)
        #   out = mapping_onehot_vector(out).round().long()
        #   out = self.emb(out)
        
        out = self.softmax_temperature(x)
        out = out @ self.emb.weight
        # out = self.emb(out.long())
        # out = self.emb(out)
        out = F.relu(out)
        out = self.fc2(out)
        if not self.training:
            out = F.softmax(out, dim=1)
        return out

In [9]:
# # build the model 
# learnable_lut_model = LUTModelWithLearnableTemperature(784, 784, 10).to(device)

# # define the loss function and the optimiser
# loss_function = nn.CrossEntropyLoss()
# optimiser = optim.Adam(learnable_lut_model.parameters(), lr=1e-2)

# Loss_Learnable_Model = []
# Temperature = []
# # the epoch loop
# for epoch in range(50):
#     running_loss = 0.0
#     for data in trainloader:
#         # get the inputs
#         inputs, labels = data

#         # zero the parameter gradients
#         optimiser.zero_grad()

#         # forward + loss + backward + optimise (update weights)
#         outputs = learnable_lut_model(inputs.to(device))
#         loss = loss_function(outputs, labels.to(device))
#         loss.backward()
#         optimiser.step()
#         # print(optimiser.param_groups[0]['params'][2])
#         # print(np.all(optimiser.param_groups[0]['params'][0].grad.numpy() == 0))
#         # keep track of the loss this epoch
#         running_loss += loss.item()
#         # break
#     print(optimiser.param_groups[0]['params'][0])
#     print("Epoch %d, loss %4.2f" % (epoch, running_loss))
#     Loss_Learnable_Model.append(running_loss)
#     Temperature.append(optimiser.param_groups[0]['params'][0].detach().cpu().numpy())
# print('**** Finished Training ****')

In [10]:
# # Compute the model accuracy on the test set
# class_correct = torch.zeros(10)
# class_total = torch.zeros(10)
# learnable_lut_model.eval()
# for (images, labels) in testloader:

#   for label in labels:
#     class_total[label] += 1
  
#   outputs = learnable_lut_model(images.to(device))
#   outputs = outputs.to("cpu")
#   for i in range(outputs.shape[0]):
#     _, prediction = outputs[i].max(0)
#     if prediction == labels[i]:
#       class_correct[labels[i]] += 1

# for i in range(10):
#     print('Class %d accuracy: %2.2f %%' % (i, 100.0*class_correct[i] / class_total[i]))
# print("accuracy: %2.2f %%" % (100.0*sum(class_correct)/sum(class_total)))

In [11]:
# build the model 
learnable_lut_model = LUTModelWithLearnableTemperature(784, 784, 10).to(device)

# define the loss function and the optimiser
loss_function = nn.CrossEntropyLoss()
optimiser = optim.Adam(learnable_lut_model.parameters(), lr=1e-2)

def learnable_model_train(inputs, labels):
  learnable_lut_model.train()
  # zero the parameter gradients
  optimiser.zero_grad()
  # forward + loss + backward + optimise (update weights)
  outputs = learnable_lut_model(inputs)
  loss = loss_function(outputs, labels)
  loss.backward()
  optimiser.step()

def learnable_model_inference(inputs):
  learnable_lut_model.eval()
  outputs = learnable_lut_model(inputs)

x = 0
y = 0
for data in testloader:
  inputs, labels = data
  x = inputs.to(device)
  y = labels.to(device)
  break

t_learnable_train = timeit.Timer(
    stmt='learnable_model_train(x, y)', 
    setup='from __main__ import learnable_model_train',
    globals={'x': x, 'y' : y}
)

t_learnable_inference = timeit.Timer(
    stmt='learnable_model_inference(x)', 
    setup='from __main__ import learnable_model_inference',
    globals={'x': x}
)

print(f'learnable_model_train(x, y):  {t_learnable_train.timeit(100) / 100 * 1e3:>5.1f} ms')
print(f'learnable_model_inference(x):  {t_learnable_inference.timeit(100) / 100 * 1e3:>5.1f} ms')

learnable_model_train(x, y):   29.8 ms
learnable_model_inference(x):   11.6 ms


In [12]:
class LUTModelWithAnnealTemperature(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(LUTModelWithAnnealTemperature, self).__init__()
        self.emb = nn.Embedding(input_size, hidden_size) 
        self.fc2 = nn.Linear(hidden_size, num_classes)  
    
    def forward(self, x, temperature):
        # out = self.fc1(x)
        # out = F.relu(out)
        if self.training:
          # print(x.shape)
          # print(self.emb.weight.shape)
          out = softmax_temperature(x, temperature)
          out = out @ self.emb.weight
        else:
          out = softmax_temperature(x, temperature)
          # out = mapping_onehot_vector(out).round().long()
          # out = self.emb(out)

          nozero = torch.nonzero(out);
          # print(np.array(nozero).shape[1])
          for i in range(out.shape[0]):
            idx = torch.where(nozero[:,0]==i)[0]
            rows = nozero[idx, 1].long()
            out[i] = torch.mean(self.emb(rows), axis=0)
          
        # out = self.emb(out.long())
        # out = self.emb(out)
        out = F.relu(out)
        out = self.fc2(out)
        if not self.training:
            out = F.softmax(out, dim=1)
        return out

In [13]:
# # build the model 
# Annealing_lut_model = LUTModelWithAnnealTemperature(784, 784, 10).to(device)
# Annealing_lut_model.train()
# # define the loss function and the optimiser
# loss_function = nn.CrossEntropyLoss()
# learning_rate = 1e-2  #Learning rate
# Annealing_lut_optimiser = optim.Adam(Annealing_lut_model.parameters(), lr=learning_rate)
# scheduler = lr_scheduler.MultiStepLR(Annealing_lut_optimiser,milestones=[2500],gamma = 0.2)
# Loss_Annealing_Model = []
# # the epoch loop
# i = 0;
# for epoch in range(50):
#     running_loss = 0.0
#     print("Epoch %d, lr %4.5f" % (epoch, Annealing_lut_optimiser.state_dict()['param_groups'][0]['lr']))
#     for data in trainloader:
#         # get the inputs
#         i = i+1
#         inputs, labels = data

#         # zero the parameter gradients
#         Annealing_lut_optimiser.zero_grad()
#         # forward + loss + backward + optimise (update weights)
#         outputs = Annealing_lut_model(inputs.to(device), max(0.0001, math.exp(-15 * math.pow(10, -5) * (i))))
#         loss = loss_function(outputs, labels.to(device))
#         loss.backward()
#         Annealing_lut_optimiser.step()
#         scheduler.step()
#         # print(optimiser.param_groups[0]['params'][2])
#         # print(np.all(optimiser.param_groups[0]['params'][0].grad.numpy() == 0))
#         # keep track of the loss this epoch
#         running_loss += loss.item()
#         # break
#     print("Epoch %d, loss %4.2f" % (epoch, running_loss))
#     print("Epoch %d, temperature %4.8f" % (epoch, max(0.0001, math.exp(-20 * math.pow(10, -5) * (i)))))
#     Loss_Annealing_Model.append(running_loss)
# print('**** Finished Training ****')

In [14]:
# # Compute the model accuracy on the test set
# class_correct = torch.zeros(10)
# class_total = torch.zeros(10)
# Annealing_lut_model.eval()
# for (images, labels) in testloader:

#   for label in labels:
#     class_total[label] += 1
#   outputs = Annealing_lut_model(images.to(device), 0.001)
#   outputs = outputs.to("cpu")
#   for i in range(outputs.shape[0]):
#     _, prediction = outputs[i].max(0)
#     if prediction == labels[i]:
#       class_correct[labels[i]] += 1

# for i in range(10):
#     print('Class %d accuracy: %2.2f %%' % (i, 100.0*class_correct[i] / class_total[i]))
# print("accuracy: %2.2f %%" % (100.0*sum(class_correct)/sum(class_total)))

In [15]:
# build the model 
Annealing_lut_model = LUTModelWithAnnealTemperature(784, 784, 10).to(device)

# define the loss function and the optimiser
loss_function = nn.CrossEntropyLoss()
optimiser = optim.Adam(Annealing_lut_model.parameters(), lr=1e-2)

def annealing_model_train(inputs, labels):
  Annealing_lut_model.train()
  # zero the parameter gradients
  optimiser.zero_grad()
  # forward + loss + backward + optimise (update weights)
  outputs = Annealing_lut_model(inputs, 0.001)
  loss = loss_function(outputs, labels)
  loss.backward()
  optimiser.step()

def annealing_model_inference(inputs):
  Annealing_lut_model.eval()
  outputs = Annealing_lut_model(inputs, 0.001)

x = 0
y = 0
for data in testloader:
  inputs, labels = data
  x = inputs.to(device)
  y = labels.to(device)
  break

t_annealing_train = timeit.Timer(
    stmt='annealing_model_train(x, y)', 
    setup='from __main__ import annealing_model_train',
    globals={'x': x, 'y' : y}
)

t_annealing_inference = timeit.Timer(
    stmt='annealing_model_inference(x)', 
    setup='from __main__ import annealing_model_inference',
    globals={'x': x}
)

print(f'annealing_model_train(x, y):  {t_annealing_train.timeit(100) / 100 * 1e3:>5.1f} ms')
print(f'annealing_model_inference(x):  {t_annealing_inference.timeit(100) / 100 * 1e3:>5.1f} ms')

annealing_model_train(x, y):   18.0 ms
annealing_model_inference(x):   26.5 ms
