In [1]:
import numpy as np

# 定义裁剪函数
def clip(x, min_val, max_val):
    return np.clip(x, min_val, max_val)


In [3]:
# 旧策略概率
old_policy_prob = 0.2
# 当前策略概率
current_policy_prob = 0.4
# 优势函数估计值
advantage = 0.5
# 裁剪超参数
epsilon = 0.2

# 计算概率比
prob_ratio = current_policy_prob / old_policy_prob
# 0.4/0.2 = 2
# 计算裁剪后的概率比
clipped_prob_ratio = clip(prob_ratio, 1 - epsilon, 1 + epsilon)
# clip(2,0.8,1.2)
# 计算目标函数中的两项
term1 = prob_ratio * advantage
term2 = clipped_prob_ratio * advantage

# 取最小值
clip_objective = min(term1, term2)

print(f"概率比: {prob_ratio}")
print(f"裁剪后的概率比: {clipped_prob_ratio}")
print(f"目标函数中的两项: {term1}, {term2}")
print(f"裁剪目标函数值: {clip_objective}")

概率比: 2.0
裁剪后的概率比: 1.2
目标函数中的两项: 1.0, 0.6
裁剪目标函数值: 0.6



# Critic 网络状态价值函数


具体计算步骤
- 输入拼接：将全局状态$ s_t$  和联合动作 $a_t$
​
  拼接成一个向量 $h^0 = [s_t; a_t]$。
- 隐藏层计算：对于$ l = 1, 2, \cdots, L - 1，计算 h^l=\sigma(W^l h^{l - 1}+b^l)$。
- 输出计算：计算 $V_{\phi}(s_t, a_t)=W^L h^{L - 1}+b^L$


In [5]:
import torch
import torch.nn as nn

# 定义 Critic 网络
class CriticNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dims):
        super(CriticNetwork, self).__init__()
        layers = []
        input_dim = state_dim + action_dim
        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(input_dim, hidden_dim))
            layers.append(nn.ReLU())
            input_dim = hidden_dim
        layers.append(nn.Linear(input_dim, 1))
        self.network = nn.Sequential(*layers)

    def forward(self, state, action):
        # 拼接状态和动作
        x = torch.cat([state, action], dim=-1)
        # 前向传播
        value = self.network(x)
        return value

# 单元测试
def test_critic_network():
    # 定义参数
    num_intersections = 3  # 交叉口数量
    state_features_per_intersection = 4  # 每个交叉口的状态特征数量
    state_dim = num_intersections * state_features_per_intersection
    action_dim = num_intersections
    hidden_dims = [16, 8]  # 隐藏层维度

    # 创建 Critic 网络实例
    critic = CriticNetwork(state_dim, action_dim, hidden_dims)

    # 随机生成一个全局状态和联合动作
    s_t = torch.randn(1, state_dim)  # 时间步 t 的全局状态
    a_t = torch.randint(0, 2, (1, action_dim)).float()  # 时间步 t 的联合动作

    # 计算状态 - 动作对的价值
    value = critic(s_t, a_t)

    print(f"全局状态: {s_t}")
    print(f"联合动作: {a_t}")
    print(f"状态 - 动作对的价值估计: {value.item()}")


In [8]:
test_critic_network()

全局状态: tensor([[ 0.6693, -0.7343, -0.4339,  1.2496, -1.1443, -0.1215, -0.9654,  1.4177,
          0.2279, -0.3850, -0.3686, -0.6260]])
联合动作: tensor([[0., 1., 0.]])
状态 - 动作对的价值估计: 0.06937223672866821
