In [None]:
import os
from tqdm import tqdm
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from nes_py.wrappers import JoypadSpace
import gym_super_mario_bros
from gym.spaces import Box
from gym import Wrapper
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT, COMPLEX_MOVEMENT, RIGHT_ONLY
import cv2
import matplotlib.pyplot as plt

In [None]:
# 创建环境
def create_train_env(world, stage, action_type):
    # 创建基础环境

    if action_type == "right":
        actions = RIGHT_ONLY
    elif action_type == "simple":
        actions = SIMPLE_MOVEMENT
    else:
       actions = COMPLEX_MOVEMENT
    env = gym_super_mario_bros.make("SuperMarioBros-{}-{}-v3".format(world, stage))
    env = JoypadSpace(env, actions) 

    # 对环境自定义
    env = CustomReward(env, world, stage, monitor=None)
    env = CustomSkipFrame(env)
    return env
# 对原始环境进行修改，以获得更好的训练效果
class CustomReward(Wrapper):
    def __init__(self, env=None, world=None, stage=None, monitor=None):
        super(CustomReward, self).__init__(env)
        self.observation_space = Box(low=0, high=255, shape=(1, 84, 84))
        self.curr_score = 0
        self.current_x = 40
        self.world = world
        self.stage = stage
        if monitor:
            self.monitor = monitor
        else:
            self.monitor = None

    def step(self, action):
        state, reward, done, info = self.env.step(action)
        
        if self.monitor:
            self.monitor.record(state)
        state = process_frame(state)
        reward += (info["score"] - self.curr_score) / 40.            # 分数差奖励
        self.curr_score = info["score"]
        if done:
            if info["flag_get"]:                  # 获得旗子
                reward += 50
            else:
                reward -= 50                
        if self.world == 7 and self.stage == 4:
            if (506 <= info["x_pos"] <= 832 and info["y_pos"] > 127) or (
                    832 < info["x_pos"] <= 1064 and info["y_pos"] < 80) or (
                    1113 < info["x_pos"] <= 1464 and info["y_pos"] < 191) or (
                    1579 < info["x_pos"] <= 1943 and info["y_pos"] < 191) or (
                    1946 < info["x_pos"] <= 1964 and info["y_pos"] >= 191) or (
                    1984 < info["x_pos"] <= 2060 and (info["y_pos"] >= 191 or info["y_pos"] < 127)) or (
                    2114 < info["x_pos"] < 2440 and info["y_pos"] < 191) or info["x_pos"] < self.current_x - 500:
                reward -= 50
                done = True
        if self.world == 4 and self.stage == 4:
            if (info["x_pos"] <= 1500 and info["y_pos"] < 127) or (
                    1588 <= info["x_pos"] < 2380 and info["y_pos"] >= 127):
                reward = -50
                done = True                                  # 针对这两个特定关卡

        # if action in [2, 4]:  
        #     reward += 0.5  # 跳跃小而正向的鼓励  针对simple_action

        # # 卡住惩罚
        # if info["x_pos"] == self.current_x:
        #     reward -= 0.3

        self.current_x = info["x_pos"]

        return state, reward / 10., done, info

    def reset(self):
        self.curr_score = 0
        self.current_x = 40
        return process_frame(self.env.reset())
    
def process_frame(frame):
    if frame is not None:
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)                           # 灰度处理
        frame = cv2.resize(frame, (84, 84))[None, :, :] / 255.                    # 变尺寸
        return frame
    else:
        return np.zeros((1, 84, 84))
class CustomSkipFrame(Wrapper):
    def __init__(self, env, skip=4):  # 5.24 将skip = 4改成2，原理上可以执行连跳，也可能是操作不够精细不能连跳，所以尝试一下
        super(CustomSkipFrame, self).__init__(env)
        self.observation_space = Box(low=0, high=255, shape=(skip, 84, 84))
        self.skip = skip
        self.states = np.zeros((skip, 84, 84), dtype=np.float32)

    def step(self, action):
        total_reward = 0
        last_states = []
        for i in range(self.skip):
            state, reward, done, info = self.env.step(action)
            total_reward += reward
            if i >= self.skip / 2:
                last_states.append(state)
            if done:
                self.reset()
                return self.states.astype(np.float32), total_reward, done, info
        max_state = np.max(np.concatenate(last_states, 0), 0)
        self.states[:-1] = self.states[1:]
        self.states[-1] = max_state
        return self.states.astype(np.float32), total_reward, done, info                        # 输出shape是(4,84,84)

    def reset(self):
        state = self.env.reset()
        self.states = np.concatenate([state for _ in range(self.skip)], 0)
        return self.states.astype(np.float32)

In [None]:
class ConvBaseNet(nn.Module):
    def __init__(self, num_inputs):
        super(ConvBaseNet, self).__init__()
        self.conv1 = nn.Conv2d(num_inputs, 32, 3, stride=2, padding=1)
        self.conv2 = nn.Conv2d(32, 32, 3, stride=2, padding=1)
        self.conv3 = nn.Conv2d(32, 32, 3, stride=2, padding=1)
        self.conv4 = nn.Conv2d(32, 32, 3, stride=2, padding=1)
        self.linear = nn.Linear(32 * 6 * 6, 512)  
        self._initialize_weights()

    def _initialize_weights(self):
        for module in self.modules():
            if isinstance(module, nn.Conv2d) or isinstance(module, nn.Linear):
                nn.init.orthogonal_(module.weight, nn.init.calculate_gain('relu'))
                nn.init.constant_(module.bias, 0)

    def forward_conv(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = self.linear(x.view(x.size(0), -1))
        return x


class PolicyNet(ConvBaseNet):
    def __init__(self, num_inputs, num_actions):
        super(PolicyNet, self).__init__(num_inputs)           # num_inputs是父类需要的参数
        self.actor_linear = nn.Linear(512, num_actions)

    def forward(self, x):
        x = self.forward_conv(x)
        return F.softmax(self.actor_linear(x), dim=1)


class ValueNet(ConvBaseNet):
    def __init__(self, num_inputs):
        super(ValueNet, self).__init__(num_inputs)
        self.critic_linear = nn.Linear(512, 1)

    def forward(self, x):
        x = self.forward_conv(x)
        return self.critic_linear(x)

In [None]:
def compute_advantage(gamma, lmbda, td_delta):
    td_delta = td_delta.detach().numpy()
    advantage_list = []
    advantage = 0.0
    for delta in td_delta[::-1]:
        advantage = gamma * lmbda * advantage + delta
        advantage_list.append(advantage)
    advantage_list.reverse()
    return torch.tensor(advantage_list, dtype=torch.float)

In [None]:
class PPO:
    ''' PPO算法,采用截断方式 '''
    def __init__(self, state_dim, action_dim, actor_lr, critic_lr,
                 lmbda, epochs, eps, gamma, device):
        self.actor = PolicyNet(state_dim, action_dim).to(device)
        self.critic = ValueNet(state_dim).to(device)
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(),
                                                lr=actor_lr)
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),
                                                 lr=critic_lr)
        self.gamma = gamma
        self.lmbda = lmbda
        self.epochs = epochs  # 一条序列的数据用来训练轮数
        self.eps = eps  # PPO中截断范围的参数
        self.device = device

    def take_action(self, state):
        state = torch.tensor([state], dtype=torch.float).to(self.device)
        probs = self.actor(state)
        action_dist = torch.distributions.Categorical(probs)
        action = action_dist.sample()
        return action.item()

    def update(self, transition_dict):
        states = torch.tensor(transition_dict['states'],
                              dtype=torch.float).to(self.device)
        actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(
            self.device)
        rewards = torch.tensor(transition_dict['rewards'],
                               dtype=torch.float).view(-1, 1).to(self.device)
        next_states = torch.tensor(transition_dict['next_states'],
                                   dtype=torch.float).to(self.device)
        dones = torch.tensor(transition_dict['dones'],
                             dtype=torch.float).view(-1, 1).to(self.device)
        td_target = rewards + self.gamma * self.critic(next_states) * (1 -
                                                                       dones)
        td_delta = td_target - self.critic(states)
        advantage = compute_advantage(self.gamma, self.lmbda,
                                               td_delta.cpu()).to(self.device)
        old_log_probs = torch.log(self.actor(states).gather(1,
                                                            actions)).detach()

        for _ in range(self.epochs):
            log_probs = torch.log(self.actor(states).gather(1, actions))
            ratio = torch.exp(log_probs - old_log_probs)
            surr1 = ratio * advantage
            surr2 = torch.clamp(ratio, 1 - self.eps,
                                1 + self.eps) * advantage  # 截断
            actor_loss = torch.mean(-torch.min(surr1, surr2))  # PPO损失函数
            critic_loss = torch.mean(
                F.mse_loss(self.critic(states), td_target.detach()))
            self.actor_optimizer.zero_grad()
            self.critic_optimizer.zero_grad()
            actor_loss.backward()
            critic_loss.backward()
            self.actor_optimizer.step()
            self.critic_optimizer.step()

In [None]:
def moving_average(a, window_size):
    cumulative_sum = np.cumsum(np.insert(a, 0, 0)) 
    middle = (cumulative_sum[window_size:] - cumulative_sum[:-window_size]) / window_size
    r = np.arange(1, window_size-1, 2)
    begin = np.cumsum(a[:window_size-1])[::2] / r
    end = (np.cumsum(a[:-window_size:-1])[::2] / r)[::-1]
    return np.concatenate((begin, middle, end))

In [None]:
def train_on_policy_agent(env, agent, num_episodes, save_dir, num_local_steps, resume=False,
                          actor_model_path=None, critic_model_path=None):
    return_list = []
    saved_models_info = []  # 用于记录每次保存模型时的 (episode_num, return)
    start_episode = 0

    # 加载指定模型
    if resume:
        if actor_model_path and critic_model_path:
            agent.actor.load_state_dict(torch.load(actor_model_path))
            agent.critic.load_state_dict(torch.load(critic_model_path))
            print(f"Resumed training from provided model files:\n- Actor: {actor_model_path}\n- Critic: {critic_model_path}")

            # 尝试从路径中提取 episode 编号（可选优化）
            try:
                start_episode = int(os.path.basename(actor_model_path).split('_')[-1].split('.')[0])
            except Exception as e:
                print("Warning: Failed to parse episode number from filename. Start_episode set to 0.")
                start_episode = 0
        else:
            raise ValueError("To resume training, both actor_model_path and critic_model_path must be provided.")

    for i in range(start_episode // (num_episodes // 10), 10):
        with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:
            for i_episode in range(int(num_episodes / 10)):
                current_episode_num = int(num_episodes / 10) * i + i_episode + 1
                if current_episode_num <= start_episode:
                    pbar.update(1)
                    continue
                
                episode_return = 0
                transition_dict = {'states': [], 'actions': [], 'next_states': [], 'rewards': [], 'dones': []}
                state = env.reset()
                done = False
                step_count = 0

                while not done and step_count < num_local_steps:
                    action = agent.take_action(state)
                    next_state, reward, done, _ = env.step(action) 
                    transition_dict['states'].append(state)
                    transition_dict['actions'].append(action)
                    transition_dict['next_states'].append(next_state)
                    transition_dict['rewards'].append(reward)
                    transition_dict['dones'].append(done)
                    state = next_state
                    episode_return += reward
                    step_count = 0

                return_list.append(episode_return)
                agent.update(transition_dict)

                if (i_episode+1) % 10 == 0:
                    pbar.set_postfix({'episode': '%d' % (num_episodes/10 * i + i_episode+1), 'return': '%.3f' % np.mean(return_list[-10:])})
                    if (current_episode_num) >= 250:
                        torch.save(agent.actor.state_dict(), f'{save_dir}/ppo_mario_actor_{current_episode_num}.pth')  
                        torch.save(agent.critic.state_dict(), f'{save_dir}/ppo_mario_critic_{current_episode_num}.pth')
                        saved_models_info.append((current_episode_num, episode_return))  # 保存模型信息
                pbar.update(1)

    # 输出 return 最好的模型保存点
    if saved_models_info:
        best_model = max(saved_models_info, key=lambda x: x[1])
        print(f"Best model saved at episode {best_model[0]} with return {best_model[1]:.3f}")
    else:
        print("No models were saved.")

    return return_list                

In [None]:
# PPO_para
actor_lr = 1e-3
critic_lr = 1e-2  
num_episodes = 2000
gamma = 0.98
lmbda = 0.95
epochs = 10
eps = 0.2
device = torch.device("cuda") if torch.cuda.is_available() else torch.device(
    "cpu")
print(device)


# env_para
world = 1
stage = 1
action_type = "simple"

# train_on_policy
save_dir = './model'  # 模型保存目录
os.makedirs(save_dir, exist_ok=True)
num_local_steps = 512
resume = True  # False
actor_model_path = './model/ppo_mario_actor_250.pth'  # None
critic_model_path = './model/ppo_mario_critic_250.pth'  # None

env= create_train_env(world, stage, action_type)  
env.seed(0)
torch.manual_seed(0)

state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

agent = PPO(state_dim, action_dim, actor_lr, critic_lr, lmbda,
            epochs, eps, gamma, device)

return_list = train_on_policy_agent(env, agent, num_episodes, save_dir, num_local_steps, resume, actor_model_path, critic_model_path)


In [None]:
episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('PPO on {}'.format("SuperMarioBros-{}-{}-v3".format(world, stage)))
plt.show()

mv_return = moving_average(return_list, 9)
plt.plot(episodes_list, mv_return)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('PPO on {}'.format("SuperMarioBros-{}-{}-v3".format(world, stage)))
plt.show()

In [None]:
import time

In [None]:
def test_model(test_actor_dir, test_world, test_stage, test_num_steps):
    env = create_train_env(test_world, test_stage, action_type="simple")  
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.n
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    actor = PolicyNet(state_dim, action_dim).to(device)
    actor.load_state_dict(torch.load(test_actor_dir, map_location=device))
    actor.eval()

    state = env.reset()
    total_reward = 0
    done = False
    step_count = 0

    while 1:
        env.render()
        time.sleep(0.02)

        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)  # (1, 4, 84, 84)
        with torch.no_grad():
            probs = actor(state_tensor)
        action = torch.argmax(probs).item()
        state, reward, done, info = env.step(action)
        total_reward += reward
        step_count += 1
        if done or step_count >= test_num_steps:
            state = env.reset()
            step_count = 0

    env.close()
    print(f'Total reward: {total_reward}')


In [None]:
test_actor_dir = './model/ppo_mario_actor_470.pth'
test_world = 1
test_stage = 1
test_num_steps = 200

test_model(test_actor_dir, test_world, test_stage, test_num_steps)

In [None]:
env.close()