In [126]:
import numpy as np
import random
import string
import torch
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [127]:
def generate_counting_dataset(n, T, k, special_char='A', special_value=7, seed=None):
    """
    Génère un dataset de comptage avec traitement spécial pour une lettre (ex: 'A').
    
    Args:
        n: nombre de séquences
        T: longueur de chaque séquence
        k: nombre de lettres (A, B, C, ..., jusqu’à la k-ème)
        special_char: caractère à traiter spécialement (ex: 'A')
        special_value: valeur assignée à ce caractère (ex: 7)
    
    Returns:
        X: [n, T, D] embeddings
        y: [n, T] entiers cibles
        char_to_id, id_to_char: dictionnaires utiles
    """
    if seed is not None:
        random.seed(seed)
        np.random.seed(seed)

    # Dictionnaires et embeddings
    letters = list(string.ascii_uppercase[:k])
    assert special_char in letters, "Le caractère spécial doit être dans l'alphabet"
    char_to_id = {c: i for i, c in enumerate(letters)}
    id_to_char = {i: c for c, i in char_to_id.items()}
    D = len(letters)
    embeddings = np.eye(D)

    X = []
    y = []

    for _ in range(n):
        seq = random.choices(letters, k=T)
        counts = {c: seq.count(c) for c in letters}
        y_seq = [special_value if c == special_char else counts[c] for c in seq]
        x_seq = [embeddings[char_to_id[c]] for c in seq]
        X.append(x_seq)
        y.append(y_seq)

    return torch.tensor(X, dtype=torch.float32), torch.tensor(y, dtype=torch.long), char_to_id, id_to_char


In [200]:
n = 30000
T = 100
D = 10

In [201]:
X, y, char_to_id, id_to_char = generate_counting_dataset(n=n, T=T, k=D, special_char='A', special_value=7)
print(X.shape)  # torch.Size([1000, 10, 5])
print(y.shape)  # torch.Size([1000, 10])
print()
print("Exemple :", y[0])


torch.Size([30000, 100, 10])
torch.Size([30000, 100])

Exemple : tensor([ 7,  8,  8, 10, 15,  8,  8, 14, 14, 10, 11, 15, 10,  8, 10, 11, 15,  8,
         8,  8,  8, 10,  8, 10,  7,  5,  7, 10, 11, 11,  5,  7, 11, 10,  7, 14,
         7,  8,  8,  8, 14, 10, 15, 11, 14, 15, 10,  8, 15,  5, 10,  8, 14, 10,
         8, 15,  5, 10, 15, 15,  7,  8, 15, 10, 11,  7, 15,  8, 15,  8, 10, 10,
        14,  8, 11,  8,  7, 14, 14, 14,  8,  8, 10,  7, 14,  5, 11, 15,  8, 14,
        14, 15, 15, 10, 14, 11, 10,  7, 10, 11])


In [202]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [203]:
X_train[1], y_train[1]

(tensor([[0., 0., 0., 0., 0., 0., 0., 0., 1., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0., 0., 1.],
         [0., 0., 0., 0., 0., 0., 0., 0., 1., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0., 1., 0.],
         [0., 0., 0., 0., 0., 0., 0., 1., 0., 0.],
         [0., 1., 0., 0., 0., 0., 0., 0., 0., 0.],
         [1., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 1., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 1., 0., 0., 0., 0., 0.],
         [0., 1., 0., 0., 0., 0., 0., 0., 0., 0.],
         [1., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [1., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 1., 0., 0.],
         [1., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 1., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 1., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 1., 0., 0., 0.],
         [0., 0., 0., 1., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0., 1., 0.],
         [0., 0., 0., 0., 0., 0

In [204]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=100):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float32).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)  # [1, max_len, d_model]

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)].to(x.device)
        return x

In [205]:
class AttentionOnlyBlock(nn.Module):
    def __init__(self, d_model):
        super(AttentionOnlyBlock, self).__init__()
        self.Wq = nn.Linear(d_model, d_model)
        self.Wk = nn.Linear(d_model, d_model)
        self.Wv = nn.Linear(d_model, d_model)
        self.norm = nn.LayerNorm(d_model)
        self.attn_weights = None

    def forward(self, x):
        Q = self.Wq(x)
        K = self.Wk(x)
        V = self.Wv(x)
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / np.sqrt(x.size(-1))
        attn_weights = F.softmax(attn_scores, dim=-1)
        self.attn_weights = attn_weights
        x = x + torch.matmul(attn_weights, V)
        x = self.norm(x)
        return x


In [206]:
class TransformerModel(nn.Module):
    def __init__(self, input_dim, d_model, nhead, num_layers):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Linear(input_dim, d_model)
        self.positional_encoding = PositionalEncoding(d_model)
        self.transformer_blocks = nn.ModuleList([AttentionOnlyBlock(d_model) for _ in range(num_layers)])
        # regression
        self.fc = nn.Linear(d_model, 1)
    def forward(self, x):
        x = self.embedding(x)
        x = self.positional_encoding(x)
        for block in self.transformer_blocks:
            x = block(x)
        x = self.fc(x)
        return x

In [207]:
def train_model(model, train_loader, test_loader, epochs=10, lr=1e-3, device='cuda' if torch.cuda.is_available() else 'cpu'):
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.MSELoss()

    for epoch in range(1, epochs + 1):
        model.train()
        total_train_loss = 0

        for xb, yb in train_loader:
            xb, yb = xb.to(device), yb.to(device)  # xb: [B, T, D], yb: [B, T]
            preds = model(xb).squeeze(-1)          # [B, T, 1] -> [B, T]
            loss = criterion(preds, yb.float())

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()

        # Validation
        model.eval()
        total_val_loss = 0
        with torch.no_grad():
            for xb, yb in test_loader:
                xb, yb = xb.to(device), yb.to(device)
                preds = model(xb).squeeze(-1)

                val_loss = criterion(preds, yb.float())
                total_val_loss += val_loss.item()

        avg_train_loss = total_train_loss / len(train_loader)
        avg_val_loss = total_val_loss / len(test_loader)

        print(f"Epoch {epoch}/{epochs} | Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")


In [208]:
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, shuffle=False)

In [209]:
model = TransformerModel(input_dim=D, d_model=128, nhead=1, num_layers=1)

In [217]:
train_model(model, train_loader, test_loader, epochs=20, lr=1e-3, device=device)

Epoch 1/20 | Train Loss: 2.0305 | Val Loss: 1.9892
Epoch 2/20 | Train Loss: 1.9842 | Val Loss: 2.0452
Epoch 3/20 | Train Loss: 1.9453 | Val Loss: 2.0130
Epoch 4/20 | Train Loss: 1.9681 | Val Loss: 2.0405
Epoch 5/20 | Train Loss: 1.9145 | Val Loss: 1.9863
Epoch 6/20 | Train Loss: 1.9215 | Val Loss: 1.9360
Epoch 7/20 | Train Loss: 1.8851 | Val Loss: 1.9128
Epoch 8/20 | Train Loss: 1.8846 | Val Loss: 1.8686
Epoch 9/20 | Train Loss: 1.8751 | Val Loss: 1.8285
Epoch 10/20 | Train Loss: 1.8807 | Val Loss: 1.8403
Epoch 11/20 | Train Loss: 1.8465 | Val Loss: 1.8352
Epoch 12/20 | Train Loss: 1.8481 | Val Loss: 1.9543
Epoch 13/20 | Train Loss: 1.8466 | Val Loss: 1.9729
Epoch 14/20 | Train Loss: 1.8247 | Val Loss: 1.9143
Epoch 15/20 | Train Loss: 1.8288 | Val Loss: 1.7639
Epoch 16/20 | Train Loss: 1.8272 | Val Loss: 1.8290
Epoch 17/20 | Train Loss: 1.8091 | Val Loss: 1.8928
Epoch 18/20 | Train Loss: 1.8174 | Val Loss: 1.9718
Epoch 19/20 | Train Loss: 1.8023 | Val Loss: 1.7548
Epoch 20/20 | Train L

In [234]:
def test_model_on_sequence(model, sequence, char_to_id, special_char='A', special_value=7, device='cuda' if torch.cuda.is_available() else 'cpu'):
    """
    Teste le modèle sur une séquence de lettres donnée.
    Affiche la séquence, les cibles attendues et les prédictions du modèle.
    Colore en vert les bons prédits, en rouge les erreurs. Les colonnes sont alignées.
    """
    import numpy as np
    D = len(char_to_id)
    embeddings = np.eye(D)
    
    x_seq = [embeddings[char_to_id[c]] for c in sequence]
    x_tensor = torch.tensor([x_seq], dtype=torch.float32).to(device)

    counts = {c: sequence.count(c) for c in char_to_id}
    y_target = [special_value if c == special_char else counts[c] for c in sequence]

    model.eval()
    with torch.no_grad():
        preds = model(x_tensor).squeeze(0).squeeze(-1).cpu().numpy()
        preds_rounded = [round(p) for p in preds]

    print("📝 Input sequence  : ", end="")
    for i, c in enumerate(sequence):
        color = "\033[92m" if preds_rounded[i] == y_target[i] else "\033[91m"
        print(f"{color}{c:>3}\033[0m", end="")  # lettre alignée à droite sur 3 espaces
    print()

    print("🎯 Expected counts : ", end="")
    for i, y in enumerate(y_target):
        color = "\033[92m" if preds_rounded[i] == y else "\033[91m"
        print(f"{color}{y:>3}\033[0m", end="")  # nombre aligné à droite sur 3 espaces
    print()

    print("🔮 Model predicts  : ", end="")
    for i, y in enumerate(preds_rounded):
        color = "\033[92m" if y == y_target[i] else "\033[91m"
        print(f"{color}{y:>3}\033[0m", end="")  # aligné à droite
    print()


In [235]:

def generate_random_letter_sequence(T, D, seed=None):
    """
    Génère une séquence de T lettres aléatoires parmi les D premières lettres majuscules.
    """
    assert D <= 26, "D ne peut pas dépasser 26 (nombre de lettres majuscules)."
    if seed is not None:
        random.seed(seed)
    alphabet = list(string.ascii_uppercase[:D])
    sequence = random.choices(alphabet, k=T)
    return sequence

In [236]:
sequence = generate_random_letter_sequence(T=T, D=D, seed=42)
test_model_on_sequence(model, sequence, char_to_id)


📝 Input sequence  : [91m  G[0m[92m  A[0m[91m  C[0m[91m  C[0m[92m  H[0m[91m  G[0m[91m  I[0m[92m  A[0m[91m  E[0m[92m  A[0m[91m  C[0m[91m  F[0m[92m  A[0m[91m  B[0m[91m  G[0m[91m  F[0m[91m  C[0m[91m  F[0m[91m  I[0m[92m  A[0m[91m  I[0m[91m  G[0m[91m  D[0m[91m  B[0m[91m  J[0m[91m  D[0m[92m  A[0m[92m  A[0m[91m  I[0m[91m  G[0m[91m  I[0m[92m  H[0m[91m  F[0m[91m  J[0m[91m  D[0m[91m  F[0m[91m  I[0m[91m  G[0m[91m  I[0m[91m  F[0m[92m  H[0m[92m  A[0m[91m  C[0m[91m  C[0m[92m  A[0m[91m  C[0m[91m  B[0m[91m  C[0m[91m  G[0m[91m  D[0m[91m  D[0m[91m  C[0m[91m  C[0m[91m  J[0m[91m  G[0m[91m  G[0m[91m  B[0m[92m  H[0m[91m  B[0m[91m  D[0m[91m  J[0m[91m  G[0m[91m  F[0m[91m  G[0m[92m  I[0m[92m  H[0m[91m  C[0m[92m  A[0m[91m  D[0m[91m  C[0m[91m  C[0m[91m  J[0m[92m  I[0m[91m  D[0m[91m  G[0m[91m  D[0m[91m  J[0m[91m  E[0m[91m  C[0m[91m  C[0m[91m  F[0m[91m  C