In [41]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import copy
import os
import random
import numpy as np
import matplotlib.patches as patches
import matplotlib.pyplot as plt
import cv2
import math


In [42]:
def distance(point1, point2):
    return math.sqrt((point1[0] - point2[0]) ** 2 + (point1[1] - point2[1]) ** 2)


class Maze:
    def __init__(self, args, size=40):
        self.args = args
        self.size = size
        self.maze = np.zeros((size, size))
        self.start_pos = (0, 0)  # 起点位置
        self.goal_area = [(i, j) for i in range(30, 40) for j in range(30, 40)]
        self.paved_area = []  # 记录走过的路径（终点内）
        self.max_goal_distance = distance((1, 1), (30, 30))  # 距离终点最长的距离
        self.goal_pos = self._reset_goal()  # 初始化终点位置
        self.steps = 0
        self.max_steps = 500
        self.animation_set = []
        self.entered_goal_area = False  # 标注首次进入
        # History of agent's positions
        self.position_history = []
        # Dictionary to count occurrences of positions
        self.position_counts = {}

        # Initialize obstacles and cliffs
        self._obstacles_and_cliffs()
        self.guiding_points = [ (22, 14), (23, 14),
                               (24, 14), (25, 14), (26, 14), (27, 14), (28, 15)]

    def _reset_goal(self):
        return random.choice(self.goal_area)

    def _obstacles_and_cliffs(self):
        obstacles = [(1, 6), (1, 7), (1, 8), (1, 9), (1, 10), (1, 11), (1, 12), (1, 13), (1, 14), (1, 15),
                     (1, 16), (1, 17), (1, 18), (1, 19), (1, 20), (1, 21), (1, 22), (1, 23), (1, 24), (1, 25),
                     (10, 3), (11, 4), (12, 5), (13, 6), (14, 7), (15, 8), (16, 9), (17, 10), (18, 11), (19, 12),
                     (20, 13), (23, 4), (24, 4), (25, 4), (26, 4), (27, 4), (28, 4), (29, 4),
                     (30, 4), (31, 4), (32, 4), (33, 4), (34, 4), (35, 4), (36, 4),
                     (25, 22), (25, 23), (25, 24), (25, 25), (25, 26), (25, 27), (25, 28), (25, 29), (25, 30),
                     (25, 31), (25, 32), (25, 33), (25, 34), (25, 35), (25, 36), (25, 37), (25, 38), (25, 39), ]
        cliffs = [(2, 0), (2, 1),
                  (22, 2), (22, 3), (22, 4), (22, 5), (22, 6), (22, 7), (22, 8),
                  (24, 15), (24, 16), (24, 17), (24, 18), (24, 19), (24, 20), (24, 21),
                  (36, 25), (37, 25), (38, 25), (39, 25), ]

        guiding_points = [ ]

        for obs in obstacles:
            self.maze[obs] = -1  # obstacle

        for clf in cliffs:
            self.maze[clf] = -2  # cliff

        for gui in guiding_points:
            self.maze[gui] = -3

    def reset(self):
        self.animation_set = []  # 动画集合重置
        self.paved_area = []  # 走过路径重置
        self.entered_goal_area = False  # 是否进入终点区域重置
        self.agent_pos = self.start_pos
        self.steps = 0
        self.goal_pos = self._reset_goal()
        self.position_history = []
        self.position_counts = {}
        self.goal_reached = False  # 重置标志
        return self.agent_pos

    def step(self, action):
        # move
        x, y = self.agent_pos

        if action == 0:  # up
            x = max(0, x - 1)
        elif action == 1:  # down
            x = min(self.size - 1, x + 1)
        elif action == 2:  # left
            y = max(0, y - 1)
        elif action == 3:  # right
            y = min(self.size - 1, y + 1)

        # Update position history
        current_position = self.agent_pos
        self.position_history.append(current_position)
        if len(self.position_history) > 100:
            self.position_history.pop(0)

        self.steps += 1
        if not self.entered_goal_area:
            # reward = -1
            reward = -5 * (distance((x, y), (30, 30)) / self.max_goal_distance)  # 距离终点越近，惩罚越小
            if current_position in self.position_counts:
                self.position_counts[current_position] += 1
            else:
                self.position_counts[current_position] = 1

            # 检查重复数
            if self.position_counts[current_position] >= 3:
                reward += -20  # 重复超过3给予更大的负奖励
        else:
            if (x, y) in self.paved_area:
                reward = -5  # 走了走过的路，小惩罚
            elif (x, y) not in self.goal_area:
                reward = -20  # 走出去了，大惩罚
            else:
                reward = -1  # 积极探索，奖励
                self.paved_area.append((x, y))

        # # 检查引导点
        # if (x, y) in self.guiding_points:
        #     reward += 15  # Increase reward for reaching a guiding point
        #     self.guiding_points.remove((x, y))  # Remove the guiding point once reached

        if action == 1:         # 向下
            if self.size - 6 >= x:
                reward += 15
            else:
                reward += 0
        elif action == 3:
            if x <= 26 and x > 22:
                reward +=0
            else:
                reward += 5
        elif action == 0:
            if self.size - 6 >= x:
                reward += -2
            else:
                reward += 0
        # 检查是否遇到障碍
        if self.maze[x, y] == -1:
            reward += -8
            return self.agent_pos, reward, False  # meet obstacle

        # 是否掉入悬崖
        if self.maze[x, y] == -2:
            reward += -80  # 死了，给个大惩罚
            return (x, y), reward, True  # fell into cliff

        self.agent_pos = (x, y)

        if self.agent_pos == self.goal_pos:
            reward += 1000  # 到终点的大大奖励
            self.goal_reached = True  # 抵达终点
            done = True
        elif self.agent_pos in self.goal_area:
            # 如果还没有进入过目标区域，给予一次奖励并设置标志
            if not self.entered_goal_area:
                reward += 500  # 阶段性大奖励
                self.entered_goal_area = True
            done = False
        elif self.steps >= self.max_steps:
            done = True
        else:
            done = False

        if self.steps % 20 == 0:
            self.goal_pos = self._reset_goal()
            self.paved_area = []

        return self.agent_pos, reward, done

    def show(self):
        plt.clf()
        fig, ax = plt.subplots(figsize=(10, 10))
        ax.set_xlim(0, self.size)
        ax.set_ylim(0, self.size)
        for i in range(self.size):
            for j in range(self.size):
                if self.maze[i, j] == -1:
                    rect = patches.Rectangle((j, self.size - i - 1), 1, 1,
                                             linewidth=1, edgecolor='black', facecolor='grey')
                    ax.add_patch(rect)
                elif self.maze[i, j] == -2:
                    rect = patches.Rectangle((j, self.size - i - 1), 1, 1,
                                             linewidth=1, edgecolor='black', facecolor='red')
                    ax.add_patch(rect)
                elif self.maze[i, j] == -3:
                    rect = patches.Rectangle((j, self.size - i - 1), 1, 1,
                                             linewidth=1, edgecolor='black', facecolor='violet')
                    ax.add_patch(rect)

        rect = patches.Rectangle((self.start_pos[1], self.size - self.start_pos[0] - 1), 1, 1,
                                 linewidth=1, edgecolor='black', facecolor='yellow')
        ax.add_patch(rect)
        for pos in self.goal_area:
            if pos == self.goal_pos:
                rect = patches.Rectangle((pos[1], self.size - pos[0] - 1), 1, 1,
                                         linewidth=1, edgecolor='black',
                                         facecolor='green')
            else:
                rect = patches.Rectangle((pos[1], self.size - pos[0] - 1), 1, 1,
                                         linewidth=1, edgecolor='black',
                                         facecolor='orange')
            ax.add_patch(rect)
        rect = patches.Rectangle((self.agent_pos[1], self.size - self.agent_pos[0] - 1), 1, 1,
                                 linewidth=1, edgecolor='black', facecolor='blue')
        ax.add_patch(rect)
        fig.savefig('./temp/temp.png')
        image = cv2.imread('./temp/temp.png')
        self.animation_set.append(image)
        print(f"Added frame {len(self.animation_set)} to animation_set")  # 调试信息
        plt.close(fig)

    def show_animation(self, name):
        # 保存动画
        fourcc = cv2.VideoWriter_fourcc(*'XVID')
        video = cv2.VideoWriter(os.path.join(self.args.save_dir, name), fourcc, self.args.fps, (1000, 1000))
        for img in self.animation_set:
            video.write(img)
        video.release()


In [43]:
class NoisyLinear(nn.Module):
    '''From https://github.com/Lizhi-sjtu/DRL-code-pytorch/blob/main/3.Rainbow_DQN/network.py'''
    def __init__(self, in_features, out_features, sigma_init=0.5):
        super(NoisyLinear, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.sigma_init = sigma_init

        self.weight_mu = nn.Parameter(torch.FloatTensor(out_features, in_features))
        self.weight_sigma = nn.Parameter(torch.FloatTensor(out_features, in_features))
        self.register_buffer('weight_epsilon', torch.FloatTensor(out_features, in_features))

        self.bias_mu = nn.Parameter(torch.FloatTensor(out_features))
        self.bias_sigma = nn.Parameter(torch.FloatTensor(out_features))
        self.register_buffer('bias_epsilon', torch.FloatTensor(out_features))

        self.reset_parameters() # for mu and sigma
        self.reset_noise() # for epsilon

    def forward(self, x):
        if self.training:
            self.reset_noise()
            weight = self.weight_mu + self.weight_sigma.mul(self.weight_epsilon)  # mul是对应元素相乘
            bias = self.bias_mu + self.bias_sigma.mul(self.bias_epsilon)

        else:
            weight = self.weight_mu
            bias = self.bias_mu

        return F.linear(x, weight, bias)

    def reset_parameters(self):
        mu_range = 1 / math.sqrt(self.in_features)
        self.weight_mu.data.uniform_(-mu_range, mu_range)
        self.bias_mu.data.uniform_(-mu_range, mu_range)

        self.weight_sigma.data.fill_(self.sigma_init / math.sqrt(self.in_features))
        self.bias_sigma.data.fill_(self.sigma_init / math.sqrt(self.out_features))

    def reset_noise(self):
        epsilon_i = self.scale_noise(self.in_features)
        epsilon_j = self.scale_noise(self.out_features)
        self.weight_epsilon.copy_(torch.ger(epsilon_j, epsilon_i))
        self.bias_epsilon.copy_(epsilon_j)

    def scale_noise(self, size):
        x = torch.randn(size)
        x = x.sign().mul(x.abs().sqrt())
        return x

In [44]:
def make_maze_env():
    env = Maze(args)
    return env

In [45]:
# 构建网络
def build_net(layer_shape, activation, output_activation):
    layers = []
    for j in range(len(layer_shape) - 1):
        if j < len(layer_shape) - 2:
            layers += [nn.Linear(layer_shape[j], layer_shape[j + 1]), activation()]
        else:
            layers += [NoisyLinear(layer_shape[j], layer_shape[j + 1], sigma_init=0.25), output_activation()]
    return nn.Sequential(*layers)


In [46]:
class ReplayBuffer:
    def __init__(self, state_dim, device, max_size=int(1e6)):
        self.max_size = max_size
        self.device = device
        self.ptr = 0
        self.size = 0
        self.s = torch.zeros((max_size, state_dim), dtype=torch.float, device=device)
        self.a = torch.zeros((max_size, 1), dtype=torch.long, device=device)
        self.r = torch.zeros((max_size, 1), dtype=torch.float, device=device)
        self.s_next = torch.zeros((max_size, state_dim), dtype=torch.float, device=device)
        self.dw = torch.zeros((max_size, 1), dtype=torch.bool, device=device)

    def add(self, s, a, r, s_next, dw):
        self.s[self.ptr] = torch.FloatTensor(s).to(self.device)
        self.a[self.ptr] = torch.LongTensor([a]).to(self.device)
        self.r[self.ptr] = torch.FloatTensor([r]).to(self.device)
        self.s_next[self.ptr] = torch.FloatTensor(s_next).to(self.device)
        self.dw[self.ptr] = torch.BoolTensor([dw]).to(self.device)
        self.ptr = (self.ptr + 1) % self.max_size
        self.size = min(self.size + 1, self.max_size)

    def sample(self, batch_size):
        ind = torch.randint(0, self.size, size=(batch_size,), device=self.device)
        return self.s[ind], self.a[ind], self.r[ind], self.s_next[ind], self.dw[ind]

In [47]:
class Noisy_Q_Net(nn.Module):
    def __init__(self, state_dim, action_dim, hid_shape):
        super(Noisy_Q_Net, self).__init__()
        layers = [state_dim] + list(hid_shape) + [action_dim]
        self.Q = build_net(layers, nn.ReLU, nn.Identity)

    def forward(self, s):
        q = self.Q(s)
        return q

class NoisyNetDQN_agent:
    def __init__(self, **kwargs):
        self.__dict__.update(kwargs)
        self.tau = 0.005
        self.replay_buffer = ReplayBuffer(self.state_dim, self.device, max_size=self.buffer_size)
        self.q_net = Noisy_Q_Net(self.state_dim, self.action_dim, (self.net_width, self.net_width)).to(self.device)
        self.q_net_optimizer = optim.Adam(self.q_net.parameters(), lr=self.lr)
        self.q_target = copy.deepcopy(self.q_net)
        for p in self.q_target.parameters():
            p.requires_grad = False

    def select_action(self, state):
        with torch.no_grad():
            if isinstance(state, tuple):
                state = np.array(state)
            state = torch.FloatTensor(state.reshape(1, -1)).to(self.device)
            action = self.q_net(state).argmax().item()
        return action

    def train(self):
        s, a, r, s_next, dw = self.replay_buffer.sample(self.batch_size)

        with torch.no_grad():
            max_q_next = self.q_target(s_next).max(1)[0].unsqueeze(1)
            target_Q = r + (~dw) * self.gamma * max_q_next

        current_q = self.q_net(s).gather(1, a)
        q_loss = F.mse_loss(current_q, target_Q)

        self.q_net_optimizer.zero_grad()
        q_loss.backward()
        self.q_net_optimizer.step()

        for param, target_param in zip(self.q_net.parameters(), self.q_target.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

    def save(self, algo, steps):
        print(f"algo:{algo}steps:{steps}")
        torch.save(self.q_net.state_dict(), f"./{args.save_dir}/{algo}_{steps}k.pth")
        print(f"模型已保存在{args.save_dir}/{algo}_{steps}")

    def load(self, algo, steps):
        self.q_net.load_state_dict(torch.load(f"./{args.save_dir}/{algo}_{steps}k.pth", map_location=self.device))
        self.q_target.load_state_dict(torch.load(f"./{args.save_dir}/{algo}_{steps}k.pth", map_location=self.device))


In [48]:
class Args:
    args = type('', (), {})()
    fps = 30
    num_steps = 5000
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    lr = 1e-3
    memory_size = 1000
    batch_size = 64
    gamma = 0.99
    epsilon = 1.0
    net_width = 128  # 网络宽度
    target_update = 10
    update_target_every = 1000
    save_dir = "./model_w/Noisy_DQN"
    load_model = False

In [49]:
if __name__ == "__main__":
    args = Args()
    env = make_maze_env()


    agent = NoisyNetDQN_agent(
        state_dim= 2,
        action_dim= 4,
        buffer_size=args.memory_size,
        net_width=args.net_width,
        lr=args.lr,
        gamma=0.99,
        batch_size=args.batch_size,
        device=args.device
    )

    # if args.load_model is not None:
    #     agent.load(args.load_model)

    state = env.reset()
    episode_reward = 0
    episode_length = 0
    for t in range(args.num_steps):
        action = agent.select_action(state)
        next_state, reward, done = env.step(action)
        agent.replay_buffer.add(state, action, reward, next_state, done)
        state = next_state
        episode_reward += reward
        episode_length += 1

        if done:
            state = env.reset()
            print(f"Episode Reward: {episode_reward}, Episode Length: {episode_length}")
            episode_reward = 0
            episode_length = 0

        if t > args.batch_size:
            agent.train()
        if t+1 == args.num_steps :
            print("111111")
            agent.save("NoisyNetDQN", t )

Episode Reward: -13904.470353364728, Episode Length: 806
Episode Reward: -60.089909179475455, Episode Length: 2
Episode Reward: -60.089909179475455, Episode Length: 2
Episode Reward: -60.089909179475455, Episode Length: 2
Episode Reward: -60.089909179475455, Episode Length: 2
Episode Reward: -60.089909179475455, Episode Length: 2
Episode Reward: -60.089909179475455, Episode Length: 2
Episode Reward: -60.001486657127984, Episode Length: 3
Episode Reward: -60.001486657127984, Episode Length: 3
Episode Reward: -60.001486657127984, Episode Length: 3
Episode Reward: -60.001486657127984, Episode Length: 3
Episode Reward: -60.001486657127984, Episode Length: 3
Episode Reward: -60.001486657127984, Episode Length: 3
Episode Reward: -60.001486657127984, Episode Length: 3
Episode Reward: -6163.199541275671, Episode Length: 502
Episode Reward: -8383.33844028067, Episode Length: 500
Episode Reward: -20380.992636310326, Episode Length: 1397
Episode Reward: -6231.643645310853, Episode Length: 500
Epi

In [50]:
args = Args()
env = make_maze_env()


agent = NoisyNetDQN_agent(
    state_dim= 2,
    action_dim= 4,
    buffer_size=args.memory_size,
    net_width=args.net_width,
    lr=args.lr,
    gamma=0.99,
    batch_size=args.batch_size,
    device=args.device
)

def test_noisynet_dqn(agent, env, num_episodes=1):
    for episode in range(num_episodes):
        state = env.reset()
        episode_reward = 0
        episode_length = 0

        for t in range(100):
            action = agent.select_action(state)
            next_state, reward, done = env.step(action)
            state = next_state
            episode_reward += reward
            episode_length += 1
            env.show()
            if done:
                break
        env.show_animation('test_animation.avi')
        print('Video saved as test_animation.avi')


model_path = args.save_dir+"NoisyNetDQN_24999.pth"  # 替换为实际模型文件路径
agent.load("NoisyNetDQN", '4999')

# 运行测试
test_noisynet_dqn(agent, env, num_episodes=1)

Added frame 1 to animation_set
Added frame 2 to animation_set
Added frame 3 to animation_set
Added frame 4 to animation_set
Added frame 5 to animation_set
Added frame 6 to animation_set
Added frame 7 to animation_set
Added frame 8 to animation_set
Added frame 9 to animation_set
Added frame 10 to animation_set
Added frame 11 to animation_set
Added frame 12 to animation_set
Added frame 13 to animation_set
Added frame 14 to animation_set
Added frame 15 to animation_set
Added frame 16 to animation_set
Added frame 17 to animation_set
Added frame 18 to animation_set
Added frame 19 to animation_set
Added frame 20 to animation_set
Added frame 21 to animation_set
Added frame 22 to animation_set
Added frame 23 to animation_set
Added frame 24 to animation_set
Added frame 25 to animation_set
Added frame 26 to animation_set
Added frame 27 to animation_set
Added frame 28 to animation_set
Added frame 29 to animation_set
Added frame 30 to animation_set
Added frame 31 to animation_set
Added frame 32 to

<Figure size 640x480 with 0 Axes>