In [None]:
# ==================== IMPORTS ====================
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from collections import Counter
import re
import pickle

In [None]:
# ==================== CONFIGURATION INITIALE ====================
# Pour la reproductibilité
torch.manual_seed(42)
np.random.seed(42)

# Configuration du device (GPU si disponible)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# ==================== PRÉTRAITEMENT DES DONNÉES ====================
def tokenizer_func(text):
    """Tokeniseur personnalisé pour les noms d'entreprises"""
    return re.findall(r'\w+', text.lower())

def build_vocab(texts, specials=['<unk>', '<pad>']):
    """Construction du vocabulaire à partir des textes"""
    vocab = {token: i for i, token in enumerate(specials)}
    counter = Counter(token for text in texts for token in tokenizer_func(text))
    for token, _ in counter.most_common():
        if token not in vocab:
            vocab[token] = len(vocab)
    return vocab

class CompanyDataset(Dataset):
    """Dataset personnalisé pour les noms d'entreprises"""

    def __init__(self, dataframe, vocab=None, max_length=50):
        self.data = dataframe
        self.max_length = max_length

        # Construction du vocabulaire si non fourni
        if vocab is None:
            tokenized_texts = [tokenizer_func(text) for text in self.data['company_name']]
            self.vocab = build_vocab(self.data['company_name'])
        else:
            self.vocab = vocab

        self.pad_idx = self.vocab['<pad>']
        self.unk_idx = self.vocab['<unk>']

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text = self.data.iloc[idx]['company_name']
        label = self.data.iloc[idx]['label']

        # Tokenisation et conversion numérique
        tokens = tokenizer_func(text)
        numericalized = [self.vocab.get(token, self.unk_idx) for token in tokens]

        # Padding/truncature
        if len(numericalized) > self.max_length:
            numericalized = numericalized[:self.max_length]
        else:
            numericalized = numericalized + [self.pad_idx] * (self.max_length - len(numericalized))

        return torch.tensor(numericalized, dtype=torch.long), torch.tensor(label, dtype=torch.float)


In [None]:
# ==================== CHARGEMENT DES DONNÉES ====================
# Chargement du dataset
df = pd.read_csv('datasets/company-dataset/dataset_entreprises (2).csv')

# Split des données (70% train, 15% val, 15% test)
train_df, temp_df = train_test_split(df, test_size=0.3, random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42)

# Création des datasets et dataloaders
train_dataset = CompanyDataset(train_df)
vocab = train_dataset.vocab
vocab_size = len(vocab)
max_length = train_dataset.max_length

val_dataset = CompanyDataset(val_df, vocab=vocab)
test_dataset = CompanyDataset(test_df, vocab=vocab)

batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)


In [None]:
# ==================== ARCHITECTURE DU MODÈLE ====================
class Attention(nn.Module):
    """Mécanisme d'attention pour le modèle"""

    def __init__(self, hidden_dim):
        super().__init__()
        self.attention = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, 1)
        )

    def forward(self, x):
        # Calcul des poids d'attention
        attention_weights = F.softmax(self.attention(x), dim=1)
        # Application des poids aux features
        weighted = torch.sum(attention_weights * x, dim=1)
        return weighted, attention_weights.squeeze(-1)

class CompanyClassifier(nn.Module):
    """Modèle complet de classification d'entreprises"""

    def __init__(self, vocab_size, embedding_dim=128, hidden_dim=128, num_layers=1, dropout=0.5):
        super().__init__()

        # Couche d'embedding
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.embedding_dropout = nn.Dropout(dropout)

        # Couche LSTM bidirectionnelle
        self.lstm = nn.LSTM(
            embedding_dim,
            hidden_dim,
            num_layers=num_layers,
            bidirectional=True,
            batch_first=True,
            dropout=dropout if num_layers>1 else 0
        )

        # Mécanisme d'attention
        self.attention = Attention(hidden_dim * 2)  # *2 pour bidirectional

        # Classificateur
        self.fc1 = nn.Linear(hidden_dim * 2, hidden_dim)
        self.dropout = nn.Dropout(dropout)
        self.fc2 = nn.Linear(hidden_dim, 1)

        # Initialisation des poids
        self._init_weights()

    def _init_weights(self):
        """Initialisation des poids du modèle"""
        for name, param in self.named_parameters():
            if 'weight' in name:
                if 'lstm' in name:
                    # Initialisation orthogonale pour les poids du LSTM
                    for i in range(0, param.shape[0], self.lstm.hidden_size):
                        nn.init.orthogonal_(param[i:i+self.lstm.hidden_size])
                else:
                    nn.init.xavier_normal_(param)
            elif 'bias' in name:
                nn.init.constant_(param, 0.0)

    def forward(self, x):
        # Embedding des tokens
        embedded = self.embedding_dropout(self.embedding(x))

        # Passage dans le LSTM
        lstm_out, _ = self.lstm(embedded)

        # Application de l'attention
        attn_out, attn_weights = self.attention(lstm_out)

        # Classification finale
        out = self.dropout(F.relu(self.fc1(attn_out)))
        return torch.sigmoid(self.fc2(out)).squeeze(-1), attn_weights


In [None]:
# ==================== INITIALISATION DU MODÈLE ====================
model = CompanyClassifier(
    vocab_size=vocab_size,
    embedding_dim=128,
    hidden_dim=128,
    num_layers=1,
    dropout=0.5
).to(device)


In [None]:
# ==================== CONFIGURATION DE L'ENTRAÎNEMENT ====================
criterion = nn.BCELoss()  # Fonction de perte
optimizer = torch.optim.AdamW(model.parameters(), lr=0.0005, weight_decay=1e-4)  # Optimiseur
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=2, factor=0.5)  # Scheduler

# Early stopping
best_val_loss = float('inf')
patience = 5
patience_counter = 0



In [None]:
# ==================== FONCTIONS D'ÉVALUATION ====================
def evaluate(model, loader, criterion):
    """Évaluation du modèle sur un loader donné"""
    model.eval()
    total_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs, _ = model(inputs)
            loss = criterion(outputs, labels)

            total_loss += loss.item()
            predicted = (outputs > 0.5).float()
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    avg_loss = total_loss / len(loader)
    accuracy = correct / total
    return avg_loss, accuracy



In [None]:
# ==================== ENTRAÎNEMENT DU MODÈLE ====================
num_epochs = 30
train_losses = []
val_losses = []
train_accs = []
val_accs = []

for epoch in range(num_epochs):
    print(f"\nEpoch {epoch + 1}/{num_epochs}")

    # Phase d'entraînement
    model.train()
    epoch_train_loss = 0
    train_correct = 0
    train_total = 0

    for inputs, labels in tqdm(train_loader, desc="Training"):
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs, _ = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()

        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

        optimizer.step()

        epoch_train_loss += loss.item()
        predicted = (outputs > 0.5).float()
        train_correct += (predicted == labels).sum().item()
        train_total += labels.size(0)

    train_loss = epoch_train_loss / len(train_loader)
    train_acc = train_correct / train_total

    # Phase de validation
    val_loss, val_acc = evaluate(model, val_loader, criterion)

    # Enregistrement des métriques
    train_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accs.append(train_acc)
    val_accs.append(val_acc)

    # Mise à jour du scheduler
    scheduler.step(val_loss)

    print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f}")
    print(f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")

    # Early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        torch.save(model.state_dict(), 'best_model.pth')
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print("Early stopping triggered")
            break



In [None]:
# ==================== ÉVALUATION FINALE ====================
# Chargement du meilleur modèle
model.load_state_dict(torch.load('best_model.pth', map_location=device))

# Évaluation sur le test set
test_loss, test_acc = evaluate(model, test_loader, criterion)
print(f"\nPerformance finale sur le test set:")
print(f"Test Loss: {test_loss:.4f} | Test Accuracy: {test_acc:.4f}")



In [None]:
# ==================== FONCTION D'INFÉRENCE ====================
def predict_company_type(company_name, model, vocab, max_length=50, device='cpu'):
    """Prédit si une entreprise est une startup ou une grande entreprise"""

    # Prétraitement du texte
    text = re.sub(r'[^a-zA-Z0-9\s]', '', company_name.lower())
    text = re.sub(r'\s+', ' ', text).strip()

    # Tokenisation et conversion numérique
    tokens = tokenizer_func(text)
    numericalized = [vocab.get(token, vocab['<unk>']) for token in tokens]

    # Padding/truncature
    if len(numericalized) > max_length:
        numericalized = numericalized[:max_length]
    else:
        numericalized = numericalized + [vocab['<pad>']] * (max_length - len(numericalized))

    # Conversion en tensor
    input_tensor = torch.tensor([numericalized], dtype=torch.long).to(device)

    # Prédiction
    model.eval()
    with torch.no_grad():
        output, attn_weights = model(input_tensor)
        prob = output.item()
        prediction = 1 if prob > 0.5 else 0

    # Récupération des poids d'attention
    tokens = tokens[:max_length] + ['<pad>'] * (max_length - len(tokens))
    attention = attn_weights[0].cpu().numpy()

    return {
        'prediction': prediction,
        'probability': prob,
        'tokens': tokens,
        'attention': attention
    }


In [None]:
# ==================== SAUVEGARDE DU MODÈLE ====================
save_data = {
    'model_state_dict': model.state_dict(),
    'vocab': vocab,
    'max_length': max_length,
    'tokenizer_func': tokenizer_func
}

torch.save(save_data, 'models/company_classifier.pth')

In [None]:
# ==================== EXEMPLE D'UTILISATION ====================
example_company = "google"
result = predict_company_type(example_company, model, vocab, max_length, device)

print(f"\nExemple de prédiction pour '{example_company}':")
print(f"Classe prédite: {'Startup' if result['prediction'] == 1 else 'Grande entreprise'}")
print(f"Probabilité: {result['probability']:.4f}")

# Affichage des tokens les plus importants
print("\nTokens les plus importants (par poids d'attention):")
token_weights = sorted(zip(result['tokens'], result['attention']), key=lambda x: x[1], reverse=True)[:5]
for token, weight in token_weights:
    print(f"{token}: {weight:.4f}")