## Imports

In [3]:
import torch
from torch import nn
from torch.nn import functional as F

import dlc_practical_prologue

## Load data

In [181]:
N = 1000

x_train, y_train, y_train_classes, x_test, y_test, y_test_classes = \
dlc_practical_prologue.generate_pair_sets(N)

assert x_train.shape == torch.Size([N, 2, 14, 14])
assert y_train.shape == torch.Size([N])
assert y_train_classes.shape == torch.Size([N, 2])
assert x_test.shape == torch.Size([N, 2, 14, 14])
assert y_test.shape == torch.Size([N])
assert y_test_classes.shape == torch.Size([N, 2])

## Define model

In [172]:
class model_1(nn.Module):
    def __init__(self):
        super(model_1, self).__init__()
        self.conv1 = nn.Conv2d(2, 10, kernel_size=3)        
        self.conv2 = nn.Conv2d(10, 10, kernel_size=3)

        self.dense1 = nn.Linear(40, 10)
        self.dense2 = nn.Linear(10, 1)
        
    def forward(self, x):
        x = F.max_pool2d(self.conv1(x), kernel_size=2)
        x = F.max_pool2d(self.conv2(x), kernel_size=2)
        x = F.relu(self.dense1(torch.flatten(x, 1)))
        x = F.sigmoid(self.dense2(x))
        return x

In [177]:
class model_weight_sharing(nn.Module):
    def __init__(self):
        super(model_weight_sharing, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=3)        
        self.conv2 = nn.Conv2d(10, 10, kernel_size=3)

        self.dense1 = nn.Linear(80, 10)
        self.dense2 = nn.Linear(10, 1)
        
    def forward(self, x):
        d1 = F.max_pool2d(self.conv1(x[:, 0:1, ...]), kernel_size=2)
        d1 = F.max_pool2d(self.conv2(d1), kernel_size=2)
        d1 = torch.flatten(d1, 1)
        
        d2 = F.max_pool2d(self.conv1(x[:, 1:2, ...]), kernel_size=2)
        d2 = F.max_pool2d(self.conv2(d2), kernel_size=2)
        d2 = torch.flatten(d2, 1)

        x = torch.cat((d1, d2), 1)
        x = F.relu(self.dense1(torch.flatten(x, 1)))
        x = F.sigmoid(self.dense2(x))
        return x

In [207]:
class model_auxiliary_loss(nn.Module):
    def __init__(self):
        super(model_auxiliary_loss, self).__init__()
        self.conv1 = nn.Conv2d(2, 10, kernel_size=3)   #2*14*14 => 10*12*12  (Maxpool => 10*6*6)  
        self.conv2 = nn.Conv2d(10, 10, kernel_size=3)  #10*6*6 => 10* 4 * 4 (Maxpool => 10*2*2)

        self.dense_digits1 = nn.Linear(40, 10)
        self.dense_digits2 = nn.Linear(40, 10)
        
        self.dense1 = nn.Linear(40, 10)
        self.dense2 = nn.Linear(10, 1)
        
    def forward(self, x):
        x = F.max_pool2d(self.conv1(x), kernel_size=2)
        x = F.max_pool2d(self.conv2(x), kernel_size=2)
        
        x = torch.flatten(x, 1)
        
        self.digits1 = F.softmax((self.dense_digits1(x)), -1)
        self.digits2 = F.softmax((self.dense_digits2(x)), -1)
        
        x = F.relu(self.dense1(x))
        x = torch.sigmoid(self.dense2(x))
        return x

## Training/Evaluation function

In [152]:
def train_model(model, train_input, train_target, epochs, optimizer, loss_function = nn.BCELoss(), batch_size=10):
    # Inspired by exercise corrige 
    model.train()
    
    for e in range(epochs):
        sum_loss = 0
        
        for x_batch, y_batch in zip(x_train.split(batch_size),
                                    y_train.split(batch_size)):
            
            output = model(x_batch)
            loss = loss_function(output, y_batch.float())
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            sum_loss = sum_loss + loss.item()
            
            
        losses.append(sum_loss)
        print("Epoch %i : loss %.2f" % (e, sum_loss), end = "\r")
        
    return losses

In [245]:
def train_auxiliary_model(model, train_input, train_target, digit_target,
                          epochs, optimizer, loss_function = nn.BCELoss(), batch_size=10):
    # Inspired by exercise corrige 
    model.train()
    
    digit_loss = nn.CrossEntropyLoss()
    
    for e in range(epochs):
        sum_loss = 0
        sum_loss_digit1 = 0
        sum_loss_digit2 = 0
        
        for x_batch, y_batch, y_digit_batch in zip(x_train.split(batch_size),
                                                    y_train.split(batch_size),
                                                    digit_target.split(batch_size)):
            
            output = model(x_batch)

            loss_digit1 = digit_loss(model.digits1, y_digit_batch[..., 0])
            loss_digit2 = digit_loss(model.digits2, y_digit_batch[..., 1])
            loss = loss_function(output, y_batch.float())
            
            loss_total = loss_digit1 + loss_digit2 + loss

            optimizer.zero_grad()
            loss_total.backward()  
            optimizer.step()
            
            sum_loss_digit1 = sum_loss_digit1 + loss_digit1.item()
            sum_loss_digit2 = sum_loss_digit2 + loss_digit2.item()
            sum_loss = sum_loss + loss.item()
            
            
        losses.append(sum_loss)
        print("Epoch %i : loss %.2f --- loss_digit1 %.2f --- loss_digit2 %.2f" \
              % (e,sum_loss, sum_loss_digit1, sum_loss_digit2))
        
    return losses

In [149]:
def evaluate_model(model, test_input, test_target):
    model.eval()
    criterion = nn.BCELoss()
    preds_proba = model(test_input).view(-1)
    preds = preds_proba.masked_fill((preds_proba > 0.5), 1).masked_fill((preds_proba<0.5), 0)
    
    loss = criterion(preds_proba, test_target.float()).item()
    accuracy = (preds == test_target).sum().item()/preds.size(0)
    #accuracy = sum([pred == truth for pred, truth in zip(preds, test_target)])
    return loss, accuracy

In [132]:
def mean(x):
    return sum(x)/len(x)

def var(x):
    u = mean(x)
    return sum([(v-u)**2 for v in x])/len(x)

## Experiment

In [246]:
number_training = 10
epochs = 25

losses = []
train_losses = []
accuracies = []
for i_train in range(number_training):
    
#     model = model_1()
#     model = model_weight_sharing()

    auxiliary = True
    model = model_auxiliary_loss()
    
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    indices_shuffle = torch.randperm(N)
    
    if not auxiliary :
        train_loss = train_model(model, 
                            x_train[indices_shuffle],
                            y_train[indices_shuffle],  
                            epochs = epochs,
                           optimizer=optimizer)

    else :
        train_loss = train_auxiliary_model(model, 
                            x_train[indices_shuffle],
                            y_train[indices_shuffle], 
                            y_train_classes[indices_shuffle],
                            epochs = epochs,
                           optimizer=optimizer)
    
    loss, accuracy = evaluate_model(model, x_test, y_test)
    print("Attempt", i_train, ": loss", loss, "- accuracy", accuracy)
    losses.append(loss)
    train_losses.append(train_loss)
    accuracies.append(accuracy)

print("Experiment results :")
print("Loss mean : %.2f (%.3f)" % (mean(losses), var(losses)))
print("Accuracy mean : %.2f (%.3f)" % (mean(accuracies), var(accuracies)))

RuntimeError: Trying to backward through the graph a second time, but the buffers have already been freed. Specify retain_graph=True when calling backward the first time.