In [None]:
import numpy as np
import gymnasium as gym
import torch
import torch.nn.functional as F

In this notebook, we will *naively* implement a neural network to approximate the Q function. Since value function approximation can handle large state spaces (even continuous), we will use a continuous state space. We will use the *CartPole* environment from Gymnasium (subsequent version of OpenAI Gym).

The core idea of Q learning with Q function approximation is to learn a Q function $\hat{q}(s,a,w)$ (*with optimal policy*) that approximates the true Q function. To achieve this, we use a neural network to approximate the Q function, and the function approximation problem turn into a optimization problem. We will use the mean squared error (MSE) as the loss function. The objective function is given by

\begin{equation}
J=\mathbb{E}\left[\left(R+\gamma \max _{a \in \mathcal{A}\left(S^{\prime}\right)} \hat{q}\left(S^{\prime}, a, w\right)-\hat{q}(S, A, w)\right)^2\right]
\end{equation}

Using gradient descent, we can update the weights of the neural network of Q function by

\begin{equation}
\nabla_w J=\mathbb{E}\left[\left(R+\gamma \max_{a \in \mathcal{A}\left(S^{\prime}\right)} \hat{q}\left(S^{\prime}, a, w\right)-\hat{q}(S, A, w)\right) \nabla_w \hat{q}(S, A, w)\right]
\end{equation}

\begin{equation}
w_{t+1} \gets w_t - \alpha_t \left(r_{t+1}+\gamma \max_{a^{\prime} \in \mathcal{A}(s_{t+1})} \hat{q}\left(s_{t+1}, a^{\prime}, w_t\right)-\hat{q}(s_t, a_t, w_t)\right) \nabla_{w_t} \hat{q}(s_t, a_t, w_t)
\end{equation}

Thanks to the *autogradient* feature of **PyTorch**, we can easily implement the above progress (i.e., solving the optimization problem).

Finally, we update the policy using the learnt Q function. Note that the exploitation part in such $\varepsilon$-greedy policy can be act as the target policy.

\begin{equation}
\begin{aligned}
& \pi_{t+1}\left(a \mid s_t\right)=1-\frac{\varepsilon}{|\mathcal{A}(s)|}(|\mathcal{A}(s)|-1) \text { if } a=\arg \max _{a \in \mathcal{A}\left(s_t\right)} \hat{q}\left(s_t, a, w_{t+1}\right) \\
& \pi_{t+1}\left(a \mid s_t\right)=\frac{\varepsilon}{|\mathcal{A}(s)|} \text { otherwise }
\end{aligned}
\end{equation}


In [None]:
class QNN_Agent():
    """ Since the discrete actions have been redefined as {0,1,2,3} by using the wapper file, we can simply represent the action by a number. """
    
    def __init__(self,
                 Q_func:torch.nn.Module,
                 action_dim:int,
                 optimizer:torch.optim.Optimizer,
                 epsilon:float = 0.1,
                 gamma:float = 0.9,
                 device:torch.device = torch.device("cpu")
                 ) -> None:
        self.device = device
        self.action_dim = action_dim
        
        self.Q_func = Q_func
        self.criteria = torch.nn.MSELoss()
        self.optimizer = optimizer
        
        self.epsilon = epsilon
        self.gamma = gamma

    def get_target_action(self,obs:np.ndarray) -> int:
        """The input of nn must be a tensor. 
            Here, the input data of the agent is numpy arrays, so we need to convert it to tensor first """
        obs = torch.tensor(obs,dtype=torch.float32).to(self.device)
        Q_list = self.Q_func(obs)
        
        """The output of nn is a tensor, so we need to convert it to numpy array and then to int type"""
        action = torch.argmax(Q_list).item()
        
        return action

    def get_behavior_action(self,obs:np.ndarray) -> int:
        if np.random.uniform(0,1) < self.epsilon:
            action = np.random.choice(self.action_dim)
        else:
            action = self.get_target_action(obs)
            
        return action
    
    def Q_star_approximation(self,
                             obs:np.ndarray,
                             action:int,
                             reward:float,
                             next_obs:np.ndarray,
                             terminated:bool) -> None:
        
        obs = torch.tensor(obs,dtype=torch.float32).to(self.device)
        next_obs = torch.tensor(next_obs,dtype=torch.float32).to(self.device)
        current_Q = self.Q_func(obs)[action]
        TD_target = reward + (1-float(terminated)) * self.gamma * self.Q_func(next_obs).max()
        loss = self.criteria(current_Q,TD_target)
        # Now, we directly use gradient descent to optimize the loss
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()           

Here, we use a simple MLP with 2 hidden layers to approximate the Q function.

In [None]:
class Q_Network(torch.nn.Module):
    """You can define your own network structure here."""
    def __init__(self,obs_dim:int,action_dim) -> None:
        super(Q_Network,self).__init__()
        self.fc1 = torch.nn.Linear(obs_dim,64)
        self.fc2 = torch.nn.Linear(64,64)
        self.fc3 = torch.nn.Linear(64,action_dim)
            
    def forward(self,x:torch.Tensor) -> torch.Tensor:
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        x = F.relu(x)
        return self.fc3(x)

In [None]:
class TrainManager():
    def __init__(self,
                 env:gym.Env,
                 episode_num:int = 1000,
                 lr:float = 0.001,
                 gamma:float = 0.9,
                 epsilon:float = 0.1) -> None:
        
        """The device is automatically selected according to the availability of GPU"""
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        """if you want to use CPU, you can use the code below"""
        # self.device = torch.device("cpu")
        
        self.env = env
        self.episode_num = episode_num
        obs_dim = gym.spaces.utils.flatdim(env.observation_space) 
        action_dim = env.action_space.n 
        Q_func = Q_Network(obs_dim,action_dim).to(self.device)
        optimizer = torch.optim.Adam(Q_func.parameters(),lr=lr)
        self.agent = QNN_Agent(Q_func = Q_func,
                               action_dim = action_dim,
                               optimizer = optimizer,
                               epsilon = epsilon,
                               gamma = gamma,
                               device = self.device)
    
    def train_episode(self,is_render:bool=False) -> float:
        total_reward = 0 
        obs,_ = self.env.reset() 
        while True:
            action = self.agent.get_behavior_action(obs) 
            next_obs, reward, terminated, _, _ = self.env.step(action) 
            total_reward += reward 
            self.agent.Q_star_approximation(obs,action,reward,next_obs,terminated)
            obs = next_obs
            if is_render:
                self.env.render()
                                
            if terminated:
                break
            
        return total_reward       

    def test_episode(self) -> float:
        total_reward = 0 
        obs,_ = self.env.reset() 
        while True:
            action = self.agent.get_target_action(obs) 
            next_obs, reward, terminated, _, _= self.env.step(action) 
            obs = next_obs
            total_reward += reward
            self.env.render()
            if terminated: break
            
        return total_reward
            
    def train(self) -> None:     
        for e in range(self.episode_num):
            episode_reward = self.train_episode()
            print('Episode %s: Total Reward = %.2f'%(e,episode_reward)) 
            
            if e%100 == 0: 
                test_reward = self.test_episode()
                print('Test Total Reward = %.2f'%(test_reward))

In [None]:
if __name__ == "__main__":
    env = gym.make('CartPole-v1')
    Manger = TrainManager(env = env,
                        episode_num = 1000,
                        lr = 0.001,
                        gamma = 0.9,
                        epsilon = 0.1
                        )
    Manger.train()