# TorchMD-NET

## Funciones recurrentes

Importacion de modulos

In [122]:
import yaml  # Para cargar configuraciones desde archivos YAML
import argparse  # Para manejar argumentos de línea de comandos
import numpy as np  # Para operaciones matemáticas con arrays
import torch  # Biblioteca principal para deep learning con PyTorch
from os.path import dirname, join, exists  # Para trabajar con rutas de archivos y directorios
from pytorch_lightning.utilities import rank_zero_warn  # Para mostrar advertencias específicas de PyTorch Lightning


Definicion de masas atomicas

In [123]:
# fmt: off
# Atomic masses are based on:
#
#   Meija, J., Coplen, T., Berglund, M., et al. (2016). Atomic weights of
#   the elements 2013 (IUPAC Technical Report). Pure and Applied Chemistry,
#   88(3), pp. 265-291. Retrieved 30 Nov. 2016,
#   from doi:10.1515/pac-2015-0305
#
# Standard atomic weights are taken from Table 1: "Standard atomic weights
# 2013", with the uncertainties ignored.
# For hydrogen, helium, boron, carbon, nitrogen, oxygen, magnesium, silicon,
# sulfur, chlorine, bromine and thallium, where the weights are given as a
# range the "conventional" weights are taken from Table 3 and the ranges are
# given in the comments.
# The mass of the most stable isotope (in Table 4) is used for elements
# where there the element has no stable isotopes (to avoid NaNs): Tc, Pm,
# Po, At, Rn, Fr, Ra, Ac, everything after N

# Definición de masas atómicas de varios elementos.
atomic_masses = np.array([
    1.0, 1.008, 4.002602, 6.94, 9.0121831,
    10.81, 12.011, 14.007, 15.999, 18.998403163,
    20.1797, 22.98976928, 24.305, 26.9815385, 28.085,
    30.973761998, 32.06, 35.45, 39.948, 39.0983,
    40.078, 44.955908, 47.867, 50.9415, 51.9961,
    54.938044, 55.845, 58.933194, 58.6934, 63.546,
    65.38, 69.723, 72.63, 74.921595, 78.971,
    79.904, 83.798, 85.4678, 87.62, 88.90584,
    91.224, 92.90637, 95.95, 97.90721, 101.07,
    102.9055, 106.42, 107.8682, 112.414, 114.818,
    118.71, 121.76, 127.6, 126.90447, 131.293,
    132.90545196, 137.327, 138.90547, 140.116, 140.90766,
    144.242, 144.91276, 150.36, 151.964, 157.25,
    158.92535, 162.5, 164.93033, 167.259, 168.93422,
    173.054, 174.9668, 178.49, 180.94788, 183.84,
    186.207, 190.23, 192.217, 195.084, 196.966569,
    200.592, 204.38, 207.2, 208.9804, 208.98243,
    209.98715, 222.01758, 223.01974, 226.02541, 227.02775,
    232.0377, 231.03588, 238.02891, 237.04817, 244.06421,
    243.06138, 247.07035, 247.07031, 251.07959, 252.083,
    257.09511, 258.09843, 259.101, 262.11, 267.122,
    268.126, 271.134, 270.133, 269.1338, 278.156,
    281.165, 281.166, 285.177, 286.182, 289.19,
    289.194, 293.204, 293.208, 294.214,
])
# fmt: on

Funcion que divide un conjunto de datos en conjuntos de entrenamiento, validación y prueba

In [124]:
def train_val_test_split(dset_len, train_size, val_size, test_size, seed, order=None):
    # Asegura que solo uno de train_size, val_size o test_size sea None.
    assert (train_size is None) + (val_size is None) + (
        test_size is None
    ) <= 1, "Only one of train_size, val_size, test_size is allowed to be None."
    
    # Comprueba si train_size, val_size, test_size son flotantes.
    is_float = (
        isinstance(train_size, float),
        isinstance(val_size, float),
        isinstance(test_size, float),
    )

    # Redondea las fracciones a números enteros si son flotantes.
    train_size = round(dset_len * train_size) if is_float[0] else train_size
    val_size = round(dset_len * val_size) if is_float[1] else val_size
    test_size = round(dset_len * test_size) if is_float[2] else test_size

    # Calcula las dimensiones faltantes si alguna de ellas es None.
    if train_size is None:
        train_size = dset_len - val_size - test_size
    elif val_size is None:
        val_size = dset_len - train_size - test_size
    elif test_size is None:
        test_size = dset_len - train_size - val_size

    # Ajusta los tamaños si la suma supera el tamaño del dataset.
    if train_size + val_size + test_size > dset_len:
        if is_float[2]:
            test_size -= 1
        elif is_float[1]:
            val_size -= 1
        elif is_float[0]:
            train_size -= 1

    # Asegura que los tamaños resultantes sean no negativos.
    assert train_size >= 0 and val_size >= 0 and test_size >= 0, (
        f"One of training ({train_size}), validation ({val_size}) or "
        f"testing ({test_size}) splits ended up with a negative size."
    )

    # Calcula el tamaño total de las divisiones y compara con el tamaño del dataset.
    total = train_size + val_size + test_size
    assert dset_len >= total, (
        f"The dataset ({dset_len}) is smaller than the "
        f"combined split sizes ({total})."
    )
    
    # Advierte si se excluyeron muestras del dataset.
    if total < dset_len:
        rank_zero_warn(f"{dset_len - total} samples were excluded from the dataset")

    # Genera índices de forma secuencial para el dataset.
    idxs = np.arange(dset_len, dtype=int)

    # Permuta los índices si se proporciona una semilla y un orden específico.
    if order is None:
        idxs = np.random.default_rng(seed).permutation(idxs)

    # Divide los índices en conjuntos de entrenamiento, validación y prueba.
    idx_train = idxs[:train_size]
    idx_val = idxs[train_size : train_size + val_size]
    idx_test = idxs[train_size + val_size : total]

    # Reorganiza los índices según el orden proporcionado si existe.
    if order is not None:
        idx_train = [order[i] for i in idx_train]
        idx_val = [order[i] for i in idx_val]
        idx_test = [order[i] for i in idx_test]

    # Devuelve los índices en forma de arrays NumPy.
    return np.array(idx_train), np.array(idx_val), np.array(idx_test)

Genera divisiones (splits) de un conjunto de datos en conjuntos de entrenamiento, validación y prueba. 

In [125]:
def make_splits(
    dataset_len,
    train_size,
    val_size,
    test_size,
    seed,
    filename=None,
    splits=None,
    order=None,
):
    # Comprueba si ya existen divisiones cargadas desde un archivo.
    if splits is not None:
        splits = np.load(splits)
        idx_train = splits["idx_train"]
        idx_val = splits["idx_val"]
        idx_test = splits["idx_test"]
    else:
        # Si no hay divisiones preexistentes, llama a la función train_val_test_split
        # para generar nuevas divisiones de los datos.
        idx_train, idx_val, idx_test = train_val_test_split(
            dataset_len, train_size, val_size, test_size, seed, order
        )

    # Si se proporciona un nombre de archivo, guarda las divisiones en un archivo NPZ.
    if filename is not None:
        np.savez(filename, idx_train=idx_train, idx_val=idx_val, idx_test=idx_test)

    # Devuelve los índices de las divisiones como tensores de PyTorch.
    return (
        torch.from_numpy(idx_train),
        torch.from_numpy(idx_val),
        torch.from_numpy(idx_test),
    )

Define una clase llamada LoadFromFile que se utiliza como acción personalizada en la creación de argumentos de línea de comandos utilizando el módulo argparse.

In [126]:
class LoadFromFile(argparse.Action):
    def __call__(self, parser, namespace, values, option_string=None):
        # Comprueba si el archivo proporcionado es un archivo YAML (con extensión .yaml o .yml).
        if values.name.endswith("yaml") or values.name.endswith("yml"):
            # Abre el archivo en modo lectura.
            with values as f:
                # Carga el contenido del archivo YAML en un diccionario utilizando el cargador FullLoader de PyYAML.
                config = yaml.load(f, Loader=yaml.FullLoader)
            
            # Comprueba si las claves del archivo YAML son argumentos válidos.
            for key in config.keys():
                if key not in namespace:
                    raise ValueError(f"Unknown argument in config file: {key}")
            
            # Si la clave "load_model" está presente en el archivo YAML y se especificó el argumento --load_model
            # en la línea de comandos, se muestra una advertencia y se ignora la clave "load_model" del archivo YAML.
            if (
                "load_model" in config
                and namespace.load_model is not None
                and config["load_model"] != namespace.load_model
            ):
                rank_zero_warn(
                    f"The load model argument was specified as a command line argument "
                    f"({namespace.load_model}) and in the config file ({config['load_model']}). "
                    f"Ignoring 'load_model' from the config file and loading {namespace.load_model}."
                )
                del config["load_model"]
            
            # Actualiza el espacio de nombres (namespace) con las claves y valores del archivo YAML.
            namespace.__dict__.update(config)
        else:
            # Si el archivo no tiene extensión .yaml o .yml, se genera un error.
            raise ValueError("Configuration file must end with yaml or yml")

Define una clase que se utiliza como acción personalizada en la creación de argumentos de línea de comandos utilizando el módulo argparse.

In [127]:
class LoadFromCheckpoint(argparse.Action):
    def __call__(self, parser, namespace, values, option_string=None):
        # Construye la ruta al archivo 'hparams.yaml' a partir de la ruta del archivo del checkpoint.
        hparams_path = join(dirname(values), "hparams.yaml")
        
        # Verifica si el archivo 'hparams.yaml' existe en la ubicación especificada.
        if not exists(hparams_path):
            # Si no existe, imprime un mensaje indicando que no se pudo encontrar el archivo y se confía en los argumentos de la línea de comandos.
            print(
                "Failed to locate the checkpoint's hparams.yaml file. Relying on command line args."
            )
            return
        
        # Si el archivo 'hparams.yaml' existe, lo abre en modo lectura.
        with open(hparams_path, "r") as f:
            # Carga el contenido del archivo YAML en un diccionario utilizando el cargador FullLoader de PyYAML.
            config = yaml.load(f, Loader=yaml.FullLoader)
        
        # Comprueba si las claves del archivo YAML son argumentos válidos, excepto la clave "prior_args".
        for key in config.keys():
            if key not in namespace and key != "prior_args":
                raise ValueError(f"Unknown argument in the model checkpoint: {key}")
        
        # Actualiza el espacio de nombres (namespace) con las claves y valores del archivo YAML.
        # También agrega una clave 'load_model' al espacio de nombres con el valor del archivo de checkpoint.
        namespace.__dict__.update(config)
        namespace.__dict__.update(load_model=values)


Define una función que se utiliza para guardar argumentos (posiblemente excluyendo algunos) en un archivo de configuración YAML o JSON. 

In [128]:
def save_argparse(args, filename, exclude=None):
    import json  # Importa el módulo json

    # Comprueba si el nombre de archivo tiene una extensión YAML o YML.
    if filename.endswith("yaml") or filename.endswith("yml"):
        # Si se proporciona un valor para excluir argumentos, conviértelo en una lista.
        if isinstance(exclude, str):
            exclude = [exclude]

        # Copia los argumentos en un nuevo diccionario.
        args = args.__dict__.copy()

        # Elimina los argumentos especificados en la lista de exclusión.
        for exl in exclude:
            del args[exl]

        # Obtén el valor del argumento "dataset_arg".
        ds_arg = args.get("dataset_arg")

        # Si "dataset_arg" existe y es una cadena, intenta cargarlo como un objeto JSON.
        if ds_arg is not None and isinstance(ds_arg, str):
            args["dataset_arg"] = json.loads(args["dataset_arg"])

        # Guarda los argumentos en el archivo de configuración YAML.
        yaml.dump(args, open(filename, "w"))
    else:
        # Si el archivo no tiene una extensión YAML o YML, genera un error.
        raise ValueError("Configuration file should end with yaml or yml")

Define una función que se utiliza para convertir una cadena de texto en un número, ya sea un número entero (int) o un número de punto flotante (float). 

In [129]:
def number(text):
    # Comprueba si el texto es None o "None" y devuelve None en ese caso.
    if text is None or text == "None":
        return None

    try:
        # Intenta convertir el texto en un número entero.
        num_int = int(text)
    except ValueError:
        # Si la conversión a entero falla, establece num_int como None.
        num_int = None
    
    # Convierte el texto en un número de punto flotante.
    num_float = float(text)

    # Compara el número entero y el número de punto flotante.
    # Si son iguales, devuelve el número entero, de lo contrario, devuelve el número de punto flotante.
    if num_int == num_float:
        return num_int
    return num_float

Define una clase llamada MissingEnergyException que hereda de la clase base Exception. 

In [130]:
class MissingEnergyException(Exception):
    pass

## Funciones recurrentes del modelo

Importacion de modulos

In [131]:
import math  # Importa el módulo math para operaciones matemáticas.
from typing import Optional, Tuple  # Importa tipos y anotaciones de tipos.
import torch  # Importa PyTorch, una biblioteca para aprendizaje profundo.
from torch import Tensor  # Importa el tipo de datos Tensor de PyTorch.
from torch import nn  # Importa el módulo de redes neuronales de PyTorch.
import torch.nn.functional as F  # Importa funciones de activación y capas de PyTorch.
from torch_geometric.nn import MessagePassing  # Importa MessagePassing de PyTorch Geometric.
from torch_cluster import radius_graph  # Importa radius_graph de Torch Cluster.
import warnings  # Importa el módulo de advertencias.


Define una función que se utiliza para visualizar una base específica de funciones. 

In [132]:
def visualize_basis(basis_type, num_rbf=50, cutoff_lower=0, cutoff_upper=5):
    """
    Function for quickly visualizing a specific basis. This is useful for inspecting
    the distance coverage of basis functions for non-default lower and upper cutoffs.

    Args:
        basis_type (str): Specifies the type of basis functions used. Can be one of
            ['gauss', 'expnorm']
        num_rbf (int, optional): The number of basis functions.
            (default: :obj:`50`)
        cutoff_lower (float, optional): The lower cutoff of the basis.
            (default: :obj:`0`)
        cutoff_upper (float, optional): The upper cutoff of the basis.
            (default: :obj:`5`)
    """
    import matplotlib.pyplot as plt  # Importa el módulo Matplotlib para graficar.

    # Genera una serie de distancias para visualización.
    distances = torch.linspace(cutoff_lower - 1, cutoff_upper + 1, 1000)
    
    # Define los parámetros de la base de funciones.
    basis_kwargs = {
        "num_rbf": num_rbf,
        "cutoff_lower": cutoff_lower,
        "cutoff_upper": cutoff_upper,
    }
    
    # Crea una instancia de la base de funciones especificada.
    basis_expansion = rbf_class_mapping[basis_type](**basis_kwargs)
    
    # Calcula las distancias expandidas utilizando la base de funciones.
    expanded_distances = basis_expansion(distances)

    # Grafica las distancias expandidas para cada función base.
    for i in range(expanded_distances.shape[-1]):
        plt.plot(distances.numpy(), expanded_distances[:, i].detach().numpy())
    
    # Muestra la gráfica.
    plt.show()

Define una clase que hereda de la clase MessagePassing de PyTorch Geometric. Esta clase se utiliza para implementar la arquitectura de aprendizaje profundo de Equivariant Transformer (ET) 

In [133]:
class NeighborEmbedding(MessagePassing):
    def __init__(self, hidden_channels, num_rbf, cutoff_lower, cutoff_upper, max_z=100, dtype=torch.float32):
        """
        The ET architecture assigns two learned vectors to each atom type zi. 
        One is used to encode information specific to an atom, the other (this class) 
        takes the role of a neighborhood embedding. The neighborhood embedding, 
        which is an embedding of the types of neighboring atoms, is multiplied by a 
        distance filter. This embedding allows the network to store information about 
        the interaction of atom pairs.
        
        See eq. 3 in https://arxiv.org/pdf/2202.02541.pdf for more details.
        """
        super(NeighborEmbedding, self).__init__(aggr="add")
        
        # Inicializa la capa de embedding para representar el tipo de átomo.
        self.embedding = nn.Embedding(max_z, hidden_channels, dtype=dtype)
        
        # Inicializa la capa de proyección para las distancias.
        self.distance_proj = nn.Linear(num_rbf, hidden_channels, dtype=dtype)
        
        # Inicializa la capa de combinación de características.
        self.combine = nn.Linear(hidden_channels * 2, hidden_channels, dtype=dtype)
        
        # Inicializa la función de corte basada en el coseno.
        self.cutoff = CosineCutoff(cutoff_lower, cutoff_upper)

        # Reinicia los parámetros de la red.
        self.reset_parameters()

    def reset_parameters(self):
        # Reinicia los parámetros de embedding.
        self.embedding.reset_parameters()
        
        # Inicializa los pesos de las capas con xavier_uniform.
        nn.init.xavier_uniform_(self.distance_proj.weight)
        nn.init.xavier_uniform_(self.combine.weight)
        
        # Inicializa los sesgos de la capa de proyección de distancias.
        self.distance_proj.bias.data.fill_(0)
        
        # Inicializa los sesgos de la capa de combinación de características.
        self.combine.bias.data.fill_(0)

    def forward(
        self,
        z: Tensor,
        x: Tensor,
        edge_index: Tensor,
        edge_weight: Tensor,
        edge_attr: Tensor,
    ):
        """
        Args:
            z (Tensor): Atomic numbers of shape :obj:`[num_nodes]`
            x (Tensor): Node feature matrix (atom positions) of shape :obj:`[num_nodes, 3]`
            edge_index (Tensor): Graph connectivity (list of neighbor pairs) with shape :obj:`[2, num_edges]`
            edge_weight (Tensor): Edge weight vector of shape :obj:`[num_edges]`
            edge_attr (Tensor): Edge attribute matrix of shape :obj:`[num_edges, 3]`
        Returns:
            x_neighbors (Tensor): The embedding of the neighbors of each atom of shape :obj:`[num_nodes, hidden_channels]`
        """
        # Elimina auto conexiones (self loops).
        mask = edge_index[0] != edge_index[1]
        if not mask.all():
            edge_index = edge_index[:, mask]
            edge_weight = edge_weight[mask]
            edge_attr = edge_attr[mask]

        # Calcula el filtro de corte (cutoff) basado en el coseno.
        C = self.cutoff(edge_weight)
        
        # Proyecta las distancias en un espacio de características.
        W = self.distance_proj(edge_attr) * C.view(-1, 1)

        # Obtiene la representación de embedding para el tipo de átomo.
        x_neighbors = self.embedding(z)
        
        # Realiza la propagación de mensajes entre nodos.
        x_neighbors = self.propagate(edge_index, x=x_neighbors, W=W, size=None)
        
        # Combina las características de los nodos con las de los vecinos.
        x_neighbors = self.combine(torch.cat([x, x_neighbors], dim=1))
        
        return x_neighbors

    def message(self, x_j, W):
        return x_j * W


Define una clase que es una capa personalizada en PyTorch. Esta capa se utiliza para calcular la lista de vecinos para un valor de corte (cutoff) dado en una estructura de átomos tridimensional.

In [134]:
class OptimizedDistance(torch.nn.Module):
    def __init__(
        self,
        cutoff_lower=0.0,
        cutoff_upper=5.0,
        max_num_pairs=-32,
        return_vecs=False,
        loop=False,
        strategy="brute",
        include_transpose=True,
        resize_to_fit=True,
        check_errors=True,
        box=None,
    ):
        super(OptimizedDistance, self).__init__()
        """ Compute the neighbor list for a given cutoff.
        This operation can be placed inside a CUDA graph in some cases.
        In particular, resize_to_fit and check_errors must be False.
        Note that this module returns neighbors such that distance(i,j) >= cutoff_lower and distance(i,j) < cutoff_upper.
        This function optionally supports periodic boundary conditions with
        arbitrary triclinic boxes.  The box vectors `a`, `b`, and `c` must satisfy
        certain requirements:

        `a[1] = a[2] = b[2] = 0`
        `a[0] >= 2*cutoff, b[1] >= 2*cutoff, c[2] >= 2*cutoff`
        `a[0] >= 2*b[0]`
        `a[0] >= 2*c[0]`
        `b[1] >= 2*c[1]`

        These requirements correspond to a particular rotation of the system and
        reduced form of the vectors, as well as the requirement that the cutoff be
        no larger than half the box width.

        Parameters
        ----------
        cutoff_lower : float
            Lower cutoff for the neighbor list.
        cutoff_upper : float
            Upper cutoff for the neighbor list.
        max_num_pairs : int
            Maximum number of pairs to store, if the number of pairs found is less than this, the list is padded with (-1,-1) pairs up to max_num_pairs unless resize_to_fit is True, in which case the list is resized to the actual number of pairs found.
            If the number of pairs found is larger than this, the pairs are randomly sampled. When check_errors is True, an exception is raised in this case.
            If negative, it is interpreted as (minus) the maximum number of neighbors per atom.
        strategy : str
            Strategy to use for computing the neighbor list. Can be one of
            ["shared", "brute", "cell"].
            Shared: An O(N^2) algorithm that leverages CUDA shared memory, best for large number of particles.
            Brute: A brute force O(N^2) algorithm, best for small number of particles.
            Cell:  A cell list algorithm, best for large number of particles, low cutoffs and low batch size.
        box : torch.Tensor, optional
            The vectors defining the periodic box.  This must have shape `(3, 3)`,
            where `box_vectors[0] = a`, `box_vectors[1] = b`, and `box_vectors[2] = c`.
            If this is omitted, periodic boundary conditions are not applied.
        loop : bool, optional
            Whether to include self-interactions.
            Default: False
        include_transpose : bool, optional
            Whether to include the transpose of the neighbor list.
            Default: True
        resize_to_fit : bool, optional
            Whether to resize the neighbor list to the actual number of pairs found. When False, the list is padded with (-1,-1) pairs up to max_num_pairs
            Default: True
            If this is True the operation is not CUDA graph compatible.
        check_errors : bool, optional
            Whether to check for too many pairs. If this is True the operation is not CUDA graph compatible.
            Default: True
        return_vecs : bool, optional
            Whether to return the distance vectors.
            Default: False
        """
        # Inicializa los parámetros relacionados con la lista de vecinos.
        self.cutoff_upper = cutoff_upper
        self.cutoff_lower = cutoff_lower
        self.max_num_pairs = max_num_pairs
        self.strategy = strategy
        self.box: Optional[Tensor] = box
        self.loop = loop
        self.return_vecs = return_vecs
        self.include_transpose = include_transpose
        self.resize_to_fit = resize_to_fit
        self.use_periodic = True
        
        # Configura el box periódico si se proporciona, de lo contrario, se asume que no hay condiciones periódicas.
        if self.box is None:
            self.use_periodic = False
            self.box = torch.empty((0, 0))
            if self.strategy == "cell":
                # Establece un box predeterminado para la estrategia "cell" (lista de celdas).
                lbox = cutoff_upper * 3.0
                self.box = torch.tensor([[lbox, 0, 0], [0, lbox, 0], [0, 0, lbox]])
        
        # Mueve el box a la memoria CPU, ya que todas las estrategias esperan que esté en la CPU.
        self.box = self.box.cpu()
        
        # Configura si se deben verificar errores relacionados con el número de pares de vecinos.
        self.check_errors = check_errors
        
        # Importa la función del kernel que se utiliza para calcular la lista de vecinos.
        from torchmdnet.neighbors import get_neighbor_pairs_kernel
        self.kernel = get_neighbor_pairs_kernel

    def forward(
        self, pos: Tensor, batch: Optional[Tensor] = None
    ) -> Tuple[Tensor, Tensor, Optional[Tensor]]:
        """Compute the neighbor list for a given cutoff.
        Parameters
        ----------
        pos : torch.Tensor
            shape (N, 3)
        batch : torch.Tensor or None
            shape (N,)
        Returns
        -------
        edge_index : torch.Tensor
          List of neighbors for each atom in the batch.
        shape (2, num_found_pairs or max_num_pairs)
        edge_weight : torch.Tensor
            List of distances for each atom in the batch.
        shape (num_found_pairs or max_num_pairs,)
        edge_vec : torch.Tensor
            List of distance vectors for each atom in the batch.
        shape (num_found_pairs or max_num_pairs, 3)

        If resize_to_fit is True, the tensors will be trimmed to the actual number of pairs found.
        otherwise the tensors will have size max_num_pairs, with neighbor pairs (-1, -1) at the end.

        """
        # Asegura que el box esté en el mismo tipo de datos que las posiciones.
        self.box = self.box.to(pos.dtype)

        # Calcula el número máximo de pares de vecinos a almacenar.
        max_pairs = self.max_num_pairs
        if self.max_num_pairs < 0:
            max_pairs = -self.max_num_pairs * pos.shape[0]

        # Si batch es None, se asigna a todas las partículas al mismo lote (batch 0).
        if batch is None:
            batch = torch.zeros(pos.shape[0], dtype=torch.long, device=pos.device)

        # Llama al kernel para calcular la lista de vecinos.
        edge_index, edge_vec, edge_weight, num_pairs = self.kernel(
            strategy=self.strategy,
            positions=pos,
            batch=batch,
            max_num_pairs=max_pairs,
            cutoff_lower=self.cutoff_lower,
            cutoff_upper=self.cutoff_upper,
            loop=self.loop,
            include_transpose=self.include_transpose,
            box_vectors=self.box,
            use_periodic=self.use_periodic,
        )

        # Verifica errores si se habilita la comprobación de errores.
        if self.check_errors:
            if num_pairs[0] > max_pairs:
                raise RuntimeError(
                    "Encontrado num_pairs({}) > max_num_pairs({})".format(
                        num_pairs[0], max_pairs
                    )
                )

        # Convierte edge_index a tipo de datos de enteros.
        edge_index = edge_index.to(torch.long)

        # Elimina pares de vecinos (-1, -1) si resize_to_fit es True.
        if self.resize_to_fit:
            mask = edge_index[0] != -1
            edge_index = edge_index[:, mask]
            edge_weight = edge_weight[mask]
            edge_vec = edge_vec[mask, :]

        # Retorna los resultados, incluyendo los vectores de distancia si se habilita return_vecs.
        if self.return_vecs:
            return edge_index, edge_weight, edge_vec
        else:
            return edge_index, edge_weight, None

Define una clase que se utiliza para aplicar un suavizado gaussiano a una distancia dada.

In [135]:
class GaussianSmearing(nn.Module):
    # El método __init__ se llama al crear una instancia de la clase.
    def __init__(self, cutoff_lower=0.0, cutoff_upper=5.0, num_rbf=50, trainable=True, dtype=torch.float32):
        super(GaussianSmearing, self).__init__()
        
        # Guardar los valores de los argumentos en las variables de la instancia.
        self.cutoff_lower = cutoff_lower
        self.cutoff_upper = cutoff_upper
        self.num_rbf = num_rbf
        self.trainable = trainable
        self.dtype = dtype
        
        # Llamar al método _initial_params para obtener los valores iniciales de offset y coeff.
        offset, coeff = self._initial_params()
        
        # Si trainable es True, se registran los parámetros como variables que se pueden entrenar.
        if trainable:
            self.register_parameter("coeff", nn.Parameter(coeff))
            self.register_parameter("offset", nn.Parameter(offset))
        # Si trainable es False, se registran los parámetros como buffers (variables no entrenables).
        else:
            self.register_buffer("coeff", coeff)
            self.register_buffer("offset", offset)

    # Método privado para calcular los valores iniciales de offset y coeff.
    def _initial_params(self):
        # Crear un tensor que contiene valores equidistantes entre cutoff_lower y cutoff_upper.
        offset = torch.linspace(self.cutoff_lower, self.cutoff_upper, self.num_rbf, dtype=self.dtype)
        
        # Calcular el valor de coeff, que se utiliza en la función de suavizado gaussiano.
        coeff = -0.5 / (offset[1] - offset[0]) ** 2
        
        return offset, coeff

    # Método para restablecer los parámetros a sus valores iniciales.
    def reset_parameters(self):
        offset, coeff = self._initial_params()
        self.offset.data.copy_(offset)
        self.coeff.data.copy_(coeff)

    # Método forward que se llama cuando se pasa una distancia (dist) a la instancia.
    def forward(self, dist):
        # Restar cada valor de dist de offset.
        dist = dist.unsqueeze(-1) - self.offset
        
        # Calcular el suavizado gaussiano aplicando exp(coeff * dist^2).
        return torch.exp(self.coeff * torch.pow(dist, 2))

Define una clase que se utiliza para aplicar un suavizado exponencial a una distancia dada.

In [136]:
class ExpNormalSmearing(nn.Module):
    # El método __init__ se llama al crear una instancia de la clase.
    def __init__(self, cutoff_lower=0.0, cutoff_upper=5.0, num_rbf=50, trainable=True, dtype=torch.float32):
        super(ExpNormalSmearing, self).__init()
        
        # Guardar los valores de los argumentos en las variables de la instancia.
        self.cutoff_lower = cutoff_lower
        self.cutoff_upper = cutoff_upper
        self.num_rbf = num_rbf
        self.trainable = trainable
        self.dtype = dtype
        
        # Crear una instancia de la clase CosineCutoff con un rango de corte entre 0 y cutoff_upper.
        self.cutoff_fn = CosineCutoff(0, cutoff_upper)
        
        # Calcular el valor de alpha para suavizar la función exponencial.
        self.alpha = 5.0 / (cutoff_upper - cutoff_lower)
        
        # Llamar al método _initial_params para obtener los valores iniciales de means y betas.
        means, betas = self._initial_params()
        
        # Si trainable es True, se registran los parámetros como variables que se pueden entrenar.
        if trainable:
            self.register_parameter("means", nn.Parameter(means))
            self.register_parameter("betas", nn.Parameter(betas))
        # Si trainable es False, se registran los parámetros como buffers (variables no entrenables).
        else:
            self.register_buffer("means", means)
            self.register_buffer("betas", betas)

    # Método privado para calcular los valores iniciales de means y betas.
    def _initial_params(self):
        # initialize means and betas according to the default values in PhysNet
        # https://pubs.acs.org/doi/10.1021/acs.jctc.9b00181
        start_value = torch.exp(
            torch.scalar_tensor(-self.cutoff_upper + self.cutoff_lower, dtype=self.dtype)
        )
        means = torch.linspace(start_value, 1, self.num_rbf, dtype=self.dtype)
        betas = torch.tensor(
            [(2 / self.num_rbf * (1 - start_value)) ** -2] * self.num_rbf, dtype=self.dtype
        )
        return means, betas

    # Método para restablecer los parámetros a sus valores iniciales.
    def reset_parameters(self):
        means, betas = self._initial_params()
        self.means.data.copy_(means)
        self.betas.data.copy_(betas)

    # Método forward que se llama cuando se pasa una distancia (dist) a la instancia.
    def forward(self, dist):
        dist = dist.unsqueeze(-1)
        
        # Aplicar la función de corte (cutoff_fn) y el suavizado exponencial a la distancia.
        return self.cutoff_fn(dist) * torch.exp(
            -self.betas
            * (torch.exp(self.alpha * (-dist + self.cutoff_lower)) - self.means) ** 2
        )


Define una clase que es una subclase de nn.Module en PyTorch y se utiliza para aplicar la función Shifted Softplus a los valores de entrada.

In [137]:
class ShiftedSoftplus(nn.Module):
    r"""Applies the ShiftedSoftplus function :math:`\text{ShiftedSoftplus}(x) = \frac{1}{\beta} *
    \log(1 + \exp(\beta * x))-\log(2)` element-wise.

    SoftPlus is a smooth approximation to the ReLU function and can be used
    to constrain the output of a machine to always be positive.
    """
    # El método __init__ se llama al crear una instancia de la clase.
    def __init__(self):
        super(ShiftedSoftplus, self).__init__()
        # Calcular el valor de desplazamiento (shift) como el logaritmo de 2.
        self.shift = torch.log(torch.tensor(2.0)).item()

    # Método forward que se llama cuando se pasa un tensor x a la instancia.
    def forward(self, x):
        # Aplicar la función Softplus a los valores de entrada x y luego restar el desplazamiento (shift).
        return F.softplus(x) - self.shift

Dfine una clase llamada CosineCutoff, que es una subclase de nn.Module en PyTorch y se utiliza para aplicar una función de corte basada en el coseno a las distancias de entrada. 

In [138]:
class CosineCutoff(nn.Module):
    # El método __init__ se llama al crear una instancia de la clase.
    def __init__(self, cutoff_lower=0.0, cutoff_upper=5.0):
        super(CosineCutoff, self).__init()
        
        # Guardar los valores de los argumentos en las variables de la instancia.
        self.cutoff_lower = cutoff_lower
        self.cutoff_upper = cutoff_upper

    # Método forward que se llama cuando se pasa un tensor de distancias (distances) a la instancia.
    def forward(self, distances):
        # Comprueba si el valor de corte inferior (cutoff_lower) es mayor que cero.
        if self.cutoff_lower > 0:
            # Calcula los valores de corte basados en el coseno.
            cutoffs = 0.5 * (
                torch.cos(
                    math.pi
                    * (
                        2
                        * (distances - self.cutoff_lower)
                        / (self.cutoff_upper - self.cutoff_lower)
                        + 1.0
                    )
                )
                + 1.0
            )
            # Elimina contribuciones por debajo del radio de corte.
            cutoffs = cutoffs * (distances < self.cutoff_upper)
            cutoffs = cutoffs * (distances > self.cutoff_lower)
            return cutoffs
        else:
            # Calcula los valores de corte basados en el coseno sin corte inferior.
            cutoffs = 0.5 * (torch.cos(distances * math.pi / self.cutoff_upper) + 1.0)
            # Elimina contribuciones más allá del radio de corte.
            cutoffs = cutoffs * (distances < self.cutoff_upper)
            return cutoffs

Dfine una clase llamada Distance, que es una subclase de nn.Module en PyTorch y se utiliza para calcular distancias entre átomos en una estructura atómica tridimensional

In [139]:
class Distance(nn.Module):
    def __init__(
        self,
        cutoff_lower,
        cutoff_upper,
        max_num_neighbors=32,
        return_vecs=False,
        loop=False,
    ):
        super(Distance, self).__init__()

        # Guardar los valores de los argumentos en las variables de la instancia.
        self.cutoff_lower = cutoff_lower
        self.cutoff_upper = cutoff_upper
        self.max_num_neighbors = max_num_neighbors
        self.return_vecs = return_vecs
        self.loop = loop

    # Método forward que se llama cuando se pasa un tensor de posiciones (pos) y una asignación de lotes (batch) a la instancia.
    def forward(self, pos, batch):
        # Utilizar la función `radius_graph` para encontrar los vecinos dentro del radio de corte.
        edge_index = radius_graph(
            pos,
            r=self.cutoff_upper,
            batch=batch,
            loop=self.loop,
            max_num_neighbors=self.max_num_neighbors + 1,
        )

        # Asegurarse de que no se hayan omitido vecinos debido a max_num_neighbors.
        assert not (
            torch.unique(edge_index[0], return_counts=True)[1] > self.max_num_neighbors
        ).any(), (
            "The neighbor search missed some atoms due to max_num_neighbors being too low. "
            "Please increase this parameter to include the maximum number of atoms within the cutoff."
        )

        # Calcular vectores de borde a partir de las posiciones de los átomos.
        edge_vec = pos[edge_index[0]] - pos[edge_index[1]]

        mask: Optional[torch.Tensor] = None
        if self.loop:
            # Mascarar los bucles de autocorrección al calcular las distancias porque
            # la norma de 0 produce gradientes NaN
            # NOTA: podría influir en las predicciones de fuerza, ya que los gradientes de autocorrección se ignoran
            mask = edge_index[0] != edge_index[1]
            edge_weight = torch.zeros(
                edge_vec.size(0), device=edge_vec.device, dtype=edge_vec.dtype
            )
            edge_weight[mask] = torch.norm(edge_vec[mask], dim=-1)
        else:
            edge_weight = torch.norm(edge_vec, dim=-1)
        # Aplicar un umbral inferior para las distancias basado en el valor de cutoff_lower.
        lower_mask = edge_weight >= self.cutoff_lower
        if self.loop and mask is not None:
            # Mantener los bucles de autocorrección incluso si están por debajo del umbral inferior.
            lower_mask = lower_mask | ~mask
        edge_index = edge_index[:, lower_mask]
        edge_weight = edge_weight[lower_mask]

        if self.return_vecs:
            edge_vec = edge_vec[lower_mask]
            return edge_index, edge_weight, edge_vec
        # TODO: return only `edge_index` and `edge_weight` once
        # Union typing works with TorchScript (https://github.com/pytorch/pytorch/pull/53180)
        return edge_index, edge_weight, None


In [140]:
class GatedEquivariantBlock(nn.Module):
    """Gated Equivariant Block as defined in Schütt et al. (2021):
    Equivariant message passing for the prediction of tensorial properties and molecular spectra
    """

    def __init__(
        self,
        hidden_channels,
        out_channels,
        intermediate_channels=None,
        activation="silu",
        scalar_activation=False,
        dtype=torch.float,
    ):
        super(GatedEquivariantBlock, self).__init__()
        # Guardar los canales de salida y otros hiperparámetros.
        self.out_channels = out_channels

        if intermediate_channels is None:
            intermediate_channels = hidden_channels

        # Definir capas lineales para proyectar vectores de entrada.
        self.vec1_proj = nn.Linear(hidden_channels, hidden_channels, bias=False, dtype=dtype)
        self.vec2_proj = nn.Linear(hidden_channels, out_channels, bias=False, dtype=dtype)

        act_class = act_class_mapping[activation]

        # Definir una red secuencial para actualizar los vectores de entrada.
        self.update_net = nn.Sequential(
            nn.Linear(hidden_channels * 2, intermediate_channels, dtype=dtype),
            act_class(),
            nn.Linear(intermediate_channels, out_channels * 2, dtype=dtype),
        )

        # Definir una función de activación si se especifica.
        self.act = act_class() if scalar_activation else None

    def reset_parameters(self):
        # Inicializar los parámetros de las capas lineales.
        nn.init.xavier_uniform_(self.vec1_proj.weight)
        nn.init.xavier_uniform_(self.vec2_proj.weight)
        nn.init.xavier_uniform_(self.update_net[0].weight)
        self.update_net[0].bias.data.fill_(0)
        nn.init.xavier_uniform_(self.update_net[2].weight)
        self.update_net[2].bias.data.fill_(0)

    def forward(self, x, v):
        # Proyectar el vector de entrada v.
        vec1_buffer = self.vec1_proj(v)

        # Separar vectores con ceros para evitar gradientes NaN durante la propagación hacia atrás.
        vec1 = torch.zeros(
            vec1_buffer.size(0), vec1_buffer.size(2), device=vec1_buffer.device, dtype=vec1_buffer.dtype
        )
        mask = (vec1_buffer != 0).view(vec1_buffer.size(0), -1).any(dim=1)
        if not mask.all():
            warnings.warn(
                (
                    f"Skipping gradients for {(~mask).sum()} atoms due to vector features being zero. "
                    "This is likely due to atoms being outside the cutoff radius of any other atom. "
                    "These atoms will not interact with any other atom unless you change the cutoff."
                )
            )
        vec1[mask] = torch.norm(vec1_buffer[mask], dim=-2)

        # Proyectar v nuevamente.
        vec2 = self.vec2_proj(v)

        # Concatenar x y vec1 y pasarlos por la red secuencial de actualización.
        x = torch.cat([x, vec1], dim=-1)
        x, v = torch.split(self.update_net(x), self.out_channels, dim=-1)

        # Realizar la actualización de vectores con vec2 utilizando una compuerta.
        v = v.unsqueeze(1) * vec2

        if self.act is not None:
            x = self.act(x)
        return x, v

In [141]:
# Diccionario de mapeo para funciones de base radial (RBF).
rbf_class_mapping = {
    "gauss": GaussianSmearing,    # Mapea "gauss" a la clase GaussianSmearing
    "expnorm": ExpNormalSmearing   # Mapea "expnorm" a la clase ExpNormalSmearing
}

# Diccionario de mapeo para funciones de activación.
act_class_mapping = {
    "ssp": ShiftedSoftplus,    # Mapea "ssp" a la clase ShiftedSoftplus
    "silu": nn.SiLU,           # Mapea "silu" a la clase nn.SiLU (Sigmoid-Weighted Linear Unit)
    "tanh": nn.Tanh,           # Mapea "tanh" a la clase nn.Tanh (tangente hiperbólica)
    "sigmoid": nn.Sigmoid     # Mapea "sigmoid" a la clase nn.Sigmoid (función sigmoide)
}

# Diccionario de mapeo para tipos de datos en PyTorch.
dtype_mapping = {
    16: torch.float16,   # Mapea 16 a torch.float16
    32: torch.float,     # Mapea 32 a torch.float
    64: torch.float64    # Mapea 64 a torch.float64
}

## Datos de entrenamiento: QM9

In [142]:
# Importamos la biblioteca PyTorch, que se utiliza para crear y entrenar modelos de aprendizaje automático.
import torch

# Importamos algunas clases y funciones específicas de PyTorch Geometric que vamos a utilizar.
# Compose se utiliza para componer transformaciones (transformers) en el conjunto de datos.
from torch_geometric.transforms import Compose

# Importamos el conjunto de datos QM9 de PyTorch Geometric.
from torch_geometric.datasets import QM9 as QM9_geometric

# Importamos el diccionario de objetivos (targets) relacionados con el conjunto de datos QM9
# que se utiliza con el modelo Schnet en PyTorch Geometric.
from torch_geometric.nn.models.schnet import qm9_target_dict


Definición de una clase llamada QM9 que hereda de QM9_geometric, un conjunto de datos específico de PyTorch Geometric.

In [143]:
class QM9(QM9_geometric):
    def __init__(self, root, transform=None, label=None):
        # Creamos un diccionario que asigna nombres de propiedades químicas a índices.
        label2idx = dict(zip(qm9_target_dict.values(), qm9_target_dict.keys()))
        
        # Verificamos que la propiedad química deseada esté en el diccionario.
        assert label in label2idx, (
            "Please pass the desired property to "
            'train on via "label". Available '
            f'properties are {", ".join(label2idx)}.'
        )
        
        # Guardamos la propiedad deseada en el objeto.
        self.label = label
        self.label_idx = label2idx[self.label]

        # Si no se proporciona una transformación personalizada, usamos la transformación predeterminada (self._filter_label).
        if transform is None:
            transform = self._filter_label
        else:
            # Componemos la transformación personalizada junto con la predeterminada.
            transform = Compose([transform, self._filter_label])

        # Llamamos al constructor de la clase padre (QM9_geometric) para inicializar el conjunto de datos.
        super(QM9, self).__init__(root, transform=transform)

    def get_atomref(self, max_z=100):
        # Llama a la función atomref para obtener los valores de referencia asociados a los índices de etiqueta (label_idx).
        atomref = self.atomref(self.label_idx)

        # Comprueba si no se encontraron valores de referencia y devuelve None si es así.
        if atomref is None:
            return None
        # Comprueba si el tamaño de atomref no coincide con el valor máximo permitido (max_z).
        if atomref.size(0) != max_z:
            # Si no coinciden, crea un tensor de ceros de tamaño max_z y le asigna los valores de atomref hasta el índice mínimo entre max_z y el tamaño de atomref.
            tmp = torch.zeros(max_z).unsqueeze(1)
            idx = min(max_z, atomref.size(0))
            tmp[:idx] = atomref[:idx]
            return tmp
        # Si atomref ya tiene el tamaño correcto, simplemente devuelve atomref.
        return atomref

    # Función para filtrar la etiqueta deseada en los datos.
    def _filter_label(self, batch):
        batch.y = batch.y[:, self.label_idx].unsqueeze(1)
        return batch

    # Función para descargar los datos del conjunto de datos (heredada de la clase padre).
    def download(self):
        super(QM9, self).download()

    # Función para procesar los datos del conjunto de datos (heredada de la clase padre).
    def process(self):
        super(QM9, self).process()
    

## Procesamiento de los datos

In [144]:

from os.path import join # Importamos la función 'join' desde el módulo 'os.path' para combinar rutas de archivos.
from tqdm import tqdm # Importamos la función 'tqdm' para mostrar barras de progreso durante iteraciones.
import torch # Importamos la biblioteca 'torch' para trabajar con PyTorch.
from torch.utils.data import Subset # Importamos la clase 'Subset' desde 'torch.utils.data' para crear subconjuntos de datos.
from torch_geometric.loader import DataLoader # Importamos la clase 'DataLoader' desde 'torch_geometric.loader' para cargar datos de manera eficiente.
from pytorch_lightning import LightningDataModule # Importamos la clase 'LightningDataModule' desde 'pytorch_lightning' para manejar los datos en un proyecto PyTorch Lightning.
from pytorch_lightning.utilities import rank_zero_warn # Importamos la función 'rank_zero_warn' desde 'pytorch_lightning.utilities' para mostrar advertencias en una única GPU.
# from torchmdnet import datasets # Comentamos la importación de 'datasets' que parece estar comentada actualmente.
from torch_geometric.data import Dataset # Importamos la clase 'Dataset' desde 'torch_geometric.data' para trabajar con conjuntos de datos en formato PyTorch Geometric.
# from torchmdnet.utils import make_splits, MissingEnergyException # Comentamos la importación de 'make_splits' y 'MissingEnergyException' que parecen estar comentadas actualmente.
from torch_scatter import scatter # Importamos la función 'scatter' desde 'torch_scatter' para realizar operaciones de dispersión en tensores.
# from torchmdnet.models.utils import dtype_mapping # Comentamos la importación de 'dtype_mapping' que parece estar comentada actualmente.


In [145]:
class FloatCastDatasetWrapper(Dataset):
    def __init__(self, dataset, dtype=torch.float64):
        # Llamamos al constructor de la clase base 'Dataset' y pasamos los atributos necesarios.
        super(FloatCastDatasetWrapper, self).__init__(
            dataset.root, dataset.transform, dataset.pre_transform, dataset.pre_filter
        )
        # Almacenamos una referencia al dataset original y el tipo de dato a utilizar.
        self.dataset = dataset
        self.dtype = dtype

    def len(self):
        # Devuelve la longitud del dataset, que es la misma que la del dataset original.
        return len(self.dataset)

    def get(self, idx):
        # Obtenemos un dato del dataset original en el índice 'idx'.
        data = self.dataset.get(idx)
        # Recorremos las claves y valores del dato.
        for key, value in data:
            # Verificamos si el valor es un tensor de punto flotante.
            if torch.is_tensor(value) and torch.is_floating_point(value):
                # Si es un tensor de punto flotante, lo convertimos al tipo de dato especificado.
                setattr(data, key, value.to(self.dtype))
        # Devolvemos el dato modificado.
        return data

    def __getattr__(self, name):
        # Comprobamos si el atributo existe en el dataset subyacente.
        if hasattr(self.dataset, name):
            # Si existe, lo obtenemos desde el dataset original.
            return getattr(self.dataset, name)
        # Si no existe, generamos un error de atributo.
        raise AttributeError(
            f"'{type(self).__name__}' and its underlying dataset have no attribute '{name}'"
        )

In [146]:
# Definición de una clase llamada DataModule que hereda de LightningDataModule
class DataModule(LightningDataModule):
    # Constructor de la clase, toma dos argumentos: hparams y dataset
    def __init__(self, hparams, dataset=None):
        # Llama al constructor de la clase padre (LightningDataModule)
        super(DataModule, self).__init__()
        
        # Guarda los hiperparámetros (hparams) en la instancia de la clase
        self.save_hyperparameters(hparams)
        
        # Inicializa dos atributos (_mean y _std) como None
        self._mean, self._std = None, None
        
        # Inicializa un diccionario llamado _saved_dataloaders para almacenar dataloaders
        self._saved_dataloaders = dict()
        
        # Asigna el valor de dataset (si se proporciona) al atributo dataset de la clase
        self.dataset = dataset


    def setup(self, stage):
     # Comienza la función 'setup' que se llama durante la configuración del módulo Lightning.
        if self.dataset is None:
            # Si 'self.dataset' aún no está configurado, significa que no se ha creado un conjunto de datos todavía.
            if self.hparams["dataset"] == "Custom":
            # Si el nombre del conjunto de datos en los hiperparámetros es "Custom", se crea un conjunto de datos personalizado llamado 'Custom'.
                self.dataset = datasets.Custom(
                    self.hparams["coord_files"],
                    self.hparams["embed_files"],
                    self.hparams["energy_files"],
                    self.hparams["force_files"],
                )
            else:
                # Si el nombre del conjunto de datos no es "Custom", se utiliza el nombre del conjunto de datos de los hiperparámetros para crear un conjunto de datos predeterminado.
                dataset_arg = {}
                if self.hparams["dataset_arg"] is not None:
                    dataset_arg = self.hparams["dataset_arg"]
                self.dataset = QM9(
                    self.hparams["dataset_root"], **dataset_arg
                )
#                self.dataset = getattr(datasets, self.hparams["dataset"])(
#                    self.hparams["dataset_root"], **dataset_arg
#                )
         # Después de crear el conjunto de datos, lo envolvemos en una clase llamada 'FloatCastDatasetWrapper' con el tipo de datos especificado en los hiperparámetros.
        self.dataset = FloatCastDatasetWrapper(
            self.dataset, dtype_mapping[self.hparams["precision"]]
        )
        # Divide los índices del conjunto de datos en conjuntos de entrenamiento, validación y prueba utilizando la función make_splits.
        self.idx_train, self.idx_val, self.idx_test = make_splits(
            len(self.dataset),
            self.hparams["train_size"],
            self.hparams["val_size"],
            self.hparams["test_size"],
            self.hparams["seed"],
            join(self.hparams["log_dir"], "splits.npz"),
            self.hparams["splits"],
        )

        # Imprime las longitudes de los conjuntos de entrenamiento, validación y prueba.
        print(
            f"train {len(self.idx_train)}, val {len(self.idx_val)}, test {len(self.idx_test)}"
        )

        # Crea conjuntos de datos de entrenamiento, validación y prueba utilizando los índices generados.
        self.train_dataset = Subset(self.dataset, self.idx_train)
        self.val_dataset = Subset(self.dataset, self.idx_val)
        self.test_dataset = Subset(self.dataset, self.idx_test)

        # Si se establece 'standardize' en True en los hiperparámetros, se llama al método '_standardize'.
        if self.hparams["standardize"]:
            self._standardize()

    # Método para obtener el dataloader de entrenamiento
    def train_dataloader(self):
        return self._get_dataloader(self.train_dataset, "train")

    # Método para obtener el dataloader de validación
    def val_dataloader(self):
        loaders = [self._get_dataloader(self.val_dataset, "val")]
        # Comprueba si hay un conjunto de datos de prueba y si es necesario cargarlo, según la configuración de los hiperparámetros y la época actual del entrenamiento.
        if (
            len(self.test_dataset) > 0
            and (self.trainer.current_epoch + 1) % self.hparams["test_interval"] == 0
        ):
            loaders.append(self._get_dataloader(self.test_dataset, "test"))
        return loaders

    # Método para obtener el dataloader de prueba
    def test_dataloader(self):
        return self._get_dataloader(self.test_dataset, "test")

    # Propiedad 'atomref' que proporciona una referencia a átomos (si está disponible en el conjunto de datos)
    @property
    def atomref(self):
        if hasattr(self.dataset, "get_atomref"):
            return self.dataset.get_atomref()
        return None

    # Propiedad 'mean' que proporciona el valor del atributo '_mean'
    @property
    def mean(self):
        return self._mean

    # Propiedad 'std' que proporciona el valor del atributo '_std'
    @property
    def std(self):
        return self._std

    # Método privado '_get_dataloader' utilizado para obtener un dataloader para un conjunto de datos específico.
    def _get_dataloader(self, dataset, stage, store_dataloader=True):
        # Verifica si se debe almacenar el dataloader en la memoria.
        store_dataloader = (
            store_dataloader and self.trainer.reload_dataloaders_every_n_epochs <= 0
        )
        # Comprueba si ya existe un dataloader almacenado para la etapa 'stage' y si se debe almacenar uno nuevo.
        if stage in self._saved_dataloaders and store_dataloader:
            # Si ya existe y se debe almacenar, se devuelve el dataloader existente para evitar recrearlo.
            # storing the dataloaders like this breaks calls to trainer.reload_train_val_dataloaders
            # but makes it possible that the dataloaders are not recreated on every testing epoch
            return self._saved_dataloaders[stage]

        # Configuración del tamaño de lote (batch_size) y si se debe realizar un barajado (shuffle) en función de la etapa (train, val, test).
        if stage == "train":
            batch_size = self.hparams["batch_size"]
            shuffle = True
        elif stage in ["val", "test"]:
            batch_size = self.hparams["inference_batch_size"]
            shuffle = False

        # Creación de un DataLoader con las configuraciones especificadas.
        dl = DataLoader(
            dataset=dataset,
            batch_size=batch_size,
            num_workers=self.hparams["num_workers"],
            pin_memory=True,
            shuffle=shuffle,
        )
        # Si se debe almacenar el dataloader, se guarda en el diccionario '_saved_dataloaders' para su uso futuro.
        if store_dataloader:
            self._saved_dataloaders[stage] = dl
        # Devuelve el dataloader recién creado o el almacenado si ya existía.
        return dl

    # Método privado '_standardize' para estandarizar los datos del conjunto de entrenamiento.
    def _standardize(self):
        # Función interna 'get_energy' para obtener las energías de un lote de datos y ajustarlas si existe una referencia a átomos.
        def get_energy(batch, atomref):
            if "y" not in batch or batch.y is None:
                raise MissingEnergyException()

            if atomref is None:
                return batch.y.clone()

            # Resta las energías de referencia de átomos de las energías objetivo.
            # remove atomref energies from the target energy
            atomref_energy = scatter(atomref[batch.z], batch.batch, dim=0)
            return (batch.y.squeeze() - atomref_energy.squeeze()).clone()

        # Utiliza un dataloader para calcular la media y la desviación estándar de las energías del conjunto de validación.
        data = tqdm(
            self._get_dataloader(self.train_dataset, "val", store_dataloader=False),
            desc="computing mean and std",
        )
        try:
            # Solo elimina las energías de referencia de átomos si se utiliza el modelo de referencia a átomos (Atomref) en la configuración.
            # only remove atomref energies if the atomref prior is used
            atomref = self.atomref if self.hparams["prior_model"] == "Atomref" else None
            # Extrae las energías de los datos y las ajusta si existe una referencia a átomos.
            # extract energies from the data
            ys = torch.cat([get_energy(batch, atomref) for batch in data])
        except MissingEnergyException:
            # Advierte si la estandarización está habilitada pero no se pueden calcular la media y la desviación estándar, posiblemente porque el conjunto de datos solo contiene fuerzas en lugar de energías.
            rank_zero_warn(
                "Standardize is true but failed to compute dataset mean and "
                "standard deviation. Maybe the dataset only contains forces."
            )
            return
        
        # Calcula la media y la desviación estándar de las energías.
        # compute mean and standard deviation
        self._mean = ys.mean(dim=0)
        self._std = ys.std(dim=0)

## Envoltorios o encapsulados (Wrappers) 

In [147]:
# Importación de módulos y clases necesarios desde las bibliotecas estándar de Python y PyTorch.
from abc import abstractmethod, ABCMeta  # Importa las clases para definir clases abstractas y metaclasses.
from typing import Optional, Tuple  # Importa tipos opcionales y tuplas.
from torch import nn, Tensor  # Importa el módulo de redes neuronales y el tipo Tensor de PyTorch.

Este fragmento de código define una clase abstracta llamada BaseWrapper, que sirve como una clase base para envolver modelos en el contexto de redes neuronales. 

In [148]:
# Definición de la clase abstracta BaseWrapper que hereda de nn.Module y utiliza la metacalse ABCMeta.
class BaseWrapper(nn.Module, metaclass=ABCMeta):
    r"""Base class for model wrappers.

    Children of this class should implement the `forward` method,
    which calls `self.model(z, pos, batch=batch)` at some point.
    Wrappers that are applied before the REDUCE operation should return
    the model's output, `z`, `pos`, `batch` and potentially vector
    features`v`. Wrappers that are applied after REDUCE should only
    return the model's output.
    """
    # Constructor de la clase BaseWrapper que toma un modelo como argumento.
    def __init__(self, model):
        super(BaseWrapper, self).__init__()
        self.model = model

    # Método para restablecer los parámetros del modelo.
    def reset_parameters(self):
        self.model.reset_parameters()

    # Método abstracto que debe ser implementado por las clases hijas.
    @abstractmethod
    def forward(self, z, pos, batch=None):
        return

Este fragmento de código define una clase llamada AtomFilter, que hereda de BaseWrapper. La clase AtomFilter se utiliza para aplicar un filtro a la salida del modelo, eliminando átomos con una carga atómica (Z) mayor que un umbral específico (remove_threshold).

In [149]:
# Definición de la clase AtomFilter que hereda de BaseWrapper.
class AtomFilter(BaseWrapper):
    """
    Remove atoms with Z > remove_threshold from the model's output.
    """
    # Constructor de la clase AtomFilter que toma un modelo y un umbral de eliminación como argumentos.
    def __init__(self, model, remove_threshold):
        super(AtomFilter, self).__init__(model)
        self.remove_threshold = remove_threshold

    # Método 'forward' para aplicar el filtro de átomos.
    def forward(
        self,
        z: Tensor,
        pos: Tensor,
        batch: Tensor,
        q: Optional[Tensor] = None,
        s: Optional[Tensor] = None,
    ) -> Tuple[Tensor, Optional[Tensor], Tensor, Tensor, Tensor]:
        # Llama al modelo subyacente y obtiene la salida.
        x, v, z, pos, batch = self.model(z, pos, batch=batch, q=q, s=s)

        n_samples = len(batch.unique())

        # Aplica el filtro de átomos eliminando aquellos con Z > remove_threshold.
        # drop atoms according to the filter
        atom_mask = z > self.remove_threshold
        x = x[atom_mask]
        if v is not None:
            v = v[atom_mask]
        z = z[atom_mask]
        pos = pos[atom_mask]
        batch = batch[atom_mask]

        # Asegura que no se eliminen por completo muestras, al menos un átomo con Z > remove_threshold por muestra.
        assert len(batch.unique()) == n_samples, (
            "Some samples were completely filtered out by the atom filter. "
            f"Make sure that at least one atom per sample exists with Z > {self.remove_threshold}."
        )
        return x, v, z, pos, batch

## Modelos de salida

In [150]:
# Importación de módulos y clases necesarios desde las bibliotecas estándar de Python y PyTorch.
from abc import abstractmethod, ABCMeta  # Importa las clases para definir clases abstractas y metaclasses.
from torch_scatter import scatter  # Importa la función 'scatter' desde 'torch_scatter'.
from typing import Optional  # Importa el tipo de datos opcional para indicar que un valor puede ser None.
#import torchmdnet.models.utils.act_class_mapping, GatedEquivariantBlock  # Importaciones que están actualmente comentadas.
#import torchmdnet.utils.atomic_masses  # Importación que está actualmente comentada.
import torch  # Importa el módulo 'torch' de PyTorch.
from torch import nn  # Importa el módulo de redes neuronales de PyTorch.

Este fragmento de código define la clase OutputModel, que es una clase abstracta destinada a representar modelos de salida en el contexto de un proyecto de aprendizaje automático.

In [151]:
# Definición de la clase OutputModel que hereda de nn.Module y utiliza la metacalse ABCMeta.
class OutputModel(nn.Module, metaclass=ABCMeta):
    # Constructor de la clase OutputModel que toma como argumentos 'allow_prior_model' y 'reduce_op'.
    def __init__(self, allow_prior_model, reduce_op):
        super(OutputModel, self).__init__()
        self.allow_prior_model = allow_prior_model
        self.reduce_op = reduce_op

    # Método para restablecer los parámetros del modelo (método vacío).
    def reset_parameters(self):
        pass

    # Método abstracto 'pre_reduce' que debe ser implementado por las clases hijas.
    @abstractmethod
    def pre_reduce(self, x, v, z, pos, batch):
        return

    # Método 'reduce' que utiliza la función 'scatter' para realizar una operación de reducción.
    def reduce(self, x, batch):
        return scatter(x, batch, dim=0, reduce=self.reduce_op)

    # Método 'post_reduce' que devuelve la salida sin cambios.
    def post_reduce(self, x):
        return x


En este fragmento de código se define la clase Scalar, que es un tipo de modelo de salida. 

In [152]:
# Visible
# Definición de la clase Scalar que hereda de OutputModel.
class Scalar(OutputModel):
    # Constructor de la clase Scalar que toma varios argumentos para configurar el modelo.
    def __init__(
        self,
        hidden_channels,
        activation="silu",
        allow_prior_model=True,
        reduce_op="sum",
        dtype=torch.float
    ):
        super(Scalar, self).__init__(
            allow_prior_model=allow_prior_model, reduce_op=reduce_op
        )
        
        # Mapeo de la función de activación a una clase correspondiente.
        act_class = act_class_mapping[activation]

        # Definición de la red de salida como una secuencia de capas lineales y funciones de activación.
        self.output_network = nn.Sequential(
            nn.Linear(hidden_channels, hidden_channels // 2, dtype=dtype),
            act_class(),
            nn.Linear(hidden_channels // 2, 1, dtype=dtype),
        )

        # Llama al método reset_parameters para inicializar los parámetros de la red de salida.
        self.reset_parameters()

    # Método para restablecer los parámetros de la red de salida.
    def reset_parameters(self):
        nn.init.xavier_uniform_(self.output_network[0].weight)
        self.output_network[0].bias.data.fill_(0)
        nn.init.xavier_uniform_(self.output_network[2].weight)
        self.output_network[2].bias.data.fill_(0)

    # Método 'pre_reduce' para realizar operaciones antes de la reducción en los datos.
    def pre_reduce(self, x, v: Optional[torch.Tensor], z, pos, batch):
        return self.output_network(x)

In [153]:
# Definición de la clase EquivariantScalar que hereda de OutputModel.
class EquivariantScalar(OutputModel):
    # Constructor de la clase EquivariantScalar que toma varios argumentos para configurar el modelo.
    def __init__(
        self,
        hidden_channels,
        activation="silu",
        allow_prior_model=True,
        reduce_op="sum",
        dtype=torch.float
    ):
        super(EquivariantScalar, self).__init__(
            allow_prior_model=allow_prior_model, reduce_op=reduce_op
        )
        
        # Definición de la red de salida como una secuencia de bloques equivariantes y activaciones.
        self.output_network = nn.ModuleList(
            [
                GatedEquivariantBlock(
                    hidden_channels,
                    hidden_channels // 2,
                    activation=activation,
                    scalar_activation=True,
                    dtype=dtype
                ),
                GatedEquivariantBlock(hidden_channels // 2, 1, activation=activation, dtype=dtype),
            ]
        )

        # Llama al método reset_parameters para inicializar los parámetros de la red de salida.
        self.reset_parameters()

    # Método para restablecer los parámetros de la red de salida.
    def reset_parameters(self):
        for layer in self.output_network:
            layer.reset_parameters()

    # Método 'pre_reduce' para realizar operaciones antes de la reducción en los datos.
    def pre_reduce(self, x, v, z, pos, batch):
        for layer in self.output_network:
            x, v = layer(x, v)
        # Incluye 'v' en la salida para asegurar que todos los parámetros tengan un gradiente.
        # include v in output to make sure all parameters have a gradient
        return x + v.sum() * 0

Este fragmento de código define la clase DipoleMoment, que hereda de la clase Scalar. La clase DipoleMoment se utiliza para calcular el momento dipolar en un sistema.

In [154]:
# Visible
# Definición de la clase DipoleMoment que hereda de Scalar.
class DipoleMoment(Scalar):
    # Constructor de la clase DipoleMoment que toma varios argumentos para configurar el modelo.
    def __init__(self, hidden_channels, activation="silu", reduce_op="sum", dtype=torch.float):
        super(DipoleMoment, self).__init__(
            hidden_channels, activation, allow_prior_model=False, reduce_op=reduce_op, dtype=dtype
        )
        
        # Cálculo de la masa atómica a partir de 'atomic_masses' y registro como un búfer en el modelo.
        atomic_mass = torch.from_numpy(atomic_masses).to(dtype)
        self.register_buffer("atomic_mass", atomic_mass)

    # Método 'pre_reduce' para realizar operaciones antes de la reducción en los datos.
    def pre_reduce(self, x, v: Optional[torch.Tensor], z, pos, batch):
        x = self.output_network(x)

        # Cálculo del centro de masas.
        # Get center of mass.
        mass = self.atomic_mass[z].view(-1, 1)
        c = scatter(mass * pos, batch, dim=0) / scatter(mass, batch, dim=0)
        
        # Cálculo del momento dipolar.
        x = x * (pos - c[batch])
        return x

    # Método 'post_reduce' para realizar operaciones después de la reducción en los datos.
    def post_reduce(self, x):
        # Cálculo de la norma del momento dipolar.
        return torch.norm(x, dim=-1, keepdim=True)

EquivariantDipoleMoment es una clase especializada para calcular el momento dipolar en sistemas que requieren consideraciones de invariancia y transformaciones específicas. 

In [155]:
# Definición de la clase EquivariantDipoleMoment que hereda de EquivariantScalar.
class EquivariantDipoleMoment(EquivariantScalar):
    # Constructor de la clase EquivariantDipoleMoment que toma varios argumentos para configurar el modelo.
    def __init__(self, hidden_channels, activation="silu", reduce_op="sum", dtype=torch.float):
        super(EquivariantDipoleMoment, self).__init__(
            hidden_channels, activation, allow_prior_model=False, reduce_op=reduce_op, dtype=dtype
        )
        
        # Cálculo de la masa atómica a partir de 'atomic_masses' y registro como un búfer en el modelo.
        atomic_mass = torch.from_numpy(atomic_masses).to(dtype)
        self.register_buffer("atomic_mass", atomic_mass)

    # Método 'pre_reduce' para realizar operaciones antes de la reducción en los datos.
    def pre_reduce(self, x, v, z, pos, batch):
        for layer in self.output_network:
            x, v = layer(x, v)

        # Cálculo del centro de masas.
        # Get center of mass.
        mass = self.atomic_mass[z].view(-1, 1)
        c = scatter(mass * pos, batch, dim=0) / scatter(mass, batch, dim=0)
        # Cálculo del momento dipolar.
        x = x * (pos - c[batch])
        
        # Añade 'v' a la salida para asegurar que todos los parámetros tengan un gradiente.
        return x + v.squeeze()

    # Método 'post_reduce' para realizar operaciones después de la reducción en los datos.
    def post_reduce(self, x):
        # Cálculo de la norma del momento dipolar.
        return torch.norm(x, dim=-1, keepdim=True)

ElectronicSpatialExtent es una clase que calcula y representa la extensión espacial electrónica de un sistema, teniendo en cuenta las posiciones de los átomos y la masa atómica.

In [156]:
# Visible
# Definición de la clase ElectronicSpatialExtent que hereda de OutputModel.
class ElectronicSpatialExtent(OutputModel):
    # Constructor de la clase ElectronicSpatialExtent que toma varios argumentos para configurar el modelo.
    def __init__(self, hidden_channels, activation="silu", reduce_op="sum", dtype=torch.float):
        super(ElectronicSpatialExtent, self).__init__(
            allow_prior_model=False, reduce_op=reduce_op
        )
        
        # Mapeo de la función de activación a una clase correspondiente.
        act_class = act_class_mapping[activation]

        # Definición de la red de salida como una secuencia de capas lineales y funciones de activación.
        self.output_network = nn.Sequential(
            nn.Linear(hidden_channels, hidden_channels // 2, dtype=dtype),
            act_class(),
            nn.Linear(hidden_channels // 2, 1, dtype=dtype),
        )

        # Cálculo de la masa atómica a partir de 'atomic_masses' y registro como un búfer en el modelo.
        atomic_mass = torch.from_numpy(atomic_masses).to(dtype)
        self.register_buffer("atomic_mass", atomic_mass)

        # Llama al método reset_parameters para inicializar los parámetros de la red de salida.
        self.reset_parameters()

    # Método para restablecer los parámetros de la red de salida.
    def reset_parameters(self):
        nn.init.xavier_uniform_(self.output_network[0].weight)
        self.output_network[0].bias.data.fill_(0)
        nn.init.xavier_uniform_(self.output_network[2].weight)
        self.output_network[2].bias.data.fill_(0)

    # Método 'pre_reduce' para realizar operaciones antes de la reducción en los datos.
    def pre_reduce(self, x, v: Optional[torch.Tensor], z, pos, batch):
        x = self.output_network(x)

        # Cálculo del centro de masas.
        # Get center of mass.
        mass = self.atomic_mass[z].view(-1, 1)
        c = scatter(mass * pos, batch, dim=0) / scatter(mass, batch, dim=0)

        # Cálculo de la distancia al cuadrado entre las posiciones y el centro de masas.
        x = torch.norm(pos - c[batch], dim=1, keepdim=True) ** 2 * x
        return x

In [157]:
class EquivariantElectronicSpatialExtent(ElectronicSpatialExtent):
    pass

EquivariantVectorOutput es una subclase de EquivariantScalar. Aunque la clase base se llama "EquivariantScalar", esta subclase está diseñada para manejar salidas vectoriales en lugar de escalares. Esto significa que se utiliza para generar vectores de salida en lugar de un solo valor escalar.

In [158]:
# Definición de la clase EquivariantVectorOutput que hereda de EquivariantScalar.
class EquivariantVectorOutput(EquivariantScalar):
    # Constructor de la clase EquivariantVectorOutput que toma varios argumentos para configurar el modelo.
    def __init__(self, hidden_channels, activation="silu", reduce_op="sum", dtype=torch.float):
        super(EquivariantVectorOutput, self).__init__(
            hidden_channels, activation, allow_prior_model=False, reduce_op="sum", dtype=dtype
        )

    # Método 'pre_reduce' para realizar operaciones antes de la reducción en los datos.
    def pre_reduce(self, x, v, z, pos, batch):
        for layer in self.output_network:
            x, v = layer(x, v)
        
        # Devuelve 'v' después de aplicar las capas de la red de salida.
        return v.squeeze()

## Base previos (Priors)

In [159]:
# Importamos las clases 'nn' y 'Tensor' desde el módulo 'torch'
# Estas clases son proporcionadas por PyTorch, un popular marco de trabajo para el aprendizaje profundo.
from torch import nn, Tensor

# Importamos el tipo 'Optional' y el tipo 'Dict' desde el módulo 'typing'
# Estos tipos se utilizan para proporcionar información adicional sobre las variables y argumentos en el código.
from typing import Optional, Dict

 define una clase llamada BasePrior, que sirve como una plantilla base para modelos de prior. Los modelos de prior son utilizados en aplicaciones de aprendizaje profundo para preprocesar predicciones atomísticas o moleculares antes de ser utilizadas en tareas posteriores.

In [160]:
# Definimos una clase llamada 'BasePrior' que hereda de 'nn.Module', lo que significa que es un módulo de PyTorch.
class BasePrior(nn.Module):
    r"""Base class for prior models.
    Derive this class to make custom prior models, which take some arguments and a dataset as input.
    As an example, have a look at the `torchmdnet.priors.atomref.Atomref` prior.
    """
    # El método '__init__' es el constructor de la clase.
    def __init__(self, dataset=None):
        # Llamamos al constructor de la clase base 'nn.Module' utilizando 'super()'.
        super().__init__()

    # Definimos un método llamado 'get_init_args'.
    def get_init_args(self):
        r"""A function that returns all required arguments to construct a prior object.
        The values should be returned inside a dict with the keys being the arguments' names.
        All values should also be saveable in a .yaml file as this is used to reconstruct the
        prior model from a checkpoint file.
        """
        return {}
    
    # Definimos un método llamado 'pre_reduce'.
    def pre_reduce(self, x, z, pos, batch, extra_args: Optional[Dict[str, Tensor]]):
        r"""Pre-reduce method of the prior model.

        Args:
            x (torch.Tensor): scalar atom-wise predictions from the model.
            z (torch.Tensor): atom types of all atoms.
            pos (torch.Tensor): 3D atomic coordinates.
            batch (torch.Tensor): tensor containing the sample index for each atom.
            extra_args (dict): any addition fields provided by the dataset

        Returns:
            torch.Tensor: updated scalar atom-wise predictions
        """
        return x
    
        # Este método recibe varios argumentos y devuelve un tensor 'x'. 
        # Puede ser usado para realizar operaciones de preprocesamiento en las predicciones atomísticas.

    # Definimos un método llamado 'post_reduce'.
    def post_reduce(self, y, z, pos, batch, extra_args: Optional[Dict[str, Tensor]]):
        r"""Post-reduce method of the prior model.

        Args:
            y (torch.Tensor): scalar molecule-wise predictions from the model.
            z (torch.Tensor): atom types of all atoms.
            pos (torch.Tensor): 3D atomic coordinates.
            batch (torch.Tensor): tensor containing the sample index for each atom.
            extra_args (dict): any addition fields provided by the dataset

        Returns:
            torch.Tensor: updated scalar molecular-wise predictions
        """
        return y
    
# La clase 'BasePrior' actúa como una plantilla base para la creación de modelos de prior específicos que heredan de ella.


## Priors: Atomref

In [161]:
# from torchmdnet.priors.base import BasePrior
from typing import Optional, Dict  # Importamos tipos para anotaciones de funciones.
import torch  # Importamos PyTorch, un marco de trabajo para aprendizaje profundo.
from torch import nn, Tensor  # Importamos clases y tipos específicos de PyTorch.
from pytorch_lightning.utilities import rank_zero_warn  # Importamos una función para emitir advertencias.


Esta parte del código define una clase llamada 'Atomref' que hereda de 'BasePrior'. La clase 'Atomref' se utiliza para implementar un modelo de referencia de átomos.

In [162]:
class Atomref(BasePrior):
    r"""Atomref prior model.
    When using this in combination with some dataset, the dataset class must implement
    the function `get_atomref`, which returns the atomic reference values as a tensor.
    """
    # El método '__init__' es el constructor de la clase.
    def __init__(self, max_z=None, dataset=None):
        # Llamamos al constructor de la clase base 'BasePrior' utilizando 'super()'.
        super().__init__()
        # Comprobamos los argumentos pasados al constructor.
        if max_z is None and dataset is None:
            # Si tanto 'max_z' como 'dataset' son 'None', lanzamos una excepción ValueError.
            raise ValueError("Can't instantiate Atomref prior, all arguments are None.")
        if dataset is None:
            # Si 'dataset' es 'None', creamos un tensor de referencia de átomos lleno de ceros.
            atomref = torch.zeros(max_z, 1)
        else:
            # Si 'dataset' no es 'None', obtenemos el tensor de referencia de átomos utilizando el método 'get_atomref' del dataset.
            atomref = dataset.get_atomref()
            if atomref is None:
                # Si el tensor de referencia de átomos es 'None', emitimos una advertencia y utilizamos un tensor lleno de ceros con un máximo número atómico de 99.
                rank_zero_warn(
                    "The atomref returned by the dataset is None, defaulting to zeros with max. "
                    "atomic number 99. Maybe atomref is not defined for the current target."
                )
                atomref = torch.zeros(100, 1)

        if atomref.ndim == 1:
            # Si el tensor de referencia de átomos tiene una dimensión, lo redimensionamos a una columna.
            atomref = atomref.view(-1, 1)

        # Registramos el tensor de referencia de átomos como un buffer para que se incluya en la lista de parámetros del modelo.
        self.register_buffer("initial_atomref", atomref)
        # Creamos una capa de embedding llamada 'atomref' que asigna índices a valores de referencia de átomos.
        self.atomref = nn.Embedding(len(atomref), 1)
        # Inicializamos los pesos de la capa de embedding con el tensor de referencia de átomos.
        self.atomref.weight.data.copy_(atomref)

    # Definimos un método llamado 'reset_parameters'.
    def reset_parameters(self):
        # Restablecemos los pesos de la capa de embedding 'atomref' a los valores iniciales.
        self.atomref.weight.data.copy_(self.initial_atomref)

    # Definimos un método llamado 'get_init_args'.
    def get_init_args(self):
        # Este método devuelve un diccionario con el valor de 'max_z', que es la longitud del tensor de referencia de átomos.
        return dict(max_z=self.initial_atomref.size(0))

    # Definimos un método llamado 'pre_reduce'.
    def pre_reduce(self, x: Tensor, z: Tensor, pos: Tensor, batch: Tensor, extra_args: Optional[Dict[str, Tensor]]):
        # Este método toma predicciones 'x', tipos de átomos 'z', coordenadas 'pos', índices de lotes 'batch', y argumentos adicionales 'extra_args'.
        # Luego, ajusta las predicciones 'x' agregando los valores de referencia de átomos correspondientes a los tipos de átomos 'z'.
        return x + self.atomref(z)

## Modelo: equivariant-transformer

In [163]:
# Importamos los siguientes módulos y tipos para su uso en el código.
from typing import Optional, Tuple  # Importamos tipos para anotaciones de funciones.
import torch  # Importamos PyTorch, un marco de trabajo para aprendizaje profundo.
from torch import Tensor, nn  # Importamos clases y tipos específicos de PyTorch.
from torch_geometric.nn import MessagePassing  # Importamos una clase de PyTorch Geometric para pasar mensajes entre nodos en grafos.
from torch_scatter import scatter  # Importamos una función para realizar reducciones en datos dispersos.
#from torchmdnet.models.utils import (
#    NeighborEmbedding,
#    CosineCutoff,
#    Distance,
#    rbf_class_mapping,
#    act_class_mapping,
#)

In [164]:
class TorchMD_ET(nn.Module):
    r"""The TorchMD equivariant Transformer architecture.

    Args:
        hidden_channels (int, optional): Hidden embedding size.
            (default: :obj:`128`)
        num_layers (int, optional): The number of attention layers.
            (default: :obj:`6`)
        num_rbf (int, optional): The number of radial basis functions :math:`\mu`.
            (default: :obj:`50`)
        rbf_type (string, optional): The type of radial basis function to use.
            (default: :obj:`"expnorm"`)
        trainable_rbf (bool, optional): Whether to train RBF parameters with
            backpropagation. (default: :obj:`True`)
        activation (string, optional): The type of activation function to use.
            (default: :obj:`"silu"`)
        attn_activation (string, optional): The type of activation function to use
            inside the attention mechanism. (default: :obj:`"silu"`)
        neighbor_embedding (bool, optional): Whether to perform an initial neighbor
            embedding step. (default: :obj:`True`)
        num_heads (int, optional): Number of attention heads.
            (default: :obj:`8`)
        distance_influence (string, optional): Where distance information is used inside
            the attention mechanism. (default: :obj:`"both"`)
        cutoff_lower (float, optional): Lower cutoff distance for interatomic interactions.
            (default: :obj:`0.0`)
        cutoff_upper (float, optional): Upper cutoff distance for interatomic interactions.
            (default: :obj:`5.0`)
        max_z (int, optional): Maximum atomic number. Used for initializing embeddings.
            (default: :obj:`100`)
        max_num_neighbors (int, optional): Maximum number of neighbors to return for a
            given node/atom when constructing the molecular graph during forward passes.
            This attribute is passed to the torch_cluster radius_graph routine keyword
            max_num_neighbors, which normally defaults to 32. Users should set this to
            higher values if they are using higher upper distance cutoffs and expect more
            than 32 neighbors per node/atom.
            (default: :obj:`32`)
    """

    # El método '__init__' es el constructor de la clase y se utiliza para inicializar sus atributos.
    def __init__(
        self,
        hidden_channels=128,
        num_layers=6,
        num_rbf=50,
        rbf_type="expnorm",
        trainable_rbf=True,
        activation="silu",
        attn_activation="silu",
        neighbor_embedding=True,
        num_heads=8,
        distance_influence="both",
        cutoff_lower=0.0,
        cutoff_upper=5.0,
        max_z=100,
        max_num_neighbors=32,
        dtype=torch.float32,
    ):
        # Llamamos al constructor de la clase base 'nn.Module'.
        super(TorchMD_ET, self).__init()

        # Validamos los valores de algunos de los parámetros pasados al constructor.
        assert distance_influence in ["keys", "values", "both", "none"]
        assert rbf_type in rbf_class_mapping, (
            f'Unknown RBF type "{rbf_type}". '
            f'Choose from {", ".join(rbf_class_mapping.keys())}.'
        )
        assert activation in act_class_mapping, (
            f'Unknown activation function "{activation}". '
            f'Choose from {", ".join(act_class_mapping.keys())}.'
        )
        assert attn_activation in act_class_mapping, (
            f'Unknown attention activation function "{attn_activation}". '
            f'Choose from {", ".join(act_class_mapping.keys())}.'
        )

        # Asignamos los parámetros pasados al constructor como atributos de la clase.
        self.hidden_channels = hidden_channels
        self.num_layers = num_layers
        self.num_rbf = num_rbf
        self.rbf_type = rbf_type
        self.trainable_rbf = trainable_rbf
        self.activation = activation
        self.attn_activation = attn_activation
        self.neighbor_embedding = neighbor_embedding
        self.num_heads = num_heads
        self.distance_influence = distance_influence
        self.cutoff_lower = cutoff_lower
        self.cutoff_upper = cutoff_upper
        self.max_z = max_z
        self.dtype = dtype

        # Obtenemos la clase de función de activación adecuada según el parámetro 'activation'.
        act_class = act_class_mapping[activation]

        # Creamos una capa de embedding llamada 'embedding' para representar los tipos de átomos.
        self.embedding = nn.Embedding(self.max_z, hidden_channels, dtype=dtype)

        # Creamos una instancia de la clase 'Distance' para calcular distancias entre átomos.
        self.distance = Distance(
            cutoff_lower,
            cutoff_upper,
            max_num_neighbors=max_num_neighbors,
            return_vecs=True,
            loop=True,
        )

        # Creamos una instancia de la función de base radial (RBF) para la expansión de distancia.
        self.distance_expansion = rbf_class_mapping[rbf_type](
            cutoff_lower, cutoff_upper, num_rbf, trainable_rbf
        )

        # Creamos una instancia de la clase 'NeighborEmbedding' para la representación de vecinos.
        # Esto se hace si 'neighbor_embedding' es verdadero, de lo contrario, se establece en 'None'.
        self.neighbor_embedding = (
            NeighborEmbedding(
                hidden_channels, num_rbf, cutoff_lower, cutoff_upper, self.max_z, dtype
            ).jittable()
            if neighbor_embedding
            else None
        )

        # Creamos una lista de capas de atención llamada 'attention_layers' utilizando 'nn.ModuleList'.
        self.attention_layers = nn.ModuleList()
        for _ in range(num_layers):
            layer = EquivariantMultiHeadAttention(
                hidden_channels,
                num_rbf,
                distance_influence,
                num_heads,
                act_class,
                attn_activation,
                cutoff_lower,
                cutoff_upper,
                dtype,
            ).jittable()
            self.attention_layers.append(layer)

        # Creamos una capa de normalización llamada 'out_norm'.
        self.out_norm = nn.LayerNorm(hidden_channels, dtype=dtype)

        # Inicializamos los parámetros de la clase llamando al método 'reset_parameters'.
        self.reset_parameters()

    # Definimos un método llamado 'reset_parameters'.
    def reset_parameters(self):
        # Reiniciamos los parámetros de la capa de embedding 'embedding'.
        self.embedding.reset_parameters()

        # Reiniciamos los parámetros de la expansión de distancia 'distance_expansion'.
        self.distance_expansion.reset_parameters()

        # Si 'neighbor_embedding' no es 'None', reiniciamos sus parámetros.
        if self.neighbor_embedding is not None:
            self.neighbor_embedding.reset_parameters()

        # Reiniciamos los parámetros de cada capa de atención en 'attention_layers'.
        for attn in self.attention_layers:
            attn.reset_parameters()

        # Reiniciamos los parámetros de la capa de normalización 'out_norm'.
        self.out_norm.reset_parameters()


    # Definimos un método llamado 'forward', que es utilizado para realizar el pase hacia adelante (forward pass) de la arquitectura.
    def forward(
        self,
        z: Tensor,  # Tipos de átomos
        pos: Tensor,  # Coordenadas de los átomos
        batch: Tensor,  # Índices de lote
        q: Optional[Tensor] = None,  # Opcional: cargas de átomos
        s: Optional[Tensor] = None,  # Opcional: masas de átomos
    ) -> Tuple[Tensor, Tensor, Tensor, Tensor, Tensor]:
        # Comenzamos por pasar los tipos de átomos 'z' a través de la capa de embedding 'self.embedding'.
        x = self.embedding(z)

        # Calculamos las distancias entre los átomos utilizando el módulo 'self.distance'.
        edge_index, edge_weight, edge_vec = self.distance(pos, batch)

        # Agregamos una aserción (assert) para garantizar que 'edge_vec' no sea 'None'.
        # Esto es necesario para convencer a TorchScript de que 'edge_vec' no es opcional.
        # 'edge_vec' contiene información direccional sobre las distancias y es crucial para el cálculo de las transformaciones equivariantes.
        # This assert must be here to convince TorchScript that edge_vec is not None
        # If you remove it TorchScript will complain down below that you cannot use an Optional[Tensor]
        assert (
            edge_vec is not None
        ), "Distance module did not return directional information"

        # Aplicamos una expansión de distancia a los pesos de los bordes utilizando 'self.distance_expansion'.
        edge_attr = self.distance_expansion(edge_weight)

        # Creamos una máscara para identificar los bordes válidos.
        mask = edge_index[0] != edge_index[1]

        # Normalizamos los vectores de borde que no sean cero.
        edge_vec[mask] = edge_vec[mask] / torch.norm(edge_vec[mask], dim=1).unsqueeze(1)

        # Si 'self.neighbor_embedding' no es 'None', aplicamos la representación de vecinos a los datos 'x'.
        if self.neighbor_embedding is not None:
            x = self.neighbor_embedding(z, x, edge_index, edge_weight, edge_attr)

        # Inicializamos un tensor 'vec' con ceros para almacenar información direccional.
        vec = torch.zeros(x.size(0), 3, x.size(1), device=x.device, dtype=x.dtype)

        # Aplicamos múltiples capas de atención en un bucle.
        for attn in self.attention_layers:
            dx, dvec = attn(x, vec, edge_index, edge_weight, edge_attr, edge_vec)
            x = x + dx
            vec = vec + dvec

        # Normalizamos la salida 'x' utilizando 'self.out_norm'.
        x = self.out_norm(x)

        # Devolvemos un conjunto de tensores como resultado de la propagación hacia adelante.
        return x, vec, z, pos, batch

    # Definimos el método '__repr__' que se utiliza para proporcionar una representación de cadena del objeto de la clase.
    def __repr__(self):
        # Construimos una cadena que describe los atributos de la instancia de la clase.
        return (
            f"{self.__class__.__name__}("
            f"hidden_channels={self.hidden_channels}, "
            f"num_layers={self.num_layers}, "
            f"num_rbf={self.num_rbf}, "
            f"rbf_type={self.rbf_type}, "
            f"trainable_rbf={self.trainable_rbf}, "
            f"activation={self.activation}, "
            f"attn_activation={self.attn_activation}, "
            f"neighbor_embedding={self.neighbor_embedding}, "
            f"num_heads={self.num_heads}, "
            f"distance_influence={self.distance_influence}, "
            f"cutoff_lower={self.cutoff_lower}, "
            f"cutoff_upper={self.cutoff_upper}), "
            f"dtype={self.dtype}"
        )

In [165]:
# Definimos una nueva clase llamada 'EquivariantMultiHeadAttention' que hereda de 'MessagePassing'.
class EquivariantMultiHeadAttention(MessagePassing):
    # El método '__init__' es el constructor de la clase y se utiliza para inicializar sus atributos.
    def __init(
        self,
        hidden_channels,  # Número de canales ocultos
        num_rbf,  # Número de funciones de base radial (RBF)
        distance_influence,  # Influencia de la distancia
        num_heads,  # Número de cabezas de atención
        activation,  # Función de activación
        attn_activation,  # Función de activación de atención
        cutoff_lower,  # Valor de corte inferior
        cutoff_upper,  # Valor de corte superior
        dtype=torch.float32,  # Tipo de datos
    ):
        # Llamamos al constructor de la clase base 'MessagePassing' con 'aggr="add"' y 'node_dim=0'.
        super(EquivariantMultiHeadAttention, self).__init__(aggr="add", node_dim=0)

        # Validamos que 'hidden_channels' sea divisible de manera uniforme por 'num_heads'.
        assert hidden_channels % num_heads == 0, (
            f"The number of hidden channels ({hidden_channels}) "
            f"must be evenly divisible by the number of "
            f"attention heads ({num_heads})"
        )

        # Asignamos los parámetros pasados al constructor como atributos de la clase.
        self.distance_influence = distance_influence
        self.num_heads = num_heads
        self.hidden_channels = hidden_channels
        self.head_dim = hidden_channels // num_heads
        self.layernorm = nn.LayerNorm(hidden_channels, dtype=dtype)
        self.act = activation()
        self.attn_activation = act_class_mapping[attn_activation]()
        self.cutoff = CosineCutoff(cutoff_lower, cutoff_upper)

        # Definimos proyecciones lineales para 'q', 'k', 'v', y 'o'.
        self.q_proj = nn.Linear(hidden_channels, hidden_channels, dtype=dtype)
        self.k_proj = nn.Linear(hidden_channels, hidden_channels, dtype=dtype)
        self.v_proj = nn.Linear(hidden_channels, hidden_channels * 3, dtype=dtype)
        self.o_proj = nn.Linear(hidden_channels, hidden_channels * 3, dtype=dtype)

        # Definimos una proyección lineal para 'vec'.
        self.vec_proj = nn.Linear(hidden_channels, hidden_channels * 3, bias=False, dtype=dtype)

        # Si 'distance_influence' contiene "keys" o "both", definimos una proyección para 'dk'.
        self.dk_proj = None
        if distance_influence in ["keys", "both"]:
            self.dk_proj = nn.Linear(num_rbf, hidden_channels, dtype=dtype)

        # Si 'distance_influence' contiene "values" o "both", definimos una proyección para 'dv'.
        self.dv_proj = None
        if distance_influence in ["values", "both"]:
            self.dv_proj = nn.Linear(num_rbf, hidden_channels * 3, dtype=dtype)

        # Inicializamos los parámetros de la clase llamando al método 'reset_parameters'.
        self.reset_parameters()

    # Definimos una función llamada 'reset_parameters' utilizada para inicializar los parámetros del modelo.
    def reset_parameters(self):
        # Reiniciamos los parámetros de la capa de normalización 'layernorm'.
        self.layernorm.reset_parameters()

        # Inicializamos los parámetros de las proyecciones lineales 'q_proj', 'k_proj', 'v_proj', y 'o_proj'
        # utilizando la inicialización Xavier uniforme para los pesos y llenando los sesgos con ceros.
        nn.init.xavier_uniform_(self.q_proj.weight)
        self.q_proj.bias.data.fill_(0)
        nn.init.xavier_uniform_(self.k_proj.weight)
        self.k_proj.bias.data.fill_(0)
        nn.init.xavier_uniform_(self.v_proj.weight)
        self.v_proj.bias.data.fill_(0)
        nn.init.xavier_uniform_(self.o_proj.weight)
        self.o_proj.bias.data.fill_(0)

        # Inicializamos los parámetros de la proyección lineal 'vec_proj' utilizando Xavier uniforme para los pesos.
        nn.init.xavier_uniform_(self.vec_proj.weight)

        # Si 'dk_proj' existe (no es 'None'), inicializamos sus parámetros de manera similar a las proyecciones anteriores.
        if self.dk_proj:
            nn.init.xavier_uniform_(self.dk_proj.weight)
            self.dk_proj.bias.data.fill_(0)

        # Si 'dv_proj' existe (no es 'None'), inicializamos sus parámetros de manera similar a las proyecciones anteriores.
        if self.dv_proj:
            nn.init.xavier_uniform_(self.dv_proj.weight)
            self.dv_proj.bias.data.fill_(0)

    # Definimos un método llamado 'forward' utilizado para realizar el pase hacia adelante de la capa de atención multi-cabeza.
    def forward(self, x, vec, edge_index, r_ij, f_ij, d_ij):
        # Aplicamos la capa de normalización 'layernorm' a la entrada 'x'.
        x = self.layernorm(x)

        # Proyectamos 'x' en los espacios de consulta, clave y valor ('q', 'k', 'v') y reformateamos para la atención multi-cabeza.
        q = self.q_proj(x).reshape(-1, self.num_heads, self.head_dim)
        k = self.k_proj(x).reshape(-1, self.num_heads, self.head_dim)
        v = self.v_proj(x).reshape(-1, self.num_heads, self.head_dim * 3)

        # Dividimos 'vec_proj' en tres partes y realizamos una operación de producto escalar para obtener 'vec_dot'.
        vec1, vec2, vec3 = torch.split(self.vec_proj(vec), self.hidden_channels, dim=-1)
        vec = vec.reshape(-1, 3, self.num_heads, self.head_dim)
        vec_dot = (vec1 * vec2).sum(dim=1)

        # Proyectamos 'f_ij' en 'dk' (claves) y 'dv' (valores) si las proyecciones existen.
        dk = (
            self.act(self.dk_proj(f_ij)).reshape(-1, self.num_heads, self.head_dim)
            if self.dk_proj is not None
            else None
        )
        dv = (
            self.act(self.dv_proj(f_ij)).reshape(-1, self.num_heads, self.head_dim * 3)
            if self.dv_proj is not None
            else None
        )

        # Llamamos al método 'propagate' para realizar la atención multi-cabeza y obtener resultados.
        # propagate_type: (q: Tensor, k: Tensor, v: Tensor, vec: Tensor, dk: Tensor, dv: Tensor, r_ij: Tensor, d_ij: Tensor)
        x, vec = self.propagate(
            edge_index,
            q=q,
            k=k,
            v=v,
            vec=vec,
            dk=dk,
            dv=dv,
            r_ij=r_ij,
            d_ij=d_ij,
            size=None,
        )
        # Reformateamos los resultados para que tengan la forma adecuada.
        x = x.reshape(-1, self.hidden_channels)
        vec = vec.reshape(-1, 3, self.hidden_channels)

        # Dividimos la proyección de salida 'o_proj' en tres partes y calculamos 'dx' y 'dvec'.
        o1, o2, o3 = torch.split(self.o_proj(x), self.hidden_channels, dim=1)
        dx = vec_dot * o2 + o3
        dvec = vec3 * o1.unsqueeze(1) + vec
        # Devolvemos 'dx' y 'dvec' como resultados de la atención multi-cabeza.
        return dx, dvec

    def message(self, q_i, k_j, v_j, vec_j, dk, dv, r_ij, d_ij):
        # attention mechanism
        if dk is None:
            # Si 'dk' no está definido (influencia de la distancia en las claves no está activa),
            # calculamos la atención como el producto escalar entre 'q_i' y 'k_j'.
            attn = (q_i * k_j).sum(dim=-1)
        else:
            # Si 'dk' está definido (influencia de la distancia en las claves está activa),
            # calculamos la atención como el producto escalar entre 'q_i', 'k_j' y 'dk'.
            attn = (q_i * k_j * dk).sum(dim=-1)

        # attention activation function
        attn = self.attn_activation(attn) * self.cutoff(r_ij).unsqueeze(1)

        # value pathway
        if dv is not None:
            # Si 'dv' está definido (influencia de la distancia en los valores está activa),
            # aplicamos 'dv' a 'v_j'.
            v_j = v_j * dv
        # Dividimos 'v_j' en tres partes ('x', 'vec1', 'vec2') de acuerdo a la dimensión de cabezas.
        x, vec1, vec2 = torch.split(v_j, self.head_dim, dim=2)

        # update scalar features
        x = x * attn.unsqueeze(2)
        # update vector features
        vec = vec_j * vec1.unsqueeze(1) + vec2.unsqueeze(1) * d_ij.unsqueeze(
            2
        ).unsqueeze(3)
        # Devolvemos las características actualizadas 'x' y 'vec'.
        return x, vec

    # Definimos un método llamado 'aggregate' que se utiliza para realizar la agregación de características.
    def aggregate(
        self,
        features: Tuple[torch.Tensor, torch.Tensor],
        index: torch.Tensor,
        ptr: Optional[torch.Tensor],
        dim_size: Optional[int],
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        # Desempaquetamos las características en 'x' y 'vec'.
        x, vec = features
        # Utilizamos la función 'scatter' para agregar 'x' y 'vec' de acuerdo al índice 'index'.
        # Esto permite agrupar las características correspondientes a los mismos nodos.
        x = scatter(x, index, dim=self.node_dim, dim_size=dim_size)
        vec = scatter(vec, index, dim=self.node_dim, dim_size=dim_size)
        # Devolvemos las características agregadas 'x' y 'vec'.
        return x, vec

    # Definimos un método llamado 'update' que se utiliza para actualizar las características después de la agregación.
    def update(
        self, inputs: Tuple[torch.Tensor, torch.Tensor]
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        # En esta implementación actual, la función no realiza ninguna operación.
        # Simplemente devuelve las mismas características de entrada.

        # Devolvemos las mismas características de entrada 'inputs'.
        return inputs

## Envoltorio del Modelo

In [166]:
import re # Importamos el módulo 're' para trabajar con expresiones regulares.
from typing import Optional, List, Tuple, Dict # Importamos tipos de Python para anotaciones de tipos.
import torch
from torch.autograd import grad
from torch import nn, Tensor
from torch_scatter import scatter
from pytorch_lightning.utilities import rank_zero_warn# Importamos una función 'rank_zero_warn' desde el módulo 'rank_zero_warn' en 'pytorch_lightning.utilities'.
#from torchmdnet.models import output_modules
#from torchmdnet.models.wrappers import AtomFilter
#from torchmdnet.models.utils import dtype_mapping
#from torchmdnet import priors
import warnings # Importamos el módulo 'warnings' para gestionar advertencias en Python.

In [167]:
def create_model(args, prior_model=None, mean=None, std=None):
    """Create a model from the given arguments.
    See :func:`get_args` in scripts/train.py for a description of the arguments.
    Parameters
    ----------
        args (dict): Arguments for the model.
        prior_model (nn.Module, optional): Prior model to use. Defaults to None.
        mean (torch.Tensor, optional): Mean of the training data. Defaults to None.
        std (torch.Tensor, optional): Standard deviation of the training data. Defaults to None.
    Returns
    -------
        nn.Module: An instance of the TorchMD_Net model.
    """
    # Mapeamos el tipo de datos (dtype) según la precisión especificada en los argumentos.
    dtype = dtype_mapping[args["precision"]]

    # Definimos un diccionario con argumentos compartidos necesarios para crear el modelo.
    shared_args = dict(
        hidden_channels=args["embedding_dimension"],
        num_layers=args["num_layers"],
        num_rbf=args["num_rbf"],
        rbf_type=args["rbf_type"],
        trainable_rbf=args["trainable_rbf"],
        activation=args["activation"],
        cutoff_lower=args["cutoff_lower"],
        cutoff_upper=args["cutoff_upper"],
        max_z=args["max_z"],
        max_num_neighbors=args["max_num_neighbors"],
        dtype=dtype
    )

    # representation network
    if args["model"] == "graph-network":
        from torchmdnet.models.torchmd_gn import TorchMD_GN

        is_equivariant = False
        representation_model = TorchMD_GN(
            num_filters=args["embedding_dimension"],
            aggr=args["aggr"],
            neighbor_embedding=args["neighbor_embedding"],
            **shared_args
        )
    elif args["model"] == "transformer":
        from torchmdnet.models.torchmd_t import TorchMD_T

        is_equivariant = False
        representation_model = TorchMD_T(
            attn_activation=args["attn_activation"],
            num_heads=args["num_heads"],
            distance_influence=args["distance_influence"],
            neighbor_embedding=args["neighbor_embedding"],
            **shared_args,
        )
    elif args["model"] == "equivariant-transformer":
        from torchmdnet.models.torchmd_et import TorchMD_ET

        is_equivariant = True
        representation_model = TorchMD_ET(
            attn_activation=args["attn_activation"],
            num_heads=args["num_heads"],
            distance_influence=args["distance_influence"],
            neighbor_embedding=args["neighbor_embedding"],
            **shared_args,
        )
    elif args["model"] == "tensornet":
        from torchmdnet.models.tensornet import TensorNet
	# Setting is_equivariant to False to enforce the use of Scalar output module instead of EquivariantScalar
        is_equivariant = False
        representation_model = TensorNet(
	    equivariance_invariance_group=args["equivariance_invariance_group"],
            **shared_args,
        )
    else:
        raise ValueError(f'Unknown architecture: {args["model"]}')

    # atom filter
    if not args["derivative"] and args["atom_filter"] > -1:
        representation_model = AtomFilter(representation_model, args["atom_filter"])
    elif args["atom_filter"] > -1:
        raise ValueError("Derivative and atom filter can't be used together")

    # prior model
    if args["prior_model"] and prior_model is None:
        # instantiate prior model if it was not passed to create_model (i.e. when loading a model)
        prior_model = create_prior_models(args)

    # create output network
    output_prefix = "Equivariant" if is_equivariant else ""
#    output_model = getattr(output_modules, output_prefix + args["output_model"])(
    output_model = EquivariantScalar(
        args["embedding_dimension"],
        activation=args["activation"],
        reduce_op=args["reduce_op"],
        dtype=dtype,
    )

    # combine representation and output network
    model = TorchMD_Net(
        representation_model,
        output_model,
        prior_model=prior_model,
        mean=mean,
        std=std,
        derivative=args["derivative"],
        dtype=dtype,
    )
    return model

In [168]:
def load_model(filepath, args=None, device="cpu", **kwargs):
    # Cargamos el estado del modelo desde el archivo
    ckpt = torch.load(filepath, map_location="cpu")
    # Si no se proporcionaron los argumentos 'args', los obtenemos del checkpoint
    if args is None:
        args = ckpt["hyper_parameters"]

    # Actualizamos los argumentos con los valores proporcionados en kwargs
    for key, value in kwargs.items():
        if not key in args:
            warnings.warn(f"Unknown hyperparameter: {key}={value}")
        args[key] = value

    # Creamos un modelo nuevo utilizando los argumentos
    model = create_model(args)

    # Preparamos el estado del modelo cargado
    state_dict = {re.sub(r"^model\.", "", k): v for k, v in ckpt["state_dict"].items()}
    # The following are for backward compatibility with models created when atomref was
    # the only supported prior.
    if 'prior_model.initial_atomref' in state_dict:
        state_dict['prior_model.0.initial_atomref'] = state_dict['prior_model.initial_atomref']
        del state_dict['prior_model.initial_atomref']
    if 'prior_model.atomref.weight' in state_dict:
        state_dict['prior_model.0.atomref.weight'] = state_dict['prior_model.atomref.weight']
        del state_dict['prior_model.atomref.weight']

    # Cargamos el estado del modelo en el modelo creado
    model.load_state_dict(state_dict)
    # Transferimos el modelo al dispositivo especificado (por defecto, "cpu")
    return model.to(device)

In [169]:
def create_prior_models(args, dataset=None):
    """Parse the prior_model configuration option and create the prior models."""
    prior_models = [] # Lista para almacenar los modelos previos creados
    if args['prior_model']:
        prior_model = args['prior_model']
        prior_names = []
        prior_args = []
        # Si prior_model es una lista, iteramos sobre sus elementos
        if not isinstance(prior_model, list):
            prior_model = [prior_model]
        # Procesamos cada elemento de prior_model
        for prior in prior_model:
            if isinstance(prior, dict):
                # Si el elemento es un diccionario, se espera que contenga el nombre del modelo y sus argumentos
                for key, value in prior.items():
                    prior_names.append(key)
                    if value is None:
                        prior_args.append({})
                    else:
                        prior_args.append(value)
            else:
                # Si el elemento no es un diccionario, se considera que es el nombre del modelo sin argumentos
                prior_names.append(prior)
                prior_args.append({})

        # Si se proporcionan argumentos específicos en 'prior_args', los utilizamos en lugar de los argumentos encontrados
        if 'prior_args' in args:
            prior_args = args['prior_args']
            if not isinstance(prior_args, list):
                prior_args = [prior_args]

        # Creamos los modelos previos
        for name, arg in zip(prior_names, prior_args):
#            assert hasattr(priors, name), (
#                f"Unknown prior model {name}. "
#                f"Available models are {', '.join(priors.__all__)}"
#            )
            # initialize the prior model
#            prior_models.append(getattr(priors, name)(dataset=dataset, **arg))
            prior_models.append(Atomref(dataset=dataset, **arg))
    return prior_models

In [170]:
class TorchMD_Net(nn.Module):
    """The  TorchMD_Net class  combines a  given representation  model
    (such as  the equivariant transformer),  an output model  (such as
    the scalar output  module) and a prior model (such  as the atomref
    prior), producing a  Module that takes as input a  series of atoms
    features  and  outputs  a  scalar   value  (i.e  energy  for  each
    batch/molecule) and,  derivative is True, the  negative of  its derivative
    with respect to the positions (i.e forces for each atom).

    """
    def __init__(
        self,
        representation_model,
        output_model,
        prior_model=None,
        mean=None,
        std=None,
        derivative=False,
        dtype=torch.float32,
    ):
        super(TorchMD_Net, self).__init__()
        self.representation_model = representation_model.to(dtype=dtype)
        self.output_model = output_model.to(dtype=dtype)

        if not output_model.allow_prior_model and prior_model is not None:
            prior_model = None
            rank_zero_warn(
                (
                    "Prior model was given but the output model does "
                    "not allow prior models. Dropping the prior model."
                )
            )
#        if isinstance(prior_model, priors.base.BasePrior):
#            prior_model = [prior_model]
        self.prior_model = None if prior_model is None else torch.nn.ModuleList(prior_model).to(dtype=dtype)

        self.derivative = derivative

        mean = torch.scalar_tensor(0) if mean is None else mean
        self.register_buffer("mean", mean.to(dtype=dtype))
        std = torch.scalar_tensor(1) if std is None else std
        self.register_buffer("std", std.to(dtype=dtype))

        self.reset_parameters()

    def reset_parameters(self):
        self.representation_model.reset_parameters()
        self.output_model.reset_parameters()
        if self.prior_model is not None:
            for prior in self.prior_model:
                prior.reset_parameters()

    def forward(
        self,
        z: Tensor,
        pos: Tensor,
        batch: Optional[Tensor] = None,
        q: Optional[Tensor] = None,
        s: Optional[Tensor] = None,
        extra_args: Optional[Dict[str, Tensor]] = None
    ) -> Tuple[Tensor, Optional[Tensor]]:
        """Compute the output of the model.
        Args:
            z (Tensor): Atomic numbers of the atoms in the molecule. Shape (N,).
            pos (Tensor): Atomic positions in the molecule. Shape (N, 3).
            batch (Tensor, optional): Batch indices for the atoms in the molecule. Shape (N,).
            q (Tensor, optional): Atomic charges in the molecule. Shape (N,).
            s (Tensor, optional): Atomic spins in the molecule. Shape (N,).
            extra_args (Dict[str, Tensor], optional): Extra arguments to pass to the prior model.
        """

        assert z.dim() == 1 and z.dtype == torch.long
        batch = torch.zeros_like(z) if batch is None else batch

        if self.derivative:
            pos.requires_grad_(True)

        # run the potentially wrapped representation model
        x, v, z, pos, batch = self.representation_model(z, pos, batch, q=q, s=s)

        # apply the output network
        x = self.output_model.pre_reduce(x, v, z, pos, batch)

        # scale by data standard deviation
        if self.std is not None:
            x = x * self.std

        # apply atom-wise prior model
        if self.prior_model is not None:
            for prior in self.prior_model:
                x = prior.pre_reduce(x, z, pos, batch, extra_args)

        # aggregate atoms
        x = self.output_model.reduce(x, batch)

        # shift by data mean
        if self.mean is not None:
            x = x + self.mean

        # apply output model after reduction
        y = self.output_model.post_reduce(x)

        # apply molecular-wise prior model
        if self.prior_model is not None:
            for prior in self.prior_model:
                y = prior.post_reduce(y, z, pos, batch, extra_args)

        # compute gradients with respect to coordinates
        if self.derivative:
            grad_outputs: List[Optional[torch.Tensor]] = [torch.ones_like(y)]
            dy = grad(
                [y],
                [pos],
                grad_outputs=grad_outputs,
                create_graph=True,
                retain_graph=True,
            )[0]
            if dy is None:
                raise RuntimeError("Autograd returned None for the force prediction.")

            return y, -dy
        # TODO: return only `out` once Union typing works with TorchScript (https://github.com/pytorch/pytorch/pull/53180)
        return y, None

## Compilación del modelo

Modulos de la compilación del modelo

In [171]:
import torch
from torch.optim import AdamW #se utiliza para optimizar modelos durante el entrenamiento
from torch.optim.lr_scheduler import ReduceLROnPlateau #un programador de tasa de aprendizaje que ajusta la tasa de aprendizaje en función de las métricas de entrenamiento
from torch.nn.functional import mse_loss, l1_loss # mse_loss (pérdida de error cuadrático medio) y l1_loss (pérdida L1)
from torch import Tensor #tipo de dato 'Tensor' de PyTorch
from typing import Optional, Dict, Tuple #anotaciones de tipo opcionales, diccionarios y tuplas del módulo 'typing'

from pytorch_lightning import LightningModule #clase 'LightningModule' de PyTorch Lightning, que se utiliza para crear módulos de entrenamiento
# from torchmdnet.models.model import create_model, load_model

Compilación del modelo

In [172]:
# Definición de la clase LNNP que hereda de LightningModule, lo que permite el uso de PyTorch Lightning para entrenar y evaluar modelos.
class LNNP(LightningModule):
    """
    Lightning wrapper for the Neural Network Potentials in TorchMD-Net.
    """
    def __init__(self, hparams, prior_model=None, mean=None, std=None):
        # Llamamos al constructor de la clase padre (LightningModule) utilizando 'super'.
        super(LNNP, self).__init__()
        
        # Verificamos si 'charge' y 'spin' están definidos en los hiperparámetros; si no, los establecemos como 'False'.
        if "charge" not in hparams:
            hparams["charge"] = False
        if "spin" not in hparams:
            hparams["spin"] = False

        # Guardamos los hiperparámetros en el modelo para que estén disponibles en todo momento.
        self.save_hyperparameters(hparams)
    
        # Si se especifica un modelo previamente entrenado, lo cargamos utilizando 'load_model'; de lo contrario, creamos un nuevo modelo utilizando 'create_model'.
        if self.hparams.load_model:
            self.model = load_model(self.hparams.load_model, args=self.hparams)
        else:
            self.model = create_model(self.hparams, prior_model, mean, std)

        # Inicializamos el suavizado exponencial (exponential moving average, EMA).
        # initialize exponential smoothing
        self.ema = None
        self._reset_ema_dict() # Llamamos a la función para configurar la EMA.

        # Inicializamos una colección de pérdidas (losses).
        # initialize loss collection
        self.losses = None
        self._reset_losses_dict()    # Llamamos a la función para configurar la colección de pérdidas.

        # Inicializamos una lista para almacenar las pérdidas de validación de la suma de errores cuadráticos ('val_sqr_e').
        self.losses["val_sqr_e"] = []

    def configure_optimizers(self):
        # Se configura el optimizador AdamW para actualizar los parámetros del modelo durante el entrenamiento.
        optimizer = AdamW(
            self.model.parameters(), # Parámetros del modelo a ser optimizados.
            lr=self.hparams.lr,  # Tasa de aprendizaje, obtenida de los hiperparámetros.
            weight_decay=self.hparams.weight_decay, # Término de decaimiento de peso (regularización L2).
        )
        # Se configura un programador de la tasa de aprendizaje (scheduler) ReduceLROnPlateau para ajustar la tasa de aprendizaje durante el entrenamiento.
        scheduler = ReduceLROnPlateau(
            optimizer,    # Optimizador al que se aplica el programador.
            "min",    # Modo de reducción: "min" significa reducir la tasa de aprendizaje cuando la métrica disminuye.
            factor=self.hparams.lr_factor,    # Factor de reducción de la tasa de aprendizaje.
            patience=self.hparams.lr_patience,    # Paciencia: número de épocas sin mejoras antes de reducir la tasa de aprendizaje.
            min_lr=self.hparams.lr_min,    # Tasa de aprendizaje mínima permitida.
        )
        # Se configura un diccionario 'lr_scheduler' que contiene información sobre el programador de la tasa de aprendizaje.
        lr_scheduler = {
            "scheduler": scheduler,    # El programador previamente configurado.
            "monitor": getattr(self.hparams, "lr_metric", "val_loss"),   # Métrica a monitorizar para ajustar la tasa de aprendizaje.
            "interval": "epoch",  # Intervalo de ajuste de la tasa de aprendizaje por época.
            "frequency": 1, # Frecuencia de ajuste de la tasa de aprendizaje (cada época en este caso).
        }
        # Se devuelve una lista con el optimizador y una lista con el programador de tasa de aprendizaje.
        return [optimizer], [lr_scheduler]

    def forward(self,
                z: Tensor,  # Tensor de características de átomos.
                pos: Tensor, # Tensor de coordenadas tridimensionales de átomos.
                batch: Optional[Tensor] = None,  # Tensor de índices de lotes (opcional).
                q: Optional[Tensor] = None,  # Tensor de cargas atómicas (opcional).
                s: Optional[Tensor] = None,  # Tensor de espines (opcional).
                extra_args: Optional[Dict[str, Tensor]] = None  # Argumentos adicionales (opcional).
                ) -> Tuple[Tensor, Optional[Tensor]]:
        # El método simplemente redirige la llamada al método 'forward' del modelo interno 'self.model'.
        return self.model(z, pos, batch=batch, q=q, s=s, extra_args=extra_args)

    # Método para una etapa de entrenamiento de un modelo de aprendizaje automático.
    def training_step(self, batch, batch_idx):
        # Devuelve el resultado de la función 'step' con los parámetros de lote, la función de pérdida MSE y la etiqueta "train".
        return self.step(batch, mse_loss, "train")

    # Método para una etapa de validación de un modelo de aprendizaje automático.
    def validation_step(self, batch, batch_idx, *args):
        # Verifica si hay argumentos adicionales y si el primer argumento es cero o no se proporciona ningún argumento.
        if len(args) == 0 or (len(args) > 0 and args[0] == 0):
            # Paso de validación. Devuelve el resultado de la función 'step' con los parámetros de lote, la función de pérdida MSE y la etiqueta "val".
            # validation step
            return self.step(batch, mse_loss, "val")
        # Paso de prueba. Devuelve el resultado de la función 'step' con los parámetros de lote, la función de pérdida L1 y la etiqueta "test".
        # test step
        return self.step(batch, l1_loss, "test")
    # Método para una etapa de prueba de un modelo de aprendizaje automático.
    def test_step(self, batch, batch_idx):
        # Devuelve el resultado de la función 'step' con los parámetros de lote, la función de pérdida L1 y la etiqueta "test".
        return self.step(batch, l1_loss, "test")

    # Método principal que se utiliza para realizar un paso de procesamiento en una etapa específica (entrenamiento, validación o prueba).
    # Recibe un lote de datos (batch), una función de pérdida (loss_fn) y una etiqueta de etapa (stage).
    def step(self, batch, loss_fn, stage):
        # Habilita o deshabilita el cálculo de gradientes según si estamos en la etapa de entrenamiento o si se requieren derivadas (self.hparams.derivative).
        with torch.set_grad_enabled(stage == "train" or self.hparams.derivative):
            # Crea un diccionario 'extra_args' a partir de los atributos del lote 'batch'.
            # Luego, elimina ciertos elementos ('y', 'neg_dy', 'z', 'pos', 'batch', 'q', 's') de 'extra_args'.
            extra_args = batch.to_dict()
            for a in ('y', 'neg_dy', 'z', 'pos', 'batch', 'q', 's'):
                if a in extra_args:
                    del extra_args[a]
            # TODO: the model doesn't necessarily need to return a derivative once
            # Union typing works under TorchScript (https://github.com/pytorch/pytorch/pull/53180)

            # Realiza la inferencia del modelo utilizando los datos del lote y obtiene las predicciones 'y' y las derivadas negativas 'neg_dy'.            
            y, neg_dy = self(
                batch.z,
                batch.pos,
                batch=batch.batch,
                q=batch.q if self.hparams.charge else None,
                s=batch.s if self.hparams.spin else None,
                extra_args=extra_args
            )

        # Inicializa las variables para las pérdidas 'loss_y' y 'loss_neg_dy'.
        loss_y, loss_neg_dy = 0, 0
        # Si se requieren derivadas (self.hparams.derivative) y no se proporcionó la etiqueta "y" en el lote.
        if self.hparams.derivative:
            if "y" not in batch:
                # "use" both outputs of the model's forward function but discard the first
                # to only use the negative derivative and avoid 'Expected to have finished reduction
                # in the prior iteration before starting a new one.', which otherwise get's
                # thrown because of setting 'find_unused_parameters=False' in the DDPPlugin
                # Suma los valores de 'y' pero los descarta, lo que evita ciertos errores de reducción en paralelo.
                neg_dy = neg_dy + y.sum() * 0

            # Calcula la pérdida de derivadas negativas ('loss_neg_dy') utilizando la función de pérdida proporcionada.
            # negative derivative loss
            loss_neg_dy = loss_fn(neg_dy, batch.neg_dy)

            # Realiza un suavizado exponencial de la pérdida de derivadas negativas si se encuentra en etapa de entrenamiento o validación.
            if stage in ["train", "val"] and self.hparams.ema_alpha_neg_dy < 1:
                if self.ema[stage + "_neg_dy"] is None:
                    self.ema[stage + "_neg_dy"] = loss_neg_dy.detach()
                # apply exponential smoothing over batches to neg_dy
                loss_neg_dy = (
                    self.hparams.ema_alpha_neg_dy * loss_neg_dy
                    + (1 - self.hparams.ema_alpha_neg_dy) * self.ema[stage + "_neg_dy"]
                )
                self.ema[stage + "_neg_dy"] = loss_neg_dy.detach()

            # Agrega la pérdida de derivadas negativas al registro de pérdidas si su peso es mayor que cero.
            if self.hparams.neg_dy_weight > 0:
                self.losses[stage + "_neg_dy"].append(loss_neg_dy.detach())

        # Si la etiqueta "y" está presente en el lote.
        if "y" in batch:
            if batch.y.ndim == 1:
                batch.y = batch.y.unsqueeze(1)

            # Calcula el error cuadrático medio y la diferencia de valor absoluto entre las predicciones y las etiquetas reales.
            # Calcula la pérdida 'loss_y' utilizando la función de pérdida proporcionada.
            # y loss
            loss_y = loss_fn(y, batch.y)
            squared_errors = mse_loss(y, batch.y)
            absolute_value_difference = l1_loss(y, batch.y)

            # Realiza un suavizado exponencial de la pérdida 'loss_y' si se encuentra en etapa de entrenamiento o validación.
            if stage in ["train", "val"] and self.hparams.ema_alpha_y < 1:
                if self.ema[stage + "_y"] is None:
                    self.ema[stage + "_y"] = loss_y.detach()
                # apply exponential smoothing over batches to y
                loss_y = (
                    self.hparams.ema_alpha_y * loss_y
                    + (1 - self.hparams.ema_alpha_y) * self.ema[stage + "_y"]
                )
                self.ema[stage + "_y"] = loss_y.detach()

            # Agrega la pérdida 'loss_y' al registro de pérdidas si su peso es mayor que cero.
            if self.hparams.y_weight > 0:
                self.losses[stage + "_y"].append(loss_y.detach())

        # Calcula la pérdida total como una combinación ponderada de 'loss_y' y 'loss_neg_dy'.
        # total loss
        loss = loss_y * self.hparams.y_weight + loss_neg_dy * self.hparams.neg_dy_weight

        # Agrega la pérdida total al registro de pérdidas para la etapa actual.
        self.losses[stage].append(loss.detach())
        self.losses[stage + "_sqr_e"].append(squared_errors.detach())
        self.losses[stage + "_avd_e"].append(absolute_value_difference.detach())
        # Devuelve la pérdida total calculada.
        return loss

    def optimizer_step(self, *args, **kwargs):
        # Obtiene el optimizador del diccionario de argumentos 'kwargs' si está presente, de lo contrario, toma el tercer argumento en 'args'.
        optimizer = kwargs["optimizer"] if "optimizer" in kwargs else args[2]
        # Verifica si el paso global actual del entrenador es menor que el número de pasos de calentamiento de la tasa de aprendizaje (lr_warmup_steps).
        if self.trainer.global_step < self.hparams.lr_warmup_steps:
            # Calcula un factor de escala 'lr_scale' para la tasa de aprendizaje basado en el progreso del paso global actual.
            lr_scale = min(
                1.0,
                float(self.trainer.global_step + 1)
                / float(self.hparams.lr_warmup_steps),
            )
            # Ajusta la tasa de aprendizaje en cada grupo de parámetros del optimizador.
            for pg in optimizer.param_groups:
                pg["lr"] = lr_scale * self.hparams.lr
        # Llama al método 'optimizer_step' de la clase base (superclase) para realizar la actualización estándar del optimizador.
        super().optimizer_step(*args, **kwargs)
        # Limpia los gradientes acumulados en el optimizador para evitar que se acumulen gradientes de lotes anteriores.
        optimizer.zero_grad()

    def training_epoch_end(self, training_step_outputs):
        # Obtiene una referencia al DataModule (dm) desde el entrenador (trainer).
        dm = self.trainer.datamodule
        # Verifica si el DataModule tiene un conjunto de datos de prueba ('test_dataset') y si tiene al menos un elemento.
        if hasattr(dm, "test_dataset") and len(dm.test_dataset) > 0:
            # Determina si se debe restablecer el conjunto de validación antes y después de la época de prueba.
            should_reset = (
                self.current_epoch % self.hparams.test_interval == 0
                or (self.current_epoch + 1) % self.hparams.test_interval == 0
            )
            # Si se determina que se debe restablecer el conjunto de validación, lo hace.
            if should_reset:
                # reset validation dataloaders before and after testing epoch, which is faster
                # than skipping test validation steps by returning None
                self.trainer.reset_val_dataloader(self)

    # Método llamado al final de cada época de validación.
    # Recibe una lista de salidas de los pasos de validación de la época (validation_step_outputs).
    def validation_epoch_end(self, validation_step_outputs):
        # Verifica si no se está realizando una comprobación de cordura (sanity_checking) del entrenador.
        if not self.trainer.sanity_checking:
            # Filtra y selecciona las pérdidas de entrenamiento (train_avd_e) que no contienen valores NaN o infinitos.
            valid_losses_train = [loss for loss in self.losses["train_avd_e"] if not torch.isnan(loss).any()]
            valid_losses_train = [loss for loss in valid_losses_train if not torch.isinf(loss).any() and torch.max(loss) < float("inf")]
            # Calcula la desviación estándar de las pérdidas de entrenamiento.            
            std_train = torch.std(torch.stack(valid_losses_train))

            # Filtra y selecciona las pérdidas de validación (val_avd_e) que no contienen valores NaN o infinitos.
            valid_losses_val = [loss for loss in self.losses["val_avd_e"] if not torch.isnan(loss).any()]
            valid_losses_val = [loss for loss in valid_losses_val if not torch.isinf(loss).any() and torch.max(loss) < float("inf")]
            # Calcula la desviación estándar de las pérdidas de validación.
            std_val = torch.std(torch.stack(valid_losses_val))

            # Construye un diccionario que contiene diversas métricas y estadísticas calculadas.
            # construct dict of logged metrics
            result_dict = {
                "epoch": float(self.current_epoch), # Número de la época actual.
                "lr": self.trainer.optimizers[0].param_groups[0]["lr"], # Tasa de aprendizaje.
                "train_loss": torch.stack(self.losses["train"]).mean(), # Pérdida promedio de entrenamiento.
                "val_loss": torch.stack(self.losses["val"]).mean(), # Pérdida promedio de validación.
                "std_train":std_train, # Desviación estándar de las pérdidas de entrenamiento.
                "mse_train":torch.stack(self.losses["train_sqr_e"]).mean(), # Error cuadrático medio de entrenamiento.
                "rmse_train":torch.sqrt(torch.stack(self.losses["train_sqr_e"]).mean()), # Raíz del error cuadrático medio de entrenamiento.
                "mae_train":torch.stack(self.losses["train_avd_e"]).mean(), # Valor absoluto medio de entrenamiento.
                "std_val":std_val, # Desviación estándar de las pérdidas de validación.
                "mse_val":torch.stack(self.losses["val_sqr_e"]).mean(), # Error cuadrático medio de validación.
                "rmse_val":torch.sqrt(torch.stack(self.losses["val_sqr_e"]).mean()), # Raíz del error cuadrático medio de validación.
                "mae_val":torch.stack(self.losses["val_avd_e"]).mean(), # Valor absoluto medio de validación.
            }

            # Si hay pérdidas disponibles para el conjunto de prueba (test), realiza lo siguiente.
            # add test loss if available
            if len(self.losses["test"]) > 0:
                # Filtra y selecciona las pérdidas de prueba (test_avd_e) que no contienen valores NaN o infinitos.
                valid_losses_test = [loss for loss in self.losses["test_avd_e"] if not torch.isnan(loss).any()]
                valid_losses_test = [loss for loss in valid_losses_test if not torch.isinf(loss).any() and torch.max(loss) < float("inf")]
                # Calcula la desviación estándar de las pérdidas de prueba.                
                std_test = torch.std(torch.stack(valid_losses_test))
                # Agrega métricas relacionadas con el conjunto de prueba al diccionario 'result_dict'.
                result_dict["test_loss"] = torch.stack(self.losses["test"]).mean()
                result_dict["std_test"] = std_test
                result_dict["mse_test"] = torch.stack(self.losses["test_sqr_e"]).mean()
                result_dict["rmse_test"] = torch.sqrt(torch.stack(self.losses["test_sqr_e"]).mean())
                result_dict["mae_test"] = torch.stack(self.losses["test_avd_e"]).mean()
            # Si se disponen de pérdidas para predicciones ('train_y' y 'train_neg_dy') y derivadas ('train_neg_dy'), también las registra por separado.            
            # if prediction and derivative are present, also log them separately
            if len(self.losses["train_y"]) > 0 and len(self.losses["train_neg_dy"]) > 0:
                result_dict["train_loss_y"] = torch.stack(self.losses["train_y"]).mean()
                result_dict["train_loss_neg_dy"] = torch.stack(
                    self.losses["train_neg_dy"]
                ).mean()
                result_dict["val_loss_y"] = torch.stack(self.losses["val_y"]).mean()
                result_dict["val_loss_neg_dy"] = torch.stack(self.losses["val_neg_dy"]).mean()

                # Si hay pérdidas disponibles para el conjunto de prueba, también las registra por separado.
                if len(self.losses["test"]) > 0:
                    result_dict["test_loss_y"] = torch.stack(
                        self.losses["test_y"]
                    ).mean()
                    result_dict["test_loss_neg_dy"] = torch.stack(
                        self.losses["test_neg_dy"]
                    ).mean()
            # Registra las métricas en el registro y las sincroniza si se está utilizando distribución (sync_dist=True).
            self.log_dict(result_dict, sync_dist=True)
        # Restablece el diccionario de pérdidas para futuras épocas.
        self._reset_losses_dict()

    def _reset_losses_dict(self):
        self.losses = {
            "train": [],
            "val": [],
            "train_sqr_e": [],
            "train_avd_e": [],
            "val_sqr_e": [],
            "val_avd_e": [],
            "test": [],
            "test_sqr_e": [],
            "test_avd_e": [],
            "train_y": [],
            "val_y": [],
            "test_y": [],
            "train_neg_dy": [],
            "val_neg_dy": [],
            "test_neg_dy": [],
        }

    def _reset_ema_dict(self):
        self.ema = {"train_y": None, "val_y": None, "train_neg_dy": None, "val_neg_dy": None}

## Entrenamiento

Modulos del entrenamiento

In [173]:
import sys
import os
import argparse
import logging
import pytorch_lightning as pl
from pytorch_lightning.callbacks import EarlyStopping
from pytorch_lightning.callbacks.model_checkpoint import ModelCheckpoint
from pytorch_lightning.loggers import CSVLogger, WandbLogger
from pytorch_lightning.strategies.ddp import DDPStrategy
import yaml
#from torchmdnet.module import LNNP
#from torchmdnet import datasets, priors, models
#from torchmdnet.data import DataModule
#from torchmdnet.models import output_modules
#from torchmdnet.models.model import create_prior_models
#from torchmdnet.models.utils import rbf_class_mapping, act_class_mapping, dtype_mapping
#from torchmdnet.utils import LoadFromFile, LoadFromCheckpoint, save_argparse, number
import torch

Función para tomar argumentos de la terminal

In [174]:
def get_args():
    # fmt: off
    parser = argparse.ArgumentParser(description='Training')
    parser.add_argument('--load-model', action=LoadFromCheckpoint, help='Restart training using a model checkpoint')  # keep first
    parser.add_argument('--conf', '-c', type=open, action=LoadFromFile, help='Configuration yaml file')  # keep second
    parser.add_argument('--num-epochs', default=300, type=int, help='number of epochs')
    parser.add_argument('--batch-size', default=32, type=int, help='batch size')
    parser.add_argument('--inference-batch-size', default=None, type=int, help='Batchsize for validation and tests.')
    parser.add_argument('--lr', default=1e-4, type=float, help='learning rate')
    parser.add_argument('--lr-patience', type=int, default=10, help='Patience for lr-schedule. Patience per eval-interval of validation')
    parser.add_argument('--lr-metric', type=str, default='val_loss', choices=['train_loss', 'val_loss'], help='Metric to monitor when deciding whether to reduce learning rate')
    parser.add_argument('--lr-min', type=float, default=1e-6, help='Minimum learning rate before early stop')
    parser.add_argument('--lr-factor', type=float, default=0.8, help='Factor by which to multiply the learning rate when the metric stops improving')
    parser.add_argument('--lr-warmup-steps', type=int, default=0, help='How many steps to warm-up over. Defaults to 0 for no warm-up')
    parser.add_argument('--early-stopping-patience', type=int, default=30, help='Stop training after this many epochs without improvement')
    parser.add_argument('--reset-trainer', type=bool, default=False, help='Reset training metrics (e.g. early stopping, lr) when loading a model checkpoint')
    parser.add_argument('--weight-decay', type=float, default=0.0, help='Weight decay strength')
    parser.add_argument('--ema-alpha-y', type=float, default=1.0, help='The amount of influence of new losses on the exponential moving average of y')
    parser.add_argument('--ema-alpha-neg-dy', type=float, default=1.0, help='The amount of influence of new losses on the exponential moving average of dy')
    parser.add_argument('--ngpus', type=int, default=-1, help='Number of GPUs, -1 use all available. Use CUDA_VISIBLE_DEVICES=1, to decide gpus')
    parser.add_argument('--num-nodes', type=int, default=1, help='Number of nodes')
    parser.add_argument('--precision', type=int, default=32, choices=[16, 32, 64], help='Floating point precision')
    parser.add_argument('--log-dir', '-l', default='output', help='log file')
    parser.add_argument('--splits', default=None, help='Npz with splits idx_train, idx_val, idx_test')
    parser.add_argument('--train-size', type=number, default=None, help='Percentage/number of samples in training set (None to use all remaining samples)')
    parser.add_argument('--val-size', type=number, default=0.05, help='Percentage/number of samples in validation set (None to use all remaining samples)')
    parser.add_argument('--test-size', type=number, default=0.1, help='Percentage/number of samples in test set (None to use all remaining samples)')
    parser.add_argument('--test-interval', type=int, default=10, help='Test interval, one test per n epochs (default: 10)')
    parser.add_argument('--save-interval', type=int, default=10, help='Save interval, one save per n epochs (default: 10)')
    parser.add_argument('--seed', type=int, default=1, help='random seed (default: 1)')
    parser.add_argument('--num-workers', type=int, default=4, help='Number of workers for data prefetch')
    parser.add_argument('--redirect', type=bool, default=False, help='Redirect stdout and stderr to log_dir/log')
    parser.add_argument('--gradient-clipping', type=float, default=0.0, help='Gradient clipping norm')

    # dataset specific
    parser.add_argument('--dataset', default=None, type=str, help='Name of the torch_geometric dataset')
    parser.add_argument('--dataset-root', default='input/data', type=str, help='Data storage directory (not used if dataset is "CG")')
    parser.add_argument('--dataset-arg', default=None, type=str, help='Additional dataset arguments, e.g. target property for QM9 or molecule for MD17. Need to be specified in JSON format i.e. \'{"molecules": "aspirin,benzene"}\'')
    parser.add_argument('--coord-files', default=None, type=str, help='Custom coordinate files glob')
    parser.add_argument('--embed-files', default=None, type=str, help='Custom embedding files glob')
    parser.add_argument('--energy-files', default=None, type=str, help='Custom energy files glob')
    parser.add_argument('--force-files', default=None, type=str, help='Custom force files glob')
    parser.add_argument('--y-weight', default=1.0, type=float, help='Weighting factor for y label in the loss function')
    parser.add_argument('--neg-dy-weight', default=1.0, type=float, help='Weighting factor for neg_dy label in the loss function')

    # model architecture
    parser.add_argument('--model', type=str, default='graph-network', help='Which model to train')
    parser.add_argument('--output-model', type=str, default='Scalar', help='The type of output model')
    parser.add_argument('--prior-model', type=str, default=None, help='Which prior model to use')

    # architectural args
    parser.add_argument('--charge', type=bool, default=False, help='Model needs a total charge')
    parser.add_argument('--spin', type=bool, default=False, help='Model needs a spin state')
    parser.add_argument('--embedding-dimension', type=int, default=256, help='Embedding dimension')
    parser.add_argument('--num-layers', type=int, default=6, help='Number of interaction layers in the model')
    parser.add_argument('--num-rbf', type=int, default=64, help='Number of radial basis functions in model')
    parser.add_argument('--activation', type=str, default='silu', choices=list(act_class_mapping.keys()), help='Activation function')
    parser.add_argument('--rbf-type', type=str, default='expnorm', choices=list(rbf_class_mapping.keys()), help='Type of distance expansion')
    parser.add_argument('--trainable-rbf', type=bool, default=False, help='If distance expansion functions should be trainable')
    parser.add_argument('--neighbor-embedding', type=bool, default=False, help='If a neighbor embedding should be applied before interactions')
    parser.add_argument('--aggr', type=str, default='add', help='Aggregation operation for CFConv filter output. Must be one of \'add\', \'mean\', or \'max\'')

    # Transformer specific
    parser.add_argument('--distance-influence', type=str, default='both', choices=['keys', 'values', 'both', 'none'], help='Where distance information is included inside the attention')
    parser.add_argument('--attn-activation', default='silu', choices=list(act_class_mapping.keys()), help='Attention activation function')
    parser.add_argument('--num-heads', type=int, default=8, help='Number of attention heads')
    
    # TensorNet specific
    parser.add_argument('--equivariance-invariance-group', type=str, default='O(3)', help='Equivariance and invariance group of TensorNet')

    # other args
    parser.add_argument('--derivative', default=False, type=bool, help='If true, take the derivative of the prediction w.r.t coordinates')
    parser.add_argument('--cutoff-lower', type=float, default=0.0, help='Lower cutoff in model')
    parser.add_argument('--cutoff-upper', type=float, default=5.0, help='Upper cutoff in model')
    parser.add_argument('--atom-filter', type=int, default=-1, help='Only sum over atoms with Z > atom_filter')
    parser.add_argument('--max-z', type=int, default=100, help='Maximum atomic number that fits in the embedding matrix')
    parser.add_argument('--max-num-neighbors', type=int, default=32, help='Maximum number of neighbors to consider in the network')
    parser.add_argument('--standardize', type=bool, default=False, help='If true, multiply prediction by dataset std and add mean')
    parser.add_argument('--reduce-op', type=str, default='add', choices=['add', 'mean'], help='Reduce operation to apply to atomic predictions')
    parser.add_argument('--wandb-use', default=False, type=bool, help='Defines if wandb is used or not')
    parser.add_argument('--wandb-name', default='training', type=str, help='Give a name to your wandb run')
    parser.add_argument('--wandb-project', default='training_', type=str, help='Define what wandb Project to log to')
    parser.add_argument('--wandb-resume-from-id', default=None, type=str, help='Resume a wandb run from a given run id. The id can be retrieved from the wandb dashboard')
    parser.add_argument('--tensorboard-use', default=False, type=bool, help='Defines if tensor board is used or not')

    # fmt: on

    args = parser.parse_args()

    # Carga los valores desde un archivo YAML
    with open('input/ET-QM9.yaml', 'r') as file:
        yaml_data = yaml.safe_load(file)

    # Asigna los valores del archivo YAML a los argumentos que coinciden
    for key, value in yaml_data.items():
        if hasattr(args, key):
            setattr(args, key, value)

    if args.redirect:
        sys.stdout = open(os.path.join(args.log_dir, "log"), "w")
        sys.stderr = sys.stdout
        logging.getLogger("pytorch_lightning").addHandler(
            logging.StreamHandler(sys.stdout)
        )

    if args.inference_batch_size is None:
        args.inference_batch_size = args.batch_size

    save_argparse(args, os.path.join(args.log_dir, "input.yaml"), exclude=["conf"])

    return args

Función principal

In [175]:
import sys
sys.argv = sys.argv[:1]  # Esto elimina los argumentos adicionales de la celda de Jupyter

In [176]:
args = get_args()
pl.seed_everything(args.seed, workers=True)

Global seed set to 30


30

In [177]:
# initialize data module
data = DataModule(args)
data.prepare_data()
data.setup("fit")

train 110000, val 10000, test 10831


In [178]:
prior_models = create_prior_models(vars(args), data.dataset)
args.prior_args = [p.get_init_args() for p in prior_models]

In [179]:
# initialize lightning module
model = LNNP(args, prior_model=prior_models, mean=data.mean, std=data.std)

In [180]:
checkpoint_callback = ModelCheckpoint(
    dirpath=args.log_dir,
    monitor="val_loss",
    save_top_k=10,  # -1 to save all
    every_n_epochs=args.save_interval,
    filename="{epoch}-{val_loss:.4f}-{test_loss:.4f}",
)
early_stopping = EarlyStopping("val_loss", patience=args.early_stopping_patience)

In [181]:
csv_logger = CSVLogger(args.log_dir, name="", version="")
_logger = [csv_logger]
if args.wandb_use:
    wandb_logger = WandbLogger(
        project=args.wandb_project,
        name=args.wandb_name,
        save_dir=args.log_dir,
        resume="must" if args.wandb_resume_from_id is not None else None,
        id=args.wandb_resume_from_id,
    )
    _logger.append(wandb_logger)

In [182]:
if args.tensorboard_use:
    tb_logger = pl.loggers.TensorBoardLogger(
        args.log_dir, name="tensorbord", version="", default_hp_metric=False
    )
    _logger.append(tb_logger)

In [183]:
trainer = pl.Trainer(
#    strategy=DDPStrategy(find_unused_parameters=False),
#    strategy='dp',
    max_epochs=args.num_epochs,
    gpus=args.ngpus,
    num_nodes=args.num_nodes,
    default_root_dir=args.log_dir,
    auto_lr_find=False,
    resume_from_checkpoint=None if args.reset_trainer else args.load_model,
    callbacks=[early_stopping, checkpoint_callback],
    logger=_logger,
    precision=args.precision,
    gradient_clip_val=args.gradient_clipping,
)

GPU available: True, used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [184]:
trainer.fit(model, data)

train 110000, val 10000, test 10831


LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name  | Type        | Params
--------------------------------------
0 | model | TorchMD_Net | 6.9 M 
--------------------------------------
6.9 M     Trainable params
0         Non-trainable params
6.9 M     Total params
27.460    Total estimated model params size (MB)


Epoch 0:   0%|          | 5/1798 [00:03<21:16,  1.40it/s, loss=5.22e+03, v_num=]  

Epoch 0:   2%|▏         | 33/1798 [00:12<10:44,  2.74it/s, loss=5.19e+03, v_num=]

Exception ignored in: Exception ignored in: Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x7fb8f939f790><function _MultiProcessingDataLoaderIter.__del__ at 0x7fb8f939f790>
<function _MultiProcessingDataLoaderIter.__del__ at 0x7fb8f939f790>Traceback (most recent call last):
  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/site-packages/torch/utils/data/dataloader.py", line 1358, in __del__


Traceback (most recent call last):
    Traceback (most recent call last):
  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/site-packages/torch/utils/data/dataloader.py", line 1358, in __del__
self._shutdown_workers()      File "/root/mambaforge/envs/torchmd-net/lib/python3.9/site-packages/torch/utils/data/dataloader.py", line 1358, in __del__

self._shutdown_workers()  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/site-packages/torch/utils/data/dataloader.py", line 1341, in _shutdown_workers
    
self._shutdown_workers()  File "/root/mambaforge/envs

Epoch 0:   2%|▏         | 40/1798 [1:00:49<44:33:34, 91.25s/it, loss=5.17e+03, v_num=]
Epoch 0:   2%|▏         | 40/1798 [1:00:49<44:33:36, 91.25s/it, loss=5.17e+03, v_num=]
Epoch 0:   2%|▏         | 40/1798 [1:00:50<44:33:37, 91.25s/it, loss=5.17e+03, v_num=]


Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x7fb8f939f790>
Traceback (most recent call last):
  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/site-packages/torch/utils/data/dataloader.py", line 1358, in __del__
    Exception ignored in: Exception ignored in: self._shutdown_workers()<function _MultiProcessingDataLoaderIter.__del__ at 0x7fb8f939f790><function _MultiProcessingDataLoaderIter.__del__ at 0x7fb8f939f790>

Traceback (most recent call last):
  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/site-packages/torch/utils/data/dataloader.py", line 1341, in _shutdown_workers

      File "/root/mambaforge/envs/torchmd-net/lib/python3.9/site-packages/torch/utils/data/dataloader.py", line 1358, in __del__
Traceback (most recent call last):
if w.is_alive():    
  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/multiprocessing/process.py", line 160, in is_alive
self._shutdown_workers()      File "/root/mambaforge/envs/torchmd-net/lib/pytho

Epoch 0:   2%|▏         | 35/1798 [00:13<11:44,  2.50it/s, loss=5.21e+03, v_num=]

Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x7fb8f939f790>
Traceback (most recent call last):
  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/site-packages/torch/utils/data/dataloader.py", line 1358, in __del__
    self._shutdown_workers()
  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/site-packages/torch/utils/data/dataloader.py", line 1341, in _shutdown_workers
    if w.is_alive():
  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/multiprocessing/process.py", line 160, in is_alive
    assert self._parent_pid == os.getpid(), 'can only test a child process'
AssertionError: can only test a child process


Epoch 0:   2%|▏         | 40/1798 [1:00:51<44:34:50, 91.29s/it, loss=5.17e+03, v_num=]


Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x7fb8f939f790>
Traceback (most recent call last):
  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/site-packages/torch/utils/data/dataloader.py", line 1358, in __del__
    self._shutdown_workers()
  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/site-packages/torch/utils/data/dataloader.py", line 1341, in _shutdown_workers
    if w.is_alive():
  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/multiprocessing/process.py", line 160, in is_alive
    Exception ignored in: assert self._parent_pid == os.getpid(), 'can only test a child process'
AssertionError: can only test a child process
<function _MultiProcessingDataLoaderIter.__del__ at 0x7fb8f939f790>
Traceback (most recent call last):
  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/site-packages/torch/utils/data/dataloader.py", line 1358, in __del__
    self._shutdown_workers()
  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/s

Epoch 0:   2%|▏         | 40/1798 [1:00:52<44:35:10, 91.30s/it, loss=5.17e+03, v_num=]


Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x7fb8f939f790>
Traceback (most recent call last):
  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/site-packages/torch/utils/data/dataloader.py", line 1358, in __del__
    self._shutdown_workers()
  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/site-packages/torch/utils/data/dataloader.py", line 1341, in _shutdown_workers
    if w.is_alive():
  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/multiprocessing/process.py", line 160, in is_alive
    assert self._parent_pid == os.getpid(), 'can only test a child process'
AssertionError: can only test a child process


Epoch 0:   2%|▏         | 36/1798 [00:15<12:20,  2.38it/s, loss=5.21e+03, v_num=]

Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x7fb8f939f790>
Traceback (most recent call last):
  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/site-packages/torch/utils/data/dataloader.py", line 1358, in __del__
    self._shutdown_workers()
  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/site-packages/torch/utils/data/dataloader.py", line 1341, in _shutdown_workers
    if w.is_alive():
  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/multiprocessing/process.py", line 160, in is_alive
    assert self._parent_pid == os.getpid(), 'can only test a child process'
AssertionError: can only test a child process


Epoch 0:   2%|▏         | 40/1798 [1:00:52<44:35:43, 91.32s/it, loss=5.17e+03, v_num=]


Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x7fb8f939f790>
Traceback (most recent call last):
  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/site-packages/torch/utils/data/dataloader.py", line 1358, in __del__
    self._shutdown_workers()
  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/site-packages/torch/utils/data/dataloader.py", line 1341, in _shutdown_workers
    if w.is_alive():
  File "/root/mambaforge/envs/torchmd-net/lib/python3.9/multiprocessing/process.py", line 160, in is_alive
    
assert self._parent_pid == os.getpid(), 'can only test a child process'AssertionError: can only test a child process


Epoch 0:   2%|▏         | 40/1798 [1:00:57<44:39:00, 91.43s/it, loss=5.17e+03, v_num=]
Epoch 0:  17%|█▋        | 313/1798 [01:29<07:06,  3.49it/s, loss=165, v_num=]     

In [None]:
# run test set after completing the fit
model = LNNP.load_from_checkpoint(trainer.checkpoint_callback.best_model_path)
trainer = pl.Trainer(logger=_logger)
trainer.test(model, data)

Ejecución de la función principal

```python
main()
```