# 43008: Reinforcement Learning

## Week 4: Solving MDPs, Part-2:
* Applying Policy Iteration Algorithm on OpenAI Gym Env

### What you will learn?
* Use Policy Iteration Algorithm on OpenAi Gym Env
* Visualize the Agent following the policy

In [1]:
!pip install gym
!pip install numpy==1.23.5



In [2]:
from __future__ import print_function
from __future__ import division
from gym import wrappers
import time
import scipy
import numpy as np
import gym

# import other required libraries/packages
from gym.wrappers.record_video import RecordVideo
import glob
import io
import base64
from IPython.display import HTML
from IPython import display as ipythondisplay

Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
See the migration guide at https://gymnasium.farama.org/introduction/migration_guide/ for additional information.
  import scipy


In [3]:
!pip install gym



In [4]:
# Rendering dependencies <-- Required for Google Colab

!apt-get install xvfb
!pip install gym pyvirtualdisplay > /dev/null 2>&1

!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
xvfb is already the newest version (2:21.1.4-2ubuntu1.7~22.04.15).
0 upgraded, 0 newly installed, 0 to remove and 35 not upgraded.


In [5]:
from IPython.display import HTML
from pyvirtualdisplay import Display
from IPython import display as ipythondisplay

display = Display(visible=0, size=(1400, 900))
display.start()

# Utility functions to enable video recording of gym environment and displaying it
# To enable video, just do "env = wrap_env(env)"

# Function to Show video after rendering
def show_video():
  mp4list = glob.glob('video/*.mp4')
  if len(mp4list) > 0:
    mp4 = mp4list[0]
    video = io.open(mp4, 'r+b').read()
    encoded = base64.b64encode(video)
    ipythondisplay.display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
  else:
    print("Could not find video")

# Warpper for Environment to record the video
def wrap_env(env):
  env = RecordVideo(env, './video',  episode_trigger = lambda episode_number: True)
  return env

In [None]:
env = wrap_env(gym.make('FrozenLake-v1', new_step_api=True))

# Display info about the environment
print("Action space: ", env.action_space)
print("Observation space: ", env.observation_space)

# Policy Iteration
Policy-iteration is about repeated improving the value-function estimate, it will re-define the policy at each step and compute the value according to this new policy until the policy converges. Policy iteration is also guaranteed to converge to the optimal policy and it often takes less iterations to converge than the value-iteration algorithm.

In [6]:
def compute_policy_v(env, policy, gamma):
    """ Iteratively evaluate the value-function under policy.
    Alternatively, we could formulate a set of linear equations in iterms of v[s]
    and solve them to find the value function.
    """
    v = np.zeros(env.observation_space.n)
    eps = 1e-10
    while True:
        prev_v = np.copy(v)
        for s in range(env.observation_space.n):
            policy_a = policy[s]
            v[s] = sum([p * (r + gamma * prev_v[s_]) for p, s_, r, _ in env.P[s][policy_a]])
        if (np.sum((np.fabs(prev_v - v))) <= eps):
            # value converged
            break
    return v

def extract_policy(v, gamma):
    """ Extract the policy given a value-function """
    policy = np.zeros(env.observation_space.n)
    for s in range(env.observation_space.n):
        q_sa = np.zeros(env.action_space.n)
        for a in range(env.action_space.n):
            for next_sr in env.P[s][a]:
                # next_sr is a tuple of (probability, next state, reward, done)
                p, s_, r, _ = next_sr
                q_sa[a] += (p * (r + gamma * v[s_]))
        policy[s] = np.argmax(q_sa)
    return policy

def evaluate_policy(env, policy, gamma,  n = 100):
    """ Evaluates a policy by running it n times.
    returns:
    average total reward
    """
    scores = [
            run_episode(env, policy, gamma=gamma, render = False)
            for _ in range(n)]
    return np.mean(scores)

def run_episode(env, policy, gamma, render = False):
    """ Evaluates policy by using it to run an episode and finding its
    total reward.
    args:
    env: gym environment.
    policy: the policy to be used.
    gamma: discount factor.
    render: boolean to turn rendering on/off.
    returns:
    total reward: real value of the total reward recieved by agent under policy.
    """
    obs = env.reset()
    total_reward = 0
    step_idx = 0
    while True:
        if render:
            env.render()
        obs, reward, done , _ = env.step(int(policy[obs]))
        total_reward += (gamma ** step_idx * reward)
        step_idx += 1
        if done:
            break
    return total_reward

def policy_iteration(env, gamma, max_iterations=100000):
    """
    This function implements policy iteration algorithm.
    """
    start = time.time()
    policy = np.random.choice(env.action_space.n, size=(env.observation_space.n))  # initialize a random policy
    ########################### Your Code Here ###########################
    for i in range(max_iterations):
        old_policy_v = compute_policy_v(env, policy, gamma)
        new_policy = extract_policy(old_policy_v, gamma)
        if (np.all(policy == new_policy)):
            end = time.time()
            print("Policy iteration took {0} seconds.".format(end - start))
            print ('Policy-Iteration converged at step %d.' %(i+1))
            break
        policy = new_policy
    return policy


    ########################### End of your code #########################
    end = time.time()
    print("Policy iteration took {0} seconds.".format(end - start))
    return policy


if __name__ == '__main__':
    np.random.seed(1111)
    env_name  = 'FrozenLake-v1'
    #for gamma in [.9, .95, .99, .9999, 1]:
    for gamma in [1]:
        print("-"*10, "Gamma={0}".format(gamma) ,"-"*10)
        env = wrap_env(gym.make(env_name, new_step_api=True))
        optimal_policy = policy_iteration(env, gamma=gamma)
        scores = evaluate_policy(env, optimal_policy, gamma=gamma)
        print('Average scores = ', np.mean(scores))

---------- Gamma=1 ----------


  deprecation(


Policy iteration took 0.4939260482788086 seconds.
Policy-Iteration converged at step 7.


  logger.deprecation(
See here for more information: https://www.gymlibrary.ml/content/api/[0m
  deprecation(
  from pkg_resources import resource_stream, resource_exists
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
See here for more information: https://www.gymlibrary.ml/content/api/[0m
  deprecation(
  logger.deprecation(
See here 

Average scores =  0.7


In [7]:
# Show the recorded video of the Environment
show_video()

In [8]:
optimal_policy.reshape(4,4)

array([[0., 3., 3., 3.],
       [0., 0., 0., 0.],
       [3., 1., 0., 0.],
       [0., 2., 1., 0.]])

In [9]:
total_reward = run_episode(env, optimal_policy, gamma=gamma, render=True)
print('Total reward under the optimal policy = ', total_reward)

  logger.deprecation(
If you want to render in human mode, initialize the environment in this way: gym.make('EnvName', render_mode='human') and don't call the render method.
See here for more information: https://www.gymlibrary.ml/content/api/[0m
  deprecation(
See here for more information: https://www.gymlibrary.ml/content/api/[0m
  deprecation(


Total reward under the optimal policy =  1.0
