In [1]:
import torch
from torch.autograd import Variable
from torch import nn
from torch import optim
from torch.nn import functional as F

import dlc_practical_prologue as prologue

In [2]:
#set to use CPU or GPU automatically based on what is available
def select_device():
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')
    
# convert decimal classes Nx2 in (0,9) to binary array Nx10 in (0,1) 
def decimal_to_binary(Array):
    classes_binary_0 = torch.zeros(1000,10) #first image
    classes_binary_1 = torch.zeros(1000,10) #second image
    for i in range(1000):
        classes_binary_0[i,int(Array[i,0].item())] = 1
        classes_binary_1[i,int(Array[i,1].item())] = 1
    return torch.cat((classes_binary_0, classes_binary_1), 1) #concatentate to Nx20

# convert output of model (Nx20) to decimal class label (Nx2)
def output_to_pred_classes(output):
    _, a = output.data[:,:10].max(1)
    _, b = output.data[:,10:].max(1)
    a = torch.reshape(a,(output.shape[0],1))
    b = torch.reshape(b,(output.shape[0],1))
    return torch.cat((a,b),1)

In [3]:
(train_input, train_target, train_classes, test_input, test_target, test_classes) = prologue.generate_pair_sets(1000)
device = select_device()
print('Device is',device)

train_input, train_target = train_input.to(device), train_target.to(device)
test_input, test_target = test_input.to(device), test_target.to(device)
train_classes, test_classes = train_classes.to(device), test_classes.to(device)

train_classes_binary, test_classes_binary = decimal_to_binary(train_classes), decimal_to_binary(test_classes)
train_classes_binary, test_classes_binary = train_classes_binary.to(device), test_classes_binary.to(device)

Device is cuda


In [4]:
print('train input size:',train_input.size())
print('train target size:',train_target.size())
print('train classes size:',train_classes.size())
print('test input size:',test_input.size())
print('test target size:',test_target.size())
print('test classes size:',test_classes.size())

train input size: torch.Size([1000, 2, 14, 14])
train target size: torch.Size([1000])
train classes size: torch.Size([1000, 2])
test input size: torch.Size([1000, 2, 14, 14])
test target size: torch.Size([1000])
test classes size: torch.Size([1000, 2])


In [5]:
# The idea here is that the weights of the recognition of the digits are shared.
class Model(nn.Module):
    def __init__(self,n_hidden):
        super(Model, self).__init__()
        self.conv1 = nn.Conv2d(1, 20, 5)
        self.conv2 = nn.Conv2d(20, 50, 5)
        self.fc1 = nn.Linear(6*6*50, n_hidden) #this size depends on output of second layer
        self.fc2 = nn.Linear(n_hidden, 10)

    def forward(self, x):
      # We take the first channel the second channel separately and we compute
      # the forward pass thought the model (basically both images in pair separately)
        
        # first layer
        _x = torch.reshape(x[:,0,:,:], (-1, 1, 14, 14))
        _x1 = F.relu(self.conv1(_x))
        _x = torch.reshape(x[:,1,:,:], (-1, 1, 14, 14))
        _x2 = F.relu(self.conv1(_x))
        
        #second layer
        _x1 = F.relu(self.conv2(_x1))
        _x2 = F.relu(self.conv2(_x2)) #_x1 and _x2 shape = (N,50,6,6) so use this in view below
        
        #third layer
        _x1 = _x1.view(-1, 6*6*50) #unpack the image from pixel grid to array to feed it to FC
        _x2 = _x2.view(-1, 6*6*50)
        _x1 = F.relu(self.fc1(_x1))
        _x2 = F.relu(self.fc1(_x2))
        
        #fourth layer
        _x1 = self.fc2(_x1)
        _x2 = self.fc2(_x2)
        _x1 = F.log_softmax(_x1, dim=1)
        _x2 = F.log_softmax(_x2, dim=1)
        
        #we concatenate the result
        return torch.cat((_x1, _x2), 1)

In [6]:
def train_model(model, criterion, optimizer, train_input, train_target, mini_batch_size):

    for e in range(0, 25):
        sum_loss = 0
        nb_errors = 0
        # We do this with mini-batches
        for b in range(0, train_input.size(0), mini_batch_size):

            mini_batch_input = train_input.narrow(0, b, mini_batch_size)
            mini_batch_target = train_target.narrow(0, b, mini_batch_size)

            output = model(mini_batch_input)
                        
            loss = criterion(output, mini_batch_target)

            sum_loss = sum_loss + loss.item()

            model.zero_grad()

            loss.backward()
            optimizer.step()


In [7]:
def compute_nb_errors(model, inputt, target, mini_batch_size):
    nb_errors = 0

    for b in range(0, inputt.size(0), mini_batch_size):
        output = model(inputt.narrow(0, b, mini_batch_size))
        predicted_classes = output_to_pred_classes(output) #convert output to predicted classes
        
        for k in range(mini_batch_size):
            # check if predicted pair equals true pair
            if torch.all(torch.eq(predicted_classes[k], target[b+k])).item() == 0:
                nb_errors = nb_errors + 1

    return nb_errors

In [13]:
eta, mini_batch_size = 1e-1, 100
h_size = [10, 50, 200, 500, 1000]

for h in h_size:
    model, criterion = Model(h), nn.MSELoss()
    model, criterion = model.to(device), criterion.to(device)
    optimizer = optim.SGD(model.parameters(), lr = eta)  
    train_model(model.train(), criterion, optimizer, train_input, train_classes_binary, mini_batch_size)

    print("Size of hidden FC layer:",h)
    print('Train error:',100* compute_nb_errors(model.train(), train_input, train_classes, mini_batch_size)/test_input.size(0), 'percent')
    print('Test error:',100* compute_nb_errors(model, test_input, test_classes, mini_batch_size)/test_input.size(0), 'percent')
    print('\n')   

Size of hidden FC layer: 10
Train error: 99.2 percent
Test error: 98.9 percent


Size of hidden FC layer: 50
Train error: 99.6 percent
Test error: 98.8 percent


Size of hidden FC layer: 200
Train error: 99.6 percent
Test error: 98.8 percent


Size of hidden FC layer: 500
Train error: 99.6 percent
Test error: 98.8 percent


Size of hidden FC layer: 1000
Train error: 99.6 percent
Test error: 98.8 percent




In [9]:
#output = model(train_input)
#criterion(output,train_classes)

In [10]:
#model_train = model.train()
#output = model_train(train_input)

In [11]:
train_classes.shape[0]

1000

In [12]:
#train_target