In [6]:
import torch
from torch import tensor, nn
from loading_datas import  generate_pair_sets
import torch.nn.functional as F



In [7]:
train_pairs, train_target, train_classes, test_pairs, test_target, test_classes = generate_pair_sets(1000)

In [8]:
class Net1(nn.Module):
  def __init__(self):
    super(Net1,self).__init__()

    self.conv11 = nn.Conv2d(1,16,3)
    self.conv12 = nn.Conv2d(16,32,3)

    self.conv21 = nn.Conv2d(1,16,3)
    self.conv22 = nn.Conv2d(16,32,3)

    self.pool = nn.MaxPool2d(kernel_size=(2,2),stride=2)

    self.fc1 = nn.Linear(64*4*4,64)
    self.fc2 = nn.Linear(64,32)
    self.fc3 = nn.Linear(32,2)

  def forward(self,x): 
    # spliting the channels
    c1 = torch.narrow(x,1,0,1)
    c2 = torch.narrow(x,1,1,1)

    # Channel 1
    c1 = F.relu(self.conv11(c1))
    c1 = self.pool(c1)
    c1 = F.relu(self.conv12(c1))
    

    # Channel 2
    c2 = F.relu(self.conv21(c2))
    c2 = self.pool(c2)
    c2 = F.relu(self.conv22(c2))
    


    output = torch.cat((c1,c2),1)
    output = output.view(-1,64*4*4)
   
    output = F.relu(self.fc1(output))
    output = F.relu(self.fc2(output))
    output = self.fc3(output)

    return output

class Net1_aux(nn.Module):
  def __init__(self):
    super().__init__()

    self.conv11 = nn.Conv2d(1,16,3)
    self.conv12 = nn.Conv2d(16,32,3)

    self.conv21 = nn.Conv2d(1,16,3)
    self.conv22 = nn.Conv2d(16,32,3)

    self.pool = nn.MaxPool2d(kernel_size=(2,2),stride=2)

    self.fc1 = nn.Linear(64*4*4,64)
    self.fc2 = nn.Linear(64,32)
    self.fc3 = nn.Linear(32,2)

  def aux(self, img1, img2):
    img = output = torch.cat((img1, img2), 1)
    nb_el = img.size()[1] * img.size()[2] * img.size()[3]
    # print(nb_el)
    output = img.view(-1, nb_el)
    # print(output.size())
    fc1_aux = nn.Linear(nb_el, 64)
    output = F.relu(fc1_aux(output))
    output = F.relu(self.fc2(output))
    output = self.fc3(output)
    return output

  def forward(self,x): 
    # spliting the channels
    c1 = torch.narrow(x,1,0,1)
    c2 = torch.narrow(x,1,1,1)

    # Channel 1
    c1 = F.relu(self.conv11(c1))
    c2 = F.relu(self.conv21(c2))
    output_aux1 = self.aux(c1, c2)

    c1 = self.pool(c1)
    c2 = self.pool(c2)
    output_aux2 = self.aux(c1, c2)

    

    c1 = F.relu(self.conv12(c1))
    c2 = F.relu(self.conv22(c2))
    


    output = torch.cat((c1,c2),1)
    output = output.view(-1,64*4*4)
   
    output = F.relu(self.fc1(output))
    output = F.relu(self.fc2(output))
    output = self.fc3(output)

    return output, output_aux1, output_aux2

class Net2(nn.Module):
  def __init__(self):
    super(Net2,self).__init__()
    
    self.conv1 = nn.Conv2d(1,16,3)
    self.conv2 = nn.Conv2d(16,32,3)
    self.pool = nn.MaxPool2d(kernel_size=(2,2), stride=2)

    self.fc1 = nn.Linear(64*4*4,64)
    self.fc2 = nn.Linear(64,32)
    self.fc3 = nn.Linear(32,2)

  def forward(self,x): 
    # spliting the channels
    c1 = torch.narrow(x,1,0,1)
    c2 = torch.narrow(x,1,1,1)

    # Channel 1
    c1 = F.relu(self.conv1(c1))
    c1 = self.pool(c1)
    c1 = F.relu(self.conv2(c1))

    # Channel 2
    c2 = F.relu(self.conv1(c2))
    c2 = self.pool(c2)
    c2 = F.relu(self.conv2(c2))


    output = torch.cat((c1,c2),1)
    output = output.view(-1,64*4*4)
   
    output = F.relu(self.fc1(output))
    output = F.relu(self.fc2(output))
    output = self.fc3(output)

    return output

class Net2_aux(nn.Module):
  def __init__(self):
    super().__init__()
    
    self.conv1 = nn.Conv2d(1,16,3)
    self.conv2 = nn.Conv2d(16,32,3)
    self.pool = nn.MaxPool2d(kernel_size=(2,2), stride=2)

    self.fc1 = nn.Linear(64*4*4,64)
    self.fc2 = nn.Linear(64,32)
    self.fc3 = nn.Linear(32,2)


  def aux(self, img1, img2):
    img = output = torch.cat((img1, img2), 1)
    nb_el = img.size()[1] * img.size()[2] * img.size()[3]
    # print(nb_el)
    output = img.view(-1, nb_el)
    # print(output.size())
    fc1_aux = nn.Linear(nb_el, 64)
    output = F.relu(fc1_aux(output))
    output = F.relu(self.fc2(output))
    output = self.fc3(output)
    return output
    

  def forward(self,x): 
    # spliting the channels
    c1 = torch.narrow(x,1,0,1)
    c2 = torch.narrow(x,1,1,1)

    # Channel 1
    c1 = F.relu(self.conv1(c1))
    c2 = F.relu(self.conv1(c2))

    output_aux1 = self.aux(c1, c2)

    c1 = self.pool(c1)
    c2 = self.pool(c2)
    output_aux2 = self.aux(c1, c2)

    c1 = F.relu(self.conv2(c1))
    c2 = F.relu(self.conv2(c2))

    # Channel 2
    


    output = torch.cat((c1,c2),1)
    output = output.view(-1,64*4*4)
   
    output = F.relu(self.fc1(output))
    output = F.relu(self.fc2(output))
    output = self.fc3(output)

    return output, output_aux1, output_aux2


In [9]:
def compute_nb_errors(model, data_input, data_target,batch_size, aux):

    nb_data_errors = 0

    for inputs, targets in zip(data_input.split(batch_size), data_target.split(batch_size)):
        if aux:
            output = model(inputs)[0]
        else:
            output = model(inputs)
        for k in range(len(targets)):
            
            if torch.argmax(output[k]) != torch.argmax(targets[k]):
                nb_data_errors = nb_data_errors + 1
                

    return nb_data_errors

In [None]:
lr, nb_epochs, batch_size = 1e-2, 50, 100


In [10]:
model = Net1()
# model = Net2()
optimizer = torch.optim.SGD(model.parameters(), lr = lr)
criterion = nn.CrossEntropyLoss()
for e in range(nb_epochs):
    for input, targets in zip(train_pairs.split(batch_size), train_target.split(batch_size)):
        # output, output_aux1, output_aux2 = model(input)
        output = model(input)
        loss = criterion(output, targets)
        # loss_aux1 = criterion(output_aux1, targets)
        # loss_aux2 = criterion(output_aux2, targets)

        # print(loss)
        # print(loss)
        # loss_total = loss + loss_aux1 + loss_aux2
        optimizer.zero_grad()
        loss.backward()

    # nb_error = compute_nb_errors(model, train_pairs, train_target, batch_size, aux=False)
    # print(nb_error)

    train_errors = 100 * (1 - compute_nb_errors(model, train_pairs, train_target,batch_size, False)/train_pairs.size(0))
    test_errors = 100 * (1 - compute_nb_errors(model, test_pairs, test_target,batch_size, False)/test_pairs.size(0))
    print(f"Epoch # {e+1} / Train accuracy (%): {train_errors:.2f} / Test accuracy (%): {test_errors:.2f}")

        
    optimizer.step()

Epoch # 1 / Train accuracy (%): 51.40 / Test accuracy (%): 49.90
Epoch # 2 / Train accuracy (%): 48.10 / Test accuracy (%): 49.60
Epoch # 3 / Train accuracy (%): 51.80 / Test accuracy (%): 50.40
Epoch # 4 / Train accuracy (%): 49.40 / Test accuracy (%): 48.00
Epoch # 5 / Train accuracy (%): 49.90 / Test accuracy (%): 48.00
Epoch # 6 / Train accuracy (%): 51.00 / Test accuracy (%): 48.10
Epoch # 7 / Train accuracy (%): 53.20 / Test accuracy (%): 50.00
Epoch # 8 / Train accuracy (%): 54.50 / Test accuracy (%): 50.70
Epoch # 9 / Train accuracy (%): 56.10 / Test accuracy (%): 51.80
Epoch # 10 / Train accuracy (%): 56.60 / Test accuracy (%): 52.30
Epoch # 11 / Train accuracy (%): 57.90 / Test accuracy (%): 54.50
Epoch # 12 / Train accuracy (%): 59.30 / Test accuracy (%): 56.10
Epoch # 13 / Train accuracy (%): 60.90 / Test accuracy (%): 57.20
Epoch # 14 / Train accuracy (%): 63.00 / Test accuracy (%): 58.60
Epoch # 15 / Train accuracy (%): 64.40 / Test accuracy (%): 59.20
Epoch # 16 / Train 

KeyboardInterrupt: 

In [None]:
model = Net1_aux()
# model = Net2()
optimizer = torch.optim.SGD(model.parameters(), lr = lr)
criterion = nn.CrossEntropyLoss()
for e in range(nb_epochs):
    for input, targets in zip(train_pairs.split(batch_size), train_target.split(batch_size)):
        output, output_aux1, output_aux2 = model(input)
        # output = model(input)
        loss = criterion(output, targets)
        loss_aux1 = criterion(output_aux1, targets)
        loss_aux2 = criterion(output_aux2, targets)

        # print(loss)
        # print(loss)
        loss_total = loss + loss_aux1 + loss_aux2
        optimizer.zero_grad()
        loss.backward()

    nb_error = compute_nb_errors(model, train_pairs, train_target, batch_size, aux=True)
    # print(nb_error)

    train_errors = 100 * (1 - compute_nb_errors(model, train_pairs, train_target,batch_size, True)/train_pairs.size(0))
    test_errors = 100 * (1 - compute_nb_errors(model, test_pairs, test_target,batch_size, True)/test_pairs.size(0))
    print(f"Epoch # {e+1} / Train accuracy (%): {train_errors:.2f} / Test accuracy (%): {test_errors:.2f}")

        
    optimizer.step()
