In [1]:
import torch
import torch.nn.functional as F
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
import sys, os
import math
from sklearn.model_selection import train_test_split
module_path = os.path.abspath(os.path.join('../..'))

if module_path not in sys.path:
    sys.path.append(module_path)

import matplotlib.pyplot as plt
import static_token_div.tools.vocab_tools as vocab_tools

In [2]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print('Using gpu: %s ' % torch.cuda.is_available())

Using gpu: False 


In [3]:
def read_corpus(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        text = f.read()
    tokens = text.strip().split()
    return tokens

In [4]:
def create_vocab(tokens):
    vocab = {}
    for word in tokens:
        if word not in vocab.keys():
            vocab[word] = len(vocab)
    return vocab

In [5]:
k = 5
file_path = "../../resources/tlnl_tp1_data/alexandre_dumas/fusion.txt"
corpus = read_corpus(file_path)
vocab = create_vocab(corpus)

In [6]:
len(vocab.keys())

35428

In [7]:
data = [vocab[word] for word in corpus]

In [8]:
len(data)

2410384

In [9]:
def extract_data(data, k):
    all_input = []
    all_target = []
    for idx in range(len(data) - k):
        current_input = data[idx:idx + k]
        all_input.append(current_input)
        all_target.append(data[idx + k])
    return all_input, all_target


In [10]:
X_data, y_data = extract_data(data, k=k)

In [11]:
X_train, X_test, y_train, y_test = train_test_split(torch.tensor(X_data), torch.tensor(y_data), test_size=0.2)
X_train.shape, y_train.shape

(torch.Size([1928303, 5]), torch.Size([1928303]))

In [12]:
class MLP(nn.Module):
    def __init__(self, k, vocab_size, embed_dim=100):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.relu = nn.ReLU()
        self.fc1 = nn.Linear(k * embed_dim, 128)
        self.fc2 = nn.Linear(128, 32)
        self.fc3 = nn.Linear(32, vocab_size)
        self.dropout = nn.Dropout(0.2)

    def forward(self, x):
        x = self.embedding(x)
        x = x.view(x.size(0), -1)
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.relu(self.fc2(x))
        x = self.fc3(x)
        return x


model = MLP(k=k, vocab_size=13023)

In [13]:
def train(model, criterion, optimizer, X_train, y_train, X_test, y_test, device, batch_size=64, nb_epoch=10):
    history = {
        "train_loss": [],
        "test_loss": []
    }

    num_samples = X_train.size(0)
    num_batches = (num_samples + batch_size - 1) // batch_size

    train_dataset = TensorDataset(X_train, y_train)
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    test_dataset = TensorDataset(X_test, y_test)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    for epoch in range(nb_epoch):
        model.train()
        total_loss = 0.0

        for batch_X, batch_Y in train_dataloader:
            batch_X = batch_X.to(device)
            batch_Y = batch_Y.to(device)
            outputs = model(batch_X)
            loss = criterion(outputs, batch_Y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_train_loss = total_loss / num_batches
        history["train_loss"].append(avg_train_loss)

        model.eval()
        total_test_loss = 0
        with torch.no_grad():
            for batch_X, batch_Y in test_loader:
                batch_X = batch_X.to(device)
                batch_Y = batch_Y.to(device)
                outputs = model(batch_X)
                loss = criterion(outputs, batch_Y)

                total_test_loss += loss.item()

        avg_test_loss = total_test_loss / len(test_loader)
        history["test_loss"].append(avg_test_loss)

        print(f'Epoch [{epoch+1}/{nb_epoch}], train loss: {avg_train_loss:.4f}, test loss: {avg_test_loss:.4f}')

    return history

In [14]:
def calculate_perplexity(model, data_loader, device):
    model.eval()
    total_loss = 0.0
    total_words = 0

    criterion = nn.CrossEntropyLoss(reduction='sum')

    with torch.no_grad():
        for batch_X, batch_Y in data_loader:
            batch_X = batch_X.to(device)
            batch_Y = batch_Y.to(device)
            outputs = model(batch_X)

            loss = criterion(outputs, batch_Y)

            total_loss += loss.item()
            total_words += batch_Y.size(0)

    avg_loss = total_loss / total_words
    perplexity = math.exp(avg_loss)
    return perplexity

In [15]:
model = MLP(k=k, vocab_size=len(vocab.keys())).to(device)
optimizer = torch.optim.Adam(lr=1e-4, params=model.parameters(), weight_decay=1e-5)
criterion = nn.CrossEntropyLoss()
batch_size=128

In [None]:
history = train(model, criterion, optimizer, X_train, y_train, X_test, y_test, device, batch_size=batch_size, nb_epoch=20)

Epoch [1/20], train loss: 4.8714, test loss: 4.4344
Epoch [2/20], train loss: 4.2973, test loss: 4.1261
Epoch [3/20], train loss: 4.0782, test loss: 3.9738
Epoch [4/20], train loss: 3.9596, test loss: 3.8855
Epoch [5/20], train loss: 3.8850, test loss: 3.8254
Epoch [6/20], train loss: 3.8325, test loss: 3.7838
Epoch [7/20], train loss: 3.7931, test loss: 3.7508
Epoch [8/20], train loss: 3.7616, test loss: 3.7256
Epoch [9/20], train loss: 3.7348, test loss: 3.7031


In [None]:
test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
perplexity = calculate_perplexity(model, test_loader, device)

In [None]:
plt.plot(history["train_loss"], c="blue", label="Train loss")
plt.plot(history["test_loss"], c="orange", label="Test loss")
plt.ylabel("Perplexity")
plt.xlabel("Epoch")
plt.title(f"Training loss, perplexity: {perplexity:.2f}")
plt.legend()
plt.ylim(0, 5)

In [None]:
import torch

# Créer le vocabulaire inverse (index -> mot)
inverse_vocab = {idx: word for word, idx in vocab.items()}

def generate_words(model, vocab, inverse_vocab, input_words, n, k, device):
    """
    Génère n mots à partir de k mots d'entrée.

    Args:
        model (nn.Module): Le modèle entraîné.
        vocab (dict): Dictionnaire de vocabulaire (mot -> index).
        inverse_vocab (dict): Dictionnaire inverse (index -> mot).
        input_words (list of str): Liste des k mots d'entrée.
        n (int): Nombre de mots à générer.
        k (int): Nombre de mots d'entrée.
        device (torch.device): Dispositif (CPU ou GPU).

    Returns:
        list of str: Liste des mots générés.
    """
    model.eval()  # Mettre le modèle en mode évaluation
    generated = []

    # Vérifier que input_words a bien k mots
    if len(input_words) != k:
        raise ValueError(f"Le nombre de mots d'entrée doit être {k}, mais {len(input_words)} ont été fournis.")

    # Convertir les mots d'entrée en indices
    try:
        input_indices = [vocab[word] for word in input_words]
    except KeyError as e:
        raise ValueError(f"Le mot '{e.args[0]}' n'est pas dans le vocabulaire.")

    for _ in range(n):
        # Convertir en tenseur et ajouter une dimension batch
        input_tensor = torch.tensor([input_indices], dtype=torch.long).to(device)

        with torch.no_grad():
            output = model(input_tensor)  # Obtenir les logits
            probabilities = torch.softmax(output, dim=1)  # Calculer les probabilités
            predicted_idx = torch.argmax(probabilities, dim=1).item()  # Prendre l'indice avec la plus haute probabilité

        # Convertir l'indice prédit en mot
        predicted_word = inverse_vocab.get(predicted_idx, "<UNK>")
        generated.append(predicted_word)

        # Mettre à jour la séquence d'entrée
        input_indices = input_indices[1:] + [predicted_idx]

    return generated

# Exemple d'utilisation
input_words = ["je", "suis", "un", "exemple", "de"]  # Remplacez par vos propres mots d'entrée
n = 10  # Nombre de mots à générer

generated_words = generate_words(model, vocab, inverse_vocab, input_words, n, k, device)
print("Mots générés :", ' '.join(generated_words))
