# Formation PyTorch : les bases pour être autonome 
#### 3 novembre 2022 de 9h à 17h à l'OMP (salle Coriolis)



# Partie 3

Lien vers le GG Colab : https://colab.research.google.com/drive/17SlrTgbwtZdN9U8lJw9nNQJi5ZbM-0Nk?usp=sharing

Dans cette troisième partie, nous allons voir comment adapter des modules PyTorch pour créer ses propres couches et comment définir ses propres fonctions de régularisation.

## Définir ses propres couches de convolution 


Les noyaux de convolution de la première couche d'AlexNet (voir partie 2) ressemblent à des filtres de Gabor. On pourrait initialiser les noyaux de convolution comme des filtres de Gabor ou même optimiser des filtres de Gabor comme proposé dans [2] plutôt que des noyaux classiques.

Dans cette sous-partie, on va définir notre propre couche de filtre de Gabor. Précisémment, on va implémenter la partie réelle d'un filtre de Gabor qui est définie comme suit : 

$$g(x,y; \lambda, \psi, \sigma, \gamma, \theta) = exp(-\frac{x'^2 + \gamma^2 \cdot y'^2}{2\sigma^2}) \cdot cos(2\pi \frac{x'}{\lambda} + \psi)$$
avec 
$$x' = x \: cos \: \theta + y \: sin \: \theta ; \:\:\: y' = -x \: sin \: \theta + y \: cos \: \theta$$ 

où $\lambda > 0$, $\psi \in [0, 2\pi]$, $\sigma > 0$ et $\gamma >0$ sont des paramètres à optimiser. $\theta \in [0, \pi[$ prendra plusieurs valeurs déterminées pour obtenir des filtres avec différentes orientations.

Le code ci-dessous est fortement inspiré du code au lien suivant : https://gist.github.com/DerThorsten/7117b9b7a41da4e0a13d6500f9a1b657

[2] Alekseev, Andrey, and Anatoly Bobe. "GaborNet: Gabor filters with learnable parameters in deep convolutional neural network." 2019 International Conference on Engineering and Telecommunication (EnT). IEEE, 2019.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import matplotlib.pyplot as plt
from tqdm import tqdm 
from sklearn.metrics import accuracy_score, f1_score

En fait, on souhaiterait conserver les propriétés et fonctions d'une couche de convolution classique, mais en redéfinissant uniquement les poids de la convolution. On va donc écire une classe qui hérite de torch.nn.modules.conv.Conv2d.

AlexNet avait 64 convolutions par canal de dimensions spatiales (11x11) pour sa première couche. On fait la même chose ici : pour chaque canal, on apprend 64 filtres de Gabor de taille (11x11). Afin d'avoir des filtres invariants aux rotations, on fait varier le paramètre $\theta$, comme indiqué ci-dessus, pour 8 valeurs entre $0$ et $2\pi$. Pour un $\theta$ donné, il reste donc 8 couples différents de paramètres $(\lambda, \psi, \sigma, \gamma)$ à optimiser. 

On a donc des filtres de Gabor de tailles $(8 \times 8 \times 3 \times 11 \times 11)$ où la première dimension correspond aux couples de paramètres $(\lambda, \psi, \sigma, \gamma)$, la deuxième dimension correspond à $\theta$, la troisième dimension correspond aux canaux RGB et les deux dernières dimensions correspondent aux dimensions spatiales.

In [None]:
class GaborFilters(nn.modules.conv.Conv2d):
    def __init__(self, in_channels, out_channels, kernel_size,
                 stride=1, padding=0, dilation=1, groups=1, bias=False,
                 padding_mode='zeros', device=None, dtype=None, n_thetas=5):
        
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        self.dilation = dilation
        self.groups = groups
        self.bias_ = bias
        self.padding_mode = padding_mode
        self.device = device
        self.dtype=dtype
        self.n_thetas = self.out_channels//8
        
        super().__init__(self.in_channels, self.out_channels, self.kernel_size,
                 stride=self.stride, padding=self.padding, dilation=self.dilation, 
                 groups=self.groups, bias=self.bias_, padding_mode=self.padding_mode, 
                 device=self.device, dtype=self.dtype)
                
        # Initialisation des paramètres lambda, psi, sigma et gamma
        lambda_init_values = torch.rand(self.out_channels//self.n_thetas, 1, self.in_channels, 1, 1)*32
        psi_init_values = torch.rand(self.out_channels//self.n_thetas, 1, self.in_channels, 1, 1)*2*math.pi
        sigma_init_values = torch.rand(self.out_channels//self.n_thetas, 1, self.in_channels, 1, 1)*10
        gamma_init_values = torch.rand(self.out_channels//self.n_thetas, 1, self.in_channels, 1, 1)
        
        self.lambda_ = ...
        self.psi = ...
        self.sigma = ...
        self.gamma = ...
        
        # On fixe les valeurs de theta
        thetas = torch.linspace(0., math.pi*(self.n_thetas-1)/self.n_thetas, self.n_thetas)
        self.register_buffer('thetas', thetas)
        
    @property
    def ...(self):
        # A compléter 
        ... 

    def gabor_filter(self):
        # A compléter 
        x = torch.arange(self.kernel_size[0], dtype=torch.float32) -  (self.kernel_size[0] - 1)/2
        x = x.view(...)
        y = torch.arange(self.kernel_size[0], dtype=torch.float32) -  (self.kernel_size[0] - 1)/2
        y = y.view(...)
        
        thetas = self.thetas.view(1, -1, 1, 1, 1)

        x_ =  x * torch.cos(thetas) + y * torch.sin(thetas)
        y_ = -x * torch.sin(thetas) + y * torch.cos(thetas)

        gb = torch.exp(-0.5 * ((x_ ** 2 + self.gamma**2 * y_ ** 2) / (1e-2+self.sigma) ** 2)) \
                 * torch.cos(2.0 * math.pi  * x_ / (1e-2+F.relu(self.lambda_)) + self.psi)
        
        gb = gb.view(self.out_channels, self.in_channels, self.kernel_size[0], self.kernel_size[1])

        return gb

On définit un modèle équivalent au TinyAlexNet de la partie 2 mais on substitue la première couche de convolution à une couche de filtres de Gabor. 

In [None]:
class GaborAlexNet(nn.Module):
    def __init__(self, num_classes=10, dropout_rate=0):
        super(GaborAlexNet, self).__init__()
        
        self.features_gabor = GaborFilters(3, 64, kernel_size=11, stride=4, padding=2)
        
        self.features = nn.Sequential(
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(64, 1, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
        )

        self.classifier = nn.Sequential(
            nn.Dropout(dropout_rate),
            nn.Linear(36, 16),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout_rate),
            nn.Linear(16, num_classes)
        )
        
        torch.manual_seed(0)
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)
                
    def extract_features(self, inputs):
        """ Returns output of the final convolution layer """
        x = self.features(inputs)
        return x

    def forward(self, inputs):
        x = self.features_gabor(inputs)
        x = self.features(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

Affichons les filtres de Gabor à l'initialisation du modèle :

In [None]:
model = GaborAlexNet(num_classes=2)

fig, ax = plt.subplots(8,8, figsize=(10,10))
k = 0
for i in range(8):
    for j in range(8):
        # A compléter 
        ax[i,j].imshow(..., cmap='gray')
        ax[i,j].set_xticks([])
        ax[i,j].set_yticks([])
        k += 1
plt.show()

Vérifions que nous avons bien défini les filtres de Gabor et que nous pouvons optimiser ses paramètres :

In [None]:
model = GaborAlexNet(num_classes=2)
print(model.gabor_filter.lambda_.grad)
x = torch.randn((1, 3, 125, 125)) # Image aléatoire
out = model(x)
loss = F.cross_entropy(out, torch.ones(1).long()) # Label aléatoire
loss.backward()
model.gabor_filter.lambda_.grad.mean()

## Régularisation

On peut facilement régulariser la norme L2 des poids d'un modèle avec l'argument "weight_decay" des optimizer Pytorch. Regardez par exemple les arguments de Adam (https://pytorch.org/docs/stable/generated/torch.optim.Adam.html?highlight=weight_decay). Néanmoins, on pourrait vouloir définir d'autres types de régularisation. 

Ici, on va régulariser la norme L1 des poids des couches denses de GaborAlexNet. 

In [None]:
def L1_regularization(submodel):
    """
    Arg:
        * submodel: a torch.nn.modules.container.Sequential object
    Output:
        * regularization term
    """
    # A compléter
    ... 
    return reg

On vérifie que la régularisation fonctionne bien :

In [None]:
model = GaborAlexNet(num_classes=2)
print(model.classifier[1].weight.grad)
x = torch.randn((1, 3, 125, 125))
out = model(x)
loss = L1_regularization(model.classifier)
loss.backward()
model.classifier[1].weight.grad.mean()

## Application au problème de la partie 2

In [None]:
import rasterio
from rasterio.windows import Window
from rasterio.warp import reproject, Resampling
import numpy as np

with rasterio.open('../data/UH_NAD83_272056_3289689.tif') as img:
    window = Window(0, 0, 1000, 1000)
    data = img.read(window=window)
    
with rasterio.open('../data/2018_IEEE_GRSS_DFC_GT_TR.tif') as gt_:
    gt = np.zeros(img.shape, dtype=np.uint8)
    reproject(
        source=gt_.read(1),
        destination=gt,
        src_transform=gt_.transform,
        src_crs=gt_.crs,
        dst_transform=img.transform,
        dst_crs=img.crs,
        resampling=Resampling.nearest)
    
img = data.transpose(1, 2, 0)
gt = gt[:1000, :1000]
for i, class_id in enumerate(np.unique(gt)):
    gt[class_id == gt] = i

In [None]:
class Dataset(torch.utils.data.Dataset):
    # A compléter (identique à la partie 2) 

In [None]:
hyperparams = {'ignored_labels': [0], 'patch_size': 125}
dataset = Dataset(img, gt, hyperparams)
# A compléter (identique à la partie 2)

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") 
num_epochs = 1
learning_rate = 1e-2
best_val = np.inf
# A compléter de manière à optimiser les filtres de Gabor avec un learning rate de 5e-3 et les autres couches 
# avec un learning rate de 1e-3
optimizer = torch.optim.Adam(...)

In [None]:
# Optimiser le modèle (identique à la partie 2)

In [None]:
# Affichez les filtres de Gabor optimisés