<a href="https://colab.research.google.com/github/Snails-tian/mpcVSrl/blob/main/testPetri.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import os
import math
import numpy as np
import torch

In [4]:
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable

# 定义连续 Petri 网模拟器（这里只是一个简化的示例）
class PetriNetSimulator:
    def simulate(self, rate):
        # 模拟 Petri 网演变，返回状态
        # 这里只是一个示例，实际应用中需要根据系统特性实现
        return np.random.rand()  # 返回一个随机状态，实际应该是 Petri 网的状态

# 定义深度确定性策略梯度（DDPG）网络
class DDPGActor(nn.Module):
    def __init__(self, input_size, output_size):
        super(DDPGActor, self).__init__()
        self.fc1 = nn.Linear(input_size, 32)
        self.fc2 = nn.Linear(32, 32)
        self.output_layer = nn.Linear(32, output_size)

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        return torch.tanh(self.output_layer(x))

class DDPGCritic(nn.Module):
    def __init__(self, input_size, action_size):
        super(DDPGCritic, self).__init__()
        self.fc1 = nn.Linear(input_size + action_size, 32)
        self.fc2 = nn.Linear(32, 32)
        self.output_layer = nn.Linear(32, 1)

    def forward(self, state, action):
        x = torch.relu(self.fc1(torch.cat([state, action], dim=1)))
        x = torch.relu(self.fc2(x))
        return self.output_layer(x)

# 定义 DDPG 算法
class DDPGAlgorithm:
    def __init__(self, input_size, action_size, alpha_actor, alpha_critic, gamma, tau):
        self.actor_network = DDPGActor(input_size, action_size)
        self.target_actor_network = DDPGActor(input_size, action_size)
        self.target_actor_network.load_state_dict(self.actor_network.state_dict())
        self.actor_optimizer = optim.Adam(self.actor_network.parameters(), lr=alpha_actor)

        self.critic_network = DDPGCritic(input_size, action_size)
        self.target_critic_network = DDPGCritic(input_size, action_size)
        self.target_critic_network.load_state_dict(self.critic_network.state_dict())
        self.critic_optimizer = optim.Adam(self.critic_network.parameters(), lr=alpha_critic)

        self.huber_loss = nn.SmoothL1Loss()
        self.gamma = gamma
        self.tau = tau

    def update_actor_network(self, state):
        self.actor_optimizer.zero_grad()

        action = self.actor_network(state)
        q_value = -self.critic_network(state, action)
        loss = q_value.mean()

        loss.backward()
        self.actor_optimizer.step()

    def update_critic_network(self, state, action, reward, next_state):
        self.critic_optimizer.zero_grad()

        target_action = self.target_actor_network(next_state)
        target_q_value = reward + self.gamma * self.target_critic_network(next_state, target_action).detach()

        predicted_q_value = self.critic_network(state, action)
        loss = self.huber_loss(predicted_q_value, target_q_value)

        loss.backward()
        self.critic_optimizer.step()

    def update_target_networks(self):
        for target_param, param in zip(self.target_actor_network.parameters(), self.actor_network.parameters()):
            target_param.data.copy_((1 - self.tau) * target_param.data + self.tau * param.data)

        for target_param, param in zip(self.target_critic_network.parameters(), self.critic_network.parameters()):
            target_param.data.copy_((1 - self.tau) * target_param.data + self.tau * param.data)

# 训练 DDPG
def train_ddpg_algorithm(ddpg_algorithm, petri_net_simulator, num_episodes):
    for episode in range(num_episodes):
        current_state = torch.rand(5)  # 生成一个简化的连续状态

        # 在实际应用中，你需要定义奖励函数等
        # 这里只是一个简单的示例

        # 使用 DDPG 算法更新网络参数
        action = ddpg_algorithm.actor_network(Variable(current_state).unsqueeze(0))
        next_state = torch.rand(5)
        reward = petri_net_simulator.simulate(action.item())  # 模拟 Petri 网演变，获取奖励
        ddpg_algorithm.update_critic_network(Variable(current_state).unsqueeze(0), action, reward, Variable(next_state).unsqueeze(0))
        ddpg_algorithm.update_actor_network(Variable(current_state).unsqueeze(0))
        ddpg_algorithm.update_target_networks()

        if episode % 100 == 0:
            print(f"Episode: {episode}, Reward: {reward}")

# 示例使用
input_size = 5  # 输入状态的维度
action_size = 1  # 输出动作的维度，这里假设为速率
alpha_actor = 0.001  # Actor 学习率
alpha_critic = 0.001  # Critic 学习率
gamma = 0.9  # 折扣因子
tau = 0.001  # 软更新参数

# 创建 DDPG 算法实例
ddpg_algorithm = DDPGAlgorithm(input_size, action_size, alpha_actor, alpha_critic, gamma, tau)

# 创建连续 Petri 网模拟器
petri_net_simulator = PetriNetSimulator()

# 训练 DDPG
train_ddpg_algorithm(ddpg_algorithm, petri_net_simulator, num_episodes=1000)


Episode: 0, Reward: 0.11521904659768711
Episode: 100, Reward: 0.5910840113156018
Episode: 200, Reward: 0.0009260925356918692
Episode: 300, Reward: 0.2432375156080261
Episode: 400, Reward: 0.5580831001061329
Episode: 500, Reward: 0.6052840288340768
Episode: 600, Reward: 0.9669169393118394
Episode: 700, Reward: 0.07403718259400172
Episode: 800, Reward: 0.8647708687974665
Episode: 900, Reward: 0.6611007897190799
