In [2]:
import math
from typing import Dict, Optional

import torch

from e3nn import o3
from e3nn.math import soft_one_hot_linspace
from e3nn.nn import ExtractIr, FullyConnectedNet, Gate
from e3nn.o3 import FullyConnectedTensorProduct, TensorProduct
from e3nn.util.jit import compile_mode
import torch
from torch import nn
from torch.nn import TransformerEncoder, TransformerEncoderLayer


In [3]:
def scatter(src: torch.Tensor, index: torch.Tensor, dim_size: int) -> torch.Tensor:
    # special case of torch_scatter.scatter with dim=0
    out = src.new_zeros(dim_size, src.shape[1])
    index = index.reshape(-1, 1).expand_as(src)
    return out.scatter_add_(0, index, src)


def radius_graph(pos, r_max, batch) -> torch.Tensor:
    # naive and inefficient version of torch_cluster.radius_graph
    r = torch.cdist(pos, pos)
    index = ((r < r_max) & (r > 0)).nonzero().T
    index = index[:, batch[index[0]] == batch[index[1]]]
    return index


@compile_mode("script")
class Convolution(torch.nn.Module):
    r"""equivariant convolution

    Parameters
    ----------
    irreps_in : `e3nn.o3.Irreps`
        representation of the input node features

    irreps_node_attr : `e3nn.o3.Irreps`
        representation of the node attributes

    irreps_edge_attr : `e3nn.o3.Irreps`
        representation of the edge attributes

    irreps_out : `e3nn.o3.Irreps` or None
        representation of the output node features

    number_of_edge_features : int
        number of scalar (0e) features of the edge used to feed the FC network

    radial_layers : int
        number of hidden layers in the radial fully connected network

    radial_neurons : int
        number of neurons in the hidden layers of the radial fully connected network

    num_neighbors : float
        typical number of nodes convolved over
    """

    def __init__(
        self,
        irreps_in: o3.Irreps,
        irreps_node_attr: o3.Irreps,
        irreps_edge_attr: o3.Irreps,
        irreps_out: Optional[o3.Irreps],
        number_of_edge_features: int,
        radial_layers: int,
        radial_neurons: int,
        num_neighbors: float,
    ) -> None:
        super().__init__()
        self.irreps_in = o3.Irreps(irreps_in)
        self.irreps_node_attr = o3.Irreps(irreps_node_attr)
        self.irreps_edge_attr = o3.Irreps(irreps_edge_attr)
        self.irreps_out = o3.Irreps(irreps_out)
        self.num_neighbors = num_neighbors

        self.sc = FullyConnectedTensorProduct(self.irreps_in, self.irreps_node_attr, self.irreps_out)

        self.lin1 = FullyConnectedTensorProduct(self.irreps_in, self.irreps_node_attr, self.irreps_in)

        irreps_mid = []
        instructions = []
        for i, (mul, ir_in) in enumerate(self.irreps_in):
            for j, (_, ir_edge) in enumerate(self.irreps_edge_attr):
                for ir_out in ir_in * ir_edge:
                    if ir_out in self.irreps_out:
                        k = len(irreps_mid)
                        irreps_mid.append((mul, ir_out))
                        instructions.append((i, j, k, "uvu", True))
        irreps_mid = o3.Irreps(irreps_mid)
        irreps_mid, p, _ = irreps_mid.sort()

        instructions = [(i_1, i_2, p[i_out], mode, train) for i_1, i_2, i_out, mode, train in instructions]

        tp = TensorProduct(
            self.irreps_in,
            self.irreps_edge_attr,
            irreps_mid,
            instructions,
            internal_weights=False,
            shared_weights=False,
        )
        self.fc = FullyConnectedNet(
            [number_of_edge_features] + radial_layers * [radial_neurons] + [tp.weight_numel], torch.nn.functional.silu
        )
        self.tp = tp

        self.lin2 = FullyConnectedTensorProduct(irreps_mid, self.irreps_node_attr, self.irreps_out)

    def forward(self, node_input, node_attr, edge_src, edge_dst, edge_attr, edge_features) -> torch.Tensor:
        weight = self.fc(edge_features)

        x = node_input

        s = self.sc(x, node_attr)
        x = self.lin1(x, node_attr)

        edge_features = self.tp(x[edge_src], edge_attr, weight)
        x = scatter(edge_features, edge_dst, dim_size=x.shape[0]).div(self.num_neighbors**0.5)

        x = self.lin2(x, node_attr)

        c_s, c_x = math.sin(math.pi / 8), math.cos(math.pi / 8)
        m = self.sc.output_mask
        c_x = (1 - m) + c_x * m
        return c_s * s + c_x * x


def smooth_cutoff(x):
    u = 2 * (x - 1)
    y = (math.pi * u).cos().neg().add(1).div(2)
    y[u > 0] = 0
    y[u < -1] = 1
    return y


def tp_path_exists(irreps_in1, irreps_in2, ir_out) -> bool:
    irreps_in1 = o3.Irreps(irreps_in1).simplify()
    irreps_in2 = o3.Irreps(irreps_in2).simplify()
    ir_out = o3.Irrep(ir_out)

    for _, ir1 in irreps_in1:
        for _, ir2 in irreps_in2:
            if ir_out in ir1 * ir2:
                return True
    return False


class Compose(torch.nn.Module):
    def __init__(self, first, second) -> None:
        super().__init__()
        self.first = first
        self.second = second
        self.irreps_in = self.first.irreps_in
        self.irreps_out = self.second.irreps_out

    def forward(self, *input):
        x = self.first(*input)
        return self.second(x)


class Network(torch.nn.Module):
    r"""equivariant neural network

    Parameters
    ----------
    irreps_in : `e3nn.o3.Irreps` or None
        representation of the input features
        can be set to ``None`` if nodes don't have input features

    irreps_hidden : `e3nn.o3.Irreps`
        representation of the hidden features

    irreps_out : `e3nn.o3.Irreps`
        representation of the output features

    irreps_node_attr : `e3nn.o3.Irreps` or None
        representation of the nodes attributes
        can be set to ``None`` if nodes don't have attributes

    irreps_edge_attr : `e3nn.o3.Irreps`
        representation of the edge attributes
        the edge attributes are :math:`h(r) Y(\vec r / r)`
        where :math:`h` is a smooth function that goes to zero at ``max_radius``
        and :math:`Y` are the spherical harmonics polynomials

    layers : int
        number of gates (non linearities)

    max_radius : float
        maximum radius for the convolution

    number_of_basis : int
        number of basis on which the edge length are projected

    radial_layers : int
        number of hidden layers in the radial fully connected network

    radial_neurons : int
        number of neurons in the hidden layers of the radial fully connected network

    num_neighbors : float
        typical number of nodes at a distance ``max_radius``

    num_nodes : float
        typical number of nodes in a graph
    """

    def __init__(
        self,
        ntoken,
        d_model,
        nhead,
        d_hid,
        nlayers,
        dropout,

        irreps_in: o3.Irreps,
        irreps_hidden: o3.Irreps,
        irreps_out: o3.Irreps,
        irreps_node_attr: o3.Irreps,
        irreps_edge_attr: o3.Irreps,
        layers: int,
        max_radius: float,
        number_of_basis: int,
        radial_layers: int,
        radial_neurons: int,
        num_neighbors: float,
        num_nodes: float,
        reduce_output: bool = True,
    ) -> None:
        super().__init__()
        self.max_radius = max_radius
        self.number_of_basis = number_of_basis
        self.num_neighbors = num_neighbors
        self.num_nodes = num_nodes
        self.reduce_output = reduce_output

        self.irreps_in = o3.Irreps(irreps_in) if irreps_in is not None else None
        self.irreps_hidden = o3.Irreps(irreps_hidden)
        self.irreps_out = o3.Irreps(irreps_out)
        self.irreps_node_attr = o3.Irreps(irreps_node_attr) if irreps_node_attr is not None else o3.Irreps("0e")
        self.irreps_edge_attr = o3.Irreps(irreps_edge_attr)

        self.input_has_node_in = irreps_in is not None
        self.input_has_node_attr = irreps_node_attr is not None

        self.ext_z = ExtractIr(self.irreps_node_attr, "0e")
        number_of_edge_features = number_of_basis + 2 * self.irreps_node_attr.count("0e")

        irreps = self.irreps_in if self.irreps_in is not None else o3.Irreps("0e")

        act = {
            1: torch.nn.functional.silu,
            -1: torch.tanh,
        }
        act_gates = {
            1: torch.sigmoid,
            -1: torch.tanh,
        }

        self.layers = torch.nn.ModuleList()

        for _ in range(layers):
            irreps_scalars = o3.Irreps(
                [
                    (mul, ir)
                    for mul, ir in self.irreps_hidden
                    if ir.l == 0 and tp_path_exists(irreps, self.irreps_edge_attr, ir)
                ]
            )
            irreps_gated = o3.Irreps(
                [(mul, ir) for mul, ir in self.irreps_hidden if ir.l > 0 and tp_path_exists(irreps, self.irreps_edge_attr, ir)]
            )
            ir = "0e" if tp_path_exists(irreps, self.irreps_edge_attr, "0e") else "0o"
            irreps_gates = o3.Irreps([(mul, ir) for mul, _ in irreps_gated])

            gate = Gate(
                irreps_scalars,
                [act[ir.p] for _, ir in irreps_scalars],  # scalar
                irreps_gates,
                [act_gates[ir.p] for _, ir in irreps_gates],  # gates (scalars)
                irreps_gated,  # gated tensors
            )
            conv = Convolution(
                irreps,
                self.irreps_node_attr,
                self.irreps_edge_attr,
                gate.irreps_in,
                number_of_edge_features,
                radial_layers,
                radial_neurons,
                num_neighbors,
            )
            irreps = gate.irreps_out
            self.layers.append(Compose(conv, gate))

        self.layers.append(
            Convolution(
                irreps,
                self.irreps_node_attr,
                self.irreps_edge_attr,
                self.irreps_out,
                number_of_edge_features,
                radial_layers,
                radial_neurons,
                num_neighbors,
            )
        )
        self.transformer = TransformerModel(ntoken, d_model, nhead, d_hid, nlayers, dropout)

    def forward(self, data: Dict[str, torch.Tensor],src_mask) -> torch.Tensor:
        """evaluate the network

        Parameters
        ----------
        data : `torch_geometric.data.Data` or dict
            data object containing
            - ``pos`` the position of the nodes (atoms)
            - ``x`` the input features of the nodes, optional
            - ``z`` the attributes of the nodes, for instance the atom type, optional
            - ``batch`` the graph to which the node belong, optional
        """
        if "batch" in data:
            batch = data["batch"]
        else:
            batch = data["pos"].new_zeros(data["pos"].shape[0], dtype=torch.long)

        edge_index = radius_graph(data["pos"], self.max_radius, batch)
        edge_src = edge_index[0]
        edge_dst = edge_index[1]
        edge_vec = data["pos"][edge_src] - data["pos"][edge_dst]
        edge_sh = o3.spherical_harmonics(self.irreps_edge_attr, edge_vec, True, normalization="component")
        edge_length = edge_vec.norm(dim=1)
        edge_length_embedded = soft_one_hot_linspace(
            x=edge_length, start=0.0, end=self.max_radius, number=self.number_of_basis, basis="gaussian", cutoff=False
        ).mul(self.number_of_basis**0.5)
        edge_attr = smooth_cutoff(edge_length / self.max_radius)[:, None] * edge_sh

        if self.input_has_node_in and "x" in data:
            assert self.irreps_in is not None
            x = data["x"]
        else:
            assert self.irreps_in is None
            x = data["pos"].new_ones((data["pos"].shape[0], 1))

        if self.input_has_node_attr and "z" in data:
            z = data["z"]
        else:
            assert self.irreps_node_attr == o3.Irreps("0e")
            z = data["pos"].new_ones((data["pos"].shape[0], 1))

        scalar_z = self.ext_z(z)
        edge_features = torch.cat([edge_length_embedded, scalar_z[edge_src], scalar_z[edge_dst]], dim=1)

        for lay in self.layers:
            x = lay(x, z, edge_src, edge_dst, edge_attr, edge_features)
        x = x.long()
        x = self.transformer(x, src_mask)
        x = x.float()
        if self.reduce_output:
            return scatter(x, batch, dim_size=int(batch.max()) + 1).div(self.num_nodes**0.5)
        else:
            return x





In [4]:
class TransformerModel(nn.Module):
    def __init__(self, ntoken, d_model, nhead, d_hid, nlayers, dropout=0.5):
        super(TransformerModel, self).__init__()
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = TransformerEncoderLayer(d_model, nhead, d_hid, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        self.encoder = nn.Embedding(ntoken, d_model)
        self.d_model = d_model
        self.decoder = nn.Linear(d_model, ntoken)

        self.init_weights()

    def generate_square_subsequent_mask(self, sz):
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask

    def init_weights(self):
        initrange = 0.1
        self.encoder.weight.data.uniform_(-initrange, initrange)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self, src, src_mask):
        src = self.encoder(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src, src_mask)
        output = self.decoder(output)
        return output

# 辅助的位置编码类
'''
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)
'''
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model // 2) * -(math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

In [16]:
import torch
from torch_geometric.data import Data
import numpy as np


def generate_random_graph_data(num_nodes, max_radius, num_features):
    # 生成随机三维坐标
    positions = torch.randn(num_nodes, 3)

    # 计算所有节点对之间的距离
    distance_matrix = torch.cdist(positions, positions)

    # 确定邻居（即距离小于最大半径的节点）
    edge_index = (distance_matrix < max_radius) & (distance_matrix > 0)  # 排除自环
    edge_index = edge_index.nonzero(as_tuple=False).t()

    # 生成每个节点的特征
    x = torch.randn(num_nodes, num_features)

    # 创建 PyTorch Geometric Data 对象
    data = Data(x=x, edge_index=edge_index, pos=positions)
    return data


# 设定参数
'''num_nodes = 100  # 节点数
max_radius = 3.0  # 最大半径
num_features = 5  # 特征维度

# 生成图数据
graph_data = generate_random_graph_data(num_nodes, max_radius, num_features)

# 显示数据信息
print("Graph data:", graph_data)
print("Number of nodes:", graph_data.num_nodes)
print("Number of edges:", graph_data.num_edges)
print("Node features shape:", graph_data.x.shape)
print("Edge indices shape:", graph_data.edge_index.shape)
print("Positions shape:", graph_data.pos.shape)
'''
# 接下来就可以将这个 graph_data 用于模型的训练了
from torch_geometric.data import DataLoader

def generate_graph_dataset(num_graphs, num_nodes, max_radius, num_features):
    dataset = []
    for _ in range(num_graphs):
        data = generate_random_graph_data(num_nodes, max_radius, num_features)
        dataset.append(data)
    return dataset

# 生成图数据集
#num_graphs = 50  # 生成50张图
#graph_dataset = generate_graph_dataset(num_graphs, num_nodes, max_radius, num_features)

# 使用 DataLoader 加载数据集
#graph_loader = DataLoader(graph_dataset, batch_size=10, shuffle=True)

# 示例：遍历数据加载器
'''
for batch in graph_loader:
    print("Batch size:", batch.num_graphs)
    print("Number of nodes in batch:", batch.num_nodes)
    print("Number of edges in batch:", batch.num_edges)
    # 此处可以调用模型进行训练步骤
'''


'\nfor batch in graph_loader:\n    print("Batch size:", batch.num_graphs)\n    print("Number of nodes in batch:", batch.num_nodes)\n    print("Number of edges in batch:", batch.num_edges)\n    # 此处可以调用模型进行训练步骤\n'

In [18]:
# 生成数据
from torch import nn, optim
num_features = 5  # 每个节点的特征维度
d_model = 5  # Transformer中的特征维度
nhead = 5  # 多头注意力中的头数
d_hid = 256  # 前馈网络中的隐藏单元数量
nlayers = 3  # Transformer堆栈中的层数
dropout = 0.1  # Dropout比率
max_radius = 3.0  # 最大半径定义邻居
number_of_basis = 10  # 基函数数量
radial_layers = 2  # 径向网络层数
radial_neurons = 64  # 每个径向层的神经元数
num_neighbors = 6  # 平均邻居数
num_nodes = 100  # 节点数，用于归一化
num_graphs = 40
dataloader = generate_graph_dataset(num_graphs, num_nodes, max_radius, num_features)

# 转换为数据加载器
#dataset = TensorDataset(features, features)  # 假设我们在一个回归或自编码任务中使用特征作为目标
#dataloader = DataLoader(dataset, batch_size=10, shuffle=True)

# 定义优化器和损失函数
#optimizer = optim.Adam(model.parameters(), lr=0.001)
loss_func = nn.MSELoss()

In [20]:
dataloader

[Data(x=[100, 5], edge_index=[2, 7185], pos=[100, 3]),
 Data(x=[100, 5], edge_index=[2, 7566], pos=[100, 3]),
 Data(x=[100, 5], edge_index=[2, 8184], pos=[100, 3]),
 Data(x=[100, 5], edge_index=[2, 7977], pos=[100, 3]),
 Data(x=[100, 5], edge_index=[2, 7204], pos=[100, 3]),
 Data(x=[100, 5], edge_index=[2, 7368], pos=[100, 3]),
 Data(x=[100, 5], edge_index=[2, 7994], pos=[100, 3]),
 Data(x=[100, 5], edge_index=[2, 8068], pos=[100, 3]),
 Data(x=[100, 5], edge_index=[2, 8214], pos=[100, 3]),
 Data(x=[100, 5], edge_index=[2, 7777], pos=[100, 3]),
 Data(x=[100, 5], edge_index=[2, 7703], pos=[100, 3]),
 Data(x=[100, 5], edge_index=[2, 7988], pos=[100, 3]),
 Data(x=[100, 5], edge_index=[2, 8556], pos=[100, 3]),
 Data(x=[100, 5], edge_index=[2, 8187], pos=[100, 3]),
 Data(x=[100, 5], edge_index=[2, 7402], pos=[100, 3]),
 Data(x=[100, 5], edge_index=[2, 7581], pos=[100, 3]),
 Data(x=[100, 5], edge_index=[2, 7763], pos=[100, 3]),
 Data(x=[100, 5], edge_index=[2, 7693], pos=[100, 3]),
 Data(x=[1

In [8]:
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, TensorDataset

from e3nn import o3
# 模型参数
num_features = 5  # 每个节点的特征维度
d_model = 5  # Transformer中的特征维度
nhead = 8  # 多头注意力中的头数
d_hid = 256  # 前馈网络中的隐藏单元数量
nlayers = 3  # Transformer堆栈中的层数
dropout = 0.1  # Dropout比率
max_radius = 3.0  # 最大半径定义邻居
number_of_basis = 10  # 基函数数量
radial_layers = 2  # 径向网络层数
radial_neurons = 64  # 每个径向层的神经元数
num_neighbors = 6  # 平均邻居数
num_nodes = 100  # 节点数，用于归一化
num_graphs = 40
# 初始化模型
model = Network(

    ntoken=num_features,
    d_model=d_model,
    nhead=nhead,
    d_hid=d_hid,
    nlayers=nlayers,
    dropout=dropout,

    irreps_in=o3.Irreps("5x0e"),
    irreps_hidden=o3.Irreps("10x0e + 5x1o"),
    irreps_out=o3.Irreps("5x0e"),
    irreps_node_attr=None,
    irreps_edge_attr=o3.Irreps.spherical_harmonics(3),
    layers=3,
    max_radius=max_radius,
    number_of_basis=number_of_basis,
    radial_layers=radial_layers,
    radial_neurons=radial_neurons,
    num_neighbors=num_neighbors,
    num_nodes=num_nodes,
    reduce_output=False
)

# 生成数据
dataloader = generate_graph_dataset(num_graphs, num_nodes, max_radius, num_features)

# 转换为数据加载器
#dataset = TensorDataset(features, features)  # 假设我们在一个回归或自编码任务中使用特征作为目标
#dataloader = DataLoader(dataset, batch_size=10, shuffle=True)

# 定义优化器和损失函数
optimizer = optim.Adam(model.parameters(), lr=0.001)
loss_func = nn.MSELoss()



RuntimeError: The expanded size of the tensor (3) must match the existing size (2) at non-singleton dimension 1.  Target sizes: [5000, 3].  Tensor sizes: [5000, 2]

In [26]:
class Network1(torch.nn.Module):
    r"""equivariant neural network

    Parameters
    ----------
    irreps_in : `e3nn.o3.Irreps` or None
        representation of the input features
        can be set to ``None`` if nodes don't have input features

    irreps_hidden : `e3nn.o3.Irreps`
        representation of the hidden features

    irreps_out : `e3nn.o3.Irreps`
        representation of the output features

    irreps_node_attr : `e3nn.o3.Irreps` or None
        representation of the nodes attributes
        can be set to ``None`` if nodes don't have attributes

    irreps_edge_attr : `e3nn.o3.Irreps`
        representation of the edge attributes
        the edge attributes are :math:`h(r) Y(\vec r / r)`
        where :math:`h` is a smooth function that goes to zero at ``max_radius``
        and :math:`Y` are the spherical harmonics polynomials

    layers : int
        number of gates (non linearities)

    max_radius : float
        maximum radius for the convolution

    number_of_basis : int
        number of basis on which the edge length are projected

    radial_layers : int
        number of hidden layers in the radial fully connected network

    radial_neurons : int
        number of neurons in the hidden layers of the radial fully connected network

    num_neighbors : float
        typical number of nodes at a distance ``max_radius``

    num_nodes : float
        typical number of nodes in a graph
    """

    def __init__(
        self,
        

        irreps_in: o3.Irreps,
        irreps_hidden: o3.Irreps,
        irreps_out: o3.Irreps,
        irreps_node_attr: o3.Irreps,
        irreps_edge_attr: o3.Irreps,
        layers: int,
        max_radius: float,
        number_of_basis: int,
        radial_layers: int,
        radial_neurons: int,
        num_neighbors: float,
        num_nodes: float,
        reduce_output: bool = True,
    ) -> None:
        super().__init__()
        self.max_radius = max_radius
        self.number_of_basis = number_of_basis
        self.num_neighbors = num_neighbors
        self.num_nodes = num_nodes
        self.reduce_output = reduce_output

        self.irreps_in = o3.Irreps(irreps_in) if irreps_in is not None else None
        self.irreps_hidden = o3.Irreps(irreps_hidden)
        self.irreps_out = o3.Irreps(irreps_out)
        self.irreps_node_attr = o3.Irreps(irreps_node_attr) if irreps_node_attr is not None else o3.Irreps("0e")
        self.irreps_edge_attr = o3.Irreps(irreps_edge_attr)

        self.input_has_node_in = irreps_in is not None
        self.input_has_node_attr = irreps_node_attr is not None

        self.ext_z = ExtractIr(self.irreps_node_attr, "0e")
        number_of_edge_features = number_of_basis + 2 * self.irreps_node_attr.count("0e")

        irreps = self.irreps_in if self.irreps_in is not None else o3.Irreps("0e")

        act = {
            1: torch.nn.functional.silu,
            -1: torch.tanh,
        }
        act_gates = {
            1: torch.sigmoid,
            -1: torch.tanh,
        }

        self.layers = torch.nn.ModuleList()

        for _ in range(layers):
            irreps_scalars = o3.Irreps(
                [
                    (mul, ir)
                    for mul, ir in self.irreps_hidden
                    if ir.l == 0 and tp_path_exists(irreps, self.irreps_edge_attr, ir)
                ]
            )
            irreps_gated = o3.Irreps(
                [(mul, ir) for mul, ir in self.irreps_hidden if ir.l > 0 and tp_path_exists(irreps, self.irreps_edge_attr, ir)]
            )
            ir = "0e" if tp_path_exists(irreps, self.irreps_edge_attr, "0e") else "0o"
            irreps_gates = o3.Irreps([(mul, ir) for mul, _ in irreps_gated])

            gate = Gate(
                irreps_scalars,
                [act[ir.p] for _, ir in irreps_scalars],  # scalar
                irreps_gates,
                [act_gates[ir.p] for _, ir in irreps_gates],  # gates (scalars)
                irreps_gated,  # gated tensors
            )
            conv = Convolution(
                irreps,
                self.irreps_node_attr,
                self.irreps_edge_attr,
                gate.irreps_in,
                number_of_edge_features,
                radial_layers,
                radial_neurons,
                num_neighbors,
            )
            irreps = gate.irreps_out
            self.layers.append(Compose(conv, gate))

        self.layers.append(
            Convolution(
                irreps,
                self.irreps_node_attr,
                self.irreps_edge_attr,
                self.irreps_out,
                number_of_edge_features,
                radial_layers,
                radial_neurons,
                num_neighbors,
            )
        )
        #self.transformer = TransformerModel(ntoken, d_model, nhead, d_hid, nlayers, dropout)

    def forward(self, data: Dict[str, torch.Tensor]) -> torch.Tensor:
        """evaluate the network

        Parameters
        ----------
        data : `torch_geometric.data.Data` or dict
            data object containing
            - ``pos`` the position of the nodes (atoms)
            - ``x`` the input features of the nodes, optional
            - ``z`` the attributes of the nodes, for instance the atom type, optional
            - ``batch`` the graph to which the node belong, optional
        """
        if "batch" in data:
            batch = data["batch"]
        else:
            batch = data["pos"].new_zeros(data["pos"].shape[0], dtype=torch.long)

        edge_index = radius_graph(data["pos"], self.max_radius, batch)
        edge_src = edge_index[0]
        edge_dst = edge_index[1]
        edge_vec = data["pos"][edge_src] - data["pos"][edge_dst]
        edge_sh = o3.spherical_harmonics(self.irreps_edge_attr, edge_vec, True, normalization="component")
        edge_length = edge_vec.norm(dim=1)
        edge_length_embedded = soft_one_hot_linspace(
            x=edge_length, start=0.0, end=self.max_radius, number=self.number_of_basis, basis="gaussian", cutoff=False
        ).mul(self.number_of_basis**0.5)
        edge_attr = smooth_cutoff(edge_length / self.max_radius)[:, None] * edge_sh

        if self.input_has_node_in and "x" in data:
            assert self.irreps_in is not None
            x = data["x"]
        else:
            assert self.irreps_in is None
            x = data["pos"].new_ones((data["pos"].shape[0], 1))

        if self.input_has_node_attr and "z" in data:
            z = data["z"]
        else:
            assert self.irreps_node_attr == o3.Irreps("0e")
            z = data["pos"].new_ones((data["pos"].shape[0], 1))

        scalar_z = self.ext_z(z)
        edge_features = torch.cat([edge_length_embedded, scalar_z[edge_src], scalar_z[edge_dst]], dim=1)

        for lay in self.layers:
            x = lay(x, z, edge_src, edge_dst, edge_attr, edge_features)
        
        if self.reduce_output:
            return scatter(x, batch, dim_size=int(batch.max()) + 1).div(self.num_nodes**0.5)
        else:
            return x

In [83]:
num_features = 5  # 每个节点的特征维度
d_model = 5  # Transformer中的特征维度
nhead = 5  # 多头注意力中的头数
d_hid = 256  # 前馈网络中的隐藏单元数量
nlayers = 3  # Transformer堆栈中的层数
dropout = 0.1  # Dropout比率
max_radius = 3.0  # 最大半径定义邻居
number_of_basis = 10  # 基函数数量
radial_layers = 2  # 径向网络层数
radial_neurons = 64  # 每个径向层的神经元数
num_neighbors = 6  # 平均邻居数
num_nodes = 100  # 节点数，用于归一化
num_graphs = 40
# 初始化模型
model = Network1(

    
    irreps_in=o3.Irreps("1x1o+2x0e"),
    irreps_hidden=o3.Irreps("10x0e + 5x1o"),
    irreps_out=o3.Irreps("1x1o+2x0e"),
    irreps_node_attr=None,
    irreps_edge_attr=o3.Irreps.spherical_harmonics(3),
    layers=3,
    max_radius=max_radius,
    number_of_basis=number_of_basis,
    radial_layers=radial_layers,
    radial_neurons=radial_neurons,
    num_neighbors=num_neighbors,
    num_nodes=num_nodes,
    reduce_output=False
)



In [84]:
optimizer = optim.Adam(model.parameters(), lr=0.001)
loss_func = nn.MSELoss()

In [92]:
def train_model(model, dataloader, optimizer, loss_func, epochs=10):
    model.train()  # 设置模型为训练模式
    for epoch in range(epochs):
        total_loss = 0
        for i in range(0, len(dataloader)):
            optimizer.zero_grad()  # 梯度清零

            # 数据解包，假设 batch 是一个 Data 对象，我们需要将节点特征和边索引提取出来
            # 以下代码需要根据你的数据加载方式做相应的调整
            x=dataloader[i]

            # 生成 src_mask，这里的 src_mask 生成需要与数据的具体形状相匹配
            # 假设每个图的节点数相同，可以这样生成 mask
            


            # 执行模型前向计算
            output = model(x)  # 注意：这里假设模型的 forward 已经适配这些输入

            # 计算损失，假设我们的目标也是 x
            loss = loss_func(output, x)
            loss.backward()  # 反向传播
            optimizer.step()  # 优化器更新参数

            total_loss += loss.item() * x.size(0)  # 累计损失

        average_loss = total_loss / len(dataloader)
        print(f'Epoch {epoch + 1}, Loss: {average_loss:.4f}')


In [85]:
dataloader[1].x.size(-1)
x=dataloader[1]
output = model(x)
type(output)


torch.Tensor

In [86]:
output.shape

torch.Size([100, 5])

In [87]:
import torch
from torch import nn
from torch.nn import TransformerEncoder, TransformerEncoderLayer

class GraphTransformer(nn.Module):
    def __init__(self, ntoken, d_model, nhead, nhid, nlayers, dropout=0.5):
        super(GraphTransformer, self).__init__()
        self.node_encoder = nn.Linear(ntoken, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        transformer_layer = TransformerEncoderLayer(d_model, nhead, nhid, dropout)
        self.transformer_encoder = TransformerEncoder(transformer_layer, nlayers)
        self.decoder = nn.Linear(d_model, ntoken)
        self.d_model = d_model

    def forward(self, x, src_mask=None):
        x = self.node_encoder(x) * math.sqrt(self.d_model)
        x = self.pos_encoder(x)
        output = self.transformer_encoder(x, src_mask)
        output = self.decoder(output)
        return output

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout, max_len=500):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

# 实例化模型
modelt = GraphTransformer(ntoken=5, d_model=100, nhead=2, nhid=256, nlayers=3, dropout=0.1)




In [88]:
output1=modelt(output)

In [89]:
type(output1)

torch.Tensor

In [93]:
train_model(model, dataloader, optimizer, loss_func, epochs=5)

  return F.mse_loss(input, target, reduction=self.reduction)


TypeError: expected Tensor as element 1 in argument 0, but got Data

In [95]:
len(dataloader)

40

In [98]:
train_size = int(len(dataloader) * 0.7)
test_size = int(len(dataloader) * 0.3)

In [99]:
train_dataset, test_dataset = torch.utils.data.random_split(dataloader, [train_size, test_size])


In [137]:
len(train_dataset.dataset)

40

In [100]:
type(train_dataset)

torch.utils.data.dataset.Subset

In [101]:
from torch.utils.data import Dataset, DataLoader

In [102]:
dataloader1 = DataLoader(train_dataset, batch_size=64, shuffle=True)

In [103]:
dataloader2 = DataLoader(test_dataset, batch_size=64, shuffle=True)

In [104]:
type(dataloader1)

torch.utils.data.dataloader.DataLoader

In [127]:
dataloader1.dataset[28]

IndexError: list index out of range

In [108]:
dataloader[1]

Data(x=[100, 5], edge_index=[2, 7566], pos=[100, 3])

In [134]:
def train_model(model, dataloader, optimizer, loss_func, epochs=10):
    train = DataLoader(train_dataset, batch_size=64, shuffle=True)
    test = DataLoader(test_dataset, batch_size=64, shuffle=True)
    model.train()  # 设置模型为训练模式
    for epoch in range(epochs):
        total_loss = 0
        for i in range(0, 27):
            optimizer.zero_grad()  # 梯度清零

            # 数据解包，假设 batch 是一个 Data 对象，我们需要将节点特征和边索引提取出来
            # 以下代码需要根据你的数据加载方式做相应的调整
            x=train.dataset[i]
            y=train.dataset[i].x
            # 生成 src_mask，这里的 src_mask 生成需要与数据的具体形状相匹配
            # 假设每个图的节点数相同，可以这样生成 mask
            


            # 执行模型前向计算
            output = model(x)  # 注意：这里假设模型的 forward 已经适配这些输入
            outputq = modelt(output)
            # 计算损失，假设我们的目标也是 x
            loss = loss_func(outputq, y)
            loss.backward()  # 反向传播
            optimizer.step()  # 优化器更新参数

            total_loss += loss.item() * x.size(0)  # 累计损失

        average_loss = total_loss / len(dataloader)
        print(f'Epoch {epoch + 1}, Loss: {average_loss:.4f}')

In [135]:
train_model(model, dataloader, optimizer, loss_func, epochs=5)

Epoch 1, Loss: 86.3561
Epoch 2, Loss: 86.5288
Epoch 3, Loss: 87.3641
Epoch 4, Loss: 86.9321
Epoch 5, Loss: 87.2541


In [None]:
'''
Epoch 1, Loss: 33210945.8789
Epoch 2, Loss: 15571050.4590
Epoch 3, Loss: 11033990.7227
Epoch 4, Loss: 8943594.9023
Epoch 5, Loss: 7526531.2695
'''

In [141]:
for data in dataloader1:
    type(data)

TypeError: default_collate: batch must contain tensors, numpy arrays, numbers, dicts or lists; found <class 'torch_geometric.data.data.Data'>

In [143]:
from torch_geometric.data import DataLoader


In [144]:
dataloader1 = DataLoader(train_dataset, batch_size=64, shuffle=True)




In [146]:
for data in dataloader1:
    print(type(data))

<class 'torch_geometric.data.batch.DataBatch'>


In [147]:
from torch_geometric.data import Dataset, Data
import torch

class TemporalGraphDataset(Dataset):
    def __init__(self, time_series_data):
        super(TemporalGraphDataset, self).__init__()
        self.time_series_data = time_series_data
    
    def len(self):
        return len(self.time_series_data[0].x)  # 假设所有时间步长都有相同数量的图

    def get(self, idx):
        # 提取所有时间步中索引为 idx 的图
        return [Data(x=data.x[idx], edge_index=data.edge_index, edge_attr=data.edge_attr if hasattr(data, 'edge_attr') else None) for data in self.time_series_data]

# 模拟一些数据
time_steps = 10
num_nodes = 5
num_features = 3
graphs = []
for _ in range(time_steps):
    x = torch.randn(num_nodes, num_features)  # Node features
    edge_index = torch.randint(0, num_nodes, (2, num_nodes * 2))  # Edges
    graph = Data(x=x, edge_index=edge_index)
    graphs.append(graph)


In [152]:
from torch_geometric.loader import DataLoader

# 创建 Dataset
dataset = graphs

# 创建 DataLoader
loader = DataLoader(dataset, batch_size=1, follow_batch=['x'])

# 在训练循环中使用 DataLoader
for step, batch_graphs in enumerate(loader):
    # `batch_graphs` 将是一个列表，其中包含批次中所有时间步的图数据
    # 你可以在这里将它们输入到 LSTM 或其他适合处理序列数据的模型
    pass


In [153]:
type(loader)

torch_geometric.loader.dataloader.DataLoader

In [155]:
loader.dataset

[Data(x=[5, 3], edge_index=[2, 10]),
 Data(x=[5, 3], edge_index=[2, 10]),
 Data(x=[5, 3], edge_index=[2, 10]),
 Data(x=[5, 3], edge_index=[2, 10]),
 Data(x=[5, 3], edge_index=[2, 10]),
 Data(x=[5, 3], edge_index=[2, 10]),
 Data(x=[5, 3], edge_index=[2, 10]),
 Data(x=[5, 3], edge_index=[2, 10]),
 Data(x=[5, 3], edge_index=[2, 10]),
 Data(x=[5, 3], edge_index=[2, 10])]