In [19]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import gym
from gym import spaces
from gym.wrappers import Monitor
from collections import deque
import random

In [20]:
# 自定义包装器以添加缺失的属性
class CustomEnvWrapper(gym.Wrapper):
    def __init__(self, env):
        super(CustomEnvWrapper, self).__init__(env)
        self.action_space = self.env.action_space
        self.observation_space = self.env.observation_space

    def reset(self, **kwargs):
        return self.env.reset(**kwargs)

    def step(self, action):
        return self.env.step(action)

# 定义四足蚂蚁环境
class AntEnv:
    def __init__(self):
        self.env = CustomEnvWrapper(gym.make("Ant-v2"))
        self.state_dim = self.env.observation_space.shape[0]
        self.action_dim = self.env.action_space.shape[0]

    def reset(self):
        return self.env.reset()

    def step(self, action):
        return self.env.step(action)


In [21]:
# 定义深度神经网络Q函数
class QNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(QNetwork, self).__init()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, action_dim)

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [22]:
# 定义DDPG智能体
class DDPGAgent:
    def __init__(self, state_dim, action_dim):
        self.q_network = QNetwork(state_dim, action_dim)
        self.target_q_network = QNetwork(state_dim, action_dim)
        self.target_q_network.load_state_dict(self.q_network.state_dict())
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=0.001)
        self.memory = deque(maxlen=10000)
        self.state_dim = state_dim
        self.action_dim = action_dim

    def act(self, state):
        state = torch.tensor(state, dtype=torch.float32)
        return self.q_network(state).detach().numpy()

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def replay(self, batch_size):
        if len(self.memory) < batch_size:
            return
        batch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in batch:
            state = torch.tensor(state, dtype=torch.float32)
            next_state = torch.tensor(next_state, dtype=torch.float32)
            action = torch.tensor(action, dtype=torch.float32)
            reward = torch.tensor(reward, dtype=torch.float32)
            q_value = self.q_network(state)
            next_q_value = self.target_q_network(next_state)
            if done:
                target = reward
            else:
                target = reward + 0.99 * next_q_value.max().item()
            loss = nn.MSELoss()(q_value, target)
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

    def update_target_network(self):
        self.target_q_network.load_state_dict(self.q_network.state_dict())

In [23]:
def train():
    env = AntEnv()
    env = Monitor(env, './videos', force=True)  # 创建一个Monitor包装器以录制视频，视频将保存在'./videos'目录下
    state_dim = env.state_dim
    action_dim = env.action_dim
    agent = DDPGAgent(state_dim, action_dim)
    batch_size = 32

    for episode in range(1000):
        state = env.reset()
        episode_reward = 0

        for t in range(1000):
            action = agent.act(state)
            next_state, reward, done, _ = env.step(action)
            agent.remember(state, action, reward, next_state, done)
            state = next_state
            episode_reward += reward
            agent.replay(batch_size)
            if done:
                break

        if episode % 10 == 0:
            agent.update_target_network()

        print(f"Episode: {episode}, Reward: {episode_reward}")

In [24]:
if __name__ == "__main__":
    train()

AttributeError: 'AntEnv' object has no attribute 'action_space'