# Prédiction d'énergie moléculaire avec Transformer

Ce notebook implémente une approche plus sophistiquée utilisant un Transformer pour prédire l'énergie des molécules. Cette approche :
1. Traite chaque atome comme un token avec ses caractéristiques propres
2. Utilise l'attention pour capturer les interactions entre atomes
3. Intègre la géométrie 3D dans le modèle
4. Utilise une architecture plus profonde pour plus de capacité

In [13]:
# Importation des bibliothèques nécessaires
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import time
import os
from ase.io import read
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error
import matplotlib.pyplot as plt
import seaborn as sns

# Configuration de PyTorch
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed(42)

In [14]:
class MolecularTransformerDataset(Dataset):
    def __init__(self, molecule_ids, energies=None, max_atoms=23, root_dir='../data/atoms/train/'):
        self.molecule_ids = molecule_ids
        self.energies = energies
        self.max_atoms = max_atoms
        self.root_dir = root_dir
        
    def __len__(self):
        return len(self.molecule_ids)
    
    def __getitem__(self, idx):
        mol_id = self.molecule_ids[idx]
        xyz_path = os.path.join(self.root_dir, f'id_{mol_id}.xyz')
        atoms = read(xyz_path)
        
        # Récupérer les positions et numéros atomiques
        positions = atoms.get_positions()
        atomic_numbers = atoms.get_atomic_numbers()
        n_atoms = len(positions)
        
        # Préparer les entrées du transformer
        atom_features = np.zeros((self.max_atoms, 4))  # [x, y, z, atomic_number]
        mask = np.zeros(self.max_atoms, dtype=bool)
        
        # Remplir avec les données réelles
        atom_features[:n_atoms, :3] = positions
        atom_features[:n_atoms, 3] = atomic_numbers
        mask[:n_atoms] = 1
        
        output = {
            'features': torch.FloatTensor(atom_features),
            'mask': torch.BoolTensor(mask),
            'n_atoms': n_atoms
        }
        
        if self.energies is not None:
            output['energy'] = torch.FloatTensor([self.energies[idx]])
            
        return output

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        assert d_model % num_heads == 0
        
        self.d_k = d_model // num_heads
        self.q_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.out = nn.Linear(d_model, d_model)
        
    def attention(self, q, k, v, mask=None, dropout=None):
        scores = torch.matmul(q, k.transpose(-2, -1)) / np.sqrt(self.d_k)
        if mask is not None:
            mask = mask.unsqueeze(1).unsqueeze(2)
            scores = scores.masked_fill(~mask, float('-inf'))
        scores = F.softmax(scores, dim=-1)
        if dropout is not None:
            scores = dropout(scores)
        output = torch.matmul(scores, v)
        return output
    
    def forward(self, q, k, v, mask=None):
        bs = q.size(0)
        
        # Linear projections and split into heads
        k = self.k_linear(k).view(bs, -1, self.num_heads, self.d_k)
        q = self.q_linear(q).view(bs, -1, self.num_heads, self.d_k)
        v = self.v_linear(v).view(bs, -1, self.num_heads, self.d_k)
        
        # Transpose to get dimensions [batch, head, seq_length, d_k]
        k = k.transpose(1, 2)
        q = q.transpose(1, 2)
        v = v.transpose(1, 2)
        
        # Apply attention
        scores = self.attention(q, k, v, mask)
        
        # Concatenate heads and put through final linear layer
        concat = scores.transpose(1, 2).contiguous().view(bs, -1, self.d_model)
        output = self.out(concat)
        
        return output

class TransformerBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff=2048, dropout=0.1):
        super().__init__()
        self.attention = MultiHeadAttention(d_model, num_heads)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.ff = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(d_ff, d_model)
        )
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, mask=None):
        attention_out = self.attention(x, x, x, mask)
        x = self.norm1(x + self.dropout(attention_out))
        ff_out = self.ff(x)
        x = self.norm2(x + self.dropout(ff_out))
        return x

class MolecularTransformer(nn.Module):
    def __init__(self, d_model=256, num_heads=8, num_layers=6, dropout=0.1):
        super().__init__()
        
        # Embedding des caractéristiques atomiques
        self.feature_embedding = nn.Sequential(
            nn.Linear(4, d_model),
            nn.LayerNorm(d_model),
            nn.ReLU(),
            nn.Dropout(dropout)
        )
        
        # Couches Transformer
        self.transformer_layers = nn.ModuleList([
            TransformerBlock(d_model, num_heads, d_model * 4, dropout)
            for _ in range(num_layers)
        ])
        
        # Couches de sortie
        self.output_layers = nn.Sequential(
            nn.Linear(d_model, d_model // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(d_model // 2, d_model // 4),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(d_model // 4, 1)
        )
        
    def forward(self, features, mask=None):
        # Embedding des caractéristiques
        x = self.feature_embedding(features)
        
        # Application des couches Transformer
        for layer in self.transformer_layers:
            x = layer(x, mask)
            
        # Global pooling avec masque
        if mask is not None:
            x = x * mask.unsqueeze(-1)
            x = x.sum(dim=1) / mask.sum(dim=1, keepdim=True)
        else:
            x = x.mean(dim=1)
            
        # Prédiction finale
        return self.output_layers(x).squeeze(-1)

In [15]:
# Chargement des données
train_energies = pd.read_csv('../data/energies/train.csv')
molecule_ids = train_energies['id'].values
energies = train_energies['energy'].values

# Séparation train/validation
train_ids, val_ids, y_train, y_val = train_test_split(
    molecule_ids, energies, test_size=0.2, random_state=42
)

# Création des datasets
train_dataset = MolecularTransformerDataset(train_ids, y_train)
val_dataset = MolecularTransformerDataset(val_ids, y_val)

# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8)

# Configuration du modèle
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = MolecularTransformer(
    d_model=128,  # Réduit de 256 à 128
    num_heads=4,  # Réduit de 8 à 4
    num_layers=3,  # Réduit de 6 à 3
    dropout=0.1
).to(device)

# Optimiseur et fonction de perte avec un learning rate légèrement plus élevé
optimizer = optim.AdamW(model.parameters(), lr=2e-4, weight_decay=0.01)
criterion = nn.MSELoss()

# Afficher la taille du modèle et le device
print(f"Entraînement sur {device}")
print(f"Nombre total de paramètres: {sum(p.numel() for p in model.parameters()):,}")

Entraînement sur cuda
Nombre total de paramètres: 606,081


In [16]:
# Fonction d'entraînement
def train_epoch(model, loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    for batch in loader:
        features = batch['features'].to(device)
        mask = batch['mask'].to(device)
        target_energies = batch['energy'].to(device).squeeze(-1)
        
        optimizer.zero_grad()
        outputs = model(features, mask)
        loss = criterion(outputs, target_energies)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    return total_loss / len(loader)

# Fonction d'évaluation
def evaluate(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    predictions = []
    true_energies = []
    
    with torch.no_grad():
        for batch in loader:
            features = batch['features'].to(device)
            mask = batch['mask'].to(device)
            target_energies = batch['energy'].to(device).squeeze(-1)
            
            outputs = model(features, mask)
            loss = criterion(outputs, target_energies)
            total_loss += loss.item()
            
            predictions.extend(outputs.cpu().numpy())
            true_energies.extend(target_energies.cpu().numpy())
            
    return np.array(predictions), np.array(true_energies), total_loss / len(loader)

# Entraînement
num_epochs = 200
best_val_loss = float('inf')
patience = 15
patience_counter = 0
history = []

print("Début de l'entraînement...")
start_time = time.time()

for epoch in range(num_epochs):
    train_loss = train_epoch(model, train_loader, criterion, optimizer, device)
    val_predictions, val_true, val_loss = evaluate(model, val_loader, criterion, device)
    
    history.append((train_loss, val_loss))
    
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        best_model = model.state_dict()
    else:
        patience_counter += 1
    
    if patience_counter >= patience:
        print(f"Early stopping à l'époque {epoch+1}")
        break
        
    if (epoch + 1) % 10 == 0:
        rmse = np.sqrt(mean_squared_error(val_true, val_predictions))
        mae = mean_absolute_error(val_true, val_predictions)
        print(f"Époque [{epoch+1}/{num_epochs}]")
        print(f"Perte entraînement: {train_loss:.4f}")
        print(f"Perte validation: {val_loss:.4f}")
        print(f"RMSE validation: {rmse:.4f}")
        print(f"MAE validation: {mae:.4f}")
        print("--------------------")

total_time = time.time() - start_time
print(f"\nTemps d'entraînement total: {total_time/60:.2f} minutes")

# Charger le meilleur modèle
model.load_state_dict(best_model)

Début de l'entraînement...
Époque [10/200]
Perte entraînement: 94.8289
Perte validation: 21.5313
RMSE validation: 4.6395
MAE validation: 3.6669
--------------------
Époque [10/200]
Perte entraînement: 94.8289
Perte validation: 21.5313
RMSE validation: 4.6395
MAE validation: 3.6669
--------------------
Époque [20/200]
Perte entraînement: 85.5589
Perte validation: 24.1312
RMSE validation: 4.9117
MAE validation: 3.8207
--------------------
Époque [20/200]
Perte entraînement: 85.5589
Perte validation: 24.1312
RMSE validation: 4.9117
MAE validation: 3.8207
--------------------


KeyboardInterrupt: 

In [None]:
# Visualisation de l'évolution de l'entraînement
train_losses, val_losses = zip(*history)
plt.figure(figsize=(10, 6))
plt.plot(train_losses, label='Entraînement')
plt.plot(val_losses, label='Validation')
plt.xlabel('Époque')
plt.ylabel('Perte (MSE)')
plt.title('Évolution de la perte pendant l\'entraînement')
plt.legend()
plt.yscale('log')
plt.grid(True)
plt.show()

# Évaluation finale sur l'ensemble de validation
val_predictions = evaluate(model, val_loader, criterion, device)
val_rmse = np.sqrt(mean_squared_error(y_val, val_predictions))
val_mae = mean_absolute_error(y_val, val_predictions)

print("\nPerformances finales sur l'ensemble de validation:")
print(f"RMSE: {val_rmse:.4f}")
print(f"MAE: {val_mae:.4f}")

In [None]:
# Préparer les données de test
test_files = sorted(os.listdir('../data/atoms/test'))
test_ids = [int(f.split('_')[1].split('.')[0]) for f in test_files]

# Créer le dataset de test
test_dataset = MolecularTransformerDataset(test_ids, root_dir='../data/atoms/test/')
test_loader = DataLoader(test_dataset, batch_size=16)

# Prédictions sur l'ensemble de test
model.eval()
test_predictions, _, _ = evaluate(model, test_loader, criterion, device)

# Sauvegarder les prédictions
predictions_df = pd.DataFrame({
    'id': test_ids,
    'energy': test_predictions
})
predictions_df.to_csv('../data/energies/test_pred_transformer.csv', index=False)
print("Prédictions sauvegardées dans test_pred_transformer.csv")

## Conclusion

Cette approche utilise une architecture Transformer pour prédire l'énergie des molécules, avec plusieurs avantages :

1. Prise en compte explicite des relations entre atomes grâce au mécanisme d'attention
2. Traitement naturel de la géométrie 3D
3. Capacité à gérer des molécules de tailles différentes
4. Architecture plus profonde permettant de capturer des relations complexes

La convergence est plus lente que les approches précédentes, mais le modèle peut potentiellement atteindre de meilleures performances grâce à sa capacité à modéliser des interactions complexes entre les atomes.