对于配方生产网络，有m个item、n个formula两种类型节点，同种节点不互相连接。设节点的权重为$\alpha$和$\beta$，代表物资和配方的单位数量，边的权重为$W_{m\times n}$（代表每种配方每单位消耗产出物资数量）。item中有一些特殊的节点为源节点$s=\alpha_{m\times 1} \odot Ms_{m \times 1}$，其中$Ms_{m \times 1}$为标示源节点的掩码向量。源节点的需求不应该由formula产出，但是要累计计算资源需求。现需要对一些目标物资的生产路线建模，优化源节点的物资需求。

可以设想一种传播途径，从某些item节点开始，初始化权重为向量$\alpha_{m\times 1}$。假设$item_a$权重为$\alpha_a$连接n个formula节点，权重为向量$w_{a|n}$，由于每个item可以由不同的formula输出，所以配置可训练向量$a_{a|n}$映射到概率向量$b_{a|n}=softmax(a_{a|n})$来表示每个item的产出需求由不同的formula输出提供的比例，满足$b_{ak}*\alpha_a=ReLU(w_{ak})*\beta_k$，即$\beta_k=\alpha_{ak} \cdot \frac{b_{ak}}{w_{ak}},k\in [1,n]$，同时由于每个formula有多个item产出，formula的实际需求权重应该由产出item中对formula需求最高的项决定，设$B_{m,n}=[b_1,b_2,...,b_m]$，有$\beta_k=max(\alpha \odot \frac{B_{:,k}}{W_{:,k}})$

那么推广到向量$\beta$，有
$$\beta_{n\times 1}= maxcol(\alpha_{m\times 1} \odot \frac{softmax(A_{m\times n}, dim=1)}{ReLU(W_{m\times n})}).$$

但是会有一个问题，即$W_{m\times n}$的元素可能小于等于0，对应的元素是无效的，不应该纳入计算，所以应该引入一个掩码和epsilon，即
$$M_{m\times n}=\mathbb{I}(W_{m\times n}>0) \in \{0,1\}^{m\times n}$$
$$\beta_{n\times 1}= maxcol(\alpha_{m\times 1} \odot \frac{softmax(A_{m\times n}+ (1−M_{m×n})\odot (-1/epsilon), dim=1)}{max(W_{m\times n}, epsilon)}\odot M_{m\times n}).$$
在实际运算中为了防止溢出，这里使用$-1/epsilon$代替$-\inf$，epsilon取一个极小数，比如1e-8。


以上网络进一步传播，由$\Delta\alpha_{m \times 1}=W_{m\times n} \cdot \beta_{n\times 1}$得到
$$\alpha'_{m\times 1}=ReLU(\alpha_{m\times 1} - \Delta\alpha_{m\times 1})$$

另外，item中有一些特殊的节点为源节点$s=\alpha_{m\times 1} \odot Ms_{m \times 1}$，其中$Ms_{m \times 1}$为标示源节点的掩码向量。源节点的需求不应该由formula产出，但是要累计计算资源需求。即
$$s=s+\alpha'_{m\times 1} \odot Ms_{m \times 1}$$
$$$$
$$\alpha''_{m\times 1}=\alpha'_{m\times 1} \odot (1-Ms_{m \times 1})$$

综上所述，已知边的权重矩阵为$W_{m\times n}$，标示源节点的掩码矩阵为$Ms_{m \times 1}\in \{0,1\}^{m\times 1}$，初始化需求矩阵为$\alpha^0_{m\times 1}$，建模方案如下：
$$\begin{aligned}
init:&\\
&Ma_{m \times 1}=1-Ms_{m \times 1}\\
&M_{m\times n}=\mathbb{I}(W_{m\times n}>0) \in \{0,1\}^{m\times n}\\
&s^0_{m \times 1}=\alpha^0_{m\times 1} \odot Ms_{m \times 1}\\
&Mp_{m\times n}=(1−M_{m×n})\odot (-1/epsilon)\\
&Wb_{m\times n}=\frac{1}{max(W_{m\times n}, epsilon)}\\
layer_1:&\\
&P^1_{m\times n} = M_{m\times n} \odot softmax(A^1_{m\times n}+ Mp_{m\times n}, dim=1)\\
&\beta^1_{n\times 1} = maxcol(\alpha^0_{m\times 1} \odot P^1_{m\times n}\odot Wb_{m\times n})\\
&\alpha'_{m\times 1}=ReLU(\alpha^0_{m\times 1} - W_{m\times n} \cdot \beta^1_{n\times 1})\\
&s^1_{m \times 1}=s^0_{m \times 1}+\alpha'_{m\times 1} \odot Ms_{m \times 1}\\
&\alpha^1_{m\times 1}=\alpha'_{m\times 1} \odot (1-Ms_{m \times 1})\\
......&\\
layer_i:&\\
&P^i_{m\times n} = M_{m\times n} \odot softmax(A^i_{m\times n}+ Mp_{m\times n}, dim=1)\\
&\beta^i_{n\times 1} = maxcol(\alpha^{i-1}_{m\times 1} \odot P^i_{m\times n}\odot Wb_{m\times n})\\
&\alpha'_{m\times 1}=ReLU(\alpha^{i-1}_{m\times 1} - W_{m\times n} \cdot \beta^i_{n\times 1})\\
&s^i_{m \times 1}=s^{i-1}_{m \times 1}+\alpha'_{m\times 1} \odot Ms_{m \times 1}\\
&\alpha^i_{m\times 1}=\alpha'_{m\times 1} \odot (1-Ms_{m \times 1})\\
\end{aligned}$$

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import networkx as nx
from typing import Callable
from factorio.satisfactory.formula import read_formula, parse_recipe_graph, parse_item, is_base_type, device

class RecipeGNN(nn.Module):
    """
    配方生产网络：基于item-formula双节点类型的多层传播网络。
    item（物资）→formula（配方）→item更新→源节点资源累计
    """
    def __init__(self, G: nx.DiGraph, target_dict: dict, 
                 num_layers: int=9,
                 epsilon=1e-8, device=device, dtype=torch.float32,
                 is_source: Callable[[nx.DiGraph, str], bool] = is_base_type):
        super().__init__()
        self.device = device
        self.dtype = dtype
        self.G = G
        self.epsilon = epsilon
        
        self.parse_recipe_graph(weight = "speed")
        
        self.parse_item(target_dict=target_dict, is_source=is_source)
        
        self.init_constants()
        # 可训练矩阵A^i（每层独立，按需扩展）
        self.init_layer(num_layers)

    
    def parse_recipe_graph(self, weight = "speed"):
        (
            self.raw_weight_matrix,
            self.item_nodes, self.formula_nodes,
            self.item2idx, self.formula2idx,
            self.idx2item, self.idx2formula
        ) = parse_recipe_graph(self.G, weight = weight)
        
    
    def parse_item(self, target_dict: dict, is_source = is_base_type):
        (
            self.raw_target_weights, self.base_items, self.base_idxs,
            self.target_items, self.target_idxs, self.other_items, self.other_items_idx
        ) = parse_item(self.G, item_nodes=self.item_nodes, item2idx=self.item2idx,
                       target_dict=target_dict, is_source=is_source)
    
    def init_layer(self, num_layers: int):
        """新增一层的可训练矩阵A^i"""
        self.A_layers = nn.ParameterList([
            nn.Parameter(torch.randn(len(self.item_nodes), len(self.formula_nodes))) for _ in range(num_layers) # 初始化可训练参数，可自定义
        ])
    
    def init_constants(self):
        """
        初始化所有常量（仅需调用一次）
        """
        device = self.device
        dtype = self.dtype
        epsilon = self.epsilon
        
        self.fixed_weight_matrix = torch.tensor(self.raw_weight_matrix, requires_grad=False, dtype=dtype, device=device) 
        self.source_mask = torch.zeros([len(self.item_nodes)], requires_grad=False, dtype=dtype, device=device)
        self.source_mask[self.base_idxs] = 1.0
        self.target_weights = torch.tensor(self.raw_target_weights, requires_grad=False, dtype=dtype, device=device)
        # 1. 边掩码矩阵 M = I(W>0) ，获取 item -> formula 的有效边
        self.weight_mask = (self.fixed_weight_matrix > 0).float()
        
        # 2. 概率掩码矩阵 Mp = (1-M) * (-1/epsilon) ，获取 item -> formula 的有效概率选择
        minus_1_over_epsilon = -1.0 / epsilon
        self.probability_mask = (1 - self.weight_mask) * minus_1_over_epsilon
        
        # 3. 倒数权重矩阵 Wb = 1 / max(W, epsilon) ，计算 item 的 formula 需求
        self.reciprocal_weight_matrix = 1.0 / torch.max(W, torch.tensor(epsilon, requires_grad=False, dtype=dtype, device=device))
        
        # 4. 非源节点掩码 Ma = 1 - Ms
        self.normal_mask = (1 - self.source_mask).float()
        
        self.alpha_0 = torch.tensor(self.raw_target_weights, requires_grad=False, dtype=dtype, device=device)
        self.s_0 = torch.zeros([len(self.item_nodes)], requires_grad=False, dtype=dtype, device=device)
        self.s_0 = self.s_0 + self.alpha_0 * self.source_mask
        
    
    def single_layer_forward(self, alpha_prev: torch.Tensor, layer_idx):
        """
        单轮layer_i前向计算（仅处理可变部分）
        参数：
            alpha_prev: (m,1) 上一轮item权重 α^{i-1}
            layer_idx: int 当前层索引（从0开始）
        返回：
            alpha_i: (m,1) 非源节点item权重 α^i
            s_increment: (m,1) 源节点资源增量（用于累计）
            beta_i: (n,1) 当前层配方权重
        """
        # 1. 获取当前层可训练矩阵A^i
        A_i = self.A_layers[layer_idx]
        
        # 2. 计算P^i = M ⊙ softmax(A^i + Mp, dim=1)
        P_i = self.weight_mask * F.softmax(A_i + self.probability_mask, dim=1)
        
        # 3. 计算beta^i = maxcol(α_prev ⊙ P_i ⊙ Wb)
        contribution = alpha_prev * P_i * self.reciprocal_weight_matrix  # 哈达玛积简化
        
        beta_i: torch.Tensor = contribution.max(dim=0, keepdim=True).T  # maxcol
        # 4. 计算alpha' = ReLU(α_prev - W·β^i)
        delta_alpha = self.fixed_weight_matrix @ beta_i  # W是init_constants中保存的原始矩阵
        alpha_prime = F.relu(alpha_prev - delta_alpha)
        
        # 5. 源节点资源增量 + 非源节点alpha_i
        s_increment = alpha_prime * self.source_mask
        alpha_i = alpha_prime * self.normal_mask
        
        return alpha_i, s_increment, beta_i, P_i
    
    def forward(self, alpha_0, s_0):
        """
        完整前向传播（含初始化+多轮计算）
        参数：
            alpha_0: (m,1) 初始item需求
            W: (m,n) 边权重矩阵
            Ms: (m,1) 源节点掩码
            num_layers: int 传播层数
        返回：
            s_list: list 各层源节点资源累计 [s^0, s^1, ..., s^num_layers]
            alpha_list: list 各层item权重 [α^0, α^1, ..., α^num_layers]
            beta_list: list 各层配方权重 [β^1, β^2, ..., β^num_layers]
        """
        
        # 4. 多轮传播迭代
        num_layers = len(self.A_layers)
        alpha_prev = alpha_0
        s_prev = s_0
        for i in range(num_layers):
            # 单轮层计算（仅处理可变参数）
            beta_i, alpha_prime, alpha_i, s_inc = self.single_layer_forward(
                alpha_prev, layer_idx=i
            )
            
            # 累计源节点资源 s^i = s^{i-1} + s_inc
            s_i = s_prev + s_inc
            
            # 保存结果
            beta_list.append(beta_i)
            alpha_list.append(alpha_i)
            s_list.append(s_i)
            
            # 更新迭代变量
            alpha_prev = alpha_i
            s_prev = s_i
        
        return s_list, alpha_list, beta_list


# -------------------------- 测试示例 --------------------------
if __name__ == "__main__":
    g = read_formula()
    # 1. 配置参数
    m = 3  # 物资(item)数量
    n = 2  # 配方(formula)数量
    num_layers = 2  # 传播层数
    epsilon = 1e-8  # 数值安全常数
    
    # 2. 初始化数据
    alpha_0 = torch.tensor([[1.0], [2.0], [3.0]])  # 初始物资需求（比如需要1单位item1，2单位item2）
    W = torch.tensor([[2.0, -1.0], [-0.5, 3.0], [4.0, 2.0]])  # 配方消耗：W[i,j]是配方j生产1单位需要item i的数量
    Ms = torch.tensor([[1.0], [0.0], [1.0]])  # 源节点掩码：item1、item3是源物资（无法由配方生产）
    
    # 3. 初始化模型
    model = FormulaProductionNetwork(m=m, n=n, epsilon=epsilon)
    
    # 4. 前向计算（优化后仅初始化一次常量）
    s_list, alpha_list, beta_list = model(
        alpha_0=alpha_0,
        W=W,
        Ms=Ms,
        num_layers=num_layers
    )
    
    # 5. 打印结果
    print("="*60)
    print(f"优化版配方生产网络 - 层数：{num_layers}，epsilon={epsilon}")
    print("="*60)
    # 打印预计算的常量（验证优化）
    print(f"\n预计算常量：")
    print(f"边掩码矩阵M：\n{model.M}")
    print(f"Mp=(1-M)*(-1/epsilon)：\n{model.Mp[:2,:]} （仅展示前2行）")
    print(f"Wb=1/max(W,epsilon)：\n{model.Wb}")
    
    # 打印各层结果
    for i in range(num_layers + 1):
        print(f"\n--- 第{i}轮 ---")
        if i == 0:
            print(f"初始源物资需求s^0：\n{s_list[i]}")
            print(f"初始物资需求α^0：\n{alpha_list[i]}")
        else:
            print(f"源物资累计需求s^{i}：\n{s_list[i]}")
            print(f"非源物资需求α^{i}：\n{alpha_list[i]}")
            print(f"配方需求β^{i}：\n{beta_list[i-1]}")