In [None]:
import itertools
import numpy as np
import pandas as pd
import random

Actions encoded as:

(0,1) -> right

(0,-1) -> left

(1,0) -> down

(-1,0) -> up

In [None]:
TERMINAL_STATES = [(3,3)]
START_STATE = (0,0)

class GridWorldEnv:
  def __init__(self, N=10, M=10):
    self.N = N
    self.M = M
    self.observation_space = list(itertools.product(range(N), range(M)))
    self.action_space = [(0,1), (0,-1), (1,0), (-1,0)]

    self.terminal_states = TERMINAL_STATES
    self.reset()

  def reset(self):
    self.state = START_STATE
    self.terminated = False
    self.total_reward = 0

  def _get_transition_probability(self, start_state, action, end_state):
    if start_state in self.terminal_states:
      return 0

    expected_state = tuple(np.array(start_state) + np.array(action))
    if expected_state == end_state:
      return 1

    if expected_state not in self.observation_space and start_state == end_state:
      return 1

    return 0

  def _get_reward(self, start_state, action, end_state):
    if end_state in self.terminal_states:
      return 10
    else:
      return -1

  def step(self, action):
    """Takes the action that agent took as the param and returns the next
    state, one-step reward and other necessary attributes.
    """
    # check if terminal state is reached
    if self.state in self.terminal_states:
      self.terminated = True
      reward = np.nan
      return self.state, reward, self.terminated

    max_prob = 0
    current_state = self.state

    for possible_state in self.observation_space:
      p = self._get_transition_probability(current_state, action, possible_state)
      if p > max_prob:
        next_state = possible_state
        max_prob = p

    reward = self._get_reward(current_state, action, next_state)

    self.state = next_state
    self.total_reward += reward

    return self.state, reward, self.terminated

Random steps in Gridworld

In [None]:
gw = GridWorldEnv(5,5)

gw.state

(0, 0)

In [None]:
print("current state: ", gw.state)
current_action = (0,1) # move right

next_state, reward, is_terminated = gw.step(current_action)

print("action: ", current_action)
print("Next state: ", next_state)

current state:  (0, 1)
action:  (0, 1)
Next state:  (0, 2)


In [None]:
gw.reset()
is_terminated = gw.terminated

# i = 0
while not is_terminated:
  current_state = gw.state
  action = random.choices(gw.action_space)[0]
  next_state, reward, is_terminated = gw.step(action)
  # print(current_state, action, next_state, reward)
  # i+=1

print("total reward: ", gw.total_reward)

total reward:  -89


In [None]:
class RandomActionAgent:
  def __init__(self, env):
    self.env = env

  def policy(self):
    action = random.choices(self.env.action_space)[0]
    return action

In [None]:
agent = RandomActionAgent(gw)

gw.reset()
is_terminated = gw.terminated

while not is_terminated:
  current_state = gw.state
  action = agent.policy()
  next_state, reward, is_terminated = gw.step(action)
  # print(current_state, action, next_state, reward)

print("total reward: ", gw.total_reward)

total reward:  -50


### Monte Carlo Prediction - Policy Evaluation

In [None]:
from tqdm import tqdm

In [None]:
gw = GridWorldEnv(5,5)
random_act_agent = RandomActionAgent(gw)

V = dict(zip(gw.observation_space, np.zeros(gw.N*gw.M)))
# G = V.copy()

# history = []
gw.reset()

for iter in tqdm(range(10000)):
  history = []
  gw.reset()
  # start from a random state to start the episode
  gw.state = random.choices(gw.observation_space)[0]

  # Simulate one episode
  while not gw.terminated:
    current_state = gw.state
    state, reward , _ = gw.step(random_act_agent.policy())
    history.append((current_state,reward))

  # update the state values according to the returns received in the
  # current episode
  G = 0
  for i in range(len(history)-2, -1, -1):
    G = history[i][1] + 1 * G
    V[history[i][0]] = V[history[i][0]] + 0.01 * (G - V[history[i][0]])

np.array(list(V.values())).reshape(5,5).round()

100%|██████████| 10000/10000 [00:50<00:00, 198.07it/s]


array([[-38., -39., -39., -37., -37.],
       [-42., -42., -35., -29., -32.],
       [-39., -35., -23., -14., -27.],
       [-34., -34., -18.,   0., -18.],
       [-34., -33., -17., -10., -28.]])

### Monte Carlo Control - Policy improvement

In [None]:
# Initialize the environment
gw = GridWorldEnv(4, 4)

# Initialize state-action values Q(s, a)
Q = dict()
for state in gw.observation_space:
    for action in gw.action_space:
        Q[(state, action)] = np.random.random()

print(Q)

# Number of episodes
num_episodes = 10000
epsilon = 0.1  # Epsilon-greedy exploration

# Monte Carlo On-Policy Prediction
for episode in tqdm(range(num_episodes)):
    gw.reset()
    episode_history = []

    # start from a random state to start the episode
    gw.state = random.choice(gw.observation_space)

    while not gw.terminated:
        current_state = gw.state

        # Epsilon-greedy policy
        if random.random() < epsilon:
            action = random.choice(gw.action_space)
        else:
            action = max(gw.action_space, key=lambda a: Q[(current_state, a)])

        next_state, reward, _ = gw.step(action)
        episode_history.append((current_state, action, reward))

    # Update state-action values based on the returns received in the episode
    G = 0
    for t in range(len(episode_history) - 2, -1, -1):
        state_t, action_t, reward = episode_history[t]
        G = reward + 1 * G
        # N[(state_t, action_t)] += 1
        Q[(state_t, action_t)] += 0.01*(G - Q[(state_t, action_t)])# / N[(state_t, action_t)]

    # if episode % 100 == 0:
    #   print(Q)

Q
# # Display the optimal state-action values
# optimal_Q_values = np.array([Q[(state, action)] for state in gw.observation_space for action in gw.action_space])
# optimal_Q_values = optimal_Q_values.reshape((gw.N, gw.M, len(gw.action_space))).round(1)
# print(optimal_Q_values)

{((0, 0), (0, 1)): 0.32065784611432013, ((0, 0), (0, -1)): 0.006855008274554986, ((0, 0), (1, 0)): 0.5842309815769173, ((0, 0), (-1, 0)): 0.21585278235800054, ((0, 1), (0, 1)): 0.9569594138334169, ((0, 1), (0, -1)): 0.6461938302008559, ((0, 1), (1, 0)): 0.11208893759184091, ((0, 1), (-1, 0)): 0.6238019361300273, ((0, 2), (0, 1)): 0.5915255875274025, ((0, 2), (0, -1)): 0.9137597827662408, ((0, 2), (1, 0)): 0.28752730579175567, ((0, 2), (-1, 0)): 0.7269461522173922, ((0, 3), (0, 1)): 0.7079917546430635, ((0, 3), (0, -1)): 0.7135315647078353, ((0, 3), (1, 0)): 0.6525679313801847, ((0, 3), (-1, 0)): 0.4217868400422119, ((1, 0), (0, 1)): 0.20052148897076227, ((1, 0), (0, -1)): 0.7505430984981386, ((1, 0), (1, 0)): 0.036579153423505484, ((1, 0), (-1, 0)): 0.589615502755193, ((1, 1), (0, 1)): 0.7421528323746497, ((1, 1), (0, -1)): 0.5781145364031902, ((1, 1), (1, 0)): 0.08370644773613267, ((1, 1), (-1, 0)): 0.593669156257704, ((1, 2), (0, 1)): 0.5023458129160917, ((1, 2), (0, -1)): 0.32406388

100%|██████████| 10000/10000 [00:03<00:00, 2675.93it/s]


{((0, 0), (0, 1)): 4.466818163620727,
 ((0, 0), (0, -1)): 1.027006496327798,
 ((0, 0), (1, 0)): -0.6194654928705995,
 ((0, 0), (-1, 0)): 0.10435860043112477,
 ((0, 1), (0, 1)): -2.2842879651791206,
 ((0, 1), (0, -1)): 1.3276435841110203,
 ((0, 1), (1, 0)): 5.585481075993956,
 ((0, 1), (-1, 0)): -8.59438145520026,
 ((0, 2), (0, 1)): 6.690672676122251,
 ((0, 2), (0, -1)): -6.735555680317456,
 ((0, 2), (1, 0)): 0.42048021258915325,
 ((0, 2), (-1, 0)): -4.780906224845939,
 ((0, 3), (0, 1)): -1.532805788414846,
 ((0, 3), (0, -1)): 1.5319047940761978,
 ((0, 3), (1, 0)): 7.802903578577042,
 ((0, 3), (-1, 0)): 2.209794867438643,
 ((1, 0), (0, 1)): -4.83838277345856,
 ((1, 0), (0, -1)): -18.580337646613838,
 ((1, 0), (1, 0)): -1.3631571182034856,
 ((1, 0), (-1, 0)): 3.4101373752912574,
 ((1, 1), (0, 1)): 2.315373497432388,
 ((1, 1), (0, -1)): -1.2604222931914084,
 ((1, 1), (1, 0)): 6.732552549037726,
 ((1, 1), (-1, 0)): 2.2211430795730096,
 ((1, 2), (0, 1)): -14.710218378292222,
 ((1, 2), (0, -

In [None]:
# start from state (0,0) and take actions based on the learned policy

gw.reset()

while not gw.terminated:
  current_state = gw.state
  action = max(gw.action_space, key=lambda a: Q[(current_state, a)])
  next_state, reward,_ = gw.step(action)
  print(current_state, action, reward)

print()
print("total reward: ", gw.total_reward)

(0, 0) (0, 1) -1
(0, 1) (1, 0) -1
(1, 1) (1, 0) -1
(2, 1) (1, 0) -1
(3, 1) (0, 1) -1
(3, 2) (0, 1) 10
(3, 3) (1, 0) nan

total reward:  5


In [1]:
class MonteCarloAgant:
  def __init__(self,env):
    self.env = env
    self.reset()

  def reset(self):
    self.trained = False
    gw = self.env

    # Initialize state-action values Q(s, a)
    self.Q = dict()
    for state in gw.observation_space:
        for action in gw.action_space:
            self.Q[(state, action)] = np.random.random()

  def train(self, num_episodes = 10000,epsilon = 0.1):

    Q = self.Q.copy()
    gw = self.env

    # Monte Carlo On-Policy Prediction
    for episode in tqdm(range(num_episodes)):
        gw.reset()
        episode_history = []

        # start from a random state to start the episode
        gw.state = random.choice(gw.observation_space)

        while not gw.terminated:
            current_state = gw.state

            # Epsilon-greedy policy
            if random.random() < epsilon:
                action = random.choice(gw.action_space)
            else:
                action = max(gw.action_space, key=lambda a: Q[(current_state, a)])

            next_state, reward, _ = gw.step(action)
            episode_history.append((current_state, action, reward))

        # Update state-action values based on the returns received in the episode
        G = 0
        for t in range(len(episode_history) - 2, -1, -1):
            state_t, action_t, reward = episode_history[t]
            G = reward + 1 * G
            # N[(state_t, action_t)] += 1
            Q[(state_t, action_t)] += 0.01*(G - Q[(state_t, action_t)])# / N[(state_t, action_t)]

    self.Q = Q.copy()
    self.trained = True


  def policy(self):
    gw= self.env
    current_state = self.env.state

    if self.trained:
      action = max(self.env.action_space, key=lambda a: self.Q[(current_state, a)])
    else:
      action = random.choice(gw.action_space)
    return action


In [None]:
gw = GridWorldEnv(5,5)
agent = MonteCarloAgant(gw)

gw.reset()
is_terminated = gw.terminated

agent.train()
gw.reset()

while not is_terminated:
  current_state = gw.state
  action = agent.policy()
  next_state, reward, is_terminated = gw.step(action)
  print(current_state, action, next_state, reward)

print("\ntotal reward: ", gw.total_reward)

100%|██████████| 10000/10000 [00:09<00:00, 1109.37it/s]

(0, 0) (1, 0) (1, 0) -1
(1, 0) (0, 1) (1, 1) -1
(1, 1) (0, 1) (1, 2) -1
(1, 2) (1, 0) (2, 2) -1
(2, 2) (0, 1) (2, 3) -1
(2, 3) (1, 0) (3, 3) 10
(3, 3) (1, 0) (3, 3) nan

total reward:  5





## Task
1. Create a `MonteCarloAgent` class, the `train()` method should perform the training though monte carlo control method.

2. Train an agent using the dynamic programming method (use `class DPAgent` and it's `.train()` method to train. ref. Lab 04).
3. Evaluate the agent traned in #2 using the Monte Carlo Prediction method and validate whether the predicted state values are equal to the optimal values obtained in #3 though dynamic programming.