## Deep Learning Project 1 

Comparing digits

### TODO Next : 

- Maybe we are already using Weight Sharing ? **WE DO**
- Maybe get better performances ? 
- Add dropout layer and stuff like this
- Add Auxilary losses (also return CNNs results and use them with a loss function, similar to comparisson net) **Done**
- Benchmark **Function Done** 
- Write report **Started** 

In [1]:
import torch
import math
import dlc_practical_prologue as prologue
from torch import optim
from torch import Tensor
from torch import nn
from torch.nn import functional as F

In [2]:
# Generate the train and test sets.
N = 1000
train_input, train_target, train_classes, test_input, test_target, test_classes = prologue.generate_pair_sets(N)

In [3]:
# This model performs each digit classification with 2 different CNNs (so no weight sharing)        
class Base_Net(nn.Module):
    def __init__(self):
        super().__init__()
        
        # Layers that handle digit classification with first CNN
        self.conv1_1 = nn.Conv2d(1, 32, kernel_size=3)
        self.conv2_1 = nn.Conv2d(32, 64, kernel_size=3)
        self.fc1_1 = nn.Linear(256, 200)
        self.fc2_1 = nn.Linear(200, 10)
        
        # Layers that handle digit classification with second CNN
        self.conv1_2 = nn.Conv2d(1, 32, kernel_size=3)
        self.conv2_2 = nn.Conv2d(32, 64, kernel_size=3)
        self.fc1_2 = nn.Linear(256, 200)
        self.fc2_2 = nn.Linear(200, 10)
        
        # Layers that handle comparisson 
        self.fc3 = nn.Linear(20, 300)
        self.fc4 = nn.Linear(300, 300)
        self.fc5 = nn.Linear(300, 2)
        
    def cnn1(self, x):
        x = F.relu(F.max_pool2d(self.conv1_1(x), kernel_size=2))
        x = F.relu(F.max_pool2d(self.conv2_1(x), kernel_size=2))
        x = F.relu(self.fc1_1(x.view(-1, 256)))
        x = self.fc2_1(x)
        return x
    
    def cnn2(self, x):
        x = F.relu(F.max_pool2d(self.conv1_2(x), kernel_size=2))
        x = F.relu(F.max_pool2d(self.conv2_2(x), kernel_size=2))
        x = F.relu(self.fc1_2(x.view(-1, 256)))
        x = self.fc2_2(x)
        return x
    
    def mlp(self, x):
        x = F.relu(self.fc3(x))
        x = F.relu(self.fc4(x))
        x = self.fc5(x)
        return x
    
    def forward(self, x):
        s = x.shape
        input_1 = x[:,0,:,:].reshape([s[0],1,s[2],s[3]])
        input_2 = x[:,1,:,:].reshape([s[0],1,s[2],s[3]])
        
        output_1 = self.cnn1(input_1)
        output_2 = self.cnn2(input_2)
        
        concatenated = torch.cat((output_1, output_2), 1)
        
        comparison = self.mlp(concatenated)
        return comparison   

In [4]:
# Model Definition 
  
class Weight_Sharing_Net(nn.Module):
    def __init__(self):
        super().__init__()
        
        # Layers that handle digit classification 
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3)
        self.fc1 = nn.Linear(256, 200)
        self.fc2 = nn.Linear(200, 10)
        
        # Layers that handle comparisson 
        self.fc3 = nn.Linear(20, 300)
        self.fc4 = nn.Linear(300, 300)
        self.fc5 = nn.Linear(300, 2)
        
    def cnn(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), kernel_size=2))
        x = F.relu(F.max_pool2d(self.conv2(x), kernel_size=2))
        x = F.relu(self.fc1(x.view(-1, 256)))
        x = self.fc2(x)
        return x
    
    def mlp(self, x):
        x = F.relu(self.fc3(x))
        x = F.relu(self.fc4(x))
        x = self.fc5(x)
        return x
    
    def forward(self, x):
        s = x.shape
        input_1 = x[:,0,:,:].reshape([s[0],1,s[2],s[3]])
        input_2 = x[:,1,:,:].reshape([s[0],1,s[2],s[3]])
        
        output_1 = self.cnn(input_1)
        output_2 = self.cnn(input_2)
        
        concatenated = torch.cat((output_1, output_2), 1)
        
        comparison = self.mlp(concatenated)
        return comparison   

In [5]:
def train_model_base_ws(model, train_input, train_target, mini_batch_size, nb_epochs = 100, use_optimizer= None, _print=False):
    criterion = nn.CrossEntropyLoss()
    eta = 1e-3
    if use_optimizer == "sgd":
        optimizer = optim.SGD(model.parameters(), lr=eta)
    if use_optimizer == "adam":
        optimizer = optim.Adam(model.parameters(), lr=eta)
    for e in range(nb_epochs):
        acc_loss = 0

        for b in range(0, train_input.size(0), mini_batch_size):
            output = model(train_input.narrow(0, b, mini_batch_size))
            target = train_target.narrow(0, b, mini_batch_size).long()
            loss = criterion(output, target)
            acc_loss = acc_loss + loss.item()
 
            model.zero_grad()
            loss.backward()
            
            if use_optimizer != None :
                optimizer.step()
            else :
                with torch.no_grad():
                    for p in model.parameters():
                        p -= eta * p.grad
        if _print:
            print(e, acc_loss)
        
def compute_nb_errors_base_ws(model, input, target, mini_batch_size):
    nb_errors = 0

    for b in range(0, input.size(0), mini_batch_size):
        output = model(input.narrow(0, b, mini_batch_size))
        _, predicted_classes = output.max(1)
        for k in range(mini_batch_size):
            if target[b + k, predicted_classes[k]] <= 0:
                nb_errors = nb_errors + 1

    return nb_errors

In [6]:
model_total = Base_Net()

train_model_base_ws(model_total, train_input, train_target, mini_batch_size=250, nb_epochs=25, use_optimizer="adam")

test_target_total = prologue.convert_to_one_hot_labels(test_input, test_target)
nb_test_errors = compute_nb_errors_base_ws(model_total, test_input, test_target_total, mini_batch_size=250)
print('test error Net {:0.2f}% {:d}/{:d}'.format((100 * nb_test_errors) / test_input.size(0),
                                                      nb_test_errors, test_input.size(0)))

test error Net 17.00% 170/1000


In [7]:
model_total = Weight_Sharing_Net()

train_model_base_ws(model_total, train_input, train_target, mini_batch_size=250, nb_epochs=25, use_optimizer="adam")

test_target_total = prologue.convert_to_one_hot_labels(test_input, test_target)
nb_test_errors = compute_nb_errors_base_ws(model_total, test_input, test_target_total, mini_batch_size=250)
print('test error Net {:0.2f}% {:d}/{:d}'.format((100 * nb_test_errors) / test_input.size(0),
                                                      nb_test_errors, test_input.size(0)))

test error Net 16.00% 160/1000


In [8]:
class Auxiliary_Loss_Weight_Sharing_Net(nn.Module):
    def __init__(self):
        super().__init__()
        
        # Layers that handle digit classification 
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3)
        self.fc1 = nn.Linear(256, 200)
        self.fc2 = nn.Linear(200, 10)
        
        # Layers that handle comparisson 
        self.fc3 = nn.Linear(20, 300)
        self.fc4 = nn.Linear(300, 300)
        self.fc5 = nn.Linear(300, 2)
        
    def cnn(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), kernel_size=2))
        x = F.relu(F.max_pool2d(self.conv2(x), kernel_size=2))
        x = F.relu(self.fc1(x.view(-1, 256)))
        x = self.fc2(x)
        return x
    
    def mlp(self, x):
        x = F.relu(self.fc3(x))
        x = F.relu(self.fc4(x))
        x = self.fc5(x)
        return x
    
    def forward(self, x):
        s = x.shape
        
        input_1 = x[:,0,:,:].reshape([s[0],1,s[2],s[3]])
        input_2 = x[:,1,:,:].reshape([s[0],1,s[2],s[3]])
        
        output_1 = self.cnn(input_1)
        output_2 = self.cnn(input_2)
        
        concatenated = torch.cat((output_1, output_2), 1)
        
        comparison = self.mlp(concatenated)
        return output_1, output_2, comparison  

        
def train_model_auxiliary_loss(model, train_input, train_target, train_classes, mini_batch_size, nb_epochs = 100, use_optimizer= None, _print=False):
    criterion_auxilary = nn.CrossEntropyLoss()
    criterion_final = nn.CrossEntropyLoss()
    
    eta = 1e-3
    if use_optimizer == "sgd":
        optimizer = optim.SGD(model.parameters(), lr=eta)
    if use_optimizer == "adam":
        optimizer = optim.Adam(model.parameters(), lr=eta)
    for e in range(nb_epochs):
        acc_loss = 0

        for b in range(0, train_input.size(0), mini_batch_size):
            digit_1, digit_2, comparison = model(train_input.narrow(0, b, mini_batch_size))
            
            target_comparison = train_target.narrow(0, b, mini_batch_size).long()
            
            target_digit_1, target_digit_2 = train_classes.narrow(0, b, mini_batch_size)[:,0], train_classes.narrow(0, b, mini_batch_size)[:,1]
            loss1 = criterion_auxilary(digit_1, target_digit_1)
            loss2 = criterion_auxilary(digit_2, target_digit_2)
            loss3 = criterion_final(comparison, target_comparison)
            acc_loss = acc_loss + loss1.item() + loss2.item() + loss3.item()
 
            model.zero_grad()
            loss1.backward(retain_graph=True)
            loss2.backward(retain_graph=True)
            loss3.backward()
            
            if use_optimizer != None :
                optimizer.step()
            else :
                with torch.no_grad():
                    for p in model.parameters():
                        p -= eta * p.grad
        if _print :
            print(e, acc_loss)
            
def compute_nb_errors_auxilary_loss(model, input, target, mini_batch_size):
    nb_errors = 0

    for b in range(0, input.size(0), mini_batch_size):
        _, _, output = model(input.narrow(0, b, mini_batch_size))
        _, predicted_classes = output.max(1)
        for k in range(mini_batch_size):
            if target[b + k, predicted_classes[k]] <= 0:
                nb_errors = nb_errors + 1

    return nb_errors

In [9]:
model_auxiliary = Auxiliary_Loss_Weight_Sharing_Net()


train_model_auxiliary_loss(model_auxiliary, train_input, train_target, train_classes, mini_batch_size=250, nb_epochs=25, use_optimizer="adam")


test_target_total = prologue.convert_to_one_hot_labels(test_input, test_target)
nb_test_errors = compute_nb_errors_auxilary_loss(model_auxiliary, test_input, test_target_total, mini_batch_size=250)
print('test error Net {:0.2f}% {:d}/{:d}'.format((100 * nb_test_errors) / test_input.size(0),
                                                      nb_test_errors, test_input.size(0)))

test error Net 12.40% 124/1000


In [12]:
def benchmark_model(model, train_function, evaluate_function, nb_trials=20, N=1000, mini_batch_size=250, nb_epochs=25, model_requires_target_and_classes=False, _print=False):
    # Benchmark of the basic network with Adam optimizer
    performances = []
    for trial in range(nb_trials):

        # Generate Data 
        train_input, train_target, train_classes, test_input, test_target, test_classes = prologue.generate_pair_sets(N)
        test_target_one_hot = prologue.convert_to_one_hot_labels(test_input, test_target)

        # Define the model 
        model_total = model()

        # Train the model
        if model_requires_target_and_classes : 
            train_function(model_total, train_input, train_target, train_classes, mini_batch_size=mini_batch_size,
                           nb_epochs=nb_epochs, use_optimizer="adam", _print=_print)
        else :
            train_function(model_total, train_input, train_target, mini_batch_size=mini_batch_size,
                           nb_epochs=nb_epochs, use_optimizer="adam", _print=_print)

        # Evaluate performances 
        nb_test_errors = evaluate_function(model_total, test_input, test_target_one_hot, mini_batch_size=mini_batch_size)
        print('test error Net trial {:d} {:0.2f}% {:d}/{:d}'.format(trial, (100 * nb_test_errors) / test_input.size(0),
                                                              nb_test_errors, test_input.size(0)))
        performances.append(nb_test_errors)

    mean_perf = 100 * sum(performances) / (N * nb_trials)
    print(f"Average precision of this architecture {mean_perf}%")
    
    std_dev = math.sqrt(sum(list(map(lambda x : x - mean_perf,performances))))/nb_trials
    print(f"With standard deviation of  {std_dev}")
    return performances

In [None]:
print("Benchmark of the model with no Weight Sharing")
results_base = benchmark_model(Base_Net, train_model_base_ws, compute_nb_errors_base_ws)

Benchmark of the model with no Weight Sharing


In [None]:
print("Benchmark of the model with Weight Sharing")
results_ws = benchmark_model(Weight_Sharing_Net, train_model_base_ws, compute_nb_errors_base_ws)

In [None]:
print("Benchmark of the model with Weight Sharing and an auxiliary loss ")
results_ws_al = benchmark_model(Auxiliary_Loss_Weight_Sharing_Net, train_model_auxiliary_loss, compute_nb_errors_auxilary_loss, model_requires_target_and_classes=True)


In [None]:
class Auxiliary_Loss_Net_Dropout(nn.Module):
    def __init__(self):
        super().__init__()
        
        # Layers that handle digit classification 
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3)
        self.fc1 = nn.Linear(256, 200)
        self.fc2 = nn.Linear(200, 10)
        self.dropout_1 = nn.Dropout(p=0.1)
        self.dropout_2 = nn.Dropout(p=0.1)
        self.dropout_3 = nn.Dropout(p=0.1)
        # Layers that handle comparisson 
        self.fc3 = nn.Linear(20, 300)
        self.fc4 = nn.Linear(300, 300)
        self.fc5 = nn.Linear(300, 2)
        
    def cnn(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), kernel_size=2))
        x = self.dropout_1(x)
        x = F.relu(F.max_pool2d(self.conv2(x), kernel_size=2))
        x = self.dropout_2(x)
        x = F.relu(self.fc1(x.view(-1, 256)))
        x = self.dropout_3(x)
        x = self.fc2(x)
        return x
    
    def mlp(self, x):
        x = F.relu(self.fc3(x))
        x = F.relu(self.fc4(x))
        x = self.fc5(x)
        return x
    
    def forward(self, x):
        s = x.shape
        
        input_1 = x[:,0,:,:].reshape([s[0],1,s[2],s[3]])
        input_2 = x[:,1,:,:].reshape([s[0],1,s[2],s[3]])
        
        output_1 = self.cnn(input_1)
        output_2 = self.cnn(input_2)
        
        concatenated = torch.cat((output_1, output_2), 1)
        
        comparison = self.mlp(concatenated)
        return output_1, output_2, comparison  
    

In [None]:
results_dropout = benchmark_model(Auxiliary_Loss_Net_Dropout, train_model_auxiliary_loss, compute_nb_errors_auxilary_loss, model_requires_target_and_classes=True, nb_epochs=50)


In [None]:
import matplotlib.pyplot as plt 
def mp(ls):
    return list(map(lambda x : x / 10,ls))
def plot_results(base, ws, ws_al, dropout):
    plt.plot(mp(base), label='base')
    plt.plot(mp(ws), label='ws')
    plt.plot(mp(ws_al), label='ws + al')
    plt.plot(mp(dropout), label='Dropout')
    plt.xlabel("Trial Number")
    plt.ylabel("Test Error Rate (%)")
    plt.legend()
    plt.savefig("benchmark_results.png")
    plt.show()
plot_results(results_base, results_ws, results_ws_al, results_dropout)