In [None]:
import gym
import numpy as np
import torch
import os
from torch.utils.tensorboard import SummaryWriter

In [None]:
class ActionNet(torch.nn.Module):
    def __init__(self, input_dims, output_dims):
        super().__init__()
        self.fc1 = torch.nn.Linear(input_dims, 30)
        self.fc2 = torch.nn.Linear(30, output_dims)
        self.tanh = torch.nn.Tanh()
        self.softmax = torch.nn.Softmax()
        self.initialize_weights()

    def initialize_weights(self):
        for m in self.modules():
            if isinstance(m, torch.nn.Linear):
                torch.nn.init.normal_(m.weight, std=0.3)

    def forward(self, x):
        x = self.tanh(self.fc1(x))
        x = self.softmax(self.fc2(x))
        return x

In [None]:
class PolicyGradient:
    def __init__(self,
                 n_state=4,
                 n_actions=2,
                 learning_rate=0.01,
                 reward_decay=0.95):
        self.n_actions = n_actions
        self.n_features = n_state
        self.lr = learning_rate
        self.gamma = reward_decay

        self.s_lst = []
        self.a_lst = []
        self.r_lst = []

        self.net = ActionNet(n_state, n_actions)
        # self.loss = torch.nn.CrossEntropyLoss()
        self.optimizer = torch.optim.Adam(self.net.parameters(), lr=learning_rate)


    def choose_action(self, states):
        self.net.eval()
        actions = self.net(torch.Tensor(states[np.newaxis, :]))
        action = np.random.choice(range(actions.shape[1]), p=actions.view(-1).detach().numpy())
        return action
    
    
    def store_transition(self, s, a, r):
        self.a_lst.append(a)
        self.r_lst.append(r)
        self.s_lst.append(s)
    
    
    def to_onehot(self, a_lst, action_n):
        onehot = torch.zeros(size=(len(a_lst), 2)).scatter(1, torch.LongTensor(a_lst).view(-1, 1), 1)
        return onehot
    
    
    def learn(self):
        self.net.train()
        discounted = np.zeros_like(self.r_lst)
        running_add = 0
        for i in range(len(self.r_lst) - 1, -1, -1):
            running_add = running_add * self.gamma + self.r_lst[i]
            discounted[i] = running_add
        discounted = discounted - np.mean(discounted)
        discounted = discounted / np.std(discounted)
        output = self.net(torch.tensor(self.s_lst))
        onehot = self.to_onehot(self.a_lst, self.n_actions)
        # PG算法使用交叉熵损失函数
        neg = torch.sum(-torch.log(output) * onehot, 1)
        loss = neg * torch.Tensor(discounted)
        loss = loss.mean()
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        # 重置列表
        self.s_lst = []
        self.a_lst = []
        self.r_lst = []
        return discounted, loss

In [None]:
log_dir = './runs'
if os.path.exists(log_dir):
    try:
        shutil.rmtree(log_dir)
        print(f'文件夹 {log_dir} 已成功删除。')
    except OSError as error:
        print(f'删除文件夹 {log_dir} 失败: {error}')
else:
    os.makedirs(log_dir)
    print(f'文件夹 {log_dir} 不存在，已创建文件夹 {log_dir}。')

In [None]:
summary_writer = SummaryWriter(log_dir=log_dir)
env = gym.make("CartPole-v1")
gamma = 0.9
num_episodes = 500
pg = PolicyGradient(
    n_state=env.observation_space.shape[0],
    n_actions=env.action_space.n,
    learning_rate=0.01,
    reward_decay=0.95
)
net = ActionNet(4, 2)
for i in range(num_episodes):
    # start_time = time.time()
    s, _ = env.reset()
    # total_reward = 0
    total_loss = 0
    step = 0
    reward_dict = {'running_reward': None}
    while True:
        a = pg.choose_action(s)
        ns, r, done, _, _ = env.step(a)
        step += 1
        pg.store_transition(s, a, r)

        if done:
            r_lst_sum = sum(pg.r_lst)

            # if 'running_reward' not in globals():
            #     running_reward = r_lst_sum
            # else:
            #     running_reward = running_reward * 0.9 + r_lst_sum * 0.1
            if reward_dict['running_reward'] is None:
                reward_dict['running_reward'] = r_lst_sum
            else:
                reward_dict['running_reward'] = reward_dict['running_reward'] * 0.9 + r_lst_sum * 0.1
            
            print("episode:", i, "  reward:", int(reward_dict['running_reward']))
            vt, loss = pg.learn()
            summary_writer.add_scalar('reward', reward_dict['running_reward'], i)
            break
        s = ns
        

In [None]:
env = gym.make("CartPole-v1", render_mode='human')
s, _ = env.reset()
step = 0
while True:
    s = torch.tensor(s)
    action = pg.choose_action(s)
    s, r, done, _, _ = env.step(action)
    step += 1
    if done:
        break
print('step:{}'.format(step))
env.close()

In [None]:
env.close()