## 1. Imports

In [None]:
import torch
import torch.nn as nn
import math

---
## 2. Embeddings

BERT utilise plusieurs embeddings :
- **Token embeddings** : pour représenter chaque token (mot ou sous-mot).
- **Segment embeddings** : pour distinguer les phrases dans une même séquence (phrase A vs phrase B).
- **Position embeddings** : pour encoder la position d’un token dans la séquence (indispensable car le Transformer n’a pas de mécanisme récurrent implicite).

Ici, on définit une classe `BertEmbeddings` qui fusionne ces trois embeddings :

In [None]:
class BertEmbeddings(nn.Module):
    def __init__(self, vocab_size, hidden_size, max_position_embeddings, type_vocab_size):
        super().__init__()
        # Embedding pour les tokens
        self.word_embeddings = nn.Embedding(vocab_size, hidden_size)
        # Embedding pour la position
        self.position_embeddings = nn.Embedding(max_position_embeddings, hidden_size)
        # Embedding pour le segment (type de phrase)
        self.token_type_embeddings = nn.Embedding(type_vocab_size, hidden_size)

        # Normalisation et dropout comme dans BERT
        self.LayerNorm = nn.LayerNorm(hidden_size, eps=1e-12)
        self.dropout = nn.Dropout(p=0.1)

    def forward(self, input_ids, token_type_ids=None):
        """
        input_ids: [batch_size, seq_length]
        token_type_ids: [batch_size, seq_length] (facultatif, sinon 0)
        """
        seq_length = input_ids.size(1)

        # Si token_type_ids n'est pas fourni, on crée un tenseur de zéros
        if token_type_ids is None:
            token_type_ids = torch.zeros_like(input_ids)

        # Position ids: [0, 1, 2, ..., seq_length-1]
        position_ids = torch.arange(seq_length, dtype=torch.long, device=input_ids.device)
        position_ids = position_ids.unsqueeze(0).expand_as(input_ids)

        # Calcul des embeddings
        word_embeds = self.word_embeddings(input_ids)
        position_embeds = self.position_embeddings(position_ids)
        token_type_embeds = self.token_type_embeddings(token_type_ids)

        embeddings = word_embeds + position_embeds + token_type_embeds
        embeddings = self.LayerNorm(embeddings)
        embeddings = self.dropout(embeddings)

        return embeddings

---

## 3. Multi-Head Self-Attention

La self-attention multi-tête consiste à projeter le même input en plusieurs espaces (Query, Key, Value), appliquer une attention *Scaled Dot-Product*, puis concaténer les sorties pour obtenir un vecteur final.


In [None]:
class MultiHeadSelfAttention(nn.Module):
    def __init__(self, hidden_size, num_heads):
        super().__init__()
        assert hidden_size % num_heads == 0, "Le hidden_size doit être divisible par le nombre de têtes."

        self.num_heads = num_heads
        self.head_dim = hidden_size // num_heads

        # Matrices de projection pour Q, K, V
        self.query = nn.Linear(hidden_size, hidden_size)
        self.key   = nn.Linear(hidden_size, hidden_size)
        self.value = nn.Linear(hidden_size, hidden_size)

        # Projection finale après la concaténation
        self.out = nn.Linear(hidden_size, hidden_size)

    def forward(self, hidden_states, attention_mask=None):
        """
        hidden_states: [batch_size, seq_length, hidden_size]
        attention_mask: [batch_size, 1, 1, seq_length] ou [batch_size, seq_length] etc.
        """
        batch_size, seq_length, _ = hidden_states.size()

        # Projections
        Q = self.query(hidden_states)  # [batch_size, seq_length, hidden_size]
        K = self.key(hidden_states)
        V = self.value(hidden_states)

        # On reshape pour séparer les têtes
        # => [batch_size, seq_length, num_heads, head_dim]
        Q = Q.view(batch_size, seq_length, self.num_heads, self.head_dim)
        K = K.view(batch_size, seq_length, self.num_heads, self.head_dim)
        V = V.view(batch_size, seq_length, self.num_heads, self.head_dim)

        # On transpose pour que les têtes soient sur le deuxième axe
        # => [batch_size, num_heads, seq_length, head_dim]
        Q = Q.transpose(1, 2)
        K = K.transpose(1, 2)
        V = V.transpose(1, 2)

        # Calcul des scores d'attention : Q * K^T / sqrt(d)
        # Q: [batch_size, num_heads, seq_length, head_dim]
        # K: [batch_size, num_heads, seq_length, head_dim]

        # On transpose K pour avoir la dimension de head_dim en dernière position
        # => K: [batch_size, num_heads, head_dim, seq_length]
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim)
        # => scores: [batch_size, num_heads, seq_length, seq_length]

        # Application du masque d'attention (optionnel)
        if attention_mask is not None:
            # On suppose que le masque a la même forme ou est broadcastable
            # Les positions masquées sont mises à un score très bas (ex: -1e9)
            scores = scores + attention_mask

        # Softmax sur la dimension des "keys"
        attn_weights = torch.softmax(scores, dim=-1)

        # Calcul de la sortie en tenant compte des poids d'attention
        context = torch.matmul(attn_weights, V)
        # => [batch_size, num_heads, seq_length, head_dim]

        # On remet la dimension des têtes et de head_dim ensemble
        context = context.transpose(1, 2).contiguous()
        # => [batch_size, seq_length, num_heads, head_dim]
        context = context.view(batch_size, seq_length, self.num_heads * self.head_dim)
        # => [batch_size, seq_length, hidden_size]

        # Projection finale
        output = self.out(context)
        # => [batch_size, seq_length, hidden_size]

        return output

---
## 4. Feed-Forward

Chaque bloc Transformer contient aussi un sous-module feed-forward (MLP) appliqué en parallèle après la self-attention.


In [None]:
class PositionwiseFeedForward(nn.Module):
    def __init__(self, hidden_size, intermediate_size):
        super().__init__()
        self.fc1 = nn.Linear(hidden_size, intermediate_size)
        self.fc2 = nn.Linear(intermediate_size, hidden_size)
        self.activation = nn.GELU()  # BERT utilise GELU

    def forward(self, x):
        x = self.fc1(x)
        x = self.activation(x)
        x = self.fc2(x)
        return x

---

## 5. Bloc Encoder (Transformer Layer)

Un bloc BERT combine :
1. **Multi-Head Self-Attention** avec *Add & LayerNorm*.
2. **Feed Forward** avec *Add & LayerNorm*.

In [None]:
class BertLayer(nn.Module):
    def __init__(self, hidden_size, num_heads, intermediate_size):
        super().__init__()
        self.attention = MultiHeadSelfAttention(hidden_size, num_heads)
        self.attention_layer_norm = nn.LayerNorm(hidden_size, eps=1e-12)

        self.ff = PositionwiseFeedForward(hidden_size, intermediate_size)
        self.ff_layer_norm = nn.LayerNorm(hidden_size, eps=1e-12)

        self.dropout = nn.Dropout(0.1)

    def forward(self, hidden_states, attention_mask=None):
        # Self-attention
        attn_output = self.attention(hidden_states, attention_mask)
        # Add + Norm
        hidden_states = self.attention_layer_norm(hidden_states + self.dropout(attn_output))

        # Feed Forward
        ff_output = self.ff(hidden_states)
        # Add + Norm
        hidden_states = self.ff_layer_norm(hidden_states + self.dropout(ff_output))

        return hidden_states

---

## 6. L’encodeur BERT complet

BERT-Base a 12 couches d’encodeurs, BERT-Large en a 24, etc. Ici on peut choisir un nombre de couches plus réduit pour simplifier.


In [None]:
class BertEncoder(nn.Module):
    def __init__(self, num_layers, hidden_size, num_heads, intermediate_size):
        super().__init__()
        self.layers = nn.ModuleList([
            BertLayer(hidden_size, num_heads, intermediate_size)
            for _ in range(num_layers)
        ])

    def forward(self, hidden_states, attention_mask=None):
        for layer in self.layers:
            hidden_states = layer(hidden_states, attention_mask)
        return hidden_states

---

## 7. Modèle BERT complet

On assemble les embeddings et l’encodeur.

In [None]:
class SimpleBertModel(nn.Module):
    def __init__(self,
                 vocab_size=30522,          # Taille du vocab (ex: BERT base)
                 hidden_size=128,           # Dim cachée (768 pour BERT base)
                 num_heads=4,              # Nb de têtes (12 pour BERT base)
                 num_layers=4,             # Nombre de couches (12 pour BERT base)
                 intermediate_size=256,     # Dim intermédiaire (3072 pour BERT base)
                 max_position_embeddings=512,
                 type_vocab_size=2):
        super().__init__()

        self.embeddings = BertEmbeddings(
            vocab_size=vocab_size,
            hidden_size=hidden_size,
            max_position_embeddings=max_position_embeddings,
            type_vocab_size=type_vocab_size
        )

        self.encoder = BertEncoder(
            num_layers=num_layers,
            hidden_size=hidden_size,
            num_heads=num_heads,
            intermediate_size=intermediate_size
        )

    def forward(self, input_ids, token_type_ids=None, attention_mask=None):
        """
        input_ids: [batch_size, seq_length]
        token_type_ids: [batch_size, seq_length]
        attention_mask: [batch_size, seq_length] (1 pour token valide, 0 pour token masqué/padding)
        """
        # Embeddings
        embedding_output = self.embeddings(input_ids, token_type_ids)

        # Si on a un attention_mask au format [batch_size, seq_length],
        # on va le transformer pour qu'il soit broadcastable (ex: [batch_size, 1, 1, seq_length])
        if attention_mask is not None:
            # attention_mask: 1 => gardé, 0 => masqué
            # BERT attend -inf (ex: -1e9) sur positions masquées
            extended_mask = attention_mask.unsqueeze(1).unsqueeze(2)  # [batch_size, 1, 1, seq_length]
            extended_mask = extended_mask.to(dtype=embedding_output.dtype)  # correspondance de type
            extended_mask = (1.0 - extended_mask) * -1e9
        else:
            extended_mask = None

        # Passage par l'encodeur
        encoder_output = self.encoder(embedding_output, attention_mask=extended_mask)

        return encoder_output

---

## 8. Exemple d’utilisation

On crée un *batch* de données factices : `batch_size=2`, `seq_length=6`.

In [None]:
# Paramètres
batch_size = 2
seq_length = 6

# Instanciation du modèle
model = SimpleBertModel(
    vocab_size=1000,  # Petit vocab fictif
    hidden_size=128,
    num_heads=4,
    num_layers=2,
    intermediate_size=256,
    max_position_embeddings=512,
    type_vocab_size=2
)

# Données fictives
input_ids = torch.randint(0, 1000, (batch_size, seq_length))
token_type_ids = torch.zeros(batch_size, seq_length, dtype=torch.long)
attention_mask = torch.ones(batch_size, seq_length, dtype=torch.long)

# Passage dans le modèle
with torch.no_grad():
    output = model(input_ids, token_type_ids, attention_mask)

print("Shape de la sortie :", output.shape)
# Sortie attendue: [batch_size, seq_length, hidden_size]

---

### Points clés à retenir

1. **Embeddings** : BERT combine les embeddings de token, de position et de segment.
2. **Multi-Head Self-Attention** : permet à chaque position de s’attendre elle-même sur toutes les autres positions.
3. **Add & Norm** : on ajoute la sortie de l’attention à l’entrée (résidual) puis on normalise.
4. **Feed-Forward** : applique un MLP (souvent 2 couches) à chaque position indépendamment.
5. **Empilement** : BERT comporte plusieurs blocs (12 pour BERT Base).
6. **Masque d’attention** : gère les tokens *paddés* (ou masqués dans d’autres tâches).

Cette implémentation est **très** simplifiée et n’inclut pas :
- Le *pooler* final habituel de BERT (souvent un `Linear + Tanh` sur le premier token `[CLS]`).
- Les *head* de classification ou de *masked language modeling*.
- Des optimisations (mixed precision, etc.) et d’autres détails d’initialisation fidèles à l’article d’origine.

Elle sert surtout à montrer la structure générale d’un *Encoder-Only Transformer* à la BERT.