In [1]:
pip install gym[toy_text]

Note: you may need to restart the kernel to use updated packages.


# Dependencies
These are my visualizing tools, originally I had them in a .py file and imported them but google colabs don't support that.
So before doing anything this has to be run.

In [6]:
import matplotlib.pyplot as plt
import ipywidgets as widgets
from ipywidgets import interactive
from IPython.display import display
import numpy as np
import gym

def visq(q):
    fig, axes = plt.subplots(2, 2, figsize=(10, 10))
    # Find the minimum and maximum Q-values across all actions and states
    min_q = min(q[state].get(action, 0)
                for action in range(4) for state in range(64))
    max_q = max(q[state].get(action, 0)
                for action in range(4) for state in range(64))
    for action in range(4):
        # Convert 'q' for the specific action to a 2D numpy array
        q_array = np.zeros((8, 8))
        for state in range(64):
            row, col = divmod(state, 8)
            q_value = q[state].get(action, 0)
            q_array[row][col] = q_value
        # Create a heatmap for this action with a common color scale
        ax = axes[action // 2, action % 2]
        im = ax.imshow(q_array, cmap='hot',
                       interpolation='nearest', vmin=min_q, vmax=max_q)
        ax.set_title(f'Action {action}')
        plt.colorbar(im, ax=ax)

    plt.tight_layout()
    plt.show()


def intq(qq):
    def plot_qq(i):
        visq(qq[i])

    i_slider = widgets.IntSlider(
        value=0, min=0, max=len(qq) - 1, description='iterations')
    interactive_plot = interactive(plot_qq, i=i_slider)
    display(interactive_plot)


def visualize_value_and_policy(v, policy):
    # Convert 'v' to a 2D numpy array
    v_array = np.zeros((8, 8))
    for state in v:
        row, col = divmod(state, 8)
        v_array[row][col] = v[state]

    # Create a grid of coordinates for arrows
    x, y = np.meshgrid(np.arange(8), np.arange(8))

    # Create an array of action vectors based on the policy
    dx = np.zeros_like(v_array, dtype=int)
    dy = np.zeros_like(v_array, dtype=int)

    for state in range(64):
        row, col = divmod(state, 8)
        action = policy[state]
        if action == 0:  # Left
            dx[row, col] = -1
        elif action == 1:  # Down
            dy[row, col] = 1
        elif action == 2:  # Right
            dx[row, col] = 1
        elif action == 3:  # Up
            dy[row, col] = -1

    # Create a combined plot with value function and policy arrows
    fig, ax = plt.subplots()
    cax = ax.matshow(v_array, cmap='hot')
    plt.colorbar(cax)
    ax.quiver(x, y, dx, dy, angles='xy',
              scale_units='xy', scale=3, color='blue')
    ax.set_xlim(-1, 8)
    ax.set_ylim(-1, 8)
    plt.gca().invert_yaxis()
    # Show the plot
    plt.show()


def intvp(vv, pp):
    def vvap(i):
        visualize_value_and_policy(vv[i], pp[i])
    i_slider = widgets.IntSlider(value=0, min=0, max=min(
        len(vv), len(pp)) - 1, description='iterations')
    interactive_plot = interactive(vvap, i=i_slider)
    display(interactive_plot)

# Loading DiscreteEnv

In [4]:
from pt1_plot import *

print(gym.__version__)

# We will load a DiscreteEnv and retrieve the probability and reward information
env = gym.make("FrozenLake8x8-v1", desc=None, map_name=None, is_slippery=True)

0.26.2


# Policy computation
env.P[state][action] = [(probability, nextstate, reward, done), ...]

We search for a stationary policy, as evaluation happens on $T=1000$
where it is improbable that the MDP isn't done.

In [5]:
alpha= 1
states = range(len(env.P)) 

# initializing v,q,policy
v = {state:0 for state in states} 
policy = {state:0 for state in states} # assume that 0 is an action
q = {state:{action:0 for action in range(len(env.P[state])) } for state in states}

#these are for plotting
vv = [] 
pp = []

for i in range(300):
    for state in states:
        info = env.P[state][policy[state]]
        # the MC version samples the b, in a simulation we can importance sample b[0]
        # a similar thing can be done with the q function
        v[state] = sum(b[0]*(b[2]+alpha*v[b[1]]) if not(b[3]) else b[0]*b[2] for b in info)

    for state in range(len(env.P)):
        for action in range(len(env.P[state])): 
            val = sum(b[0]*(b[2]+v[b[1]]) for b in env.P[state][action])
            q[state][action]= val

    for state in states:
        ma , mval = 0, q[state][0]
        for action,val in q[state].items():
            if val>mval:
                ma = action 
                mval = val
        policy[state] = ma
    vv.append(v.copy())
    pp.append(policy.copy())

sol = list(pp[-1].items())
print(sol)
print(vv[-1][0])
intvp(vv,pp)

[(0, 0), (1, 0), (2, 2), (3, 3), (4, 2), (5, 2), (6, 2), (7, 2), (8, 0), (9, 1), (10, 0), (11, 0), (12, 2), (13, 2), (14, 2), (15, 2), (16, 0), (17, 0), (18, 2), (19, 1), (20, 3), (21, 2), (22, 2), (23, 2), (24, 1), (25, 1), (26, 1), (27, 0), (28, 0), (29, 2), (30, 3), (31, 2), (32, 3), (33, 3), (34, 3), (35, 3), (36, 1), (37, 0), (38, 0), (39, 2), (40, 3), (41, 0), (42, 3), (43, 3), (44, 3), (45, 3), (46, 1), (47, 2), (48, 3), (49, 0), (50, 0), (51, 0), (52, 0), (53, 0), (54, 2), (55, 2), (56, 2), (57, 0), (58, 0), (59, 1), (60, 2), (61, 0), (62, 0), (63, 0)]
0.8958558845705116


interactive(children=(IntSlider(value=0, description='iterations', max=299), Output()), _dom_classes=('widget-…

# Policy evaluation: here's where YOU also code
Insert here your code to evaluate
the total expected rewards over the planning horizon T
if one follows your policy. Do the same for a random policy (i.e. the
sample policy given above). As a sanity check, your policy should get an
expected reward of at least the one obtained by the random policy!

In [7]:
T = 1000  # Given horizon
random_policy = {t: {i: env.action_space.sample()
              for i in range(env.observation_space.n)}
          for t in range(T)}

sol_policy ={t: {i:pp[-1][i] 
              for i in range(env.observation_space.n)}
                for t in range(T)}

def MC_eval(pol,nsim = 10**3):
    # does MC estimation of the expected reward
    tmp = 0
    for _ in range(nsim):
        stat = env.reset()
        total_reward = 0
        state = stat[0]
        for i, t in enumerate(range(T)): #wut i = t ?
            # env.render()
            action = pol[t][state]
            state, reward, done, _, _ = env.step(action)
            total_reward +=reward
            # if the MDP is stuck, we end the simulation here
            if done:
                break
        env.close()
        tmp +=total_reward/nsim
    return tmp

print(f"MC_eval of random policy: {MC_eval(random_policy)}")
print(f"MC_eval of sol policy: {MC_eval(sol_policy)}")

MC_eval of random policy: 0.0
MC_eval of sol policy: 0.8990000000000007
