In [30]:
#!/usr/bin/env python

import torch
from torch import nn
from torch import optim
from torch import Tensor
from torch.nn import functional as F
import dlc_practical_prologue as prologue
import matplotlib.pyplot as plt
%matplotlib notebook

In [31]:
#Data generation
N=10**3
train_input,train_target,train_classes,test_input,test_target,test_classes=prologue.generate_pair_sets(N)
train_target=train_target.long()#.float for MSELoss, .long for CrossEntropy
train_input=train_input.float()
train_classes=train_classes.float()

In [32]:
print(train_classes)

tensor([[5., 7.],
        [2., 0.],
        [7., 0.],
        ...,
        [7., 6.],
        [5., 8.],
        [7., 7.]])


In [33]:
#Base functions adapted from the practicals
def train_model(model, train_input, train_target,train_classes, mini_batch_size, crit=nn.MSELoss, eta = 1e-3, nb_epochs = 200,print_=False):
    criterion = crit()
    optimizer = optim.SGD(model.parameters(), lr = eta)
    for e in range(nb_epochs):
        acc_loss = 0

        for b in range(0, train_input.size(0), mini_batch_size):
            output,aux_output = model(train_input.narrow(0, b, mini_batch_size))
            if crit==nn.MSELoss:
                loss1 = criterion(output[:,1], train_target.narrow(0, b, mini_batch_size))
                print(torch.argmax(aux_output[:,0:9],dim=1))
                print(train_classes[:,0].narrow(0, b, mini_batch_size))
                loss2 = criterion(torch.argmax(aux_output[:,0:9],dim=1), train_classes[:,0].narrow(0, b, mini_batch_size))
                loss3 = criterion(torch.argmax(aux_output[:,10:19],dim=1), train_classes[:,1].narrow(0, b, mini_batch_size))
                loss = loss1 + loss2 + loss3
                print(loss)
            elif crit==nn.CrossEntropyLoss:
                loss = criterion(output, train_target.narrow(0, b, mini_batch_size))
            else:
                print("Loss not implemented")
            acc_loss = acc_loss + loss.item()
            model.zero_grad()
            loss.backward()
            optimizer.step()
            if False:
                with torch.no_grad():
                    for p in model.parameters():
                        p -= eta * p.grad

        if(e%20==0 and print_):
            print(e, acc_loss)
            
def compute_nb_errors(model, input, target, mini_batch_size=100):
    nb_errors = 0

    for b in range(0, input.size(0), mini_batch_size):
        output , aux_output = model(input.narrow(0, b, mini_batch_size))
        _, predicted_classes = output.max(1)
        for k in range(mini_batch_size):
            if target[b + k]!=predicted_classes[k]:
                nb_errors = nb_errors + 1

    return nb_errors

def run_many_times(model,crit=nn.MSELoss,mini_batch_size=100,n=10,print_=False):
    average_error=0
    for i in range(n):
        m=model()
        train_model(m, train_input, train_target,train_classes,mini_batch_size,crit=crit)
        nb_test_errors = compute_nb_errors(m, test_input, test_target, mini_batch_size)
        print('test error Net {:0.2f}% {:d}/{:d}'.format((100 * nb_test_errors) / test_input.size(0),
                                                      nb_test_errors, test_input.size(0)))
        average_error+=(100 * nb_test_errors) / test_input.size(0)
    print("Average error: "+str(average_error/n))

In [34]:
#Is it better to use groups or not?
#Takes about 2 hours to run
#about 22.5% error average without groups if we exclude outliers that get stuck and don't move
#about 21.5% error average with groups if we exclude outliers that get stuck and don't move
class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(2, 32, kernel_size=3)
        #self.conv2 = nn.Conv2d(32, 64, kernel_size=2)
        self.fc1 = nn.Linear(512, 20)
        self.fc2 = nn.Linear(20, 2)
        self.aux_linear = nn.Linear(20, 20)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), kernel_size=3, stride=3))
        #x = F.relu(F.max_pool2d(self.conv2(x), kernel_size=2, stride=2))
        aux_output = F.softmax(self.fc1(x.view(-1, 512)), dim=1)
        x = F.relu(self.fc1(x.view(-1, 512)))
        output = F.softmax(self.fc2(x), dim=1)
        aux_output = F.softmax(self.aux_linear(x), dim=1)
        #print(x)
        return output, aux_output
    def last_hiddes(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), kernel_size=3, stride=3))
        #x = F.relu(F.max_pool2d(self.conv2(x), kernel_size=2, stride=2))
        x = F.relu(self.fc1(x.view(-1, 512)))
        return x

class NetGroups(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(2, 32, kernel_size=3, groups=2)
        #self.conv2 = nn.Conv2d(32, 64, kernel_size=2)
        self.fc1 = nn.Linear(512, 20)
        self.fc2 = nn.Linear(20, 2)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), kernel_size=3, stride=3))
        #x = F.relu(F.max_pool2d(self.conv2(x), kernel_size=2, stride=2))
        x = F.relu(self.fc1(x.view(-1, 512)))
        x = F.softmax(self.fc2(x), dim=1)
        #print(x)
        return x

train_target=train_target.float()
print("Without groups:")
#run_many_times(Net)
m=Net()
train_model(m, train_input, train_target,train_classes,mini_batch_size=100,crit=nn.MSELoss)
nb_test_errors = compute_nb_errors(m, test_input, test_target, mini_batch_size)
print('test error Net {:0.2f}% {:d}/{:d}'.format((100 * nb_test_errors) / test_input.size(0),nb_test_errors, test_input.size(0)))
print("With groups:")
#run_many_times(NetGroups)

Without groups:
tensor([5, 0, 5, 0, 6, 0, 0, 6, 5, 2, 7, 5, 7, 0, 6, 7, 7, 3, 0, 0, 6, 0, 7, 1,
        2, 0, 0, 5, 7, 7, 5, 7, 3, 2, 6, 5, 5, 7, 1, 5, 0, 0, 6, 3, 6, 0, 7, 7,
        2, 3, 0, 5, 7, 5, 0, 3, 3, 6, 1, 6, 0, 0, 7, 6, 5, 7, 7, 5, 7, 5, 2, 1,
        5, 1, 7, 6, 5, 5, 7, 5, 6, 5, 1, 5, 0, 5, 1, 1, 1, 0, 5, 0, 7, 5, 7, 6,
        3, 7, 7, 0])
tensor([5., 2., 7., 3., 8., 1., 7., 4., 5., 4., 6., 7., 6., 5., 7., 3., 2., 6.,
        6., 1., 8., 3., 0., 4., 3., 5., 0., 6., 9., 5., 6., 3., 1., 2., 4., 6.,
        4., 4., 6., 5., 2., 0., 0., 1., 2., 9., 4., 5., 1., 1., 2., 4., 5., 1.,
        6., 4., 1., 7., 6., 3., 8., 1., 5., 7., 7., 3., 9., 9., 0., 3., 0., 1.,
        6., 2., 3., 0., 3., 3., 3., 7., 6., 3., 5., 2., 2., 7., 3., 2., 6., 5.,
        9., 9., 9., 2., 2., 7., 2., 9., 7., 3.])
tensor(24.2567, grad_fn=<AddBackward0>)
tensor([5, 5, 1, 0, 6, 0, 7, 7, 5, 0, 1, 6, 7, 0, 0, 3, 5, 6, 0, 0, 7, 6, 7, 2,
        7, 3, 1, 7, 5, 1, 0, 1, 0, 7, 7, 1, 7, 6, 6, 6, 5, 0, 5, 7, 7, 7, 

NameError: name 'mini_batch_size' is not defined

In [None]:
nb_test_errors = compute_nb_errors(m, test_input, test_target, mini_batch_size = 100)
print('test error Net {:0.2f}% {:d}/{:d}'.format((100 * nb_test_errors) / test_input.size(0),nb_test_errors, test_input.size(0)))
print("With groups:")

In [None]:
#Is it better with Cross entropy loss rather than MSE?
#Doesn't seem significantly better
print("With Cross Entropy Loss:")
run_many_times(NetGroups,crit=nn.CrossEntropyLoss)

In [None]:
#shuffling the dataset
permuted_index = torch.randperm(train_input.size()[0])
train_input_shuffled = train_input[permuted_index]
train_target_shuffled = train_target[permuted_index]
train_classes_shuffled = train_classes[permuted_index]


In [None]:
#visual check
index = torch.randint(1000, (1,)).item()
fig = plt.figure()
fig.add_subplot(1, 2, 1)
plt.imshow(train_input_shuffled[index][0])
fig.add_subplot(1, 2, 2)
plt.imshow(train_input_shuffled[index][1])
print('classes')
print(train_classes_shuffled[index][0])
print(train_classes_shuffled[index][1])
print('target')
print(train_target_shuffled[index])

In [None]:
#retraining net on shuffled data 
model2 = Net()
train_model(model2, train_input, train_target,mini_batch_size)
nb_test_errors = compute_nb_errors(model2, test_input, test_target, mini_batch_size)
print('test error Net {:0.2f}% {:d}/{:d}'.format((100 * nb_test_errors) / test_input.size(0),
                                                      nb_test_errors, test_input.size(0)))

In [None]:
output = model(test_input.narrow(0, 0, mini_batch_size))
_, predicted_classes = output.max(1)

In [None]:
#looking at the last hidden layer
#visual check
index = torch.randint(1000, (1,)).item()
fig = plt.figure()
fig.add_subplot(1, 2, 1)
plt.imshow(train_input_shuffled[index][0])
fig.add_subplot(1, 2, 2)
plt.imshow(train_input_shuffled[index][1])
print('classes')
print(train_classes_shuffled[index][0])
print(train_classes_shuffled[index][1])
print('target')
print(train_target_shuffled[index])
print('last hidden layer', model.last_hiddes(train_input[index].unsqueeze(0)))