In [None]:
# Reload automatically when the file is changed.
%load_ext autoreload
%autoreload 2

# Importing Modules

In [None]:
import numpy as np
import torch
import time
import os
from sklearn import (linear_model, model_selection, preprocessing,
                     pipeline)
from scipy.spatial.distance import pdist
from kymatio.torch import HarmonicScattering3D
from kymatio.scattering3d.utils import generate_weighted_sum_of_gaussians
from kymatio.datasets import fetch_qm7
from kymatio.caching import get_cache_dir
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
# Import torch backend 
from kymatio.scattering3d.backend.torch_backend import TorchBackend3D

import os 
import sys
MODULES_PATH = "../../Modules/Scattering"
MODELS_PATH = "../../Models/"

sys.path.append(MODULES_PATH)
sys.path.append(MODELS_PATH)


from Preprocessing import *
from Dataloaders_Preprocessing import *

# Importing Data

In [None]:
List_Data_Train = Load_Data('../../Data/atoms/train', '../../Data/energies/train.csv')
List_Data_Test= Load_Test_Data('../../Data/atoms/test')

In [None]:
List_Data_Train[0]

In [None]:
Display_Molecule_From_Atom_List(List_Data_Train[500]['Atoms_List'], Width=800, Height=800, Background_Color='lightblue')


In [None]:
Train_Dataset = XYZDataset_V2(List_Data_Train)
Test_Dataset = XYZDataset_V2(List_Data_Test,return_energy=False)

In [None]:
Test_Dataset[0]

# Scattering

In [None]:
LOAD_SCATTERING = True

In [None]:
def config(device):
    M = 192
    N = 128
    O = 96
    grid  = torch.tensor(np.fft.ifftshift(np.mgrid[-M//2:-M//2+M, -N//2:-N//2+N, -O//2:-O//2+O]))
    
    J =2
    L = 3
    sigma = 2.0
    integral_powers = [0.5, 1.0, 2.0, 3.0,5.0]
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    scattering = HarmonicScattering3D(J=J, shape=(M, N, O),
                                        L=L, sigma_0=sigma,
                                        integral_powers=integral_powers, max_order=2).to(device)
    
    return grid, scattering, sigma, integral_powers, M,N,O,J,L

In [None]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# Define output directory and ensure it exists
output_dir_train = "../../Saved_Features/Scattering/Train"
os.makedirs(output_dir_train, exist_ok=True)

output_dir_test = "../../Saved_Features/Scattering/Test"
os.makedirs(output_dir_test, exist_ok=True)


In [None]:
if not LOAD_SCATTERING:

    order_0, orders_1_and_2 = [], []
    Ids = []
    energies = []
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    grid, scattering, sigma, integral_powers, M,N,O,J,L = config(device)

    test_loader = DataLoader(Test_Dataset, batch_size=16, shuffle=True)
    train_loader = DataLoader(Train_Dataset, batch_size=16, shuffle=True)


    import torch
    from tqdm.auto import tqdm
    import os

    # Assuming order_0, orders_1_and_2, and energies are lists
    order_0 = []
    orders_1_and_2 = []
    energies = []

    # Your existing loop
    for batch in tqdm(train_loader):
        Id_Batch = batch[0]
        full_charge_batch = batch[1]
        valence_batch = batch[2]
        pos_batch = batch[3]
        energie = batch[4]

        full_density_batch = generate_weighted_sum_of_gaussians(grid, pos_batch, full_charge_batch, sigma)
        full_density_batch = torch.from_numpy(full_density_batch)
        full_density_batch = full_density_batch.to(device).float()
        full_order_0 = TorchBackend3D.compute_integrals(full_density_batch, integral_powers)
        full_scattering = scattering(full_density_batch)

        val_density_batch = generate_weighted_sum_of_gaussians(grid, pos_batch, valence_batch, sigma)
        val_density_batch = torch.from_numpy(val_density_batch)
        val_density_batch = val_density_batch.to(device).float()
        val_order_0 = TorchBackend3D.compute_integrals(val_density_batch, integral_powers)
        val_scattering = scattering(val_density_batch)

        core_density_batch = full_density_batch - val_density_batch
        core_order_0 = TorchBackend3D.compute_integrals(core_density_batch, integral_powers)
        core_scattering = scattering(core_density_batch)

        batch_order_0 = torch.stack((full_order_0, val_order_0, core_order_0), dim=-1)
        batch_orders_1_and_2 = torch.stack((full_scattering, val_scattering, core_scattering), dim=-1)
        
        order_0.append(batch_order_0)
        orders_1_and_2.append(batch_orders_1_and_2)
        energies.append(energie)
        Ids.append(Id_Batch)
else:
    order_0_train = torch.load(os.path.join(output_dir_train, 'order_0.pt'))
    orders_1_and_2_train = torch.load(os.path.join(output_dir_train, 'orders_1_and_2.pt'))
    energies_train = torch.load(os.path.join(output_dir_train, 'energies.pt'))
    Ids_train = torch.load(os.path.join(output_dir_train, 'Ids.pt'))

    


In [None]:
if not LOAD_SCATTERING:
    # After the loop, concatenate the lists into single tensors
    order_0_train = torch.cat(order_0, dim=0)  # Adjust dim as needed
    orders_1_and_2_train = torch.cat(orders_1_and_2, dim=0)  # Adjust dim as needed
    energies_train = torch.cat(energies, dim=0)  # Adjust dim as needed
    Ids_train = torch.cat(Ids, dim=0)  # Adjust dim as needed

    # Save tensors to files
    torch.save(order_0_train, os.path.join(output_dir_train, "order_0.pt"))
    torch.save(orders_1_and_2_train, os.path.join(output_dir_train, "orders_1_and_2.pt"))
    torch.save(energies_train, os.path.join(output_dir_train, "energies.pt"))

    # Save Ids
    torch.save(Ids_train, os.path.join(output_dir_train, "Ids.pt"))



In [None]:
if not LOAD_SCATTERING:
    order_0, orders_1_and_2 = [], []
    energies = []
    Ids = []
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    grid, scattering, sigma, integral_powers, M,N,O,J,L = config(device)

    test_loader = DataLoader(Test_Dataset, batch_size=16, shuffle=False)


    import torch
    from tqdm.auto import tqdm
    import os

    # Assuming order_0, orders_1_and_2, and energies are lists
    order_0 = []
    orders_1_and_2 = []
    energies = []

    # Your existing loop
    for batch in tqdm(test_loader):
        Id_batch = batch[0]
        full_charge_batch = batch[1]
        valence_batch = batch[2]
        pos_batch = batch[3]
        energie = 0

        full_density_batch = generate_weighted_sum_of_gaussians(grid, pos_batch, full_charge_batch, sigma)
        full_density_batch = torch.from_numpy(full_density_batch)
        full_density_batch = full_density_batch.to(device).float()
        full_order_0 = TorchBackend3D.compute_integrals(full_density_batch, integral_powers)
        full_scattering = scattering(full_density_batch)

        val_density_batch = generate_weighted_sum_of_gaussians(grid, pos_batch, valence_batch, sigma)
        val_density_batch = torch.from_numpy(val_density_batch)
        val_density_batch = val_density_batch.to(device).float()
        val_order_0 = TorchBackend3D.compute_integrals(val_density_batch, integral_powers)
        val_scattering = scattering(val_density_batch)

        core_density_batch = full_density_batch - val_density_batch
        core_order_0 = TorchBackend3D.compute_integrals(core_density_batch, integral_powers)
        core_scattering = scattering(core_density_batch)

        batch_order_0 = torch.stack((full_order_0, val_order_0, core_order_0), dim=-1)
        batch_orders_1_and_2 = torch.stack((full_scattering, val_scattering, core_scattering), dim=-1)
        
        order_0.append(batch_order_0)
        orders_1_and_2.append(batch_orders_1_and_2)
        energies.append(energie)
        Ids.append(Id_batch)
else:
    order_0_test = torch.load(os.path.join(output_dir_test, 'order_0.pt'))
    orders_1_and_2_test = torch.load(os.path.join(output_dir_test, 'orders_1_and_2.pt'))
    Ids_test = torch.load(os.path.join(output_dir_test, 'Ids.pt'))

    



In [None]:
if not LOAD_SCATTERING:
    # After the loop, concatenate the lists into single tensors
    order_0_test = torch.cat(order_0, dim=0)  # Adjust dim as needed
    orders_1_and_2_test = torch.cat(orders_1_and_2, dim=0)  # Adjust dim as needed
    # energies_tensor = torch.cat(energies, dim=0)  # Adjust dim as needed
    Ids_tensor = torch.cat(Ids, dim=0)  # Adjust dim as needed
    torch.save(order_0_test, os.path.join(output_dir_test, "order_0.pt"))
    torch.save(orders_1_and_2_test, os.path.join(output_dir_test, "orders_1_and_2.pt"))
    torch.save(Ids_tensor, os.path.join(output_dir_test, "Ids.pt"))


# MLP

In [None]:
import torch
import torch.nn as nn
# Dataset
from torch.utils.data import Dataset, DataLoader

In [None]:
class MLP(nn.Module):
    def __init__(self, Input_Dim,Nb_Hidden_Layers, Hidden_Layers_Size_List, Output_Dim, Activation_Name):
        super(MLP, self).__init__()
        
        self.Input_Dim = Input_Dim
        self.Nb_Hidden_Layers = Nb_Hidden_Layers
        self.Hidden_Layers_Size_List = Hidden_Layers_Size_List
        self.Output_Dim = Output_Dim
        self.Activation_Name = Activation_Name
        
        # Define the layers of the MLP
        layers = []
        
        # Input layer
        layers.append(nn.Linear(Input_Dim, Hidden_Layers_Size_List[0]))
        
        # Hidden layers
        for i in range(Nb_Hidden_Layers - 1):
            layers.append(self.get_activation_function(Activation_Name))
            layers.append(nn.Linear(Hidden_Layers_Size_List[i], Hidden_Layers_Size_List[i + 1]))
        
        # Output layer
        layers.append(self.get_activation_function(Activation_Name))
        layers.append(nn.Linear(Hidden_Layers_Size_List[-1], Output_Dim))
        
        self.model = nn.Sequential(*layers)


    def get_activation_function(self, activation_name):
        if activation_name == 'ReLU':
            return nn.ReLU()
        elif activation_name == 'Sigmoid':
            return nn.Sigmoid()
        elif activation_name == 'Tanh':
            return nn.Tanh()
        elif activation_name == 'LeakyReLU':
            return nn.LeakyReLU()
        elif activation_name == 'PReLU':
            return nn.PReLU()
        elif activation_name == 'ELU':
            return nn.ELU()
        elif activation_name == 'Softmax':
            return nn.Softmax(dim=1)
        elif activation_name == 'GELU':
            return nn.GELU()
        elif activation_name == 'linear' :
            return nn.Identity()
        else:
            raise ValueError(f"Activation function '{activation_name}' is not supported.")
        

    def forward(self, x):
        """
        Forward pass through the MLP.
        
        :param x: Input tensor of shape (batch_size, Input_Dim).
        :return: Output tensor of shape (batch_size, Output_Dim).
        """
        return self.model(x)
    


In [None]:
class Scattering_Dataset(Dataset):
    def __init__(self, order_0, orders_1_and_2, energies=None, Ids=None):
        self.order_0 = order_0
        self.orders_1_and_2 = orders_1_and_2
        self.energies = energies
        self.Ids = Ids

    def __len__(self):
        return len(self.order_0)

    def __getitem__(self, idx):
        Ids = self.Ids[idx] if self.Ids is not None else None
        order_0 = self.order_0[idx].reshape(-1)
        orders_1_and_2 = self.orders_1_and_2[idx].reshape(-1)
        energy = self.energies[idx] if self.energies is not None else None


        Features = torch.cat((order_0, orders_1_and_2), dim=-1)

        if self.energies is not None:
            return Ids, Features, energy
        else:
            return Ids, Features
        




Dataset_Train = Scattering_Dataset(order_0_train, orders_1_and_2_train, energies_train, Ids_train)
Dataset_Test = Scattering_Dataset(order_0_test, orders_1_and_2_test, None, Ids_test)

# Split the dataset into training and validation sets
train_size = int(0.8 * len(Dataset_Train))
val_size = len(Dataset_Train) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(Dataset_Train, [train_size, val_size])



In [None]:
from tqdm.auto import tqdm
def Train_One_Epoch(Model,Train_Loader,Optimizer,Criterion,List_Train_Losses_Per_Batches, Device):

    Progress_Bar_Batch = tqdm(Train_Loader,desc="Batches")

    Train_Loss = 0
    Num_Batches = 0
    for Ids, Positions,Energies in Progress_Bar_Batch:
        Ids = Ids.to(Device)
        Positions = Positions.to(Device)
        Energies = Energies.to(Device)

        # Forward pass

        Optimizer.zero_grad()

        outputs = Model(Positions)

        loss = Criterion(outputs,Energies)

        loss.backward()

        Optimizer.step()

        Running_Loss = loss.item()

        # Calculate the accuracy
        List_Train_Losses_Per_Batches.append(Running_Loss)

        Progress_Bar_Batch.set_description(f"Num Batches: {Num_Batches} Running Train Loss: {Running_Loss:.3f} ")

        Train_Loss += Running_Loss

        Num_Batches += 1

    Train_Loss = Train_Loss/Num_Batches
    return Train_Loss, List_Train_Losses_Per_Batches


def Test_One_Epoch(Model,Test_Loader,Criterion,List_Test_Losses_Per_Batches, Device):
    Progress_Bar_Batch = tqdm(Test_Loader,desc="Batches",leave=False)

    Test_Loss = 0

    Num_Batches = 0

    for Ids, images, labels in Progress_Bar_Batch:

        Ids = Ids.to(Device)
        images = images.to(Device)
        labels = labels.to(Device)

        
        outputs = Model(images)

        loss = Criterion(outputs,labels)

        Running_Loss = loss.item()
        List_Test_Losses_Per_Batches.append(Running_Loss)

        Test_Loss += Running_Loss




        Num_Batches += 1

        Progress_Bar_Batch.set_description(f"Num Batches: {Num_Batches} Running Test Loss: {Running_Loss:.3f} ")


    Test_Loss = Test_Loss/Num_Batches

    return Test_Loss,  List_Test_Losses_Per_Batches


def Train(Model, Train_Loader, Test_Loader, Optimizer, Criterion, Num_Epochs, Device, Save_Path="best_model.pth", Best_Test_Loss=float('inf')):
    # Set the model to training mode
    Model.train()

    # Move the model to the device
    Model.to(Device)

    List_Train_Losses_Per_Epochs = []
    List_Test_Losses_Per_Epochs = []
    List_Train_Losses_Per_Batches = []
    List_Test_Losses_Per_Batches = []

    Progress_Bar_Epochs = tqdm(range(Num_Epochs), desc="Epochs")

    for epoch in Progress_Bar_Epochs:
        Train_Loss, List_Train_Losses_Per_Batches = Train_One_Epoch(Model, Train_Loader, Optimizer, Criterion, List_Train_Losses_Per_Batches, Device)

        Test_Loss, List_Test_Losses_Per_Batches = Test_One_Epoch(Model, Test_Loader, Criterion, List_Test_Losses_Per_Batches, Device)

        List_Train_Losses_Per_Epochs.append(Train_Loss)
        List_Test_Losses_Per_Epochs.append(Test_Loss)

        # Save model if test loss is the best so far
        if Test_Loss < Best_Test_Loss:
            Best_Test_Loss = Test_Loss
            torch.save(Model, Save_Path)
            print(f"New best test loss: {Test_Loss:.3f}, model saved to {Save_Path}")

        print(f"Epoch: {epoch+1}/{Num_Epochs} Train Loss: {Train_Loss:.3f} Test Loss: {Test_Loss:.3f} ")

        Progress_Bar_Epochs.set_description(f"Epochs Train Loss: {Train_Loss:.3f} Test Loss: {Test_Loss:.3f}")

    return List_Train_Losses_Per_Epochs, List_Test_Losses_Per_Epochs, List_Train_Losses_Per_Batches, List_Test_Losses_Per_Batches



def Plot_Losses(List_Train_Losses_Per_Epochs,List_Test_Losses_Per_Epochs,List_Train_Losses_Per_Batches,List_Test_Losses_Per_Batches, Save = False, Save_Path = None):

    plt.figure(figsize=(15,15))

    fig, axes = plt.subplots(2, 2, figsize=(15, 15))

    axes[0, 0].plot(List_Train_Losses_Per_Epochs)
    axes[0, 0].set_title("Train Losses Per Epochs")
    axes[0, 0].set_xlabel("Epochs")
    axes[0, 0].set_ylabel("Losses")

    axes[0, 1].plot(List_Test_Losses_Per_Epochs)
    axes[0, 1].set_title("Test Losses Per Epochs")
    axes[0, 1].set_xlabel("Epochs")
    axes[0, 1].set_ylabel("Losses")

    axes[1, 0].plot(List_Train_Losses_Per_Batches)
    axes[1, 0].set_title("Train Losses Per Batches")
    axes[1, 0].set_xlabel("Batches")
    axes[1, 0].set_ylabel("Losses")

    axes[1, 1].plot(List_Test_Losses_Per_Batches)
    axes[1, 1].set_title("Test Losses Per Batches")
    axes[1, 1].set_xlabel("Batches")
    axes[1, 1].set_ylabel("Losses")


    plt.show()

    if Save:
        plt.savefig(Save_Path)



In [None]:


Nb_Hidden_Layers = 1
Hidden_Layers_Size_List = [500]
Input_Dim = 375
Ouput_Dim = 1

Activation_Name = 'Identity'

Model = MLP(Input_Dim, Nb_Hidden_Layers, Hidden_Layers_Size_List, Ouput_Dim, Activation_Name)


# Define the optimizer and loss function
Optimizer = torch.optim.Adam(Model.parameters(), lr=3e-4)

Criterion = nn.MSELoss()

# Define the device
Device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

Model = Model.to(Device)

# Create the DataLoaders
Train_DataLoader = DataLoader(train_dataset, batch_size=64, shuffle=True)
Val_DataLoader = DataLoader(val_dataset, batch_size=64, shuffle=False)
Test_DataLoader = DataLoader(Dataset_Test, batch_size=64, shuffle=False)



In [None]:
Train(Model, Train_DataLoader, Val_DataLoader, Optimizer, Criterion, Num_Epochs=100, Device=Device, Save_Path="best_model.pth")
