> 📌 **Avant de commencer :**
>
> [![Ouvrir dans Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/1RoYOImTMfhTV9iPxXop4CrW9scX_Dm3g?usp=sharing)
>
> 🔁 *Pensez à faire une copie dans votre Google Drive (`Fichier > Enregistrer une copie dans Drive`) pour pouvoir l’éditer librement.*


# Modèle linguistique avec les Transformers

### Tâche

L'objectif est d'apprendre à prédire le mot suivant à partir d'une séquence d'entrée à l'aide d'un modèle de langage de type Transformer.

Le jeu de données utilisé est PTB (Penn Treebank).

In [None]:
import requests
from bs4 import BeautifulSoup
import os

def download_github_files(repo_url, folder_path, output_dir="data"):
    """
    Télécharger tous les fichiers d'un dossier spécifique sur GitHub.

    Arguments :
        repo_url (str) : L'URL du dépôt GitHub (ex. : "https://github.com/user/repo").
        folder_path (str) : Le chemin vers le dossier dans le dépôt (ex. : "notebooks/demo").
        output_dir (str) : Répertoire local pour enregistrer les fichiers téléchargés (par défaut : "data").
    """
    # Vérifie que l'URL est bien celle d’un dépôt GitHub
    if "github.com" not in repo_url:
        raise ValueError("URL GitHub invalide.")

    # Construit l’URL vers l’API GitHub pour accéder aux contenus bruts
    repo_parts = repo_url.rstrip("/").split("/")
    user, repo = repo_parts[-2], repo_parts[-1]
    api_url = f"https://api.github.com/repos/{user}/{repo}/contents/{folder_path}"

    # Récupère le contenu du dossier depuis l’API GitHub
    response = requests.get(api_url)
    if response.status_code != 200:
        raise Exception(f"Échec lors de la récupération du contenu du dossier. Code HTTP : {response.status_code}")

    files = response.json()

    # Crée le répertoire de sortie s'il n’existe pas
    os.makedirs(output_dir, exist_ok=True)

    # Télécharge chaque fichier
    for file in files:
        if file["type"] == "file":
            file_url = file["download_url"]
            file_name = file["name"]
            file_path = os.path.join(output_dir, file_name)

            print(f"Téléchargement de {file_name}...")
            file_response = requests.get(file_url)
            with open(file_path, "wb") as f:
                f.write(file_response.content)
            print(f"{file_name} enregistré dans le dossier {output_dir}")

    print("Tous les fichiers ont été téléchargés avec succès.")


repo_url = "https://github.com/atouammi/basemodel"
folder_path = "data/ptb"
download_github_files(repo_url, folder_path)

Téléchargement de generate_ptb.ipynb...
generate_ptb.ipynb enregistré dans le dossier data
Téléchargement de idx2word.pt...
idx2word.pt enregistré dans le dossier data
Téléchargement de test_data.pt...
test_data.pt enregistré dans le dossier data
Téléchargement de train_data.pt...
train_data.pt enregistré dans le dossier data
Téléchargement de word2idx.pt...
word2idx.pt enregistré dans le dossier data
Tous les fichiers ont été téléchargés avec succès.


In [None]:
! ls data

generate_ptb.ipynb  idx2word.pt  test_data.pt  train_data.pt  word2idx.pt


In [None]:
import torch
import torch.nn.functional as F
import numpy as np
import math
import os.path


path_data = 'data/'
# _ = check_ptb_dataset_exists(path_data)
word2idx  =  torch.load(path_data + '/word2idx.pt')
idx2word  =  torch.load(path_data + '/idx2word.pt')


def normalize_gradient(net):

    grad_norm_sq=0

    for p in net.parameters():
        # grad_norm_sq += p.grad.data.norm()**2
        if p.grad is not None:
            grad_norm_sq += p.grad.data.norm()**2

    grad_norm=math.sqrt(grad_norm_sq)

    if grad_norm<1e-4:
        net.zero_grad()
        print('La norme du gradient est proche de zéro')
    else:
        for p in net.parameters():
            if p.grad is not None:
                p.grad.data.div_(grad_norm)

    return grad_norm


def display_num_param(net):
    nb_param = 0
    for param in net.parameters():
        nb_param += param.numel()
    print('Ce réseau de neurones contient {} paramètres (soit {:.2f} millions)'.format(
        nb_param, nb_param/1e6)
         )

def sentence2vector(sentence):
    words = sentence.split()
    x = torch.LongTensor(len(words),1)
    for idx, word in enumerate(words):

         if word not in word2idx:
            print('Vous avez entré un mot qui n\'est pas dans le vocabulaire.')
            print('Assurez-vous qu\'il n\'y a pas de lettres majuscules.')
         else:
            x[idx,0]=word2idx[word]
    return x


def show_next_word(scores):
    num_word_display=30
    prob=F.softmax(scores,dim=2)
    p=prob[-1].squeeze()
    p,word_idx = torch.topk(p,num_word_display)

    for i,idx in enumerate(word_idx):
        percentage= p[i].item()*100
        word=  idx2word[idx.item()]
        print("{:.1f}%\t".format(percentage), word)


In [None]:
import torch
import torch.nn.functional as F
import torch.nn as nn
import math
import time
#import utils

### GPU

Il est recommandé d'exécuter ce code sur un GPU :  
* Temps pour 1 époque sur GPU : 48 secondes avec Google Colab Tesla P100-PCIE-16GB  


In [None]:
# Vérifier si le GPU est disponible

if torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Utilisation du périphérique: {device}")

Utilisation du périphérique: cuda


### Téléchargement de Penn Tree Bank

Le tenseur `train_data` est composé de 20 colonnes contenant 46 479 mots.  
Le tenseur `test_data` est composé de 20 colonnes contenant 4 121 mots.  


In [None]:
#from utils import check_ptb_dataset_exists
#data_path=check_ptb_dataset_exists()

train_data  =  torch.load(path_data+'/train_data.pt')
test_data   =  torch.load(path_data+'/test_data.pt')

print(  train_data.size()  )
print(  test_data.size()   )

torch.Size([46479, 20])
torch.Size([4121, 20])


In [None]:
! pwd

/content


### Quelques constantes associées avec le jeu de données

In [None]:
bs = 20
vocab_size = 10000

### Créer une classe de réseau à mécanisme d'attention

In [None]:
def generate_positional_encoding(seq_length, dim):
    assert dim == 2* (dim//2)  # vérifier si la dimension est divisible par 2
    pe = torch.zeros(seq_length, dim)
    position = torch.arange(0, seq_length, dtype=torch.float).unsqueeze(1)
    div_term = torch.exp(torch.arange(0, dim, 2).float() * (-torch.log(torch.tensor(10000.0)) / dim))
    pe[:,0::2] = torch.sin(position * div_term)
    pe[:,1::2] = torch.cos(position * div_term)
    return pe

class AttentionHead(nn.Module):
    def __init__(self, d, d_head, dropout):
        super().__init__()
        self.LN_MHA = nn.LayerNorm(d_head)
        self.LN_MLP = nn.LayerNorm(d_head)
        self.query = nn.Linear(d, d_head, bias=False)  # couche d'embedding pour la requête
        self.key = nn.Linear(d, d_head, bias=False)     # couche d'embedding pour la clé
        self.value = nn.Linear(d, d_head)               # couche d'embedding pour la valeur
        self.dropout = nn.Dropout(dropout)
    def forward(self, H):  # taille(H) = [batch_size, seq_length, d]
        batch_size = H.size(0); batch_len = H.size(1)
        H_in = H  # Ajouter une connexion résiduelle (RC)
        # Calcul d'une tête d'attention unique : H = Softmax( QK^T / √d ) V
        Q = self.query(H)  # taille = [batch_size, batch_length, d]
        K = self.key(H)    # taille = [batch_size, batch_length, d]
        V = self.value(H)  # taille = [batch_size, batch_length, d]
        attention_score = Q @ K.transpose(2,1) * Q.size(2)**-0.5  # QK^T/√d, (B,L,d) @ (B,d,L) => (B,L,L)
        mask = torch.tril(torch.ones(batch_len,batch_len)).long().to(device)  # masque pour n'utiliser que les tokens précédents : { token(≤t) }, taille = [batch_len, batch_len]
        attention_score = attention_score.masked_fill(mask==0, value=float('-inf'))  # softmax(-inf)=0 empêche l'utilisation des tokens suivants pour la prédiction
        attention_score = torch.softmax(attention_score, dim=2)  # somme des poids = 1
        attention_score = self.dropout(attention_score)  # appliquer dropout sur les scores d'attention
        H_HA = attention_score @ V  # softmax( QK^T / √d ) V, (B,L,L) @ (B,L,d) => (B,L,d)
        return H_HA  # retourne les scores de prédiction pour le prochain token

class MultipleAttentionHead(nn.Module):
    def __init__(self, d, num_heads, dropout):
        super().__init__()
        d_head = d // num_heads  # dim_head = d // num_heads, généralement 64
        assert d == d_head * num_heads  # vérifier la divisibilité
        self.MHA = nn.ModuleList([ AttentionHead(d, d_head, dropout) for _ in range(num_heads) ])
        self.WO = nn.Linear(d, d)  # couche de combinaison
        self.dropout = nn.Dropout(dropout)
    def forward(self, H):  # taille(H) = [batch_size, seq_length, d]
        batch_size = H.size(0); seq_length = H.size(1)
        H_heads = []
        for HA_layer in self.MHA:
            H_heads.append(HA_layer(H))  # taille = [batch_size, seq_length, d_head]
        H_heads = torch.cat(H_heads, dim=2)  # taille = [batch_size, seq_length, d]
        H_heads = self.dropout(H_heads)  # appliquer dropout sur les activations d'attention
        H = self.WO(H_heads)  # taille = [batch_size, seq_length, d]
        return H

class TransformerBlock(nn.Module):
    def __init__(self, d, num_heads, dropout):
        super().__init__()
        self.LN_MHA = nn.LayerNorm(d)
        self.LN_MLP = nn.LayerNorm(d)
        self.MHA = MultipleAttentionHead(d, num_heads, dropout)
        self.MLP = nn.Sequential(nn.Linear(d, 4*d), nn.ReLU(), nn.Dropout(dropout), nn.Linear(4*d, d))
    def forward(self, H):  # taille = [batch_size, seq_length, d]
        # Têtes d'attention multiples avec normalisation couche (LN) et connexion résiduelle (RC)
        H = H + self.MHA(self.LN_MHA(H))  # taille = [batch_size, seq_length, d]
        # MLP avec normalisation couche (LN) et connexion résiduelle (RC)
        H = H + self.MLP(self.LN_MLP(H))  # taille = [batch_size, seq_length, d]
        return H  # taille = [batch_size, seq_length, d]

class Transformer_decoder(nn.Module):
    def __init__(self, d, num_heads, num_blocks, seq_length, dropout):
        super().__init__()
        self.TR_Blocks = nn.ModuleList([ TransformerBlock(d, num_heads, dropout) for _ in range(num_blocks) ])
    def forward(self, batch_seq, pos_enc):
        H = batch_seq.transpose(1,0)  # taille = [batch_size, seq_length, d]
        batch_size = H.size(0); batch_len = H.size(1)
        # Ajouter l'encodage positionnel
        pos_enc = pos_enc.unsqueeze(dim=0)  # taille = [1, seq_length, d]
        H = H + pos_enc                    # taille = [batch_size, seq_length, d]
        # Appliquer les blocs transformer
        for TR_Block in self.TR_Blocks:
            H = TR_Block(H)
        # Sortie
        H = H.permute(1,0,2)  # taille = [batch_length, batch_size, d]
        return H  # retourne les scores de prédiction pour le prochain token

class ANN(nn.Module):

    def __init__(self, d, num_heads, num_blocks, seq_length, dropout):
        super(ANN, self).__init__()
        self.decoder = Transformer_decoder(d, num_heads, num_blocks, seq_length, dropout)

    def forward(self, g_seq , pos ):
        h_dec_seq = self.decoder( g_seq , pos )
        return h_dec_seq

class attention_net(nn.Module):

    def __init__(self, d, num_heads, num_blocks, seq_length, dropout):
        super(attention_net, self).__init__()
        self.layer1 = nn.Embedding( vocab_size  , hidden_size  )
        self.layer2 = ANN(d, num_heads, num_blocks, seq_length, dropout)
        self.layer3 = nn.Linear( hidden_size , vocab_size )

    def forward(self, word_seq, pos ):
        g_seq     =   self.layer1( word_seq )  # taille = (seq_length, batch_size, hidden_dim)
        h_seq     =   self.layer2( g_seq , pos )  # taille = (seq_length, batch_size, hidden_dim)
        score_seq =   self.layer3( h_seq )  # taille = (seq_length, batch_size, vocab_size)
        return score_seq


### Fonction pour évaluer le réseau sur un ensemble de test

In [None]:
def eval_on_test_set():

    net.eval()

    running_loss=0
    num_batches=0

    for count in range( 0 , 4120-seq_length ,  seq_length) :

        minibatch_data =  test_data[ count   : count+seq_length   ]
        minibatch_label = test_data[ count+1 : count+seq_length+1 ]
        pos = generate_positional_encoding(seq_length, hidden_size)

        minibatch_data = minibatch_data.to(device)
        minibatch_label = minibatch_label.to(device)
        pos = pos.to(device)

        scores = net( minibatch_data, pos )

        minibatch_label = minibatch_label.view(  bs*seq_length )
        scores = scores.view(  bs*seq_length , vocab_size)

        loss = criterion(scores, minibatch_label)

        running_loss += loss.item()
        num_batches += 1

    total_loss = running_loss/num_batches
    print('test: exp(loss) = ', math.exp(total_loss)  )


### Construire le réseau. Choisir une taille cachée de 128, le nombre de têtes à 4 et le nombre de blocs à 2.
### Combien de paramètres au total ?


In [None]:
hidden_size = 128
num_heads = 4
num_blocks = 2
dropout = 0.95
seq_length = 100

net = attention_net(hidden_size, num_heads, num_blocks, seq_length, dropout)
print(net)
display_num_param(net)

attention_net(
  (layer1): Embedding(10000, 128)
  (layer2): ANN(
    (decoder): Transformer_decoder(
      (TR_Blocks): ModuleList(
        (0-1): 2 x TransformerBlock(
          (LN_MHA): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
          (LN_MLP): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
          (MHA): MultipleAttentionHead(
            (MHA): ModuleList(
              (0-3): 4 x AttentionHead(
                (LN_MHA): LayerNorm((32,), eps=1e-05, elementwise_affine=True)
                (LN_MLP): LayerNorm((32,), eps=1e-05, elementwise_affine=True)
                (query): Linear(in_features=128, out_features=32, bias=False)
                (key): Linear(in_features=128, out_features=32, bias=False)
                (value): Linear(in_features=128, out_features=32, bias=True)
                (dropout): Dropout(p=0.95, inplace=False)
              )
            )
            (WO): Linear(in_features=128, out_features=128, bias=True)
            (dropout): 

### Transférer le réseau sur le GPU

In [None]:
net = net.to(device)

### Choisir la fonction de perte comme l'entropie croisée et l'optimiseur comme Adam, ainsi que les hyperparamètres importants suivants :

* initial learning rate = 0.001
* sequence length = 30

In [None]:
criterion = nn.CrossEntropyLoss()

my_lr = 0.001
optimizer = torch.optim.Adam(net.parameters(), lr=my_lr)

pos = generate_positional_encoding(seq_length, hidden_size) # size=(seq_length, hidden_dim)

### Faire 5 passages sur l’ensemble d’entraînement  
### Observer la perplexité sur l’ensemble d’entraînement et sur l’ensemble de test  


In [None]:
epochs = 5
start=time.time()
for epoch in range(epochs):

    # diviser le taux d'apprentissage par 1.1 sauf après la première époque
    if epoch >= 2:
        optimizer.param_groups[0]['lr'] /= 1.1
        my_lr = optimizer.param_groups[0]['lr']

    # initialiser les quantités cumulées à zéro au début de l'époque
    running_loss=0
    num_batches=0
    for count in range(0, 46478 - seq_length, seq_length):

        # Remettre les gradients à zéro
        optimizer.zero_grad()

        # créer un mini-batch et l'encodage positionnel
        minibatch_data = train_data[count : count + seq_length]
        minibatch_label = train_data[count + 1 : count + seq_length + 1]
        pos = generate_positional_encoding(seq_length, hidden_size)  # taille=(seq_length, hidden_dim)

        # envoyer les tenseurs sur le GPU
        minibatch_data = minibatch_data.to(device)
        minibatch_label = minibatch_label.to(device)
        pos = pos.to(device)

        # passage avant du mini-batch dans le réseau
        scores = net(minibatch_data, pos)  # taille=(seq_length, bs, vocab_size)

        # remodeler scores et labels en un grand batch de taille bs*seq_length
        scores = scores.view(bs * seq_length, vocab_size)  # taille=(bs*seq_length, vocab_size)
        minibatch_label = minibatch_label.view(bs * seq_length)  # taille=(bs*seq_length)

        # calculer la moyenne des pertes sur ce grand batch
        loss = criterion(scores, minibatch_label)

        # rétropropagation pour calculer dL/dR, dL/dV et dL/dW
        loss.backward()

        # faire un pas de descente de gradient stochastique : R = R - lr*(dL/dR), etc.
        optimizer.step()

        # mettre à jour la perte cumulée
        running_loss += loss.item()
        num_batches += 1

    # calculer les statistiques pour l'ensemble complet d'entraînement
    total_loss = running_loss / num_batches
    elapsed = time.time() - start

    print('')
    print('époque =', epoch, '\t temps =', elapsed, '\t lr =', my_lr, '\t exp(loss) =', math.exp(total_loss))

    eval_on_test_set()



époque = 0 	 temps = 9.094287633895874 	 lr = 0.001 	 exp(loss) = 680.3077307862844
test: exp(loss) =  365.23720805758177

époque = 1 	 temps = 16.386220455169678 	 lr = 0.001 	 exp(loss) = 294.13487818708074
test: exp(loss) =  257.01290844500113

époque = 2 	 temps = 24.889225959777832 	 lr = 0.0009090909090909091 	 exp(loss) = 207.11752464353142
test: exp(loss) =  214.31581592394815

époque = 3 	 temps = 32.57538723945618 	 lr = 0.0008264462809917355 	 exp(loss) = 161.85380147916237
test: exp(loss) =  192.47670005076125

époque = 4 	 temps = 40.2217652797699 	 lr = 0.0007513148009015777 	 exp(loss) = 134.0087874013894
test: exp(loss) =  179.95568609705285


### Choisir une phrase (prise de l'ensemble de test)


In [None]:
sentence1 = "some analysts expect oil prices to remain relatively"

sentence2 = "over the next days and weeks they say investors should look for stocks to"

sentence3 = "prices averaging roughly $ N a barrel higher in the third"

sentence4 = "i think my line has been very consistent mrs. hills said at a news"

sentence5 = "this appears particularly true at gm which had strong sales in"

# ou créez votre propre phrase. Pas de lettres majuscules ni de ponctuation autorisées. Chaque mot doit appartenir au vocabulaire autorisé.
sentence6 = "be your self,the world"

# CHOISIR LA PHRASE ICI
mysentence = sentence5

### Afficher la prédiction du réseau pour le mot suivant


In [None]:
minibatch_data = sentence2vector(mysentence)
minibatch_data = torch.cat((minibatch_data, minibatch_data), dim=0)  # dupliquer la séquence de test pour utiliser la même taille de fenêtre d'attention pour chaque mot
pos = generate_positional_encoding(minibatch_data.size(0), hidden_size)

minibatch_data = minibatch_data.to(device)
pos = pos.to(device)

net.eval()
scores = net(minibatch_data, pos)
scores = scores[-1, :]  # sélectionner le dernier vecteur de score pour la prédiction du mot suivant à partir de la séquence d'entrée
scores = scores[0].unsqueeze(0).unsqueeze(0)

print(mysentence, '... \n')
show_next_word(scores)


this appears particularly true at gm which had strong sales in ... 

25.8%	 the
7.3%	 N
4.7%	 a
2.9%	 <unk>
2.6%	 new
2.1%	 its
1.8%	 august
1.4%	 july
1.1%	 this
1.0%	 an
0.9%	 london
0.9%	 recent
0.7%	 april
0.7%	 september
0.6%	 january
0.6%	 early
0.6%	 europe
0.5%	 his
0.5%	 their
0.5%	 cash
0.4%	 late
0.4%	 which
0.4%	 california
0.4%	 march
0.4%	 chicago
0.4%	 <eos>
0.4%	 connection
0.4%	 u.s.
0.3%	 japan
0.3%	 part
