In [1]:
"""A simple world model

Simple deterministic MDP is made of 6 grids (states)
---------------------------------
|         |          |          |
|  Start  |          |  Goal    |
|         |          |          |
---------------------------------
|         |          |          |
|         |          |  Hole    |
|         |          |          |
---------------------------------

"""

from collections import deque
import numpy as np
import argparse
import os
import time
from termcolor import colored


class QWorld:
    def __init__(self):
        """Simulated deterministic world made of 6 states.
        """
        # 4 actions
        # 0 - Left, 1 - Down, 2 - Right, 3 - Up
        self.col = 4

        # 6 states
        self.row = 6

        # setup the environment
        self.init_transition_table()
        self.init_reward_table()

        # reset the environment
        self.reset()

    def reset(self):
        """start of episode"""
        self.state = 0
        self.count = 0
        return self.state

    def is_in_win_state(self):
        """agent wins when the goal is reached"""
        return self.state == 2
    
#The reward table stores the resulting reward corresponding to pairs of (states, actions).

    def init_reward_table(self):
        """
        0 - Left, 1 - Down, 2 - Right, 3 - Up
        ----------------
        | 0 | 0 | 100  |
        ----------------
        | 0 | 0 | -100 |
        ----------------
        """
        #############################################
        #TODO-- fill in the reward table
        #############################################
        self.reward_table = np.zeros([self.row, self.col])
        # To be completed
        #Action 0 : Left
        self.reward_table[0, 0] = 0
        self.reward_table[1, 0] = 0
        self.reward_table[2, 0] = 0
        self.reward_table[3, 0] = 0
        self.reward_table[4, 0] = 0
        self.reward_table[5, 0] = 0

        # Action 1 : Down
        self.reward_table[0, 1] = 0
        self.reward_table[1, 1] = 0
        self.reward_table[2, 1] = -100
        self.reward_table[3, 1] = 0
        self.reward_table[4, 1] = 0
        self.reward_table[5, 1] = 0

        # Action 2 : Right
        self.reward_table[0, 2] = 0
        self.reward_table[1, 2] = 100
        self.reward_table[2, 2] = 0
        self.reward_table[3, 2] = 0
        self.reward_table[4, 2] = -100
        self.reward_table[5, 2] = 0

        # Action 3 : Up
        self.reward_table[0, 3] = 0
        self.reward_table[1, 3] = 0
        self.reward_table[2, 3] = 0
        self.reward_table[3, 3] = 0
        self.reward_table[4, 3] = 0
        self.reward_table[5, 3] = 100


#The transition table stores the resulting states corresponding to pairs of (states, actions).
    def init_transition_table(self):
        """
        actions:
        0 - Left, 1 - Down, 2 - Right, 3 - Up

        states:
        -------------
        | 0 | 1 | 2 |
        -------------
        | 3 | 4 | 5 |
        -------------
        """
        self.transition_table = np.zeros([self.row, self.col],
                                         dtype=int)

        self.transition_table[0, 0] = 0
        self.transition_table[0, 1] = 3
        self.transition_table[0, 2] = 1
        self.transition_table[0, 3] = 0

        #############################################
        #TODO-- complete the transition_table
        #############################################
        # to be completed
        self.transition_table[1, 0] = 0
        self.transition_table[1, 1] = 4
        self.transition_table[1, 2] = 2
        self.transition_table[1, 3] = 1

        self.transition_table[2, 0] = 1
        self.transition_table[2, 1] = 5
        self.transition_table[2, 2] = 2
        self.transition_table[2, 3] = 2

        self.transition_table[3, 0] = 3
        self.transition_table[3, 1] = 3
        self.transition_table[3, 2] = 4
        self.transition_table[3, 3] = 0

        self.transition_table[4, 0] = 3
        self.transition_table[4, 1] = 4
        self.transition_table[4, 2] = 5
        self.transition_table[4, 3] = 1

        self.transition_table[5, 0] = 4
        self.transition_table[5, 1] = 5
        self.transition_table[5, 2] = 5
        self.transition_table[5, 3] = 2


    def step(self, action):
        """execute the action on the environment
        Argument:
            action (tensor): An action in Action space
        Returns:
            next_state (tensor): next env state
            reward (float): reward received by the agent
            done (Bool): whether the terminal state
                is reached
        """
        # determine the next_state given state and action
        next_state = self.transition_table[self.state, action]
        # done is True if next_state is Goal or Hole
        #############################################
        #TODO
        #############################################
        done =  self.is_in_win_state()

        # reward given the state and action
        reward = self.reward_table[self.state, action]
        # the enviroment is now in new state
        self.state = next_state
        self.count+=1
        return next_state, reward, done




    def print_cell(self, row=0):
        """UI to display agent moving on the grid"""
        print("")
        for i in range(13):
            j = i - 2
            if j in [0, 4, 8]:
                if j == 8:
                    if self.state == 2 and row == 0:
                        marker = "\033[4mG\033[0m"
                    elif self.state == 5 and row == 1:
                        marker = "\033[4mH\033[0m"
                    else:
                        marker = 'G' if row == 0 else 'H'
                    color = self.state == 2 and row == 0
                    color = color or (self.state == 5 and row == 1)
                    color = 'red' if color else 'blue'
                    print(colored(marker, color), end='')
                elif self.state in [0, 1, 3, 4]:
                    cell = [(0, 0, 0), (1, 0, 4), (3, 1, 0), (4, 1, 4)]
                    marker = '_' if (self.state, row, j) in cell else ' '
                    print(colored(marker, 'red'), end='')
                else:
                    print(' ', end='')
            elif i % 4 == 0:
                    print('|', end='')
            else:
                print(' ', end='')
        print("")


    def print_world(self, action):
        """UI to display mode and action of agent"""
        actions = { 0: "(Left)", 1: "(Down)", 2: "(Right)", 3: "(Up)" }
        if self.count==0:
          print("Start Game")
        else:
          print("Action : ", actions[action])
        for _ in range(13):
            print('-', end='')
        self.print_cell()
        for _ in range(13):
            print('-', end='')
        self.print_cell(row=1)
        for _ in range(13):
            print('-', end='')
        print("")




In [2]:
def print_episode(episode, delay=1):
    """UI to display episode count
    Arguments:
        episode (int): episode number
        delay (int): sec delay

    """
    os.system('clear')
    for _ in range(13):
        print('=', end='')
    print("")
    print("Episode ", episode)
    for _ in range(13):
        print('=', end='')
    print("")
    time.sleep(delay)

def print_status(q_world, done,action, delay=1):
    """UI to display the world,
        delay of 1 sec for ease of understanding
    """
    os.system('clear')
    q_world.print_world(action)
    if done:
        print("-------EPISODE DONE--------")
        delay *= 2
    time.sleep(delay)

In [3]:
#instantiate the environment
q_world = QWorld()


# **TODO:**


A) Complete the code in the cells above (the parts to be completed are marked with 'TODO')

B) Once part A) is complete, implement now each of the following situations in **a separate cell**. For the sake of illustration, Situation 1 is already implemented for you to give you a hint on how to answer each part. You could implement somthing similar for Situation 2, Situation 3 and Situation 4.

**Situation 1:**take agent to goal in 2 steps. Print the episode name and display the grid for each step taken.

**Situation 2:** take agent to H in 3 steps. Print the episode name and display the grid for each step taken.

**Situation 3:** implement the following trajectory: down-right-up-right. what's the cumulative reward (assume the discount factor is 1)? Compare with the cumulative reward of the episode from **Situation 1** and comment.
Make sure the cumulative reward is printed when the cell is executed.

**Situation 4:** Implement an agent that takes random actions at each step and stops only when the task is solved (note that your agent may need to go through multiple episodes before your it is able to reach the goal). After how many episodes it solved the task? (the number of episodes should be displayed automatically each time you run the cell)

In [8]:
# Situation 1
# #initialize the env
state = q_world.reset()
done = False
episode = 1
delay = 0
print_episode(episode=episode, delay=0)

# print initial status of the board
print_status(q_world, done, 0, delay=delay)


# to take the agent to GOAL (G) in two steps, the agent needs
# to go right then go right again

# recall the actions:
# 0 - Left, 1 - Down, 2 - Right, 3 - Up

# 1- Go right
action=2
next_state, reward, done = q_world.step(action)
print_status(q_world, done,action, delay=delay)

# 2- Go right again
action=2
next_state, reward, done = q_world.step(action)
print_status(q_world, done,action, delay=delay)
cumulative_reward_situation1 = reward
print(f' The cumulative Reward for Situation 1 is equal to: {cumulative_reward_situation1}')

Episode  1
Start Game
-------------
| [31m_[0m | [31m [0m | [34mG[0m |
-------------
| [31m [0m | [31m [0m | [34mH[0m |
-------------
Action :  (Right)
-------------
| [31m [0m | [31m_[0m | [34mG[0m |
-------------
| [31m [0m | [31m [0m | [34mH[0m |
-------------
Action :  (Right)
-------------
|   |   | [31m[4mG[0m[0m |
-------------
|   |   | [34mH[0m |
-------------
 The cumulative Reward for Situation 1 is equal to: 100.0


In [9]:
# Implement Situation 2
# TODO
# #initialize the env
state = q_world.reset()
done = False
episode = 2  # You can choose a different episode number
delay = 0
print_episode(episode=episode, delay=0)

# print initial status of the board
print_status(q_world, done, 0, delay=delay)

# To take the agent to H in three steps, the agent needs
# to go down, right, then right

# 1- Go down
action = 1
next_state, reward, done = q_world.step(action)
print_status(q_world, done, action, delay=delay)

# 2- Go right
action = 2
next_state, reward, done = q_world.step(action)
print_status(q_world, done, action, delay=delay)

# 3- Go right
action = 2
next_state, reward, done = q_world.step(action)
print_status(q_world, done, action, delay=delay)
cumulative_reward_situation2 = reward
print(f' The cumulative Reward for Situation 2 is equal to: {cumulative_reward_situation2}')

Episode  2
Start Game
-------------
| [31m_[0m | [31m [0m | [34mG[0m |
-------------
| [31m [0m | [31m [0m | [34mH[0m |
-------------
Action :  (Down)
-------------
| [31m [0m | [31m [0m | [34mG[0m |
-------------
| [31m_[0m | [31m [0m | [34mH[0m |
-------------
Action :  (Right)
-------------
| [31m [0m | [31m [0m | [34mG[0m |
-------------
| [31m [0m | [31m_[0m | [34mH[0m |
-------------
Action :  (Right)
-------------
|   |   | [34mG[0m |
-------------
|   |   | [31m[4mH[0m[0m |
-------------
 The cumulative Reward for Situation 2 is equal to: -100.0


In [10]:
# Implement Situation 3
# TODO
# Situation 3
#initialize the env
state = q_world.reset()
done = False
episode = 0 
delay = 0
print_episode(episode=episode, delay=0)

print_status(q_world, done, 0, delay=delay)

# Trajectory: down-right-up-right
# 1- Go down
action = 1
next_state, reward, done = q_world.step(action)
print_status(q_world, done, action, delay=delay)

# 2- Go right
action = 2
next_state, reward, done = q_world.step(action)
print_status(q_world, done, action, delay=delay)

# 3- Go up
action = 3
next_state, reward, done = q_world.step(action)
print_status(q_world, done, action, delay=delay)

# 4- Go right
action = 2
next_state, reward, done = q_world.step(action)
print_status(q_world, done, action, delay=delay)

# Print cumulative reward
cumulative_reward_situation3 = reward
print(f' The cumulative Reward for Situation 3 is equal to: {cumulative_reward_situation3}')


Episode  0
Start Game
-------------
| [31m_[0m | [31m [0m | [34mG[0m |
-------------
| [31m [0m | [31m [0m | [34mH[0m |
-------------
Action :  (Down)
-------------
| [31m [0m | [31m [0m | [34mG[0m |
-------------
| [31m_[0m | [31m [0m | [34mH[0m |
-------------
Action :  (Right)
-------------
| [31m [0m | [31m [0m | [34mG[0m |
-------------
| [31m [0m | [31m_[0m | [34mH[0m |
-------------
Action :  (Up)
-------------
| [31m [0m | [31m_[0m | [34mG[0m |
-------------
| [31m [0m | [31m [0m | [34mH[0m |
-------------
Action :  (Right)
-------------
|   |   | [31m[4mG[0m[0m |
-------------
|   |   | [34mH[0m |
-------------
 The cumulative Reward for Situation 3 is equal to: 100.0


The cumulative Reward for Situation 1 and 3 are both equal to: 100.0

In [15]:
# Implement Situation 4
# TODO
# Situation 4
import random

# checking if the task is solved
def is_challenge_completed():
    return q_world.is_in_win_state()

# initialize the env
state = q_world.reset()
done = False
episode = 0
delay = 0

total_episodes = 0

# Continue taking random actions until the task is solved
while not is_challenge_completed():
    # Choose a random action (0: Left, 1: Down, 2: Right, 3: Up)
    random_action = random.randint(0, 3)
    
    # Execute the action in the environment
    next_state, reward, done = q_world.step(random_action)

    # Increment episode count
    total_episodes += 1
    
# Print the number of episodes it took to solve the challenge
print(f'Situation 4 took {total_episodes} episodes to complete.')


Situation 4 took 11 episodes to complete.
