In [3]:
import torch

# Opérations sur les tenseurs

In [4]:
#Création d'un tenseur
tenseur = torch.zeros((10,5,100)) # [10,5,100]
tenseur = torch.ones((10,5,100)) # [10,5,100]
tenseur = torch.rand((10,5,100)) # [10,5,100]

tenseur_zeros = torch.zeros_like(tenseur) # [10,5,100]
tenseur_copy = torch.clone(tenseur)

print(f'Tenseur de dimensions {tenseur.shape} et de type {tenseur.dtype}')

# Opération élémentaires
tenseur2 = torch.rand((10,5,100))  # [10,5,100]
tenseur_au_carre = tenseur**2  # [10,5,100]
tenseur_exp = torch.exp(tenseur)  # [10,5,100]
tenseur_somme = tenseur + tenseur2  # [10,5,100]
tenseur_produit = tenseur * tenseur2  # [10,5,100]

# Concaténation
tenseur_concatene_axe0 = torch.cat([tenseur, tenseur2],axis=0)  # [20,5,100]
tenseur_concatene_axe1 = torch.cat([tenseur, tenseur2],axis=1)  # [10,10,100]
tenseur_concatene_axe2 = torch.cat([tenseur, tenseur2],axis=2)  # [10,5,200]

tenseur_tile = torch.tile(tenseur,dims=(2,3,4))  # [20,15,400]

# Opérations de réduction
tenseur_reduit_axe0 = torch.mean(tenseur,axis=0)  # [5,100]
tenseur_reduit_axe1 = torch.mean(tenseur,axis=1)  # [10,100]
tenseur_reduit_axe2 = torch.mean(tenseur,axis=2)  # [10,5]

tenseur_reduit_axe0_kd = torch.mean(tenseur,axis=0,keepdim=True)  # [1,5,100]

# Manipulation sur les dimensions 

tenseur_21 = torch.transpose(tenseur,1,2) # [10,100,5]
tenseur_flat = torch.flatten(tenseur) # [5000,]
tenseur_newdim_0 = torch.unsqueeze(tenseur,dim=0) # [1,10,5,100]
tenseur_newdim_1 = torch.unsqueeze(tenseur,dim=1) # [10,1,5,100]

# Slicing 
tenseur_partie = tenseur[0,:,:] # [5,100]
tenseur_partie = tenseur[0,...] # [5,100]
tenseur_partie = tenseur[0:5,:,:] # [5,5,100]
tenseur_partie = tenseur[0,:,0::2]# [10,5,50]


Tenseur de dimensions torch.Size([10, 5, 100]) et de type torch.float32


# Optimisation d'un graphe de calculs
Exemple : régression logistique

In [5]:
a_true = 2.0
b_true = -1.0

x = torch.rand((10,)) # simulation des données
y = a_true * x + b_true

a = torch.zeros(1,requires_grad=True) # les objets que l'on va faire converger vers les valeurs recherchées
b = torch.zeros(1,requires_grad=True) # requires_grad => un champs gradient est attaché à l'objet crée

for i in range(1000):
    
    y_est = a*x + b # je connais x, j'estime y / paramètres estimés courants 
                    # cette ligne crée un graphe de calcul entre x et y_est mettant en jeu a et b 

    loss = torch.mean((y_est-y)**2) # je calcule l'erreur entre l'estimation et les valeurs observées
    
    loss.backward() # je différencie la fonction de perte
                    # cela entraine la différentiation automatique de tout le graphe de calcul
                    # le gradient est mis à jour dans toutes les variables du graphe / requires_grad = True
    
    print(f'Itération {i}:')
    print(f"a: {a.item():.4f}, b: {b.item():.4F}, loss: {loss.item():.4f},  grad a: {a.grad.item():.4f}, grad b: {b.grad.item():.4f}")
   
    with torch.no_grad(): #je vais effectuer des opérations sur des objets attachés au graphe de calcul 
                          # mais je ne veux pas que ces opérations entrent dans l'optimisation des paramètres
        
        a -=  0.1*a.grad # descente de gradient de pas 0.1
        b -=  0.1*b.grad
        
        a.grad.zero_() # je remets à 0 tous les champs gradient des objets
        b.grad.zero_()
    



Itération 0:
a: 0.0000, b: 0.0000, loss: 0.4857,  grad a: -0.3780, grad b: 0.2153
Itération 1:
a: 0.0378, b: -0.0215, loss: 0.4669,  grad a: -0.3732, grad b: 0.2060
Itération 2:
a: 0.0751, b: -0.0421, loss: 0.4490,  grad a: -0.3679, grad b: 0.1981
Itération 3:
a: 0.1119, b: -0.0619, loss: 0.4317,  grad a: -0.3622, grad b: 0.1913
Itération 4:
a: 0.1481, b: -0.0811, loss: 0.4151,  grad a: -0.3563, grad b: 0.1854
Itération 5:
a: 0.1838, b: -0.0996, loss: 0.3991,  grad a: -0.3502, grad b: 0.1801
Itération 6:
a: 0.2188, b: -0.1176, loss: 0.3837,  grad a: -0.3440, grad b: 0.1753
Itération 7:
a: 0.2532, b: -0.1352, loss: 0.3690,  grad a: -0.3378, grad b: 0.1710
Itération 8:
a: 0.2870, b: -0.1522, loss: 0.3548,  grad a: -0.3316, grad b: 0.1669
Itération 9:
a: 0.3201, b: -0.1689, loss: 0.3411,  grad a: -0.3254, grad b: 0.1631
Itération 10:
a: 0.3527, b: -0.1852, loss: 0.3280,  grad a: -0.3193, grad b: 0.1595
Itération 11:
a: 0.3846, b: -0.2012, loss: 0.3154,  grad a: -0.3133, grad b: 0.1561
Ité

# Définition d'un réseau de neurones 

## Complètement à la main

In [6]:
class MonModeleQuiTorche(torch.nn.Module):
    def __init__(self,delta_chan=4,verbose=False):
        if verbose:
            self.print = print
        else:
            self.print = lambda x:None
        self.print('Initialisation classe mère \n')
        torch.nn.Module.__init__(self) 
        
        self.print('\n Initialisation classe courante \n')
        self.delta_chan=delta_chan
        self.learnable_param = torch.nn.Parameter(torch.rand([1,delta_chan,1]))
        self.not_learnable_param = torch.rand((1,delta_chan,1))

    def __setattr__(self,name,value):
        super().__setattr__(name,value)
        self.print(f'Enregistrement de: {name} à la valeur {value}')
        
    def forward(self,x): 
        #x is [B,input_chan,T]
        # output is [B,self.output_chan,T]
        x_reduced = torch.mean(x , axis = 1 , keepdim=True)
        x_duplicated = torch.tile(x_reduced , dims = (1, self.delta_chan, 1))

        y0 = self.learnable_param *x_duplicated 
        y1 = y0+ self.not_learnable_param

        y2 = torch.abs(y1)
        
        y3 = torch.concat([x, y2], axis=1)
        return y3
    
    def __call__(self,x):
    # Défini dans la classe mère
        return self.forward(x)

In [7]:
mon_modele=MonModeleQuiTorche(delta_chan=4, verbose=True)


Enregistrement de: print à la valeur <built-in function print>
Initialisation classe mère 


 Initialisation classe courante 

Enregistrement de: delta_chan à la valeur 4
Enregistrement de: learnable_param à la valeur Parameter containing:
tensor([[[0.4109],
         [0.7455],
         [0.6436],
         [0.6213]]], requires_grad=True)
Enregistrement de: not_learnable_param à la valeur tensor([[[0.5034],
         [0.2785],
         [0.6202],
         [0.6244]]])


In [8]:
x= torch.rand(5,1,100)
y = mon_modele(x)
z = mon_modele(y)
print(f'Input shape {x.shape}, output shape {y.shape}, second output shape {z.shape}')

Input shape torch.Size([5, 1, 100]), output shape torch.Size([5, 5, 100]), second output shape torch.Size([5, 9, 100])


In [9]:
print(f'Paramètres du modèle : \n {mon_modele._parameters}')

Paramètres du modèle : 
 {'learnable_param': Parameter containing:
tensor([[[0.4109],
         [0.7455],
         [0.6436],
         [0.6213]]], requires_grad=True)}


## En enchaînant des couches

In [None]:
mon_modele_sequentiel = torch.nn.Sequential( 
  MonModeleQuiTorche(4),
  MonModeleQuiTorche(5),
  MonModeleQuiTorche(6)  )

mon_modele_sequentiel(torch.rand(5,1,100)).shape

### Quand il y en a beaucoup

In [None]:
mlp = torch.nn.Sequential(*[MonModeleQuiTorche(4+i) for i in range(10)])
print(mlp((torch.rand(5,1,100))).shape)

## Couches linéaires

In [19]:
linear = torch.nn.Linear(in_features=40,
                         out_features=100
                        )

In [29]:
x = torch.rand([ 10, 40])
print(f'Input shape is {x.shape}')
y = linear(x)
print(f'Output shape is {y.shape}')

Input shape is torch.Size([10, 40])
Output shape is torch.Size([10, 100])


In [23]:
print(f'Nombre de paramètres à apprendre :  {sum(p.numel() for p in linear.parameters() if p.requires_grad)}' )

Nombre de paramètres à apprendre :  4100


### Couches linéaires sur un signal

In [31]:
x = torch.rand([10, 4, 10]) #[B, C, T]
print(f'Input shape is {x.shape}')
x_flat = torch.flatten(x, start_dim = 1 , end_dim=2)
print(f'x flatten shape is : {x_flat.shape}')
y = linear(x_flat)
print(f'Output shape is {y.shape}')

Input shape is torch.Size([10, 4, 10])
x flatten shape is : torch.Size([10, 40])
Output shape is torch.Size([10, 100])


## Couches de convolution

In [26]:
conv = torch.nn.Conv1d(in_channels=4,  # entrée [B,1,T]
                       out_channels=10, # sortie [B,10,T']
                       kernel_size=11, # préférer les nombres impairs 
                       stride=1,       # T' = T//2
                       padding='same', # idem (kernel_size-1)//2 
                      )

In [34]:
x = torch.rand([10, 4, 100]) #[B, C, T]
print(f'Input shape is {x.shape}')

y = conv(x)
print(f'Output shape is {y.shape}')

Input shape is torch.Size([10, 4, 100])
Output shape is torch.Size([10, 10, 100])


In [None]:
print(f'Nombre de paramètres à apprendre :  {sum(p.numel() for p in conv.parameters() if p.requires_grad)}' )

## Convolutions séparables

In [None]:
depthwise = torch.nn.Conv1d(in_channels=4,  # entrée [B,1,T]
                           out_channels=4,  # sortie [B,4,T]
                           groups= 4,       # correspond à in_channels
                           kernel_size=11,  # préférer les nombres impairs 
                           stride=1,  
                           padding='same'
                          )

In [None]:
pointwise = torch.nn.Conv1d(in_channels=4,  # entrée [B,1,T]
                           out_channels=10,  # sortie [B,4,T]
                           kernel_size=1,   # préférer les nombres impairs 
                           stride=1, 
                           padding='same'
                          )

In [None]:
separable_convolution = torch.nn.Sequential(depthwise,
                                            pointwise)

In [None]:
x = torch.rand([10, 4, 100]) #[B, C, T]
print(f'Input shape is {x.shape}')

y = separable_convolution(x)
print(f'Output shape is {y.shape}')

In [None]:
print(f'Nombre de paramètres à apprendre :  {sum(p.numel() for p in separable_convolution.parameters() if p.requires_grad)}' )

## Couches récurrentes

In [None]:
recurrent = torch.nn.RNN(input_size=6, # x is [B, T , input_size]
                        hidden_size =15, # h is [B,T, hidden_size]
                        num_layers =1, # par défaut
                        batch_first=True,  # pour que la première dimension soit bien le batch
                        bidirectional= False, # par défaut
                         bias= False
                        )

In [None]:
x= torch.rand([20, 100,6])
h , h_layers_end = recurrent(x)
print(h.shape)
print(h_layers_end.shape)

In [None]:
print(f'Nombre de paramètres à apprendre :  {sum(p.numel() for p in recurrent.parameters() if p.requires_grad)}' )

###  Réseau récurrent avec deux couches

In [None]:
recurrent = torch.nn.RNN(input_size=6, # x is [B, T , input_size]
                        hidden_size =15, # h is [B,T, hidden_size]
                        num_layers =4, # h de la couche 0 devient le x de la couche 1 etc.
                        batch_first=True,  # pour que la première dimension soit bien le batch
                        bidirectional= False, # valeur par défaut
                        bias= False

                        )

In [None]:
x= torch.rand([20, 100,6])
h , h_layers_end = recurrent(x)
print(h.shape)
print(h_layers_end.shape)

In [None]:
print(f'Nombre de paramètres à apprendre :  {sum(p.numel() for p in recurrent.parameters() if p.requires_grad)}' )

###  Réseau récurrent bidirectionnel

In [None]:
recurrent = torch.nn.RNN(input_size=6, # x is [B, T , input_size]
                        hidden_size =15, # h is [B,T, hidden_size]
                        num_layers =7, # h de la couche 0 devient le x de la couche 1 etc.
                        batch_first=True,  # pour que la première dimension soit bien le batch
                        bidirectional= True, # cf Algorithme forward backward
                         bias = False
                        )

In [None]:
x= torch.rand([20, 100,6])
h , h_layers_end = recurrent(x)
print(h.shape)
print(h_layers_end.shape)

In [None]:
print(f'Nombre de paramètres à apprendre :  {sum(p.numel() for p in recurrent.parameters() if p.requires_grad)}' )

In [None]:
(6+15)*15*2 + 6*(15*2 + 15 )*(15)*2

# Pipeline d'entraînement

In [None]:
class MyDataset(torch.utils.data.Dataset):
    
    def __init__(self, path_to_data):
        ...
    def __len__(self): #returns int
        ...
    def __getitem__(self,i): #returns (data_i,label_i)
        ...

dataset = MyDataset(...)

dataloader = DataLoader(dataset, 
                        batch_size=10, 
                        shuffle=True
                       )


In [None]:
class metric_logger:
    
    def __init__(self,...):
        ...
    
    def reset_metrics(self):
        ... 
        
    def update_metrics(self, batch_x,batch_y_true,batch_y_pred):
        ...
    
        return {'metric0':...,
               'metric1':...
               }
    
    def log(self):
        ...

device = 'cpu' # set so 'cuda:xx' if you have a GPU, xx is GPU index
model = ... 
optimizer = torch.optim.Adam(model.parameters())

model.to(device)

metric_logger_train = metric_logger(...)
metric_logger_valid = metric_logger(...)

for epoch in range(n_epochs):
    
    metric_logger_train.reset()
    
    for batch_x,batch_y in dataloader_train:
        
        batch_x.to(device)
        batch_y.to(device)
        
        optimizer.zero_grad()
        
        batch_y_predicted = model(batch_x)
        
        l = loss(batch_y_predicted, batch_y)
        
        metric_logger_train.log(batch_x,batch_y,batch_y_predicted)
        
        l.backward()
        
        optimizer.step()
        
    for batch_x,batch_y in dataloader_valid:
        
        batch_x.to(device)
        batch_y.to(device)
        
        with torch.no_grad():
            batch_y_predicted = model(batch_x)  
            
        metric_logger_valid.log(batch_x,batch_y,batch_y_predicted)
