In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

In [2]:
import sys
import os
sys.path.append(os.path.abspath(".."))
from utils_project import generate_csv,create_dataframe_from_xyz_files,create_X_y_from_dataframe


csv_path = "../../data/energies/train.csv"
path_data = "../../data/atoms/train"
df_train=create_dataframe_from_xyz_files(path_data,csv_path)
X=df_train[['positions', 'energy', 'charges']]

qm7 = X.to_dict("list")

#qm7 = fetch_qm7(align=True)
pos = np.array(qm7['positions'])
full_charges = np.array(qm7['charges'])

n_molecules = pos.shape[0]

In [3]:
class ElementwiseProd(nn.Module):
    def __init__(self, input_dim, q, k, act='sigmoid'):
        super().__init__()
        self.q = q
        self.k = k
        
        # Sélection de la fonction d'activation
        if act == 'sigmoid':
            self.activation = torch.sigmoid
        elif act == 'tanh':
            self.activation = torch.tanh
        elif act == 'relu':
            self.activation = F.relu
        else:
            raise ValueError(f"Activation '{act}' non supportée.")
        
        # Création des k couches linéaires
        self.hidden_layers = nn.ModuleList([
            nn.Linear(input_dim, q) for _ in range(k)
        ])

    def forward(self, x):
        output = torch.ones(x.size(0), self.q, device=x.device)
        for layer in self.hidden_layers:
            out = self.activation(layer(x))
            output *= out  # Produit élément par élément
        return output


In [4]:
from sklearn.base import BaseEstimator, RegressorMixin
import torch
import torch.nn as nn
import torch.optim as optim

class ElementwiseProdRegressor(BaseEstimator, RegressorMixin):
    def __init__(self, input_dim=1, q=10, k=3, act='sigmoid', epochs=100, lr=1e-3, verbose=False):
        self.input_dim = input_dim
        self.q = q
        self.k = k
        self.act = act
        self.epochs = epochs
        self.lr = lr
        self.verbose = verbose
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self._build_model()

    def _build_model(self):
        class FullModel(nn.Module):
            def __init__(self, input_dim, q, k, act):
                super().__init__()
                self.core = ElementwiseProd(input_dim, q, k, act)
                self.output = nn.Linear(q, 1)
            
            def forward(self, x):
                x = self.core(x)
                x = self.output(x)
                return x
        
        self.model = FullModel(self.input_dim, self.q, self.k, self.act).to(self.device)

    def fit(self, X, y):
        X = torch.tensor(X, dtype=torch.float32).to(self.device)
        y = torch.tensor(y, dtype=torch.float32).view(-1, 1).to(self.device)

        criterion = nn.MSELoss()
        optimizer = optim.SGD(self.model.parameters(), lr=self.lr)

        self.model.train()
        for epoch in range(self.epochs):
            optimizer.zero_grad()
            output = self.model(X)
            loss = criterion(output, y)
            loss.backward()
            optimizer.step()
            if self.verbose and epoch % 10 == 0:
                print(f"Epoch {epoch}, Loss: {loss.item():.4f}")
        return self

    def predict(self, X):
        self.model.eval()
        X = torch.tensor(X, dtype=torch.float32).to(self.device)
        with torch.no_grad():
            output = self.model(X)
        return output.cpu().numpy().flatten()


In [5]:
M, N, O = 64, 64, 64 #192, 128, 96
grille = "64-64-64"
grid = np.mgrid[-M//2:-M//2+M, -N//2:-N//2+N, -O//2:-O//2+O]
grid = np.fft.ifftshift(grid)

In [6]:
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
print(device)

cuda


In [7]:
saved_data = torch.load( f'../models_scattering/scattering_outputs_{grille}.pt', map_location=device)
order_0 = saved_data['order_0']
orders_1_and_2 = saved_data['orders_1_and_2']
order_0 = order_0.cpu().numpy()
orders_1_and_2 = orders_1_and_2.cpu().numpy()

order_0 = order_0.reshape((n_molecules, -1))
orders_1_and_2 = orders_1_and_2.reshape((n_molecules, -1))
scattering_coef = np.concatenate([order_0, orders_1_and_2], axis=1)
target = qm7['energy']



In [22]:
import numpy as np
from sklearn import linear_model, preprocessing, pipeline, model_selection
import joblib

# Supposons que scattering_coef et target soient déjà définis
cross_val_folds = 5  # Assurez-vous que cross_val_folds est défini

# Liste des modèles à tester
models = [
    ("Ridge Regression with alpha=0.1", linear_model.Ridge(alpha=0.1)),
    ("PyTorch ElementwiseProd", ElementwiseProdRegressor(input_dim=scattering_coef.shape[1], q=1000, k=3, epochs=50, lr=1e-2))

]

results = []

for name, model in models:
    scaler = preprocessing.StandardScaler()
    regressor = pipeline.make_pipeline(scaler, model)

    target_prediction = model_selection.cross_val_predict(regressor, X=scattering_coef, y=target, cv=cross_val_folds)

    MAE = np.mean(np.abs(target_prediction - target))
    RMSE = np.sqrt(np.mean((target_prediction - target) ** 2))

    results.append((name, model, MAE, RMSE))

    print('{}: MAE: {}, RMSE: {}'.format(name, MAE, RMSE))

# Trouver le modèle avec le RMSE le plus bas
best_result = min(results, key=lambda x: x[3])
best_model_name, best_model, best_mae, best_rmse = best_result

print(f"Le meilleur modèle est {best_model_name} avec un RMSE de {best_rmse}.")

  return linalg.solve(A, Xy, assume_a="pos", overwrite_a=True).T
  return linalg.solve(A, Xy, assume_a="pos", overwrite_a=True).T
  return linalg.solve(A, Xy, assume_a="pos", overwrite_a=True).T
  return linalg.solve(A, Xy, assume_a="pos", overwrite_a=True).T
  return linalg.solve(A, Xy, assume_a="pos", overwrite_a=True).T


Ridge Regression with alpha=0.1: MAE: 0.2799451085647929, RMSE: 0.6534790099081015
PyTorch ElementwiseProd: MAE: 2.5771977830805666, RMSE: 4.114753866037503
Le meilleur modèle est Ridge Regression with alpha=0.1 avec un RMSE de 0.6534790099081015.


In [21]:
# 1) Normalise les données avant le fit/predict

model = ElementwiseProdRegressor(input_dim=scattering_coef.shape[1], q=1000, k=3, epochs=200, lr=1e-4, verbose=True)

# 3) Entraîne sur X_scaled
model.fit(scattering_coef, target)

# 4) Prédiction
y_pred = model.predict(scattering_coef)

# 5) RMSE
rmse = np.sqrt(np.mean((y_pred - target)**2))
print("RMSE:", rmse)


Epoch 0, Loss: 6216.8462
Epoch 10, Loss: 3430.2117
Epoch 20, Loss: 1912.3687
Epoch 30, Loss: 1093.7592
Epoch 40, Loss: 651.7441
Epoch 50, Loss: 450.4674
Epoch 60, Loss: 319.3500
Epoch 70, Loss: 245.6996
Epoch 80, Loss: 195.6316
Epoch 90, Loss: 173.9612
Epoch 100, Loss: 156.6039
Epoch 110, Loss: 158.0332
Epoch 120, Loss: 151.3026
Epoch 130, Loss: 144.4747
Epoch 140, Loss: 145.3064
Epoch 150, Loss: 141.1523
Epoch 160, Loss: 138.7427
Epoch 170, Loss: 137.2773
Epoch 180, Loss: 136.3021
Epoch 190, Loss: 136.2119
RMSE: 11.652322475579757
