### Machine Learning for Systems & Control 5SC28 2023-2024

# Exercise set for Lecture 7: Advanced Reinforcement Learning 

This exercise set illustrates two more advanced methods of $Q$-learning; basis function methods and deep $Q$-learning.

## Table of contents

1. <a href="#Exercise-1:-Basis-Function-$Q$-learning">Exercise 1: Basis Function $Q$-learning</a>
2. <a href="#Exercise-2:-Deep-$Q$-learning">Exercise 2: Deep $Q$-learning</a>


## Exercise 1: Basis Function $Q$-learning

As seen in the lecture, tabular $Q$-learning can be interpreted as minimizing the temporal difference (TD) given by:

$$
 \frac{1}{2} \text{TD}^2 = \frac{1}{2} \left (\left (r_k + \gamma \max_u Q_\theta(x_{k+1},u) \right )  - Q_\theta(x_k,u_k) \right)^2
$$

This mimization problem can be used to formulate a steepest descent update rule for continous $Q$ functions.  

**a)** Derive the steepest descent update rule which minimizes $\frac{1}{2} \text{TD}^2$ for the parameters $\theta$ where the $Q$-function has the form of the following basis function expansion $$ Q_\theta(x_k,u_k) = \phi(x_k) \cdot \theta_{u_k} $$ where $\theta_{u_k}$ is a vector for a given input $u_k$. [By considering the discrete and finite set of possible actions, we associate a specific parameter vector $\theta_{u_k}$ with each possible action $u_k$. This formulation allows us to compute the derivative of $Q_\theta(x_k,u_k)$ with respect to the parameter of the corresponding action, capturing the specific characteristics and policies associated with each action].

**Answer a):** fill by student

**b)** Implement the RBF network function found below and validate the output for the 1D `my_gym` environment. (`scale` is the $\sigma_c$ parameter in the equation)

**c)** what happens when $\sigma_c$ gets close to zero? Does this look familiar?

**Answer c):** fill by student


In [None]:
import gymnasium as gym
import numpy as np
from matplotlib import pyplot as plt
class my_gym(gym.Env):
    def __init__(self, render_mode=None):
        super(my_gym,self).__init__()
        self.observation_space = gym.spaces.Box(np.array([0]).astype(np.float32),np.array([1]).astype(np.float32))    

def make_radial_basis_network(env,nvec,scale):
    # env: is the given enviroment
    # nvec: is the given number of grid points in each dimention.
    # scale: is the sigma_c in the equation
    if isinstance(nvec,int):
        nvec = [nvec]*env.observation_space.shape[0]
    
    # This creates a grid of points c_i the lower bound to the upper bound with nvec number of samples in each dimention
    low, high = env.observation_space.low, env.observation_space.high # get upper and lower bound
    assert np.all(np.isfinite(low)) and np.all(np.isfinite(high)), f'infinite bounds on obersvation space are not permitted low={low}, high={high}'
    Xvec = [np.linspace(l,h,num=ni) for l,h,ni in zip(low,high,nvec)] # calculate the linspace in all directions
    c_points = np.array(np.meshgrid(*Xvec)) # meshgrid all the linspaces together (Nx, X1, X2, X3, ...) 
    c_points = np.moveaxis(c_points, 0, -1) #transform to (X1, X2, X3, ..., Nobs) 
    c_points = c_points.reshape((-1,c_points.shape[-1])) #flatten into the size (Nc, Nobs)
    dx = np.array([X[1]-X[0] for X in Xvec]) # spacing (related to the B matrix)
    
    def basis_fun(obs):
        #this function should return the vector containing all phi_i of all c_points
        obs = np.array(obs) #(Nobs)
        
        dis = (c_points-obs[None,:])/dx[None,:] #dim = (Nbasis, Nobs)
        # b) Fill this
    return basis_fun #returns a basis function


# b) Fill this
# c) Fill this


**d)** Implement the $\epsilon$-greedy $Q$-learning function provided below using the previously derived update rule. Test this setup on the Mountain Car problem with `nvec=10` and `scale=1.0` of the radial basis function. Visualize the episode lengths returned by the function (tune the other hyperparameters if needed). Additionally, use visualize_theta to plot the maximum $Q$ value in the state-space of the mountain car.

*tip: use `basis_fun(obs).shape[0]` and `env.action_space.n`*  

*tip: use the `roll_mean` function to smooth out the resulting episode lengths*

*Note: A time-out does not qualify as a terminal state, but it does require the environment to be reset. This functionality is already implemented. You can find an explanation [here](https://www.reddit.com/r/reinforcementlearning/comments/bb5mzl/d_confused_about_envis_done/).

In [None]:
import gymnasium.wrappers
import gymnasium.envs.classic_control
def argmax(a):
    #argmax([0,1,2,3]) -> 3
    #argmax([0,1,2,2]) -> 2 or 3 with equal probability of both (np.argmax would only return 2)
    #argmax([0,0,0,0]) -> 0, 1, 2 or 3 with equal probability of each (np.argmax would only return 0)
    a = np.array(a)
    return np.random.choice(np.arange(a.shape[0],dtype=int)[a==np.max(a)])

import numpy as np
def Qlearn(env, basis_fun, epsilon=0.1, alpha=0.1, gamma=0.99, nsteps=100_000, verbose=True):
    #theta = (Nbasis, Na)
    #basis_fun(state) -> (Nbasis)
    #Q(s,.) = basis_fun(state)@theta
    env_time = env
    while not isinstance(env_time,gym.wrappers.time_limit.TimeLimit):
        env_time = env_time.env
    ep_length = []
    ep_length_id = []
    
    
    obs, info = # d) Fill this
    #init theta:
    Nbasis = # d) Fill this
    theta = # d) Fill this
    
    Q = lambda s: basis_fun(s)@theta #short-hand such that you can call Q(obs)
    
    for z in range(nsteps):
        # d) Fill this
        obs_next, reward, terminated, truncated, info = # d) Fill this
        if terminated:
            # d) Fill this
        else:
            # d) Fill this
        #update theta
        # d) Fill this
        if terminated or truncated:
            if verbose: #print result only when verbose is set to True
                print(env_time._elapsed_steps, end=' ') 
            ep_length.append(env_time._elapsed_steps)#time-keeping
            ep_length_id.append(z)
            
            obs, info = # d) Fill this
        else:
            obs = # d) Fill this
    print()
    return theta, np.array(ep_length_id), np.array(ep_length)

def roll_mean(ar,start=400,N=25):
    s = 1-1/N
    k = start
    out = np.zeros(ar.shape)
    for i,a in enumerate(ar):
        k = s*k + (1-s)*a
        out[i] = k
    return out

def visualize_theta(env, theta, basis_fun):
    # for a given enviroment, theta matrix (Nbasis, Naction) and basis_fun(obs) -> (Nbasis,) 
    # it visualizes the max Q value in state-space.
    low, high = env.observation_space.low, env.observation_space.high
    nvec = [50,60]
    Xvec = [np.linspace(l,h,num=ni) for l,h,ni in zip(low,high,nvec)] # calculate the linspace in all directions
    c_points = np.array(np.meshgrid(*Xvec)) # meshgrid all the linspaces together (Nx, X1, X2, X3, ...) 
    c_points = np.moveaxis(c_points, 0, -1) #transform to (X1, X2, X3, ..., Nobs) 
    c_points = c_points.reshape((-1,c_points.shape[-1])) #flatten into the size (Nc, Nobs)
    maxtheta = np.array([np.max(basis_fun(ci)@theta) for ci in c_points]).reshape((nvec[1],nvec[0]))
    
    plt.contour(Xvec[0],Xvec[1],maxtheta)
    plt.xlabel('position')
    plt.ylabel('velocity')
    plt.colorbar()
    plt.show()

max_episode_steps = 400
env = gym.envs.classic_control.MountainCarEnv() 
env = gym.wrappers.time_limit.TimeLimit(env,max_episode_steps=max_episode_steps) 

# d) Fill this
    basis_fun = # d) Fill this
    theta, ep_length_id, ep_length = # d) Fill this


**e)** Adjust the `scale` and `nvec` parameters to optimize the agent's convergence towards the desired policy. Compare the tuned `scale` and `nvec` values to those used in the discretized state setup. Additionally, visualize the `theta` values using `visualize_theta` to gain insight into them and use this information to further refine the `scale` and `nvec`.

**Answer e):** fill by student


In [None]:
nvec = 10
max_episode_steps = 400
env = gym.envs.classic_control.MountainCarEnv() 
env = gym.wrappers.time_limit.TimeLimit(env,max_episode_steps=max_episode_steps) 

# e) Fill this


## Exercise 2: Deep $Q$-learning

Many standard implementations of deep Q-learning in deep reinforcement learning (DRL) often encounter challenges leading to failures. These failures can typically be attributed to factors such as:

- Instability
- Sparse reward functions
- Overfitting
- And more

To address these issues, various adjustments and techniques are necessary. However, the specific methods employed can vary depending on the problem domain and the preferences of users. These methods can be viewed as a toolbox of strategies tailored to different scenarios and challenges.

For this exercise, you will use the [Acrobat environment](https://gymnasium.farama.org/environments/classic_control/acrobot/)

![image-4.png](attachment:image-4.png)

A computationally efficient approach to implement the $Q$ function is to give it the same number of outputs as possible actions. However this is only possible for discrete action spaces (i.e. $u_t \in \{0,...,n_{\text{actions}}-1\}$).

![image.png](attachment:image.png)


By structuring the Q function in this manner, we can compute all the values of the Q-functions for a given state $x_t$ in a single pass.

**a)** Construct an ANN to serve as our Q-function. Design it with one hidden layer, employ the tanh activation function, and include 40 nodes per layer.

*tip: use env.observation_space.shape[0] and env.action_space.n*  

In [None]:
from torch import nn
import gymnasium as gym
import gymnasium.wrappers
import gymnasium.envs.classic_control
import torch

class Qfunction(nn.Module):
    def __init__(self, env):
        super(Qfunction,self).__init__()
        # a) Fill this
    def forward(self, obs):
        # a) Fill this
max_episode_steps = 500
env = gym.envs.classic_control.AcrobotEnv()
env = gym.wrappers.time_limit.TimeLimit(env,max_episode_steps=max_episode_steps) 
Q = Qfunction(env)
#test validity:
obs, info = env.reset()
obs_tensor = torch.tensor(obs,dtype=torch.float32)[None,:] #convert to an torch tensor with size (1, Nobs=6)
print('obs_tensor = ', obs_tensor)
print('Q(x) = ',Q(obs_tensor)) #output #(1,Naction=3)


**b)** To visualize the $Q$-function make a `show(Q,env)` function which uses the policy of the $Q$-function 

$$ u_t = \pi(x_t) = \text{arg}\max_{u'} Q_\theta(x_t,u')$$

on a given enviroment `env`. Iterate until the end of the episode (i.e. until `done==True`) and render the results.

*tip: use `np.argmax` to get the element with the highest value, `time.sleep` to introduce a real-time delay and `break` to break from the while loop if done, and of course the `.render` method*


In [None]:
#visualize
import torch, time
import gymnasium as gym
import gymnasium.wrappers, gymnasium.envs.classic_control
import numpy as np
from matplotlib import pyplot as plt
from torch import nn

def show(Q,env):
    with torch.no_grad():
        #you can use Qfun(obs) as a shorthand for the q function.
        Qfun = lambda x: Q(torch.tensor(x[None,:],dtype=torch.float32))[0].numpy() #convert x to torch.tensor -> put in the Q function -> back to numpy
        try:
            # b) Fill this
        finally: #this will always run even when an error occurs
            env.close()

max_episode_steps = 250
env = gym.envs.classic_control.AcrobotEnv(render_mode='human')
env = gym.wrappers.time_limit.TimeLimit(env,max_episode_steps=max_episode_steps) 
Q = Qfunction(env)
# show(Q,env)


Due to the unstable nature of Deep Q-learning, we will be utilizing the rollout/replay buffer structure. This method utilizes a two-step process: 

 1. Applying the policy ($\epsilon$ greedy) to the environment `N_rollout` times and save the results of every step taken.
 [tip: one should save the following results: ($x_t$, $u_t$, $r_{t+1}$, $x_{t+t}$, $\text{terminal}_{t+1}$)]

 2. Use batch optimization on the results obtained on the TD Loss 
 $$ Loss = \frac{1}{N_{rollout}} \sum_t \left (r_{t+1} + \gamma \max_{u'} Q_\theta(x_{t+1},u') (1-\text{terminal}_{t+1}) - Q_\theta(x_{t},u_{t}) \right )^2$$ 
 where the $\max_{u'} Q_\theta(x_{t+1},u')$ is excluded from the gradient calculation. 

 
**c)** Finish the rollout function with the $\epsilon$-greedy algorithm. 

*tip: use `Qfun` present which can take in an `obs` and will give all the Q values in a simple NumPy array*

In [None]:
def rollout(Q, env, epsilon=0.1, N_rollout=10_000): 
    #save the following (use .append)
    Start_state = [] #hold an array of (x_t)
    Actions = [] #hold an array of (u_t)
    Rewards = [] #hold an array of (r_{t+1})
    End_state = [] #hold an array of (x_{t+1})
    Terminal = [] #hold an array of (terminal_{t+1})
    # Qfun( a numpy array of the obs) -> a numpy array of Q values
    Qfun = lambda x: Q(torch.tensor(x[None,:],dtype=torch.float32))[0].numpy() 
    with torch.no_grad():
        
        # c) Fill this
    #error checking:
    assert len(Start_state)==len(Actions)==len(Rewards)==len(End_state)==len(Terminal), f'error in lengths: {len(Start_state)}=={len(Actions)}=={len(Rewards)}=={len(End_state)}=={len(Dones)}'
    return np.array(Start_state), np.array(Actions), np.array(Rewards), np.array(End_state), np.array(Terminal).astype(int)

max_episode_steps = 250
env = gym.envs.classic_control.AcrobotEnv()
env = gym.wrappers.time_limit.TimeLimit(env,max_episode_steps=max_episode_steps)
Q = Qfunction(env) 
Start_state, Actions, Rewards, End_state, Terminal = rollout(Q,env,N_rollout=300)
print(Start_state, Actions, Rewards, End_state, Terminal)

During training, we want to evaluate the policy resulting from the current $Q$-function to accurately assess its performance and for early stopping. This is done by calculating the accumulated rewards on an episode [note: averages will be taken later].

**d)** Finish writing `eval_Q` which returns the accumulated sum of the rewards for a single episode. ($\gamma=1$, $\epsilon=0$)

In [None]:
from matplotlib import pyplot as plt

def eval_Q(Q,env):
    with torch.no_grad():
        Qfun = lambda x: Q(torch.tensor(x[None,:],dtype=torch.float32))[0].numpy()
        # d) Fill this
max_episode_steps = 250
env = gym.envs.classic_control.AcrobotEnv()
env = gym.wrappers.time_limit.TimeLimit(env,max_episode_steps=max_episode_steps)
Q = Qfunction(env) 
# d) Fill this


Now we have written all the utility functions to implement a rollout deep Q-learning algorithm. 

**e)** Use the previously written utility functions to create the DQN Algorithm. This algorithm requires the following steps:

 1. Realize an $\epsilon$ factor which decays linearly from 1 to 0 with the number of iterations.
 2. Use the rollout function to produce the `Start_state, Actions, Rewards, End_state, Terminal` (do not forget to pass the hyperparameters)
 3. Use batch optimization on the Loss given by:
 $$Loss = \sum_t \left (r_{t+1} + \gamma \max_{u'} Q_{\theta\text{ no grad}} (x_{t+1},u') (1-\text{done}_{t+1}) - Q_\theta(x_{t},u_{t}) \right )^2$$
 where $\max_{u'} Q_{\theta\text{ no grad}} (x_{t+1},u')$ is excluded from the gradient calculation. (use `torch.no_grad()` or `.detach()`)
 
 4. Each epoch of the batch optimization evaluates the policy from the Q-function using the previously written `Q_eval(Q,env)` function and saves the result if it is the highest observed (averaged over `N_evals` episodes). 
 5. At the end of each iteration load the best performing Q function previously saved [this is already present].

In [None]:


from copy import deepcopy
def DQN_rollout(Q, optimizer, env, gamma=0.98, use_target_net=False, N_iterations=21, N_rollout=20000, \
                N_epochs=10, batch_size=32, N_evals=10, target_net_update_feq=100):
    best = -float('inf')
    torch.save(Q.state_dict(),'Q-checkpoint')
    try:
        for iteration in range(N_iterations):
            epsilon = # e) Fill this
            print(f'rollout iteration {iteration} with epsilon={epsilon:.2%}...')
            
            #2. rollout
            # e) Fill this
            #Data conversion, no changes required
            convert = lambda x: [torch.tensor(xi,dtype=torch.float32) for xi in x]
            Start_state, Rewards, End_state, Terminal = convert([Start_state, Rewards, End_state, Terminal])
            Actions = Actions.astype(int)

            print('starting training on rollout information...')
            t = 0
            for epoch in range(N_epochs): 
                for i in range(batch_size,len(Start_state)+1,batch_size): 
                    if t%target_net_update_feq==0:
                        # g) Fill this
                        pass
                    t += 1
                    
                    Start_state_batch, Actions_batch, Rewards_batch, End_state_batch, Terminal_batch = [d[i-batch_size:i] for d in \
                                                                                                        [Start_state, Actions, Rewards, End_state, Terminal]] #e= # e) Fill this
                    
                    with torch.no_grad(): #3.
                        if use_target_net:
                            pass
                            # g) Fill this
                        else:
                            maxQ = # e) Fill this
                    
                    # action_index = np.stack((np.arange(batch_size),Actions_batch),axis=0)
                    # ids = np.arange(batch_size)
                    
                    Qnow = Q(Start_state_batch)
                    # print(f'{action_index.shape=}')
                    # print(f'{Qnow.shape=}')
                    Qnow = Qnow[np.arange(batch_size), Actions_batch] #Q(x_t,u_t) is given
                    # print(Rewards_batch.shape, maxQ.shape, Terminal_batch.shape, Qnow.shape)
                    # e) Fill this
                score = # e) Fill this
                print(f'iteration={iteration} epoch={epoch} Average Reward per episode:',score)
                if score>best:
                    best = score
                    print('################################# \n new best',best,'saving Q... \n#################################')
                    torch.save(Q.state_dict(),'Q-checkpoint')
            
            print('loading best result')
            Q.load_state_dict(torch.load('Q-checkpoint'))
    finally: #this will always run even when using the a KeyBoard Interrupt. 
        print('loading best result')
        Q.load_state_dict(torch.load('Q-checkpoint'))

**f)** Apply your implementation to the `AcrobotEnv` environment. Experiment with the hyperparameters to observe their effects on the resulting performance. Evaluate the resulting policies visually using the `show` function and with the `eval` function as demonstrated in the next two cells.

In [None]:
import torch
from torch import nn
import gymnasium as gym

max_episode_steps = 250
env = gym.envs.classic_control.AcrobotEnv()
env = gym.wrappers.time_limit.TimeLimit(env,max_episode_steps=max_episode_steps)


gamma = # f) Fill this
batch_size = # f) Fill this
N_iterations = # f) Fill this
N_rollout = # f) Fill this
N_epochs = # f) Fill this
N_evals = # f) Fill this
lr = 0.0005 #given

assert isinstance(env.action_space,gym.spaces.Discrete), 'action space requires to be discrete'
Q = Qfunction(env)
optimizer = torch.optim.Adam(Q.parameters(),lr=lr) #low learning rate
DQN_rollout(Q, optimizer, env, use_target_net=True, gamma=gamma, N_iterations=N_iterations, \
            N_rollout=N_rollout, N_epochs=N_epochs, N_evals=N_evals)

In [None]:
env_vis = gym.envs.classic_control.AcrobotEnv(render_mode='human')
env_vis = gym.wrappers.time_limit.TimeLimit(env_vis,max_episode_steps=max_episode_steps)

show(Q,env_vis)

In [None]:
from matplotlib import pyplot as plt
Rewards = [eval_Q(Q,env) for i in range(100)]
plt.plot(Rewards,'.')
plt.title(f'mean={np.mean(Rewards)}')
plt.xlabel('instance')
plt.ylabel('Reward per episode')
plt.show()

**g)** In the lecture, you learned about another method to enhance the stability of DQN by incorporating a target network that is updated only occasionally. Implement this in the function above, where the target network is update after each epoch.

*tip: use `deepcopy` to make a copy of the current network*

**h)** Summarize the number of hyperparameters exlcuding the ANN structure. 

**Answer h):** fill by student


**i)** **Bonus**If you're up for an additional challenge, you can rerun the code with any discrete-input environment. See [classic control](https://gymnasium.farama.org/environments/classic_control/) which includes:

 * [Cart Pole](https://gymnasium.farama.org/environments/classic_control/cart_pole/)
 * [Mountain Car](https://gymnasium.farama.org/environments/classic_control/mountain_car/)
 * and the design assignment (update the reward function)

You may need to adjust the hyperparameters to suit the chosen environment. Alternatively, you can create your own environment, which offers a more tailored approach. See: https://towardsdatascience.com/creating-a-custom-openai-gym-environment-for-stock-trading-be532be3910e

### Quick last note

It's worth mentioning that *state normalization* is a popular method in reinforcement learning, although it wasn't covered in this exercise set.