In [1]:
import os

def load_imdb_data(imdb_path, split="train", max_per_class=None):
    """
    Charge les reviews et labels depuis aclImdb/train ou aclImdb/test.
    Labels : 1 pour positif, 0 pour négatif.
    """
    texts = []
    labels = []
    
    for label in ['pos', 'neg']:
        path = os.path.join(imdb_path, split, label)
        files = os.listdir(path)
        if max_per_class:
            files = files[:max_per_class]
        for file in files:
            with open(os.path.join(path, file), encoding='utf-8') as f:
                texts.append(f.read())
                labels.append(1 if label == 'pos' else 0)
    
    return texts, labels

# Exemple d'utilisation
imdb_path = "aclImdb"  # <- Remplace par le chemin réel
train_texts, train_labels = load_imdb_data(imdb_path, split="train", max_per_class=1000)
test_texts, test_labels = load_imdb_data(imdb_path, split="test", max_per_class=1000)


In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import re
from collections import Counter

def tokenize(text):
    text = re.sub(r"<br />", " ", text)
    return re.findall(r"\b\w+\b", text.lower())

def build_vocab(texts, max_vocab=5000, min_freq=5):
    counter = Counter()
    for text in texts:
        counter.update(tokenize(text))
    vocab = {"<PAD>": 0, "<UNK>": 1}
    for word, freq in counter.most_common():
        if freq >= min_freq and len(vocab) < max_vocab:
            vocab[word] = len(vocab)
    return vocab

def encode(text, vocab, max_len=400):
    tokens = tokenize(text)
    ids = [vocab.get(token, vocab["<UNK>"]) for token in tokens[:max_len]]
    if len(ids) < max_len:
        ids += [vocab["<PAD>"]] * (max_len - len(ids))
    return ids

vocab = build_vocab(train_texts)
vocab_size = len(vocab)

class IMDBDataset(Dataset):
    def __init__(self, texts, labels, vocab, max_len=400):
        self.texts = texts
        self.labels = labels
        self.vocab = vocab
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        encoded = encode(self.texts[idx], self.vocab, self.max_len)
        label = float(self.labels[idx])
        return torch.tensor(encoded), torch.tensor(label)

train_dataset = IMDBDataset(train_texts, train_labels, vocab)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

class MaasWordVectorModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.bias = nn.Parameter(torch.zeros(vocab_size))
        self.sentiment_head = nn.Linear(embedding_dim, 1)

    def forward(self, x, theta):
        emb = self.embedding(x)
        avg_emb = emb.mean(dim=1)
        sentiment_logits = self.sentiment_head(avg_emb).squeeze(1)
        sentiment_probs = torch.sigmoid(sentiment_logits)

        logits = torch.matmul(self.embedding.weight, theta.T).T + self.bias
        return sentiment_probs, logits


In [3]:
def train(model, train_loader, vocab_size, embedding_dim,
          sentiment_weight=1.0, semantic_weight=1.0, 
          lambda_theta=1e-3, nu_R=1e-5, epochs=5, device="cpu"):

    model = model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        total_acc = 0

        for batch in train_loader:
            x, y = batch
            x, y = x.to(device), y.to(device)

            batch_size = x.size(0)
            theta = torch.randn(batch_size, embedding_dim, requires_grad=True, device=device)
            theta_optimizer = torch.optim.SGD([theta], lr=0.1)

            for _ in range(5):
                _, logits = model(x, theta)
                flat_x = x.view(-1)
                flat_logits = logits.repeat_interleave(x.size(1), dim=0)
                semantic_loss = F.cross_entropy(flat_logits, flat_x, reduction='mean')
                reg_theta = lambda_theta * torch.norm(theta, p=2) ** 2
                loss = semantic_loss + reg_theta
                theta_optimizer.zero_grad()
                loss.backward()
                theta_optimizer.step()

            sentiment_probs, logits = model(x, theta.detach())
            sentiment_loss = F.binary_cross_entropy(sentiment_probs, y)

            flat_x = x.view(-1)
            flat_logits = logits.repeat_interleave(x.size(1), dim=0)
            semantic_loss = F.cross_entropy(flat_logits, flat_x, reduction='mean')

            R = model.embedding.weight
            reg_R = nu_R * torch.norm(R, p='fro') ** 2

            total_batch_loss = (
                sentiment_weight * sentiment_loss +
                semantic_weight * semantic_loss +
                reg_R
            )

            optimizer.zero_grad()
            total_batch_loss.backward()
            optimizer.step()

            acc = ((sentiment_probs > 0.5) == y).float().mean()
            total_loss += total_batch_loss.item()
            total_acc += acc.item()

        print(f"Epoch {epoch+1} | Loss: {total_loss / len(train_loader):.4f} | Acc: {total_acc / len(train_loader):.4f}")


In [5]:
# === Évaluation sur données de test ===
test_dataset = IMDBDataset(test_texts, test_labels, vocab)
test_loader = DataLoader(test_dataset, batch_size=32)

def evaluate(model, data_loader, embedding_dim, device="cpu"):
    model.eval()
    total_acc = 0
    with torch.no_grad():
        for x, y in data_loader:
            x, y = x.to(device), y.to(device)
            batch_size = x.size(0)
            theta = torch.randn(batch_size, embedding_dim).to(device)
            sentiment_probs, _ = model(x, theta)
            acc = ((sentiment_probs > 0.5) == y).float().mean()
            total_acc += acc.item()
    print(f"Test Accuracy: {total_acc / len(data_loader):.4f}")

# Exemple d'utilisation après entraînement :
evaluate(model, test_loader, embedding_dim=50, device="cpu")


NameError: name 'model' is not defined