In [2]:
import time
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import os
import random

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset,DataLoader

import torchvision
import torchvision.transforms as transforms

from monotonenorm import GroupSort, direct_norm,project_norm, SigmaNet
from tqdm.notebook import tqdm

In [37]:
def time_execution(start,end):
    timespan=end-start
    minutes=timespan//60
    secondes=timespan%60
    heures=minutes//60
    minutes=minutes%60
    print(f"{int(heures)}h {int(minutes)} min {secondes} s")

In [2]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

Using cpu device


## Functions/classes

In [3]:
def splitting(data,label,proportion):
    return data[:int(proportion*data.size(0))],label[:int(proportion*data.size(0))],data[int(proportion*data.size(0)):],label[int(proportion*data.size(0)):]

In [4]:
def error_achieved(labels,outputs,error,relative):
    error_goal_achieved=0
    abs_diff= torch.abs(labels-outputs)
    if relative==True:
        for i in range(len(abs_diff)):
            if abs_diff[i]>=labels[i]*error:
                error_goal_achieved+=1
    else:
        for i in range(len(abs_diff)):
            if abs_diff[i]>=error:
                error_goal_achieved+=1
        
    return error_goal_achieved

In [5]:
class myData(Dataset):
    def __init__(self,x,y):
        self.x=x
        self.y=y
        self.shape=x.size(0)
        
    def __getitem__(self,index):
        return self.x[index],self.y[index]
    
    def __len__(self):
        return self.shape

In [6]:
dim_input=1
dim_hidden_layer=8
dim_output=1

class RELUNet(nn.Module):
    def __init__(self):
        super(RELUNet, self).__init__()
        self.linear1 = torch.nn.Linear(dim_input,dim_hidden_layer)
        self.activation = torch.nn.ReLU()
        self.linear2 = torch.nn.Linear(dim_hidden_layer, dim_hidden_layer)
        self.linear3 = torch.nn.Linear(dim_hidden_layer, dim_output)

    def forward(self, x):
        x = self.linear1(x)
        x = self.activation(x)
        x = self.linear2(x)
        x = self.activation(x)
        x = self.linear2(x)
        x = self.activation(x)
        x = self.linear2(x)
        x = self.activation(x)
        x = self.linear3(x)
        return x
    

In [7]:
class GroupNet(nn.Module):
    def __init__(self,dim_input,dim_hidden_layer,dim_output,norm,nb_hidden_layer,grouping_size):
        
        super(GroupNet, self).__init__()
        self.linear1=direct_norm(torch.nn.Linear(dim_input,dim_hidden_layer ),kind="two-inf")
        self.group=GroupSort(grouping_size)
        self.linear2=direct_norm(torch.nn.Linear(dim_hidden_layer,dim_hidden_layer ),kind=norm)
        self.linear22=direct_norm(torch.nn.Linear(dim_hidden_layer,dim_hidden_layer ),kind=norm)
        self.linear21=direct_norm(torch.nn.Linear(dim_hidden_layer,dim_hidden_layer ),kind=norm)
        self.linear23=direct_norm(torch.nn.Linear(dim_hidden_layer,dim_hidden_layer ),kind=norm)
        self.linear24=direct_norm(torch.nn.Linear(dim_hidden_layer,dim_hidden_layer ),kind=norm)
        self.linear25=direct_norm(torch.nn.Linear(dim_hidden_layer,dim_hidden_layer ),kind=norm)
        self.linear26=direct_norm(torch.nn.Linear(dim_hidden_layer,dim_hidden_layer ),kind=norm)
        self.linear27=direct_norm(torch.nn.Linear(dim_hidden_layer,dim_hidden_layer ),kind=norm)
        self.linear28=direct_norm(torch.nn.Linear(dim_hidden_layer,dim_hidden_layer ),kind=norm)
        self.linear29=direct_norm(torch.nn.Linear(dim_hidden_layer,dim_hidden_layer ),kind=norm)
        self.linear210=direct_norm(torch.nn.Linear(dim_hidden_layer,dim_hidden_layer ),kind=norm)
        self.linear211=direct_norm(torch.nn.Linear(dim_hidden_layer,dim_hidden_layer ),kind=norm)
        self.linear212=direct_norm(torch.nn.Linear(dim_hidden_layer,dim_hidden_layer ),kind=norm)
        self.linear213=direct_norm(torch.nn.Linear(dim_hidden_layer,dim_hidden_layer ),kind=norm)
        self.linear214=direct_norm(torch.nn.Linear(dim_hidden_layer,dim_hidden_layer ),kind=norm)
        self.linear215=direct_norm(torch.nn.Linear(dim_hidden_layer,dim_hidden_layer ),kind=norm)
        self.linear216=direct_norm(torch.nn.Linear(dim_hidden_layer,dim_hidden_layer ),kind=norm)
        self.linear217=direct_norm(torch.nn.Linear(dim_hidden_layer,dim_hidden_layer ),kind=norm)
        self.linear218=direct_norm(torch.nn.Linear(dim_hidden_layer,dim_hidden_layer ),kind=norm)
        self.linear3=direct_norm(torch.nn.Linear(dim_hidden_layer,dim_output ),kind=norm)
        
        self.nb_hidden_layer=nb_hidden_layer
        

    def forward(self, x):# l'ancienne version est n'imp a priori
        x=self.linear1(x)
        x=self.group(x)
        
        if(self.nb_hidden_layer>=2):
            x=self.linear2(x)
            x=self.group(x)
            
        if(self.nb_hidden_layer>=4):
            x=self.linear21(x)
            x=self.group(x)
            x=self.linear22(x)
            x=self.group(x)
        
        if(self.nb_hidden_layer>=10):
            x=self.linear23(x)
            x=self.group(x)
            x=self.linear24(x)
            x=self.group(x)
            x=self.linear25(x)
            x=self.group(x)
            x=self.linear26(x)
            x=self.group(x)
            x=self.linear27(x)
            x=self.group(x)
            x=self.linear28(x)
            x=self.group(x)
        if(self.nb_hidden_layer==20):
            x=self.linear29(x)
            x=self.group(x)
            x=self.linear210(x)
            x=self.group(x)
            x=self.linear211(x)
            x=self.group(x)
            x=self.linear212(x)
            x=self.group(x)
            x=self.linear213(x)
            x=self.group(x)
            x=self.linear214(x)
            x=self.group(x)
            x=self.linear215(x)
            x=self.group(x)
            x=self.linear216(x)
            x=self.group(x)
            x=self.linear217(x)
            x=self.group(x)
            x=self.linear218(x)
            x=self.group(x)
        
        
        x=self.linear3(x)
        return x
    


## Data generation

In [8]:
data_generated=1250#attention
proportion=0.8#attention
batch_size=80#int(data_generated*proportion)
#attention





#Generate data
x= torch.tensor([[random.random()] for i in range(data_generated)])
y_carré=(0.5*x**2)

x_log=x+1
y_log=torch.log(x_log)

x_3dim= torch.tensor([[random.random(),random.random(),random.random()] for i in range(data_generated)])
y_3dim= torch.sum((1/(2*np.sqrt(3)))*x_3dim**2,1).view(data_generated,1)

x_hdim= torch.tensor(np.random.rand(data_generated,10)).float()
y_hdim= torch.sum((1/(2*np.sqrt(10)))*x_hdim**2,1).view(data_generated,1).float()

x_b=torch.tensor([random.random() for i in range(data_generated)])

x_carré_b=x_b.view(x.size()[0],1)
y_carré_b=(0.5*x_b**2 + torch.tensor([np.random.normal(loc=0,scale=0.025) for i in range(data_generated)])).view(x.size()[0],1)

x_log_b=(x_b+1).view(x.size()[0],1)
y_log_b=(torch.log(x_b+1)+ torch.tensor([np.random.normal(loc=0,scale=0.025) for i in range(data_generated)])).view(x.size()[0],1)

x_3dim_b2= torch.tensor([[random.random(),random.random(),random.random()] for i in range(data_generated)])
y_3dim_b2=(torch.sum((1/(2*np.sqrt(3)))*x_3dim_b2**2,1)+ torch.tensor([np.random.normal(loc=0,scale=0.025) for i in range(data_generated)])).view(data_generated,1) 

x_hdim_b= torch.tensor(np.random.rand(data_generated,10)).float()
y_hdim_b= (torch.sum((1/(2*np.sqrt(10)))*x_hdim_b**2,1)+ torch.tensor([np.random.normal(loc=0,scale=0.025) for i in range(data_generated)])).view(data_generated,1).float()

#plot
#plt.figure(figsize=[12,8])
#plt.scatter(x_carré_b,y_carré_b)




X_train,y_train,X_test,y_test=splitting(x_hdim,y_hdim,proportion)


#Define batch length
trainset=myData(X_train,y_train)
testset=myData(X_test,y_test)

trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=True)


## Training

In [9]:
#params neural network
dim_input=10
dim_output=1
norm="inf"
grouping_size=2 #ncore de la merde ca cest le nombre de groupes

#params training
nb_tests=10
error_goal=0.0158
margin_error=0
relative=False
epochs=1000
lr=0.001
set_nb_hidden_layer=[1,2,4,10,20]
set_dim_hidden_layer=[4,8,16,32,64,128]

#[1,2,4,10,20]
#[4,8,16,32,64]
#[40,80,160,240,400]
#[70,140,280,420,700]
results={}
nb_batch=int(data_generated*proportion/batch_size)

for nb_hidden_layer in set_nb_hidden_layer:
    
    for dim_hidden_layer in set_dim_hidden_layer:
        
        epochs_moyenne=[]
        non_convergent=0
        for w in range(nb_tests):

            # Create model
            net=GroupNet(dim_input,dim_hidden_layer,dim_output,norm,nb_hidden_layer,int(dim_hidden_layer/grouping_size))
            #print(dim_hidden_layer/grouping_size)
            #net=RELUNet()
            #net = SigmaNet(GroupNet(), sigma=1.)
            criterion =nn.MSELoss()
            optimizer = optim.Adam(net.parameters(), lr=lr)
            error_goal_achieved=False

            for epoch in range(epochs):  
                nb_error=0
                running_loss = 0.0
                
                for i, data in enumerate(trainloader, 0):

                    inputs, labels = data
                    val_inputs,val_labels,inputs,labels=inputs[:16],labels[:16],inputs[16:],labels[16:]
                    optimizer.zero_grad()
                    outputs = net(inputs)
                    val_outputs = net(val_inputs)
                    nb_error+=error_achieved(val_labels,val_outputs,error_goal,relative)
                    loss = criterion(outputs, labels)

                    loss.backward()
                    optimizer.step()

                    running_loss += loss.item()
                    
                    if i %nb_batch  ==nb_batch-1:    
                        #print(f"[{epoch+1}, {i + 1}], loss: {running_loss / nb_batch}")
                        running_loss = 0.0
                #print(nb_error)
                if nb_error<=208*margin_error:
                    print(str(w)+" error goal was achieved")
                    error_goal_achieved=True
                    epochs_moyenne.append(epoch)
                    break


            if error_goal_achieved==False:
                print(str(w)+" No more epochs, error goal was not achieved")
                epochs_moyenne.append(epochs)
                non_convergent+=1
               
                if non_convergent>=3:
                    print("this configuration does not seems to converge")
                    break
            


        print(f"\nepochs moyenne sur {w+1} tests: {sum(epochs_moyenne)/len(epochs_moyenne)}")
        print(epochs_moyenne)
        print("\n\n")
        results[f"{nb_hidden_layer}\\{dim_hidden_layer}"]=(sum(epochs_moyenne)/len(epochs_moyenne),epochs_moyenne)

KeyboardInterrupt: 

## Evaluating

In [None]:
correct = 0
total = 0
mae=0
maxx=0
i=0
label_tot=np.array([])
outputs_tot=np.array([])
inputs_tot=np.array([])
with torch.no_grad():
    for data in testloader:
        inputs, labels = data
        outputs = net(inputs)
        total += labels.size(0)
        abs_diff= torch.abs(outputs-labels)
        maxx=max(abs_diff.max(),maxx)
        mae+= abs_diff.sum()
        label_tot=np.concatenate((label_tot,np.array(labels.squeeze())))
        outputs_tot=np.concatenate((outputs_tot,np.array(outputs.squeeze())))
        if i>0:
            inputs_tot=np.concatenate((inputs_tot,np.array(inputs.squeeze())))
        else:inputs_tot=np.array(inputs.squeeze())
        i+=1
        

print("MAE: "+ str(mae/total))
print("Max: "+ str(maxx))

In [None]:
plt.figure(figsize=(12,8))
plt.scatter(inputs_tot,outputs_tot,label="predicted")
plt.scatter(inputs_tot,label_tot,label="true values")
plt.legend()