In [4]:
import gym
import torch
import torch.nn as nn
from torch.optim.lr_scheduler import StepLR
from matplotlib import pyplot as plt

class DQN(nn.Module):
    def __init__(self, in_dim, out_dim):
        super(DQN, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(in_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, 512),
            nn.ReLU(),
            nn.Linear(512, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, out_dim)
        )

    def forward(self, x):
        return self.model(x)
class DQNAgent(object):
    
    def __init__(self):
        self.env = gym.make("LunarLander-v2")
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
   
    def solve(self):
        print(self.device)
        in_dim = self.env.observation_space.shape[0]
        out_dim = self.env.action_space.n
        gamma = 0.9
        epsilon = 0
        
        losses = []
        rewards = []
        target_update = 200  # 定期更新目标模型的频率

        main_model = DQN(in_dim, out_dim).to(self.device)
        target_model = DQN(in_dim, out_dim).to(self.device)
        target_model.load_state_dict(main_model.state_dict())  # 将目标模型初始化为与主模型相同的参数
        target_model.eval()  # 关闭目标模型的梯度计算

        loss = nn.MSELoss()
        optimizer = torch.optim.Adam(main_model.parameters(), lr=0.0001)
        scheduler = StepLR(optimizer, step_size=50, gamma=0.7)    
        state =  self.env.reset()
        state = torch.FloatTensor(state).to(self.device)

        epoch = 200
        epochs = [] 
        for i in range(epoch):
            self.env.reset()
            done = 0
            R=0
            J=0
            C=0
            l = 0
            while done != 1:
                C+=1
                random = torch.rand(1).item()
                action = torch.argmax(main_model(state)) if random > epsilon else torch.randint(high=self.env.action_space.n, size=(1,))[0].item()
                action = int(action)
                next_state, reward, done, _ = self.env.step(action)
                R+=reward
                next_state = torch.FloatTensor(next_state).to(self.device)
                next_acton = torch.argmax(main_model(next_state))
                
                # 使用目标模型来计算下一个状态的值
                q_value_next = target_model(next_state)[next_acton].detach()
                e_q_value = reward + gamma * q_value_next

                q_value = main_model(state)[action]

                l = loss(q_value,e_q_value)
                J+=int(l)
                optimizer.zero_grad()
                l.backward()
                optimizer.step()

                state = next_state

            losses.append(J/C)
            rewards.append(R)
            epochs.append(i)
      
            if (i+1) % 2000 == 0:
                torch.save(main_model, f'model{i}.pth')
            if R>200:
                torch.save(main_model, f'good_model{i}.pth')
            self.env.close()
            scheduler.step()
            
            # 每隔一定步数更新目标模型的参数
            if (i + 1) % target_update == 0:
                target_model.load_state_dict(main_model.state_dict())

            if (i + 1) % 2000 ==0:
                plt.figure(figsize=(12,6))
                plt.plot(epochs,losses,label="loss")
                plt.xlabel("epoch")
                plt.legend()
                plt.savefig(f"model{i}loss.png", dpi = 400)
                plt.close() 
                plt.figure(figsize=(12,6))
                plt.plot(epochs,rewards,label="reward")
                plt.xlabel("epoch")
                plt.legend()
                plt.savefig(f"model{i}reward.png", dpi = 400)
                plt.close()
        return l


Complete
