----

# Artificial Intelligence - COMPSCI4004 2025-2026

## Lab 9: Policy search (including function approximations)

----

### Q9.1 Introduction & Housekeeping 

This lab is a case study where we attempt to apply ("tabular") policy search (and simple function approximations) to a specific problem namely an Open AI Gym compatible problem called CliffWalk.

Running the notebook:
 - **We recommned you to run this lab on Google colab: https://colab.research.google.com/**
 - You will need to use an OLDER version of gym (0.21.0). If you are using Anaconda, you may run the following command in the console to change the gym version:
     

*conda install -c conda-forge gym==0.21.0*
 

----

- Let's import everything we need for this lab (assuming you have installed Open AI gym and Tensorflow correctly). Note: you might get some FutureWarnings when importing tensorflow which is expected/typical:

In [12]:
# You can install v1.15 tensorflow but we recommend running the notebook on colab as its current default is 1.x
# Be careful with python versions, keras and tensorflow!
#!pip install q tensorflow==2.2.4
#!pip install q keras==1.15

# The following 3 statements are un-commented for runningon Google colab
!pip install setuptools==65.5.0
!pip install wheel==0.38.4
!pip install gym==0.21.0



In [13]:
import matplotlib.pyplot as plt
import time
import itertools
import matplotlib
from mpl_toolkits.mplot3d import Axes3D
from collections import deque

import numpy as np
import sys
import os
import random
from collections import namedtuple
import collections
import copy

# Import the open AI gym
import gym
import keras
from keras.models import Sequential
#from keras.layers import Dense
from tensorflow.keras.optimizers import Adam
#from keras import backend as K

from keras import layers
from keras.models import Model
from keras import backend as K
from keras import utils as np_utils
from keras import optimizers
from keras import initializers

import tensorflow as tf

from gym.envs.toy_text import discrete

print(sys.version)


3.9.13 (main, Oct 13 2022, 21:23:06) [MSC v.1916 64 bit (AMD64)]


-----

### Define a few helper functions 

In [14]:
# define a couple of helper functions

EpisodeStats = namedtuple("Stats",["episode_lengths", "episode_rewards"])

def plot_episode_stats(stats, smoothing_window=10, noshow=False):
    # Plot the episode length over time
    fig1 = plt.figure(figsize=(10,5))
    plt.plot(stats.episode_lengths)
    plt.xlabel("Episode")
    plt.ylabel("Episode Length")
    plt.title("Episode Length over Time")
    if noshow:
        plt.close(fig1)
    else:
        plt.show(fig1)

    # Plot the episode reward over time
    fig2 = plt.figure(figsize=(10,5))
    rewards_smoothed = pd.Series(stats.episode_rewards).rolling(smoothing_window, min_periods=smoothing_window).mean()
    plt.plot(rewards_smoothed)
    plt.xlabel("Episode")
    plt.ylabel("Episode Reward (Smoothed)")
    plt.title("Episode Reward over Time (Smoothed over window size {})".format(smoothing_window))
    if noshow:
        plt.close(fig2)
    else:
        plt.show(fig2)

    # Plot time steps and episode number
    fig3 = plt.figure(figsize=(10,5))
    plt.plot(np.cumsum(stats.episode_lengths), np.arange(len(stats.episode_lengths)))
    plt.xlabel("Time Steps")
    plt.ylabel("Episode")
    plt.title("Episode per time step")
    if noshow:
        plt.close(fig3)
    else:
        plt.show(fig3)

    return fig1, fig2, fig3

----

## Q9.2 The Environment

We will start out with a relatively simple environment to demonstrate the policy search procedure. You can (optionally) choose to experiment with a more advanced/interesting example in a later task).

The task is to transverse a grid world from start (3,0) to the target state ('T') where there is the risk of falling off a cliff ('C') with reward -100. All other states have a reward of -1.

Let's define and load the environment and visualize it (see below):
    

In [15]:
UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3

class CliffWalkingEnv(discrete.DiscreteEnv):

    metadata = {'render.modes': ['human', 'ansi']}

    def _limit_coordinates(self, coord):
        coord[0] = min(coord[0], self.shape[0] - 1)
        coord[0] = max(coord[0], 0)
        coord[1] = min(coord[1], self.shape[1] - 1)
        coord[1] = max(coord[1], 0)
        return coord

    def _calculate_transition_prob(self, current, delta):
        new_position = np.array(current) + np.array(delta)
        new_position = self._limit_coordinates(new_position).astype(int)
        new_state = np.ravel_multi_index(tuple(new_position), self.shape)
        reward = -100.0 if self._cliff[tuple(new_position)] else -1.0
        is_done = self._cliff[tuple(new_position)] or (tuple(new_position) == (3,11))
        return [(1.0, new_state, reward, is_done)]

    def __init__(self):
        self.shape = (4, 12)

        nS = np.prod(self.shape)
        nA = 4

        # Cliff Location
        self._cliff = np.zeros(self.shape, dtype=bool)
        self._cliff[3, 1:-1] = True

        # Calculate transition probabilities
        P = {}
        for s in range(nS):
            position = np.unravel_index(s, self.shape)
            P[s] = { a : [] for a in range(nA) }
            P[s][UP] = self._calculate_transition_prob(position, [-1, 0])
            P[s][RIGHT] = self._calculate_transition_prob(position, [0, 1])
            P[s][DOWN] = self._calculate_transition_prob(position, [1, 0])
            P[s][LEFT] = self._calculate_transition_prob(position, [0, -1])

        # We always start in state (3, 0)
        isd = np.zeros(nS)
        isd[np.ravel_multi_index((3,0), self.shape)] = 1.0

        super(CliffWalkingEnv, self).__init__(nS, nA, P, isd)

    def render(self, mode='human', close=False):
        self._render(mode, close)

    def _render(self, mode='human', close=False):
        if close:
            return

        outfile = StringIO() if mode == 'ansi' else sys.stdout

        for s in range(self.nS):
            position = np.unravel_index(s, self.shape)
            # print(self.s)
            if self.s == s:
                output = " x "
            elif position == (3,11):
                output = " T "
                #print(self.s)
            elif self._cliff[position]:
                output = " C "
            else:
                output = " o "

            if position[1] == 0:
                output = output.lstrip() 
            if position[1] == self.shape[1] - 1:
                output = output.rstrip() 
                output += "\n"

            outfile.write(output)
        outfile.write("\n")


In [16]:
env = CliffWalkingEnv()

In [17]:
env.render()

o  o  o  o  o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o  o  o  o  o
x  C  C  C  C  C  C  C  C  C  C  T



Note: We start in the state (3,0) indicated by an x and we need to find a "good" route to the T state.

- <font color='dark-magenta'>TASK:</font> Inspect the source and make you understand the states, actions, rewards and transition probabilities. Hint: is this a stochastic environment?

We can e.g. take a step and see what happends:

In [18]:
next_state, reward, done, _ = env.step(0) # 0 is UP
env.render()
print(next_state)
print(reward)
print(done)

o  o  o  o  o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o  o  o  o  o
x  o  o  o  o  o  o  o  o  o  o  o
o  C  C  C  C  C  C  C  C  C  C  T

24
-1.0
False


- <font color='dark-magenta'>TASK:</font> <font color="red">[descriptive]</font> What is the optimal policy (given that you have access to the full state-space and know where the target is)?

<div style="border:0.5px solid red"></div>
<font color="red">SOLUTION</font>

First, we see that it is NOT a stochastic environment (however you still need to learn not to go over the cliff).We also observe that the starting state is (3,0), i.e. state (0,0) is the top right corner.

Furthermore, we can get a sense of the action and state representations by runnign the follwing std (Open AI) commands:

In [19]:
print(env.action_space)
print(env.observation_space)

Discrete(4)
Discrete(48)


I.e. we have 4 discrete actions and 48 unique states (discrete).

Example of taking an taking in the space:

In [20]:
next_state, reward, done, _ = env.step(0)
    
print(next_state)
print(reward)
print(done)

12
-1.0
False


- We can easily see that the optimal policy (along the edge) is: up -> 11xleft -> 1xdown (i.e. 13 steps). Note had the env been stochastic this would have been different!

<div style="border:0.5px solid red"></div>

## Q9.2 Policy Search (review)

- <font color='dark-magenta'>REVIEW</font>: Review policy search in the AIMA book (sec 21.5) or from the overview below:

Recall that policy search requires the following:

- A function which quantifies how good it is to take action in a state, let's call it $Q_{\theta}(s, a)$ as in the AIMA book (others call it $h_{\theta}(s, a)$)
    - We will use a basic neural network defined in [tensorflow](https://www.tensorflow.org/) for defining this function approximation. The main reason for using tensorflow is that it provides us with the different types of neural network layers and can automatically provide the gradients (in the case of the policy !).
    
    
- A way of mapping the $Q_{_\theta}$ function to a policy to determine which actions to take in a specific action.
    - We typically use the [softmax](https://en.wikipedia.org/wiki/Softmax_function) function for obtaining a differentiable policy, i.e:
    $$\pi (a|s,\theta )= \frac{{{e^{{Q_\theta }(s,a)}}}}{{\sum\limits_{a'}^{} {{e^{{Q_\theta }(s,a')}}} }}$$


- The goodness of the policy 
    - We use the basic idea of the expected utility and define the value/goodness of a policy, $\rho(\theta)$, as the *true* expected sum of future rewards under that (stochastic) policy.
    

- A method to update the policy to maximize the goodness, i.e., $\rho(\theta)$, of the policy:
    - It can seem challenging to change the policy defined by $\theta$ to maximize $\rho(\theta)$ in a way that
ensures improvement. Goodness depends on both the action choice and the distribution of states in which those actions are chosen. These are both dependent on $\theta$. For a specific state the effect of $\theta$ on the actions and reward, can be computed in a relatively easy way, however, the effect of $\theta$ on the state distribution depends on the unknown environment. So the main question is: How can we estimate the performance gradient with respect to the policy parameter when the gradient depends on the unknown effect of policy changes on the state distribution?
    - We rely on the policy gradient theorem which leads us to an estimate of the gradients after observing a single transition
    $$\theta  \leftarrow \theta  + \alpha {G_t}\frac{{\nabla \pi \left( {{a_j}|{s_j},\theta } \right)}}{{\pi \left( {{a_j}|{s_j},\theta } \right)}}$$
    where $G_t$ is the future sum of discounted returns from that state onwards (which can be computed after an episode has been completed).
    Note: we can also estimate the gradient based on multiple transitions and do a batch update (as in the lecture slides and book).
 The gradient wrt to the individual parameters (i.e. a vector) is the direction in parameter space that most increases the probability of repeating the action $A_t$ on future
visits to state $S_t$. We can see that the update increases the parameter vector in this direction proportional to $G_t$, and inversely proportional to the action probability, $\pi(a_j|s_j,\theta)$. The former makes sense because it causes the parameter to move most in the directions that favor actions that yield the highest return. The latter makes sense because otherwise actions that are selected frequently are at an advantage (the updates will be more often in their direction) and might win out even if they do not yield the highest return (AIMA Chapter 21 and Sutton & Barto: Reinforcement Learning: An Introduction, Chapter 13).


----

## Q9.3 Policy Approximation (and its gradients)

In this part we will implement/define the policy appoximation/estimator in [tensorflow.](https://www.tensorflow.org/learn). The code is carefully annotated (to solve the tasks below).

In [21]:
class NeuralNetworkPolicyEstimator():
    """ 
    A very basic MLP neural network approximator and estimator for poliy search    
    
    The only tricky thing is the traning/loss function and the specific neural network arch
    """
    
    def __init__(self, alpha, n_actions, d_states, nn_config, verbose=False):        
        self.alpha = alpha    
        self.nn_config = nn_config                   
        self.n_actions = n_actions        
        self.d_states = d_states
        self.verbose=verbose # Print debug information        
        self.n_layers = len(nn_config)  # number of hidden layers        
        self.model = []
        self.__build_network(d_states, n_actions)
        self.__build_train_fn()
             

    def __build_network(self, input_dim, output_dim):
        """Create a base network usig the Keras functional API"""
        self.X = layers.Input(shape=(input_dim,))
        net = self.X
        for h_dim in nn_config:
            net = layers.Dense(h_dim)(net)
            net = layers.Activation("relu")(net)
        net = layers.Dense(output_dim, kernel_initializer=initializers.Zeros())(net)
        net = layers.Activation("softmax")(net)
        self.model = Model(inputs=self.X, outputs=net)

    def __build_train_fn(self):
        """ Create a custom train function
        It replaces `model.fit(X, y)` because we use the output of model and use it for training.        
        Called using self.train_fn([state, action_one_hot, target])`
        which would train the model. 
        
        Hint: you can think of K. as np.
        
        """
        # predefine a few variables
        action_onehot_placeholder   = K.placeholder(shape=(None, self.n_actions),name="action_onehot") # define a variable
        target                      = K.placeholder(shape=(None,), name="target") # define a variable       
        
        # this part defines the loss and is very important!
        action_prob        = self.model.output # the outlout of the neural network        
        action_selected_prob        = K.sum(action_prob * action_onehot_placeholder, axis=1) # probability of the selcted action        
        log_action_prob             = K.log(action_selected_prob) # take the log
        loss = -log_action_prob * target # the loss we are trying to minimise
        loss = K.mean(loss)
        
        # defining the speific optimizer to use
        adam = optimizers.Adam(lr=self.alpha)# clipnorm=1000.0) # let's use a kereas optimiser called Adam
        updates = adam.get_updates(params=self.model.trainable_weights,loss=loss) # what gradient updates to we parse to Adam
            
        # create a handle to the optimiser function    
        self.train_fn = K.function(inputs=[self.model.input,action_onehot_placeholder,target],
                                   outputs=[],
                                   updates=updates) # return a function which, when called, takes a gradient step
      
    
    def predict(self, s, a=None):              
        if a==None:            
            return self._predict_nn(s)
        else:                        
            return self._predict_nn(s)[a]
        
    def _predict_nn(self,state_hat):                          
        """
        Implements a basic MLP with tanh activations except for the final layer (linear)               
        """                
        x = self.model.predict(state_hat)                                                    
        return x
  
    def update(self, states, actions, target):  
        """
            states: a interger number repsenting the discrete state
            actions: a interger number repsenting the discrete action
            target: a real number representing the discount furture reward, reward to go
            
        """
        action_onehot = np_utils.to_categorical(actions, num_classes=self.n_actions) # encodes the state as one-hot
        self.train_fn([states, action_onehot, target]) # call the custom optimiser which takes a gradient step
        return 
        
    def new_episode(self):        
        self.t_episode  = 0.    

<br>

***Look through the annotated code above and address the following question.***

- <font color='dark-magenta'>NOTE</font>:  Make sure you appreciate what one-hot-encoding means (look online if you are in doubt).
- <font color='dark-magenta'>TASK</font>:  <font color="red">[descriptive]</font> What does the 'target' in the code represent (from reading the code annotation/description)

<div style="border:0.5px solid red"></div>
<font color="red">SOLUTION</font>

- target represents $G_t$ (the observed reward to go). This can be computed for any state in a observed sequance of states.

<div style="border:0.5px solid red"></div>

- <font color='dark-magenta'>TASK</font>: <font color="red">[descriptive]</font> What is the function approximator used (i.e. what is architecture of the neural network). Hint: it is a rather simple neural network but try to draw the structure anyway (you can extend the network at a later stage).

<div style="border:0.5px solid red"></div>
<font color="red">SOLUTION</font>

Assuming that the states are each described by a vector s=[s_0, s_1,.., s_K] the network can be visualized as (without bias terms):
<img src="nn_full.png" width='100%'>
Where we have indicated the linear activation as a straight line. 

However, we encode the states as one-hot (e.g. [1,0,0,0]), if we consider $\pi(a|s_0)$ we have:
<img src="nn_s0.png" width='100%'>
This means that only the weigths related to the first input contributes becaues all other elements of the s vector is zero.

I.e. the network is basically a look-up table with the values determined by the weights - but TF provides an easy way to find the gradients and update rules for the elements through the softmax function!

<br>
<br>

<div style="border:0.5px solid red"></div>

- <font color='dark-magenta'>TASK</font>: <font color="red">[descriptive]</font> What is the loss function (write it up as an equation). Assuming the specificed optimizer (Adam) can compute the gradient of the loss with respect to $\theta$ (i.e. the weights); does this match the update equations in REINFOCE as specified in Q9.2 (hint: this is not an easy question an make sure you talk to the tutors or look over the answers when uploaded).
    

<div style="border:0.5px solid red"></div>
<font color="red">SOLUTION</font>

Yes, it matches the REINFORCE updates.

The loss is (from the code):


`loss = -K.log(action_selected_prob) * target`


I.e.

$$Loss = - log( \pi(a_j | s_j, \theta) ) * G_t$$ 

We rely on the defined optimizer (Adam) to take the gradient step, but we can validate that this step will match the REINFORCE updates defined above. If we differentiale the defined loss with respect to $\theta$ we get:

$$\frac{{\partial {\text{self}}{\text{.loss}}}}{{\partial \theta }} = -{G_t}\frac{{\partial \ln \pi (a|s,\theta )}}{{\partial \theta }} = -{G_t}\frac{{\tfrac{\partial }{{\partial \theta }}\pi (a|s,\theta )}}{{\pi (a|s,\theta )}}$$

where we exploit an identity $\nabla_{\theta} log z = (\nabla_{\theta} z) / z$. This matches the update in the REINFORCE updates noting that the optimizer (Adam) is a method for minimizing a function (i.e. the sign is also correct in this context).

<br>

<div style="border:0.5px solid red"></div>

----

## 9.3 REINFORCE - implementation

In this part we setup the main loop of the REINFORCE method, i.e., the iterations over the episodes and steps etc.

In [22]:
def reinforce(env, estimator_policy, num_episodes, discount_factor=1.0):
    """
    REINFORCE (Monte Carlo Policy Gradient) Algorithm. Optimizes the policy
    function approximator using policy gradient.
    
    Args:
        env: OpenAI environment.
        estimator_policy: Policy Function to be optimized         
        num_episodes: Number of episodes to run for
        discount_factor: reward discount factor
    
    Returns:
        An EpisodeStats object with two numpy arrays for episode_lengths and episode_rewards.
    
    Adapted from: https://github.com/dennybritz/reinforcement-learning/blob/master/PolicyGradient/CliffWalk%20REINFORCE%20with%20Baseline%20Solution.ipynb
    """

    # Keeps track of useful statistics
    stats = EpisodeStats(
        episode_lengths=np.zeros(num_episodes),
        episode_rewards=np.zeros(num_episodes))    
    
    Transition = collections.namedtuple("Transition", ["state", "action", "reward", "next_state", "done"])
    
    for i_episode in range(num_episodes):
        # Reset the environment and pick the fisrst action
        state = env.reset()
        
        episode = []
        
        # One step in the environment
        for t in itertools.count():
            
            # Take a step
            state_oh = np.zeros((1,env.nS))
            state_oh[0,state] = 1.0                                   
            action_probs = estimator_policy.predict(state_oh)
            action_probs = action_probs.squeeze()
            
            action = np.random.choice(np.arange(len(action_probs)), p=action_probs)
            
            ##
            next_state, reward, done, _ = env.step(action)
            
            # Keep track of the transition
            episode.append(Transition(
              state=state, action=action, reward=reward, next_state=next_state, done=done))
            
            # Update statistics
            stats.episode_rewards[i_episode] += reward
            stats.episode_lengths[i_episode] = t
            
            # Print out which step we're on, useful for debugging.
            print("\rStep {} @ Episode {}/{} ({})".format(
                    t, i_episode + 1, num_episodes, stats.episode_rewards[i_episode - 1]), end="")            

            if done:
                break
                
            state = next_state
    
        # Go through the episode, step-by-step and make policy updates (note we sometime use j for the individual steps)
        estimator_policy.new_episode()
        new_theta=[]
        for t, transition in enumerate(episode):                 
            state_oh = np.zeros((1,env.nS))
            state_oh[0,transition.state] = 1.0
            
            # The return, G_t, after this timestep; this is the target for the PolicyEstimator
            G_t = sum(discount_factor**i * t.reward for i, t in enumerate(episode[t:]))
           
            # Update our policy estimator
            estimator_policy.update(state_oh, transition.action,np.array(G_t))            
         
    return stats

---
- <font color='dark-magenta'>TASK</font>: <font color="red">[pseudo-code]</font> Browse through the code and write down pseudo-code without all the implementation details which highlight the main loops in the programme (i.e. over episodes and steps). Specifically, identify how many transitions are actually parsed to the gradient-based optimizer each time?

<div style="border:1.5px solid red"></div>
<font color="red">SOLUTION</font>

Pseudo-code: We'll leave this to the reader.

The main thing to notice is that the update is carried out based on one transition (in a given episode) only. In the book and slides we're typically computing the gradient over multiple transitions/steps (i.e. an average gradient) however updating based on only one step is perfectely valid in an online/stochastic gradient approach. Although fine-tuning the step size is sometimes more difficult.

<div style="border:0.5px solid red"></div>