In [2]:
import random
import numpy as np
import matplotlib.pyplot as plt 

<h1> Problem Definition</h1>

In this exercise, we will find the optimal poliy for a GridWorld's instance.

<h2> GridWorld's instance rules </h2>

These exercises are based on the Gridworld game, which comprises the following rules:<br>
- There is a squared grid with $4$ tiles in each orientation. <br>
- The player (agent) starts in an initial tile and is free to move throughout the grid in 4 different directions: North (N), South (S), East (E), and West (W). <br>
- At the boundary tiles, an attempt to move outside the grid results in returning to the current tile. <br>
- A few tiles in the grid, called "walls", are not accessible and have the same effect as the grid's boundary. <br>
- A few tiles in the grid, called "crowd", will cause severe time penalties when being traversed. <br>
- There is exactly one terminal tile. <br>
- Our goal is to reach a final tile as quick as possible.

We will use the following grid:

In [3]:
# print the grid with the optimal values and policies obtained by Value Iteration
print("---------------------------------")
print("|\033[96m%s\t\033[0m|%s\t|%s\t|%s\t|"      %("(1,1)", "(1,2)", "(1,3)", "(1,4)"))
print("|\033[96m%s\t\033[0m|%s\t|%s\t|%s\t|"      %("start", "     ", "     ", "     "))
print("---------------------------------")
print("|####\t|\033[91m%s\t\033[0m|####\t|%s\t|"  %("(2,2)", "(2,4)"))
print("|####\t|\033[91m%s\t\033[0m|####\t|%s\t|"  %("crowd", "     "))
print("---------------------------------")
print("|\033[92m%s\t\033[0m|%s\t|####\t|%s\t|"    %("(3,1)", "(3,2)", "(3,4)"))
print("|\033[92m%s\t\033[0m|%s\t|####\t|%s\t|"    %("end",   "     ", "     "))
print("---------------------------------")
print("|\033[91m%s\t\033[0m|%s\t|%s\t|%s\t|"      %("(4,1)", "(4,2)", "(4,3)", "(4,4)"))
print("|\033[91m%s\t\033[0m|%s\t|%s\t|%s\t|"      %("crowd", "     ", "     ", "     "))
print("---------------------------------")

---------------------------------
|[96m(1,1)	[0m|(1,2)	|(1,3)	|(1,4)	|
|[96mstart	[0m|     	|     	|     	|
---------------------------------
|####	|[91m(2,2)	[0m|####	|(2,4)	|
|####	|[91mcrowd	[0m|####	|     	|
---------------------------------
|[92m(3,1)	[0m|(3,2)	|####	|(3,4)	|
|[92mend	[0m|     	|####	|     	|
---------------------------------
|[91m(4,1)	[0m|(4,2)	|(4,3)	|(4,4)	|
|[91mcrowd	[0m|     	|     	|     	|
---------------------------------


<h2> MDP model </h2>

The first step is to represent the problem as an MDP with the following components: <br>
- Set of states: $\mathcal S=\{(x,y)\}_{x=1,\dots,K, y=1,\dots,K}$ (each tile is a state) <br>
- According to the provided grid, the states are:
  - Initial state: $(1,1)$ 
  - Terminal state: $(3,1)$
  - "Wall" states: $(2,1)$, $(2,3)$, $(3,3)$
  - "Crowd" states: $(4,1)$, $(2,2)$
  - All other states are "Simple"
- Set of actions $\mathcal A=\{\uparrow, \downarrow, \rightarrow, \leftarrow\}$ (the same for each state) <br>
- Each action $a\in\mathcal A$ is a function mapping one state to another, i.e., $a: \mathcal S \rightarrow \mathcal S$, and can be defined as follows:
  - $\uparrow(x,y) = \begin{cases} (x,y), & \text{if (1) state } (x,y) \text{ is terminal or (2) state } (x,y-1) \text{ is outside the grid or a ``wall'' tile} \\ (x,y-1), & \text{otherwise} \end{cases}$
  - $\downarrow(x,y) = \begin{cases} (x,y), & \text{if (1) state } (x,y) \text{ is terminal or (2) state } (x,y+1) \text{ is outside the grid or a ``wall'' tile} \\ (x,y+1), & \text{otherwise} \end{cases}$
  - $\rightarrow(x,y) = \begin{cases} (x,y), & \text{if (1) state } (x,y) \text{ is terminal or (2) state } (x+1,y) \text{ is outside the grid or a ``wall'' tile} \\ (x+1,y), & \text{otherwise} \end{cases}$
  - $\leftarrow(x,y) = \begin{cases} (x,y), & \text{if (1) state } (x,y) \text{ is terminal or (2) state } (x-1,y) \text{ is outside the grid or a ``wall'' tile} \\ (x-1,y), & \text{otherwise} \end{cases}$
- Discount Factor: $\gamma$
- Because, in this exercise, all actions lead deterministically to a single state, we consider: <br>
  - Transition probabilities are $1$ if the next state is a result of the action and $0$ otherwise, i.e., $\mathcal P_{ss'}^{a} = \begin{cases} 1, & \text{ if } a(s)=s' \\ 0, & \text{otherwise}\end{cases}$<br>
  - There are not random factors affecting the rewards so they have constant values, i.e., $\mathcal R_s^a = \mathbb{E} [ R_{t+1} |\, S_t=s, A_t=a] = R_s^a$, where $R_s^a=\begin{cases} -1 & \text{if } a(s) \text{ is a "Simple" tile} \\-5 & \text{if } a(s) \text{ is a "Crowd" tile} \\ 0 & \text{if } s \text{ is a "Terminal" tile (regardless of the action } a) \end{cases}$

The MDP can be implemented as a Python class as follows: 

In [4]:
class MDP:
    def __init__(self, grid_size=4, gamma=1.0):
        self.k = grid_size  # grid side
        self.g = gamma      # discount factor

        # set of states
        self.S = [(i+1,j+1) for i in range(self.k) for j in range(self.k)]
        self.walls = [(2,1), (2,3), (3,3)]
        for wall in self.walls:
            self.S.remove(wall)

        self.terminal = [(3,1)]
        self.crowds = [(4,1), (2,2)]
        self.simple = [s for s in self.S]
        for pair in self.terminal + self.crowds:
            self.simple.remove(pair)

        # set of actions
        self.A  = ["up", "down", "right", "left"]                                            

        # definition of transition probabilities
        self.Tp = {}
        for s in self.S:
            for ss in self.S:
                for a in self.A:
                    if self.move(s,a) == ss:
                        self.Tp[(s,ss,a)] = 1 
                    else:
                        self.Tp[(s,ss,a)] = 0 

        # definition of expected reward R
        self.R = {}
        for s in self.S:
            for a in self.A:
                if s in self.terminal:
                    self.R[(s,a)] = 0
                    continue
                if self.move(s, a) in self.simple + self.terminal:
                    self.R[(s,a)] = -1
                if self.move(s, a) in self.crowds:
                    self.R[(s,a)] = -5

   
    # support function "action" that help define the transition probabilities Tp
    def move(self, s, a):   
        if s in self.terminal:
            return s
        if a == "up":
            return s if (s[0]-1,s[1]) in self.walls or s[0]-1 == 0 else (s[0]-1,s[1])
        if a == "down":
            return s if (s[0]+1,s[1]) in self.walls or s[0]+1 == self.k + 1 else (s[0]+1,s[1])
        if a == "right":
            return s if (s[0],s[1]+1) in self.walls or s[1]+1 == self.k + 1 else (s[0],s[1]+1)
        if a == "left":
            return s if (s[0],s[1]-1) in self.walls or s[1]-1 == 0 else (s[0],s[1]-1)





<h1>Solution Methods</h1>

Now, we are going to use different RL methods to solve the problem.
As a reference, we will start with a model-based technique called Value Iteration.

<h2> Model-Based Solution: Value Iteration </h2>

Here is the code for the Value Iteration algorithm:

In [5]:
# Value Iteration Algorithm
# Based on Value Iteration Algorithm (Sutton and Barto - Section 4.4)
# New feature:
#    - additional stop criterion based on maximum number of iteration
# Input:  MDP parameters (states S, actions A)
# Output: (Estimate of the) Optimal values
def value_iteration(mdp, theta=1.0e-6, max_iter=10):
    # initialization
    v = {s: 0.0 if s in mdp.terminal else -100 for s in mdp.S}

    t = 0
    Delta = 0
    while(True):
        t += 1
        for s in mdp.S:
            v_temp = v[s]
            v[s] = max(sum(mdp.Tp[(s,ss,a)]*(mdp.R[(s,a)] + mdp.g*v[ss]) for ss in mdp.S) for a in mdp.A)
            Delta = max(Delta, abs(v_temp - v[s]))
        if t>max_iter or Delta<theta:
            break
        else:
            continue
    
    Pi = {}
    for s in mdp.S:
        max_value = -100000.0
        max_action = ""
        for a in mdp.A:
            value = 0.0
            for ss in mdp.S:
                value += mdp.Tp[(s, ss, a)] * (mdp.R[(s, a)] + mdp.g*v[ss])
            if value >= max_value:
                max_value = value
                max_action = a
        for a in mdp.A:
            Pi[(s,a)] = 1.0 if a == max_action else 0

    return v, Pi

In [19]:
mdp = MDP(gamma=1.0)
theta = 1.0e-6
max_iteration = 1000

# compute optimal value v(s) and optimal policy Pi(s) for each state s of the MDP
v, Pi_VI = value_iteration(mdp, theta, max_iteration)

# print the grid with the optimal values and policies obtained by Value Iteration
Pi = {}
for s in mdp.S:
    for a in mdp.A:
        if Pi_VI[(s,a)] == 1:
            Pi[s] = a

print("---------------------------------")
print("|\033[96m%s\t\033[0m|%s\t|%s\t|%s\t|"      %(v[(1,1)], v[(1,2)], v[(1,3)], v[(1,4)]))
print("|\033[96m%s\t\033[0m|%s\t|%s\t|%s\t|"      %(Pi[(1,1)], Pi[(1,2)], Pi[(1,3)], Pi[(1,4)]))
print("---------------------------------")
print("|####\t|\033[91m%s\t\033[0m|####\t|%s\t|"  %(v[(2,2)], v[(2,4)]))
print("|####\t|\033[91m%s\t\033[0m|####\t|%s\t|"  %(Pi[(2,2)], Pi[(2,4)]))
print("---------------------------------")
print("|\033[92m%s\t\033[0m|%s\t|####\t|%s\t|"    %(v[(3,1)], v[(3,2)], v[(3,4)]))
print("|\033[92m%s\t\033[0m|%s\t|####\t|%s\t|"    %(Pi[(3,1)], Pi[(3,2)], Pi[(3,4)]))
print("---------------------------------")
print("|\033[91m%s\t\033[0m|%s\t|%s\t|%s\t|"      %(v[(4,1)], v[(4,2)], v[(4,3)], v[(4,4)]))
print("|\033[91m%s\t\033[0m|%s\t|%s\t|%s\t|"      %(Pi[(4,1)], Pi[(4,2)], Pi[(4,3)], Pi[(4,4)]))
print("---------------------------------")


---------------------------------
|[96m-8.0	[0m|-7.0	|-8.0	|-7.0	|
|[96mright	[0m|down	|left	|down	|
---------------------------------
|####	|[91m-2.0	[0m|####	|-6.0	|
|####	|[91mdown	[0m|####	|down	|
---------------------------------
|[92m0.0	[0m|-1.0	|####	|-5.0	|
|[92mleft	[0m|left	|####	|down	|
---------------------------------
|[91m-1.0	[0m|-2.0	|-3.0	|-4.0	|
|[91mup	[0m|up	|left	|left	|
---------------------------------


<h2>Model-free Solutions</h2>

Here we are going to explore a few approaches to solve the problem without the MDP.
We start by introducing the GridWorld simulator.

<h3> GridWorld Simulator </h3>

Agent class:

In [7]:
class Agent:
    def __init__(self, env, init_loc=(1,1)):
        self.env = env
        self.loc = init_loc     # agent's current location. Initial location set to (1,1)

        # set of states
        self.S = [(i+1,j+1) for i in range(self.env.k) for j in range(self.env.k)]

        # set of actions
        self.A = ["up", "down", "right", "left"] 
        self.m = len(self.A)

Environment class:

In [8]:
class Environment:
    def __init__(self, grid_size=4):
        self.k = grid_size  # grid side

        # set of states
        self.tiles = [(i+1,j+1) for i in range(self.k) for j in range(self.k)]
        self.walls = [(2,1), (2,3), (3,3)]
        for wall in self.walls:
            self.tiles.remove(wall)

        self.terminal = [(3,1)]
        self.crowds = [(4,1), (2,2)]
        self.simple = [s for s in self.tiles]
        for pair in self.terminal + self.crowds:
            self.simple.remove(pair)

        # set of actions
        self.A  = ["up", "down", "right", "left"]                                            

        # definition of observed reward
        self.R = {}
        for s in self.tiles:
            for a in self.A:
                if s in self.terminal:
                    self.R[(s,a)] = 0
                    continue
                if self.move(s, a) in self.simple + self.terminal:
                    self.R[(s,a)] = -1
                if self.move(s, a) in self.crowds:
                    self.R[(s,a)] = -5

    # support function "action" that help define the observed reward
    def move(self, s, a):   
        if s in self.terminal:
            return s
        if a == "up":
            return s if (s[0]-1,s[1]) in self.walls or s[0]-1 == 0 else (s[0]-1,s[1])
        if a == "down":
            return s if (s[0]+1,s[1]) in self.walls or s[0]+1 == self.k + 1 else (s[0]+1,s[1])
        if a == "right":
            return s if (s[0],s[1]+1) in self.walls or s[1]+1 == self.k + 1 else (s[0],s[1]+1)
        if a == "left":
            return s if (s[0],s[1]-1) in self.walls or s[1]-1 == 0 else (s[0],s[1]-1)

<h3>Monte-Carlo Learning </h3>

Here is an implementation of the Monte-Carlo methods:

In [9]:
class MC:
    def __init__(self, _gamma, _agent, _environment):
        self.gamma = _gamma
        self.agent = _agent
        self.env = _environment

    # This function simulates episodes where the agent explores the tiles of the GridWorld's instance
    # At every time step t, we compute (s_t, a_t, r_t+1, s_t+1)
    def generate_episode(self, policy):
        episode = []
        self.agent.loc = (1,1)
        while not self.agent.loc in self.env.terminal:
            # decide the next action based on the provided policy
            next_action = np.random.choice(self.agent.A, p=[policy[(self.agent.loc, a)] for a in self.agent.A])
            # collect the reward related to the performed action and add tuple (s,a,r,s') 
            episode.append((self.agent.loc, next_action, self.env.R[(self.agent.loc, next_action)], self.env.move(self.agent.loc, next_action)))
            # move the agent to the new location based on the performed action
            self.agent.loc = self.env.move(self.agent.loc, next_action)
            
        return episode
    
    # Monte-Carlo Prediction Algorithm
    # Based on Every-Visit MC Prediction Algorithm (Sutton and Barto - Section 5.1) and adapted with David Silver's slides ideas
    # Input:    - A policy to be evaluated
    #           - Total number of episodes
    # Output:   - estimate of the state-value functions V
    #           - estimate of the action-value functions Q
    def prediction(self, policy, num_episodes):
        N_state = {s: 0 for s in self.agent.S}
        N_action = {(s,a): 0 for s in self.agent.S for a in self.agent.A}
        S_state = {s: 0.0 for s in self.agent.S}
        S_action = {(s,a): 0.0 for s in self.agent.S for a in self.agent.A}

        for k in range(num_episodes):
            episode = self.generate_episode(policy)
            G = 0.0
            for (s,a,r,ss) in reversed(episode):
                G = self.gamma*G + r
                N_state[s] += 1
                S_state[s] += G
                N_action[(s,a)] += 1
                S_action[(s,a)] += G
        
        V = {}
        for s in self.agent.S:
            if s in self.env.terminal:
                V[s] = 0.0
            else:
                if N_state[s] > 0:
                    V[s] = S_state[s]/N_state[s]
                else:
                    V[s] = -num_episodes

        Q = {}
        for s in self.agent.S:
            for a in self.agent.A:
                if s in self.env.terminal:
                    Q[(s,a)] = 0.0
                else:
                    if N_action[(s,a)] > 0:
                        Q[(s,a)] =  S_action[(s,a)]/N_action[(s,a)]
                    else:
                        Q[(s,a)] = -num_episodes

        return V, Q
    
    # Monte-Carlo Q-Value Iteration Algorithm
    # Based on One-Visit MC Control Algorithm (Sutton and Barto - Section 5.4)
    # Input:    - An initial policy
    #           - Total number of iterations for each prediction step
    #           - Total number of episodes
    #           - epsilon, to define the epsilon-greedy probability
    # Output:   - estimate of the optimal policy
    def control(self, Pi_init, num_iterations, num_episodes, epsilon):
        Pi_opt = Pi_init
        for i in range(num_iterations):
            v,q = self.prediction(Pi_opt, num_episodes)
            Pi_opt = self.epsilon_greedy(epsilon, q)
        return Pi_opt
    
    def epsilon_greedy(self, epsilon, q):
        Pi = {(s,a): 0.0 for s in self.agent.S for a in self.agent.A}
        for s in self.agent.S:
            a_max = ""
            q_max = -10000.0
            for a in self.agent.A:
                if q[(s,a)] > q_max:
                    q_max = q[(s,a)]
                    a_max = a
            
            for a in self.agent.A:
                if a == a_max:
                    Pi[(s,a)] = epsilon/self.agent.m + 1 - epsilon
                else:
                    Pi[(s,a)] = epsilon/self.agent.m

        return Pi

gamma = 1.0
env = Environment(grid_size=4)
agent = Agent(env=env,  init_loc=(1,1))

mc = MC(gamma, agent, env)

Testing MC prediction for RANDOM policy

In [10]:
Pi_RANDOM = { (s, a): 1.0/agent.m for s in agent.S for a in agent.A } # random policy
num_episodes = 10000                                                 # number of episodes 
v_MC, Q_MC = mc.prediction(Pi_RANDOM, num_episodes)

# print the grid with the optimal values and policies obtained by Value Iteration
print("---------------------------------")
print("|\033[96m%.1f\t\033[0m|%.1f\t|%.1f\t|%.1f\t|"      %(v_MC[(1,1)], v_MC[(1,2)], v_MC[(1,3)], v_MC[(1,4)]))
print("---------------------------------")
print("|####\t|\033[91m%.1f\t\033[0m|####\t|%.1f\t|"  %(v_MC[(2,2)], v_MC[(2,4)]))
print("---------------------------------")
print("|\033[92m%.1f\t\033[0m|%.1f\t|####\t|%.1f\t|"    %(v_MC[(3,1)], v_MC[(3,2)], v_MC[(3,4)]))
print("---------------------------------")
print("|\033[91m%.1f\t\033[0m|%.1f\t|%.1f\t|%.1f\t|"      %(v_MC[(4,1)], v_MC[(4,2)], v_MC[(4,3)], v_MC[(4,4)]))
print("---------------------------------")

---------------------------------
|[96m-95.2	[0m|-91.2	|-97.7	|-99.6	|
---------------------------------
|####	|[91m-72.4	[0m|####	|-98.2	|
---------------------------------
|[92m0.0	[0m|-41.9	|####	|-92.0	|
---------------------------------
|[91m-29.3	[0m|-48.6	|-67.7	|-81.3	|
---------------------------------


Testing MC prediction for the Value Iteration's policy

In [11]:
num_episodes = 10000                                                  # number of episodes 
v_MC, Q_MC = mc.prediction(Pi_VI, num_episodes)

# print the grid with the optimal values and policies obtained by Value Iteration
print("---------------------------------")
print("|\033[96m%.1f\t\033[0m|%.1f\t|%.1f\t|%.1f\t|"      %(v_MC[(1,1)], v_MC[(1,2)], v_MC[(1,3)], v_MC[(1,4)]))
print("---------------------------------")
print("|####\t|\033[91m%.1f\t\033[0m|####\t|%.1f\t|"  %(v_MC[(2,2)], v_MC[(2,4)]))
print("---------------------------------")
print("|\033[92m%.1f\t\033[0m|%.1f\t|####\t|%.1f\t|"    %(v_MC[(3,1)], v_MC[(3,2)], v_MC[(3,4)]))
print("---------------------------------")
print("|\033[91m%.1f\t\033[0m|%.1f\t|%.1f\t|%.1f\t|"      %(v_MC[(4,1)], v_MC[(4,2)], v_MC[(4,3)], v_MC[(4,4)]))
print("---------------------------------")

---------------------------------
|[96m-8.0	[0m|-7.0	|-10000.0	|-10000.0	|
---------------------------------
|####	|[91m-2.0	[0m|####	|-10000.0	|
---------------------------------
|[92m0.0	[0m|-1.0	|####	|-10000.0	|
---------------------------------
|[91m-10000.0	[0m|-10000.0	|-10000.0	|-10000.0	|
---------------------------------


Testing MC learning for policy optimization

In [12]:
num_iterations = 10
num_episodes = 1000                                                  # number of episodes
epsilon = 0.04

Pi_opt = mc.control(Pi_RANDOM, num_iterations, num_episodes, epsilon)
v_opt, Q_opt = mc.prediction(Pi_opt, num_episodes)

for (s,a,r,ss) in mc.generate_episode(Pi_opt):
    print(s, a, r, ss)

(1, 1) right -1 (1, 2)
(1, 2) down -5 (2, 2)
(2, 2) down -1 (3, 2)
(3, 2) left -1 (3, 1)


<h3>Temporal-Difference Learning</h3>

Here is the class implementing the TD-based methods:

In [13]:
class TD:
    def __init__(self, _gamma, _agent, _environment):
        self.gamma = _gamma
        self.agent = _agent
        self.env = _environment

    # This function simulates episodes where the agent explores the tiles of the GridWorld's instance
    # At every time step t, we compute (s_t, a_t, r_t+1, s_t+1)
    def generate_episode(self, policy):
        episode = []
        self.agent.loc = (1,1)
        while not self.agent.loc in self.env.terminal:
            # decide the next action based on the provided policy
            next_action = np.random.choice(self.agent.A, p=[policy[(self.agent.loc, a)] for a in self.agent.A])
            # collect the reward related to the performed action and add tuple (s,a,r,s') 
            episode.append((self.agent.loc, next_action, self.env.R[(self.agent.loc, next_action)], self.env.move(self.agent.loc, next_action)))
            # move the agent to the new location based on the performed action
            self.agent.loc = self.env.move(self.agent.loc, next_action)
            
        return episode
        
    # TD(0) Prediction  Algorithm
    # Based on Every-Visit MC Prediction Algorithm (Sutton and Barto - Section 6.1)
    # Input:    - A policy to be evaluated, pi
    #           - Learning rate, alpha
    #           - Total number of episodes, num_episodes
    # Output:   - estimate of the state-value functions V
    def prediction(self, pi, alpha, num_episodes):
        V = {s: 0 if s in self.env.terminal else -100 for s in self.agent.S}

        for k in range(num_episodes):
            self.agent.loc = (1,1)
            while not self.agent.loc in self.env.terminal: 
                S = self.agent.loc
                A = np.random.choice(self.agent.A, p=[pi[(S, a)] for a in self.agent.A])
                R = self.env.R[(S, A)]
                Sprime = self.env.move(S, A)

                V[S] = V[S] + alpha*(R + self.gamma*V[Sprime] - V[S])

                self.agent.loc = Sprime
        
        return V
    
    def sarsa(self, alpha, epsilon, num_episodes):
        Q = {(s,a): 0.0 if s in self.env.terminal else -100.0 for s in self.agent.S for a in self.agent.A}

        for k in range(num_episodes):
            S = (1,1)
            A = np.random.choice(self.agent.A, p=self.epsilon_greedy(S, Q, epsilon))

            while not S in self.env.terminal:               
                R = self.env.R[(S, A)]
                Sprime = self.env.move(S, A)
                Aprime = np.random.choice(self.agent.A, p=self.epsilon_greedy(Sprime, Q, epsilon))

                Q[(S,A)] += alpha*(R + self.gamma*Q[(Sprime,Aprime)] - Q[(S,A)])
                
                S = Sprime   
                A = Aprime
        
        return Q

    def q_learning(self, alpha, epsilon, num_episodes):
        Q = {(s,a): 0.0 if s in self.env.terminal else -100.0 for s in self.agent.S for a in self.agent.A}

        for k in range(num_episodes):
            S = (1,1)
            while not S in self.env.terminal:      
                A = np.random.choice(self.agent.A, p=self.epsilon_greedy(S, Q, epsilon))
                R = self.env.R[(S, A)]
                Sprime = self.env.move(S, A)

                Q[(S,A)] += alpha*(R + self.gamma*max( Q[(Sprime,a)] - Q[(S,A)] for a in self.agent.A))

                S = Sprime
        
        return Q        


    def epsilon_greedy(self, S, Q, epsilon):
        a_max = ""
        q_max = -10000.0
        for a in self.agent.A:
            if Q[(S,a)] > q_max:
                q_max = Q[(S,a)]
                a_max = a
        pi = {a: epsilon/self.agent.m + 1 - epsilon if a == a_max else  epsilon/self.agent.m for a in self.agent.A}
        
        return [pi[a] for a in self.agent.A]
    

gamma = 1.0
env = Environment(grid_size=4)
agent = Agent(env=env,  init_loc=(1,1))

td = TD(gamma, agent, env)

Testing TD Prediction with RANDOM policy

In [14]:
alpha = 1.0
num_episodes = 10000
v_TD_RANDOM = td.prediction(Pi_RANDOM, alpha, num_episodes)

# print the grid with the optimal values and policies obtained by Value Iteration
print("---------------------------------")
print("|\033[96m%.1f\t\033[0m|%.1f\t|%.1f\t|%.1f\t|"      %(v_TD_RANDOM[(1,1)], v_TD_RANDOM[(1,2)], v_TD_RANDOM[(1,3)], v_TD_RANDOM[(1,4)]))
print("---------------------------------")
print("|####\t|\033[91m%.1f\t\033[0m|####\t|%.1f\t|"  %(v_TD_RANDOM[(2,2)], v_TD_RANDOM[(2,4)]))
print("---------------------------------")
print("|\033[92m%.1f\t\033[0m|%.1f\t|####\t|%.1f\t|"    %(v_TD_RANDOM[(3,1)], v_TD_RANDOM[(3,2)], v_TD_RANDOM[(3,4)]))
print("---------------------------------")
print("|\033[91m%.1f\t\033[0m|%.1f\t|%.1f\t|%.1f\t|"      %(v_TD_RANDOM[(4,1)], v_TD_RANDOM[(4,2)], v_TD_RANDOM[(4,3)], v_TD_RANDOM[(4,4)]))
print("---------------------------------")


---------------------------------
|[96m-28.0	[0m|-117.0	|-78.0	|-47.0	|
---------------------------------
|####	|[91m-20.0	[0m|####	|-28.0	|
---------------------------------
|[92m0.0	[0m|-1.0	|####	|-39.0	|
---------------------------------
|[91m-1.0	[0m|-20.0	|-9.0	|-24.0	|
---------------------------------


Testing TD Prediction with Value Iteration Policy

In [15]:
alpha = 1.0
num_episodes = 10000
v_TD_VI = td.prediction(Pi_VI, alpha, num_episodes)

# print the grid with the optimal values and policies obtained by Value Iteration
print("---------------------------------")
print("|\033[96m%.1f\t\033[0m|%.1f\t|%.1f\t|%.1f\t|"      %(v_TD_VI[(1,1)], v_TD_VI[(1,2)], v_TD_VI[(1,3)], v_TD_VI[(1,4)]))
print("---------------------------------")
print("|####\t|\033[91m%.1f\t\033[0m|####\t|%.1f\t|"  %(v_TD_VI[(2,2)], v_TD_VI[(2,4)]))
print("---------------------------------")
print("|\033[92m%.1f\t\033[0m|%.1f\t|####\t|%.1f\t|"    %(v_TD_VI[(3,1)], v_TD_VI[(3,2)], v_TD_VI[(3,4)]))
print("---------------------------------")
print("|\033[91m%.1f\t\033[0m|%.1f\t|%.1f\t|%.1f\t|"      %(v_TD_VI[(4,1)], v_TD_VI[(4,2)], v_TD_VI[(4,3)], v_TD_VI[(4,4)]))
print("---------------------------------")


---------------------------------
|[96m-8.0	[0m|-7.0	|-100.0	|-100.0	|
---------------------------------
|####	|[91m-2.0	[0m|####	|-100.0	|
---------------------------------
|[92m0.0	[0m|-1.0	|####	|-100.0	|
---------------------------------
|[91m-100.0	[0m|-100.0	|-100.0	|-100.0	|
---------------------------------


Testing (On-Policy) TD Control with Sarsa

In [16]:
alpha = 1.0
epsilon = 0.004
num_episodes = 1000

Q = td.sarsa(alpha, epsilon, num_episodes)

Pi_Sarsa = {}
for s in td.agent.S:
    a_max = ""
    q_max = -10000.0
    for a in td.agent.A:
        if Q[(s,a)] > q_max:
            q_max = Q[(s,a)]
            a_max = a
    for a in td.agent.A:
        Pi_Sarsa[(s,a)] = 1.0 if a == a_max else 0.0

for (s,a,r,ss) in td.generate_episode(Pi_Sarsa):
    print(s, a, r, ss)

(1, 1) right -1 (1, 2)
(1, 2) down -5 (2, 2)
(2, 2) down -1 (3, 2)
(3, 2) left -1 (3, 1)


Testing (Off-Policy) TD Control with Q-Learning

In [17]:
alpha = 1.0
epsilon = 0.004
num_episodes = 1000

Q = td.q_learning(alpha, epsilon, num_episodes)

Pi_QLearning = {}
for s in td.agent.S:
    a_max = ""
    q_max = -10000.0
    for a in td.agent.A:
        if Q[(s,a)] > q_max:
            q_max = Q[(s,a)]
            a_max = a
    for a in td.agent.A:
        Pi_QLearning[(s,a)] = 1.0 if a == a_max else 0.0

for (s,a,r,ss) in mc.generate_episode(Pi_QLearning):
    print(s, a, r, ss)

(1, 1) right -1 (1, 2)
(1, 2) down -5 (2, 2)
(2, 2) down -1 (3, 2)
(3, 2) left -1 (3, 1)
