# FeedForward Network (ángulos e incertidumbre)

Vamo a hacer una red feedforward muy sencilla con dos neuronas en la salida que se corresponderán con el ángulo y la incertidumbre asociada a la predicción hecha por la red. Este modelo se corresponde con class FNN() del archivo Models.py.

In [None]:
#Importamos todas las librerías que vamos a utilizar
import torch
import torch.nn as nn
from PIL import Image
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import torchvision
import os
import matplotlib.pyplot as plt
import numpy as np
import time
from tqdm import tqdm
import torch.nn.functional as F
import copy
import matplotlib.pyplot as plt 
from torch.distributions import Normal 
import seaborn as sns
import Models as models
import Functions

In [None]:
# hyper parameters
noisy = True # False
num_pixels = 56 #28
input_size = num_pixels**2 #mismo num_pixels para H y W
batch_size = 32
learning_rate = 0.001
num_im = 70000
x_tensor = torch.zeros(num_im,num_pixels,num_pixels)
num = 0
nummax_epochs = 100
patience = 10 # 10 # 5
periodicity = 2*np.pi
norm = True # False

In [None]:
#We call the model we'll use
model = models.FFN()

In [None]:
# device config (para que el codigo se ejecute en la GPU si tenemos cuda instalado)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

In [None]:
#Put the directory where the images are
path='/home/benjapases/Desktop/TFM_Benja/Salva3/New_Images_56noise01'
os.chdir(path)

for i in range(num_im):
    name = path + '/' + 'Imagen'+ str(i) + '.png'
    image = Image.open(name)
    
    #Define a transform to convert the image to tensor 
    transform = transforms.Compose([transforms.ToTensor()])
    
    #Convert the image to Pytorch tensor
    x_tensor[i] = transform(image)[0,:,:]

if norm:
    x_tensor = Functions.normalization(x_tensor)

In [None]:
#Download our labels(angles.txt) and convert to tensor
true_angles = np.loadtxt(path + '/' + 'angles.txt')
noisy_angles = np.loadtxt(path + '/' + 'angles_noisy.txt')

if noisy:
  labels = np.cos(np.pi/periodicity*noisy_angles)
else:
  labels = np.cos(np.pi/periodicity*true_angles)
y_tensor = torch.as_tensor(labels)

In [None]:
#Creating a custom dataset
class CustomDataset(Dataset):
    def __init__(self, x_tensor, y_tensor):
        self.x = x_tensor
        self.y = y_tensor
        
    def __getitem__(self, index):
        return (self.x[index], self.y[index])
    
    def __len__(self):
        return len(self.x)
    
#We create our dataset
dataset = CustomDataset(x_tensor, y_tensor)

#Divide our data into training, validation, test (60% for training, 20% validation, 20% test)
train_len = round(0.6*num_im)
valid_len = round(0.2*num_im)
test_len = num_im - (train_len + valid_len)
train_data, valid_data, test_data = torch.utils.data.random_split(dataset, [train_len,valid_len,test_len])

trainloader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, shuffle=True)
validloader = torch.utils.data.DataLoader(valid_data, batch_size=batch_size)
testloader = torch.utils.data.DataLoader(test_data, batch_size=batch_size)

train_tensor = torch.zeros(train_len,num_pixels,num_pixels)
valid_tensor = torch.zeros(valid_len,num_pixels,num_pixels)
test_tensor = torch.zeros(test_len,num_pixels,num_pixels)

l = 0
n = 0
k = 0
labels_train = []
labels_valid = []
labels_test = []

for i in train_data.indices:
    train_tensor[l] = x_tensor[i]
    labels_train.append(y_tensor[i])
    l+=1
    
for i in valid_data.indices:
    valid_tensor[n] = x_tensor[i]
    labels_valid.append(y_tensor[i])
    n+=1

for i in test_data.indices:
    test_tensor[k] = x_tensor[i]
    labels_test.append(y_tensor[i])
    k+=1

labels_train = torch.tensor(labels_train)
labels_valid = torch.tensor(labels_valid)
labels_test = torch.tensor(labels_test)

train_dataset_norm = CustomDataset(train_tensor,labels_train)
valid_dataset_norm = CustomDataset(valid_tensor,labels_valid)
test_dataset_norm = CustomDataset(test_tensor,labels_test)

trainloader_norm = torch.utils.data.DataLoader(train_dataset_norm, batch_size=batch_size, shuffle=True)
validloader_norm = torch.utils.data.DataLoader(valid_dataset_norm, batch_size=batch_size)
testloader_norm = torch.utils.data.DataLoader(test_dataset_norm, batch_size=batch_size)

In [None]:
#We define our train model
activ_training = []
activ_validation = []
activ_trainvalid = []
epoca = []
training_loss = []
validation_loss = []

def train(model, trainloader, validloader, optimizer, patience):
    """Trains a model using validation and early stopping.
    Args:
        model (torch.nn.modules.module.Module): Feedforward neural network.
        trainloader (torch.utils.data.dataloader.DataLoader): Training dataset split in batches.
        validloader (torch.utils.data.dataloader.DataLoader): Validation dataset split in batches.
        criterion (torch.nn.modules.loss): Loss function used in the output layer.
        optimizer (torch.optim): Optimizer to update parameters.
        patience (int): Early stopping criteria. Number of epochs without improvement.
    """
    
    time.sleep(0.2)  # Prevent tqdm bar to print twice
    
    epoch = 1
    best_loss_valid = np.inf
    best_model = None
    current_patience = patience
    

    while True:
        # Train
        bar_train = tqdm(enumerate(trainloader, 1), total=len(trainloader),
                         desc=f'Epoch {epoch:>2} (Train)')  # Progressbar to show current epoch, loss and accuracy on train
        total_loss_train = 0
        total_inputs_train = 0
        model.train()
        
        for batch, (inputs, labels) in bar_train:
            
            # Reshape inputs (images to vector)
            inputs = inputs.view(inputs.shape[0],-1)
            model.layers[-1].register_forward_hook(get_activation(''))
            
            # Initialize gradient
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(inputs)
            
            # Backward pass and optimize
            loss = Functions.loss_gll(labels, outputs)
            loss.mean().backward()
            optimizer.step()
            
            # Show mean loss and accuracy in progressbar
            total_loss_train += loss.sum().item()
            total_inputs_train += len(labels)
            loss_train = total_loss_train/total_inputs_train
            bar_train.set_postfix_str(f'loss_train={loss_train:.4g}')
            
            activ_training.append((epoch,activation['']))
    
        # Sanity check (all training images were used)
        assert(total_inputs_train == len(trainloader.sampler))
        
        # Validation
        bar_valid = tqdm(enumerate(validloader, 1), total=len(validloader),
                         desc=f'Epoch {epoch:>2} (Valid)')#the number 1 in enumerate means that I want number 1 to start enumerating my sampler validloader
        total_loss_valid = 0
        total_inputs_valid = 0
        model.eval()  # Test mode
        with torch.no_grad():  # Deactivates autograd to reduce memory usage
        
            for batch, (inputs, labels) in bar_valid:
              
                # Reshape inputs (images to vector)
                inputs = inputs.view(inputs.shape[0],-1)
                model.layers[-1].register_forward_hook(get_activation(''))
                # Forward pass
                outputs = model(inputs)

                # Compute loss (no backprop)
                loss = Functions.loss_gll(labels, outputs)

                # Show mean loss and accuracy in progressbar
                total_loss_valid += loss.sum().item()
                total_inputs_valid += len(labels)
                loss_valid = total_loss_valid/total_inputs_valid
                bar_valid.set_postfix_str(f'loss_valid={loss_valid:.4g}')

                activ_validation.append((epoch,activation['']))
                
        # Sanity check (all validation images were used)
        assert(total_inputs_valid == len(validloader.sampler))
        
    #Retrieve mean loss at validation and compare it to the best (Early stopping)
        if loss_valid < best_loss_valid:
            best_loss_valid = loss_valid
            best_model = copy.deepcopy(model.state_dict())
            current_patience = patience
        else:
            current_patience -= 1
            if current_patience <= 0:
                model.load_state_dict(best_model)
                break
       
        activ_trainvalid = activ_training + activ_validation
    #Graph the loss in training and validation
        plt.plot(epoch,loss_train,'b.')
        plt.plot(epoch,loss_valid,'r.')
        plt.title("Loss",fontsize = 16)
        plt.xlabel("epoch",fontsize = 13)
        plt.ylabel("loss",fontsize = 13)
        plt.legend(["Training","Validation"])
        
        epoca.append(epoch)
        training_loss.append(loss_train)
        validation_loss.append(loss_valid)
       
        epoch += 1
        
        if epoch == nummax_epochs:
            break
            
    lastepoch = epoch   
    plt.show()
    return activ_trainvalid, lastepoch

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
activation = {}
def get_activation(name):
    def hook(model, input, output):
        activation[name] = output.detach()
    return hook

In [None]:
#Execute our functions

print("Aquí mostramos los resultados del training")

activ_trainvalid, lastepoch = train(model, trainloader_norm, validloader_norm, optimizer, patience)

#We save our model
model_path = '/home/benjapases/Desktop/TFM_Benja/Salva3/model_trained_encoder.pth'
#state = {'state_dict':model.layers.state_dict(),'optimizer':optimizer.state_dict()}
torch.save(model.layers.state_dict(),model_path)
#torch.save(state,model_path)

In [None]:
#We define our test model
activ_test = []
def test(model, testloader):
    """Tests a model using testloader.
    Args:
    model (torch.nn.modules.module.Module): Feedforward neural network.
    testloader (torch.utils.data.dataloader.DataLoader): Test dataset split in batches.
    trainloader (torch.utils.data.dataloader.DataLoader): Training dataset split in batches.
    criterion (torch.nn.modules.loss): Loss function used in the output layer.
    """

    time.sleep(0.2)  # Prevent tqdm bar to print twice
    bar_test = tqdm(enumerate(testloader, 1), total=len(testloader),
        desc=f'{model.__class__.__name__:<10} (Test)')
    total_loss_test = 0
    total_inputs_test = 0

    model.eval()  # Test mode
    with torch.no_grad():  # Deactivates autograd to reduce memory usage

        for batch, (inputs, labels) in bar_test:
            # Reshape inputs (images to vector)
            inputs = inputs.view(inputs.shape[0], -1)
            model.layers[-1].register_forward_hook(get_activation(''))

            # Forward pass
            outputs = model(inputs)
            #print(torch.min(outputs[:,0]))
            #print(torch.max(outputs[:,0]))

            # Compute loss (no backprop)
            loss = Functions.loss_gll(labels, outputs)
            #loss = (labels-outputs[:,0])**2

            # Show mean loss and accuracy in progressbar
            total_loss_test += loss.sum().item()
            total_inputs_test += len(labels)
            loss_test = total_loss_test/total_inputs_test
            bar_test.set_postfix_str(f'loss_test={loss_test:.4g}')

            activ_test.append(activation[''])
        
    # Sanity check (all test images were used)
    assert(total_inputs_test == len(testloader.sampler))

In [None]:
print("Aquí mostramos los resultados del test")

test(model, testloader_norm)

Aplicamos las funciones de activación pertinentes a las salidas de la red y analizamos los resultados:

In [None]:
o1 = []
ox = []
m = nn.Tanh()

for i in range(len(activ_test)):
    o1.append(activ_test[i][:,0])

for j in range(len(o1)):
    ox = ox + o1[j].tolist()
    
ox = torch.tensor(ox)
ox_activ_test = torch.acos(m(ox))*periodicity/np.pi
print(min(ox_activ_test))
print(max(ox_activ_test))

#We save the data in txt files
output_angle = open("predicted_angleffn.txt","w+")
    
for i in range(len(ox_activ_test)):    
    output_angle.write(str(ox_activ_test[i])+'\n')

output_angle.close()

In [None]:
test_true_angles = []
test_noisy_angles = []

for i in test_data.indices:
    test_true_angles.append(true_angles[i])
    test_noisy_angles.append(noisy_angles[i])

test_true_angles = np.array(test_true_angles)
test_noisy_angles = np.array(test_noisy_angles)

#We save the data in txt files
true_anglestest = open("trueanglestest_ffn.txt","w+")
noisy_anglestest =  open("noisyanglestest_ffn.txt","w+")  
                         
for i in range(len(test_true_angles)):
    true_anglestest.write(str(test_true_angles[i]) + '\n')
    noisy_anglestest.write(str(test_noisy_angles[i]) + '\n')
                         
                         
true_anglestest.close()
noisy_anglestest.close()

Graficamos los resultados obtenidos:

In [None]:
plt.figure()
plt.plot(ox_activ_test,test_true_angles,'.')
plt.xlabel("Prediction", fontsize = 14)
plt.ylabel("Real angle", fontsize = 14)
plt.title('$\sigma_{noise} = 0.1$',fontsize = 14)
plt.grid()

plt.figure()
plt.plot(ox_activ_test,test_noisy_angles,'.')
plt.xlabel("Prediction", fontsize = 14)
plt.ylabel("Noisy label", fontsize = 14)
plt.title('$\sigma_{noise} = 0.1$',fontsize = 14)
plt.grid()

plt.figure()
plt.plot(test_noisy_angles,test_true_angles,'.')
plt.xlabel("Noisy label", fontsize = 14)
plt.ylabel("Real angle", fontsize = 14)
plt.title('$\sigma_{noise} = 0.1$',fontsize = 14)
plt.grid()

Calculamos el error asociado a cada salida con la función $\textit{circ_dist}$ en el archivo $\textit{Functions.py}$, que nos proporciona la distancia mínima entre el ángulo real y el predicho por el modelo. Finalmente, representamos la densidad de probabilidad de dicho error:

In [None]:
ox_activ = np.array(ox_activ_test)
error = Functions.circ_dist(torch.from_numpy(test_true_angles), torch.from_numpy(ox_activ), np.pi).numpy()
abs_error = abs(error)

plt.figure()
#p = sns.kdeplot(data=abs_error,cut=0,common_grid=True,common_norm=True,bw_adjust=10,linewidth=0)
p = sns.kdeplot(data=abs_error,cut=0,common_grid=True,common_norm=True,linewidth=0)
kdeline = p.lines
x, y = kdeline[0].get_xdata(), kdeline[0].get_ydata()
p.fill_between(x, 0, y, facecolor='blue',alpha=.2)
plt.xlabel(r'Error',fontsize = 14)

# Medians
median = np.median(abs_error)
#height = np.interp(median, x, y)
#plt.vlines(median, 0, 1,linestyles='dashdot', color='#5555ff')
plt.vlines(median, 0, 3.5,linestyles='dashdot', color='black', label='Median')
print('Median: ', median,'\n')

#Means
mean = np.mean(abs_error)
#height = np.interp(mean, x, y)
#plt.vlines(mean, 0, 1, color='blue')
#plt.vlines(median, 0, 1, color='black', label='Mean')
print('Mean: ', mean,'\n')

#Quartiles
q1, q3 = np.quantile(abs_error, 0.25), np.quantile(abs_error, 0.75)
#height1, height3 = np.interp(q1, x, y), np.interp(q3, x, y)
plt.vlines(q1, 0, 3.5, linestyles='dashed', color='#5555ff')
plt.vlines(q3, 0, 3.5, linestyles='dashed', color='#5555ff')
plt.vlines(median, 0, 0.0001,linestyles='dashed', color='black', label='Quartiles')
print('Quartiles: ', q1,q3,'\n')

#plt.xlim((0,3*q3))
plt.xlim((0,0.37))
plt.legend()

plt.grid()
plt.title('$\sigma_{noise} = 0.1$',fontsize = 14)

Ahora representamos el error frente a la incerteza obtenida con la red FFN(), es decir, la segunda salida de la red:

In [None]:
o2 = []
oy = []
m = nn.ELU()

for i in range(len(activ_test)):
    o2.append(activ_test[i][:,1])

for i in range(len(o2)):
    oy = oy + o2[i].tolist()
    
oy = torch.tensor(oy)
oy_activ = m(oy) + 1

#We save the data in txt files
output_incertidumbre = open("incertidumbreffn.txt","w+")
    
for i in range(len(oy_activ)):    
    output_incertidumbre.write(str(oy_activ[i])+'\n')

output_incertidumbre.close()

In [None]:
oy_activ = np.array(oy_activ)
plt.figure()
plt.plot(oy_activ, error,'.')
plt.ylabel(r'Error',fontsize = 14)
plt.xlabel('Uncertainty',fontsize = 14)
plt.title('$\sigma_{noise} = 0.1$',fontsize = 14)
plt.grid()