# 1
载入包以及配置

In [10]:
import torch
import gym
import numpy as py
import torch.nn.functional as F
import torch.nn as nn
import torch.optim as optim
import numpy as np
from copy import deepcopy
import os

## 如何建立网络估计reward 


class Config:
    def __init__(self, agent_class=None, env_class=None, env_args=None):
        self.agent_class = agent_class
        self.if_off_policy = True

        self.env_class = env_class
        self.env_agrs = env_args
        if env_args is None:
            env_args = {'env_name': None, 'state_dim': None, 'action_dim': None, 'if_discrete': None}
        self.env_name = env_args['env_name']
        self.state_dim = env_args['state_dim']
        self.action_dim = env_args['action_dim']
        self.if_discrete = env_args['if_discrete']
    ### 奖励函数建立
        self.gamma = 0.99
        self.reward_scale = 1.0
    #### 训练
        self.net_dims = (64,32)
        self.batch_size = int(64)
        self.horizon_len = int(512)
        self.buffer_size = int(1e6)
        self.repeat_times = 1
        self.learning_rate = 6e-5  # 2 ** -14 ~= 6e-5
        self.soft_update_tau = 5e-3  # 2 ** -8 ~= 5e-3
        ### device
        self.cwd = None
        self.break_step = +np.inf
        self.eval_times = int(32)
        self.eval_per_step = int(10000)
    
    def init_before_training(self):
        if self.cwd is None:
            self.cwd = f'./{self.env_name}_{self.agent_class.__name[5:]}'
        os.makedirs(self.cwd,exist_ok=True)
    


# 2
建立Q 神经网络

In [11]:
## 建立QNet
class QNet(nn.Module): 
    def __init__(self, n_hidden:int, n_dim: int, state_dim: int, action_dim: int,epls_rate):
        super().__init__()
        net_list = []
        net_list.extend([nn.Linear(2,n_dim),nn.ReLU()])
        for i in range (n_hidden):
            net_list.extend([nn.Linear(n_dim,n_dim)],nn.ReLU())
        net_list.append(nn.Linear(n_dim,1))
        self.net = nn.Sequential(*net_list)
        self.action_dim = action_dim
        self.epls_rate = epls_rate

    def forward(self, state:torch.Tensor) -> torch.Tensor:
        return self.net(state)
    def get_action(self, state: torch.Tensor) -> torch.Tensor:
        if self.epls_rate < torch.rand(1):
            torch.argmax(self.net(state),dim=1,keepdim=True )
        else:
            action = torch.randint(self.action_dim, size = (state.shape[0],1))
        return action
    

建立环境以及智能体

In [12]:
def get_gym_env_args(env, if_print: bool) -> dict:
    if {'unwrapped', 'observation_space', 'action_space', 'spec'}.issubset(dir(env)):  # isinstance(env, gym.Env):
        env_name = env.unwrapped.spec.id
        state_shape = env.observation_space.shape
        state_dim = state_shape[0] if len(state_shape) == 1 else state_shape  # sometimes state_dim is a list
        if_discrete = isinstance(env.action_space, gym.spaces.Discrete)
        action_dim = env.action_space.n if if_discrete else env.action_space.shape[0]
    else:
        env_name = env.env_name
        state_dim = env.state_dim
        action_dim = env.action_dim
        if_discrete = env.if_discrete
    env_args = {'env_name': env_name, 'state_dim': state_dim, 'action_dim': action_dim, 'if_discrete': if_discrete}
    print(f"env_args = {repr(env_args)}") if if_print else None
    return env_args

def build_env(env_class = None, env_agrs = None):
    if env_class.__module__ == 'gym_envs.registration':
        assert '0.18.0' <= gym.__version__ <= '0.25.2'
        env = env_class(id = env_agrs['env_name'])
    for attr_str in ('env_name', 'state_dim', 'action_dim', 'if_discrete'):
        setattr(env,attr_str,env_agrs[attr_str])
    return env

class AgentBase:
    def __init__(self, net_dims, state_dim: int, action_dim: int, args: Config = Config()):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = args.gamma
        self.batch_size = args.batch_size
        self.repeat_times = args.repeat_times
        self.reward_scale = args.reward_scale
        self.learning_rate = args.learning_rate
        self.soft_update_tau = args.soft_update_tau
        self.last_state = None
        self.device  = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        act_class = getattr(self, "act_class", None)
        cri_class = getattr(self, "cri_class",None)
        self.act = self.act_target = act_class(net_dims,state_dim,action_dim).to(self.device)
        self.cri_class = self.cri_target = cri_class(net_dims,state_dim,action_dim).to(self.device) \
            if cri_class else self.act
        
        self.act_optimizer = optim.Adam(self.act.parameters(),self.learning_rate)
        self.cri_optimizer = optim.Adam(self.cri_class.parameters(),self.learning_rate) \
            if cri_class else self.act_optimizer
        self.loss = nn.SmoothL1Loss()
    
    @staticmethod
    def optimizer_update(optimizer, object: torch.Tensor):
        optimizer.zero_grad()
        object.backward()
        optimizer.step()
    
    @staticmethod
    def soft_update(target_net: nn.Module, current_net: nn.Module , tau: float):
        for tar, cur in zip(target_net.parameters(), current_net.parameters()):
            tar.data.copy_(cur.data * tau + tar.data * (1.0 - tau))

        


    

        




design the agent
$$

\delta \hat{R_{\theta}} = \frac{1}{N}\sum_{n=1}^{N}\sum_{t=1}^{T_{n}} (R(\epsilon^{n}) -b) \delta \log p_{\theta}(a_{t}^{n}|s_{t}^{n}) \\

$$

In [14]:
class AgentDQN(AgentBase):
    def __init__(self, net_dims, state_dim: int, action_dim: int, args: Config = Config()):
        self.act_class = getattr(self, "act_class", QNet)
        self.cir_class = getattr(self,"cri_class", None)
        AgentBase().__init__(self,net_dims, state_dim, action_dim,  args)
        self.act_target = self.cir_target = deepcopy(self.act)
        self.act.explore_rate = getattr(args, "explore_rate", 0.25)
        ### 与环境交互
    def explore_env(self,env,horizon_len: int, if_random: bool = False) :
        ### 初始化
        states = torch.zeros((horizon_len,self.state_dim),dtype=torch.float32).to(self.device)
        actions = torch.zeros((horizon_len,1), dtype=torch.int32).to(self.device)
        rewards = torch.ones(horizon_len, dtype=torch.float32).to(self.device)
        dones = torch.zeros(horizon_len, dtype=torch.bool).to(self.device)

        ary_state = self.last_state

        get_action = self.act.get_action
        for i in range(horizon_len):
            state = torch.as_tensor(ary_state, dtype=torch.float32, device=self.device)
            ###greedy- eplison
            if if_random:
                action = torch.randint(self.action_dim, size = (1,))[0]
            else:
                action = get_action(state.unsqueeze(0))[0,0]
            ary_action = action.detach().cpu().numpy()
            ary_state, reward, done, _ = env.step(ary_action)
            if done:
                ary_state = env.reset()
            states[i] = state
            actions[i] = action
            rewards[i] = reward
            dones[i] = done
        self.last_state = ary_state
        rewards = (rewards * self.reward_scale).unsqueeze(1)
        undones = (1.0 - dones.type(torch.float32)).unsqueeze(1)
        return states, actions, rewards, undones

    def get_obj_critic(self, buffer, batch_size: int):
        with torch.no_grad():
            state, action, reward, undone, next_state = buffer.sample(batch_size)
            next_q = self.cri_target(next_state).max(dim=1, keepdim=True)[0]
            
            q_label = reward + undone * self.gamma * next_q
        q_value = self.cri(state).gather(1, action.long())
        obj_critic = self.criterion(q_value, q_label)
        return obj_critic, q_value.mean()
    
    def update_net(self, buffer):
        obj_critics = 0.0
        q_values = 0.0

        update_times = int(buffer.cursize * self.repeat_times / self.batch_size)
        assert update_times >= 1
        for i in range (update_times):
            obj_critic, q_values = self.get_obj_critic(buffer, self.batch_size)
            self.optimizer_update(self.cri_optimizer, obj_critic)
            self.soft_update(self.cri_target, self.cri, self.soft_update_tau)
            
            obj_critics += obj_critic.item()
            q_values += q_values.item()
        return obj_critic / update_times, q_values / update_times
class ReplayBuffer:
    def __init__(self, max_size: int, state_dim: int, action_dim:int):
        self.p = 0 # pointer
        self.if_full = False
        self.cur_size = 0
        self.max_size = max_size
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.states = torch.empty((max_size, state_dim), dtype=torch.float32, device=self.device)
        self.actions = torch.empty((max_size, action_dim), dtype=torch.float32, device=self.device)
        self.rewards = torch.empty((max_size, 1), dtype=torch.float32, device=self.device)
        self.undones = torch.empty((max_size, 1), dtype=torch.float32, device=self.device)
    
    def update(self, items):
        states, actions, rewards, undones = items
        p = self.p + rewards.shape[0]
        if p > self.max_size:
            self.if_full = True
            p0 = self.p
            p1 = self.max_size
            p2 = self.max_size - self.p
            p = p - self.max_size

            self.states[p0:p1], self.states[0:p] = states[:p2], states[-p:]
            

    


IndentationError: expected an indented block (3219204704.py, line 78)