### Prérequis: Avoir fait le tp Transformer.ipynb

# Génération de texte
Félicitation d'avoir fini le tp sur les transformers (ou pas). Ici, on va s'amuser à générer du texte avec un transformer (ou presque).

## Idée
Je ne sais pas si vous avez remarqué, mais pour de la génération de texte, il n'y pas de phrase à encoder, contrairement à la traduction de texte où la phrase à encoder est la phrase à traduire.

On va donc utiliser juste la partie decodeur du transformer. Mais si vous vous souvenez, il y a la couche Cross Attention dans la partie decodeur, qui n'est pas nécessaire ici car on n'a pas de phrase à encoder. On va donc utiliser un decoduer sans la Cross Attention.

![decoderonly](https://raw.githubusercontent.com/Automatants/projets-de-permanence/master/image-hosting/transfo/decoderonly.png)

Bref, l'idée est la suivante: on va entraîner notre modèle à compléter des phrases.

Exemple:

Entrée: ["\<start>", "Je", "suis", "un", "chat", "<end>"]

Sortie: ["Je", "suis", "un", "chat", "\<end>"]

Le modèle va alors:
- Prédire "Je" à partir de "\<start>"
- Prédire "suis" à partir de "\<start> Je"
- Prédire "un" à partir de "\<start> Je suis"
- etc...

Puis, pendant l'inference, on va juste lui donner "\<start>" et il va compléter la phrase jusqu'à "\<end>".

Si vous vous rappelez, la sortie de notre modèle est des vecteurs de probabilité et dans le tp précédent, on a utilisé l'argmax de ces vecteurs de probabilité pour avoir le mot prédit.

Mais si ici on prend l'argmax du vecteur de probabilité à chaque fois, on va avoir juste avoir la même phrase à chaque fois qu'on génère du texte.

Donc on va utiliser un autre moyen pour générer du texte: le sampling.

Le sampling consiste à prendre un mot en fonction de sa probabilité. Par exemple, si le mot "chat" a une probabilité de 0.8, on va le prendre 80% du temps.

Donc avec les vecteurs de probabilité que notre modèle nous donne, on va prendre un mot en fonction de sa probabilité.

Avant de sampler, on va diviser les vecteurs (avant le softmax) par un facteur qu'on appelle la température. La température est un hyperparamètre qui va déterminer la diversité du texte généré. Plus la température est grande, plus le texte généré sera diversifié. Plus la température est petite, plus le texte généré sera similaire à l'entrée, mais plus robuste à l'erreur, car avec une température trop grande, les probabilités des mots vont être trop proches et on risque d'avoir des mots qui n'ont pas de sens.

Remarque: C'est exactement ce que fait ChatGPT. Il est aussi un decoder-only transformer mais il a 175 milliards de paramètres, un peu plus gros que le modèle qu'on va faire ici.

## Préparation des données
On va utiliser le dataset WikiText-2.

Il faut tout d'abord installer torchtext et portalocker.

In [None]:
!pip install torchtext portalocker

In [None]:
import torchtext
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torchtext.datasets import WikiText2
import torch

tokenizer = get_tokenizer('basic_english')
train_iter = WikiText2(split='train')

vocab = build_vocab_from_iterator(map(tokenizer, train_iter), specials=["<unk>", "<pad>", "<start>", "<end>"])
vocab.set_default_index(vocab["<unk>"])

In [None]:
x_train = []
max_len = 500
for sentence in train_iter:
    x_train.append(torch.tensor(vocab(["<start>"] + tokenizer(sentence)[:500] + ["<end>"])))

x_train = torch.nn.utils.rnn.pad_sequence(x_train, batch_first=True, padding_value=vocab["<pad>"])
pad_train = (x_train != vocab["<pad>"]).bool()

In [None]:
# dataloader pour le training containing x_train and pad_train
train_dataset = torch.utils.data.TensorDataset(x_train, pad_train)
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)

In [None]:
class PositionalEncoding(torch.nn.Module):
    def __init__(self, d_model, dropout=0.0, max_len=5000):
        super(PositionalEncoding, self).__init__()

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2, dtype=torch.float) * (-torch.log(torch.tensor(10000.0)) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x: (batch_size, seq_len, d_model)
        x = x + self.pe[:, :x.size(1)]
        return x


class MultiHeadAttention(torch.nn.Module):
    def __init__(self, d_model, num_heads, dropout=0.0):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0
        self.d_model = d_model
        self.num_heads = num_heads
        self.depth = d_model // num_heads

        self.wq = torch.nn.Linear(d_model, d_model)
        self.wk = torch.nn.Linear(d_model, d_model)
        self.wv = torch.nn.Linear(d_model, d_model)

        self.fc = torch.nn.Linear(d_model, d_model)

    def forward(self, q, k, v, key_padding_mask=None, attn_mask=None):
        batch_size = q.size(0)

        q = self.wq(q).view(batch_size, -1, self.num_heads, self.depth).transpose(1, 2)
        k = self.wk(k).view(batch_size, -1, self.num_heads, self.depth).transpose(1, 2)
        v = self.wv(v).view(batch_size, -1, self.num_heads, self.depth).transpose(1, 2)

        attn_scores = torch.matmul(q, k.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.depth, dtype=torch.float))
        if attn_mask is not None:
            attn_scores = attn_scores.masked_fill(attn_mask == 0, -1e9)
        if key_padding_mask is not None:
            attn_scores = attn_scores.masked_fill(key_padding_mask.unsqueeze(1).unsqueeze(2) == 0, -1e9)
        attn_scores = torch.nn.functional.softmax(attn_scores, dim=-1)

        attn_output = torch.matmul(attn_scores, v)
        attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
        attn_output = self.fc(attn_output)

        return attn_output, attn_scores


class DecoderLayer(torch.nn.Module):
    def __init__(self, d_model, num_heads, dim_feedforward=2048, dropout=0.1):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads, dropout=dropout)
        self.multihead_attn = MultiHeadAttention(d_model, num_heads, dropout=dropout)
        self.linear1 = torch.nn.Linear(d_model, dim_feedforward)
        self.linear2 = torch.nn.Linear(dim_feedforward, d_model)
        self.norm1 = torch.nn.LayerNorm(d_model)
        self.norm3 = torch.nn.LayerNorm(d_model)

    def forward(self, tgt, tgt_mask=None, tgt_key_padding_mask=None):
        tgt2 = self.self_attn(tgt, tgt, tgt, attn_mask=tgt_mask, key_padding_mask=tgt_key_padding_mask)[0]
        tgt = tgt + tgt2
        tgt = self.norm1(tgt)

        tgt2 = self.linear2(torch.nn.functional.relu(self.linear1(tgt)))
        tgt = tgt + tgt2
        tgt = self.norm3(tgt)

        return tgt


class Decoder(torch.nn.Module):
    def __init__(self, d_model, num_heads, dim_feedforward, num_layers, norm=None):
        super(Decoder, self).__init__()
        self.layers = torch.nn.ModuleList([DecoderLayer(d_model, num_heads, dim_feedforward) for _ in range(num_layers)])
        self.norm = norm

        self.max_len = max_len+1
        self.register_buffer('tgt_mask', torch.tril(torch.ones(self.max_len, self.max_len)))

    def forward(self, tgt, tgt_key_padding_mask=None):
        sen_len = tgt.size(1)
        for layer in self.layers:
            tgt = layer(tgt, tgt_mask=self.tgt_mask[:sen_len, :sen_len], tgt_key_padding_mask=tgt_key_padding_mask)
        if self.norm is not None:
            tgt = self.norm(tgt)
        return tgt


class Transformer(torch.nn.Module):
    def __init__(self, tgt_vocab_size, d_model, num_heads, num_layers, dim_feedforward, dropout=0.1):
        super(Transformer, self).__init__()
        self.encoder = torch.nn.Embedding(tgt_vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        self.decoder = Decoder(d_model, num_heads, dim_feedforward, num_layers)
        self.out = torch.nn.Linear(d_model, tgt_vocab_size)

        self.d_model = d_model
        self.num_layers = num_layers

    def forward(self, tgt, tgt_key_padding_mask=None):
        tgt = self.encoder(tgt)
        tgt = self.pos_encoder(tgt)

        tgt = self.decoder(tgt, tgt_key_padding_mask=tgt_key_padding_mask)

        output = self.out(tgt)

        return output

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Transformer(len(vocab), d_model=128, num_heads=4, num_layers=4, dim_feedforward=1024).to(device)

loss = torch.nn.CrossEntropyLoss(reduction='none')
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)


In [None]:
from tqdm import tqdm

# training
for epoch in range(2):
    model.train()
    pbar = tqdm(enumerate(train_dataloader))
    for idx, batch in pbar:
        tgt, tgt_key_padding_mask = batch
        tgt = tgt.to(device)
        tgt_key_padding_mask = tgt_key_padding_mask.to(device)

        ground_truth = tgt[:, 1:]
        tgt = tgt[:, :-1]

        output = model(tgt, tgt_key_padding_mask=tgt_key_padding_mask[:, :-1])

        loss_val = loss(output.transpose(1, 2), ground_truth)
        loss_val = loss_val.masked_select(ground_truth != vocab['<pad>']).mean()

        optimizer.zero_grad()
        loss_val.backward()
        optimizer.step()

        pbar.set_description(f'epoch {epoch} loss {loss_val.item():.4f}')
        break

        if idx % 100 == 0:

            print("Input: ", ' '.join(vocab.lookup_tokens(tgt[0].tolist())))
            print("Output: ", ' '.join(vocab.lookup_tokens(ground_truth[0].tolist())))

            # evaluation
            model.eval()
            cur_sentence = [vocab['<start>']]
            temperature = 0.8

            with torch.no_grad():
                for i in range(100):
                    cur_input = torch.tensor(cur_sentence).unsqueeze(0).to(device)
                    output = model(cur_input)
                    output = output[:, -1, :] / temperature
                    output = torch.nn.functional.softmax(output, dim=-1)
                    output = torch.multinomial(output, num_samples=1).squeeze(0)
                    cur_sentence.append(output.item())
                    if output.item() == vocab['<end>']:
                        break

            print(' '.join(vocab.lookup_tokens(cur_sentence)))

            model.train()


In [None]:
# evaluation
model.eval()
cur_sentence = [vocab['<start>']]
temperature = 0.8

with torch.no_grad():
    for i in range(100):
        cur_input = torch.tensor(cur_sentence).unsqueeze(0).to(device)
        output = model(cur_input)
        temperature_tensor = torch.full_like(output, temperature)
        # put 1000 to <unk> and <pad>
        temperature_tensor[:, :, vocab['<unk>']] = 1000
        output = output[:, -1, :] / temperature_tensor[:, -1, :]
        print(output.shape)
        output = torch.nn.functional.softmax(output, dim=-1)
        output = torch.multinomial(output, num_samples=1).squeeze(0)
        cur_sentence.append(output.item())
        if output.item() == vocab['<end>']:
            break

print(' '.join(vocab.lookup_tokens(cur_sentence)))