<a href="https://colab.research.google.com/github/eya-methnani/Assignment-1---Deep-Reinforcement-Learning-Course/blob/main/Assignment1_DRL_course.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
"""A simple world model

Simple deterministic MDP is made of 6 grids (states)
---------------------------------
|         |          |          |
|  Start  |          |  Goal    |
|         |          |          |
---------------------------------
|         |          |          |
|         |          |  Hole    |
|         |          |          |
---------------------------------

"""

from collections import deque
import numpy as np
import argparse
import os
import time
from termcolor import colored


class QWorld:
    def __init__(self):
        """Simulated deterministic world made of 6 states.
        """
        # 4 actions
        # 0 - Left, 1 - Down, 2 - Right, 3 - Up
        self.col = 4

        # 6 states
        self.row = 6

        # setup the environment
        self.init_transition_table()
        self.init_reward_table()

        # reset the environment
        self.reset()

    def reset(self):
        """start of episode"""
        self.state = 0
        self.count = 0
        return self.state

    def is_in_win_state(self):
        """agent wins when the goal is reached"""
        return self.state == 2


    def init_reward_table(self):
        """
        0 - Left, 1 - Down, 2 - Right, 3 - Up
        ----------------
        | 0 | 0 | 100  |
        ----------------
        | 0 | 0 | -100 |
        ----------------
        """
        #############################################
        #TODO-- fill in the reward table
        #############################################
        self.reward_table = np.zeros([self.row, self.col])
        # To be completed

        self.reward_table[0, 2] = 100  # Moving right at state 0 to Goal
        self.reward_table[1, 2] = 100  # Moving right at state 1 to Goal
        self.reward_table[3, 2] = -100 # Moving right at state 3 to Hole
        self.reward_table[4, 2] = -100 # Moving right at state 4 to Hole



    def init_transition_table(self):
        """
        actions:
        0 - Left, 1 - Down, 2 - Right, 3 - Up

        states:
        -------------
        | 0 | 1 | 2 |
        -------------
        | 3 | 4 | 5 |
        -------------
        """
        self.transition_table = np.zeros([self.row, self.col],
                                         dtype=int)

        self.transition_table[0, 0] = 0
        self.transition_table[0, 1] = 3
        self.transition_table[0, 2] = 1
        self.transition_table[0, 3] = 0

        #############################################
        #TODO-- complete the transition_table
        #############################################
        # to be completed

         # State 1 transitions
        self.transition_table[1, 0] = 0  # Left to state 0
        self.transition_table[1, 1] = 4  # Down to state 4
        self.transition_table[1, 2] = 2  # Right to state 2 (Goal)
        self.transition_table[1, 3] = 1  # Up stays at 1

        # State 2 (Goal) transitions
        self.transition_table[2, :] = 2  # Any action stays at Goal

        # State 3 transitions
        self.transition_table[3, 0] = 3  # Left stays at 3
        self.transition_table[3, 1] = 3  # Down stays at 3
        self.transition_table[3, 2] = 4  # Right to state 4
        self.transition_table[3, 3] = 0  # Up to state 0

        # State 4 transitions
        self.transition_table[4, 0] = 3  # Left to state 3
        self.transition_table[4, 1] = 4  # Down stays at 4
        self.transition_table[4, 2] = 5  # Right to state 5 (Hole)
        self.transition_table[4, 3] = 1  # Up to state 1

        # State 5 (Hole) transitions
        self.transition_table[5, :] = 5  # Any action stays at Hole


    def step(self, action):
        """execute the action on the environment
        Argument:
            action (tensor): An action in Action space
        Returns:
            next_state (tensor): next env state
            reward (float): reward received by the agent
            done (Bool): whether the terminal state
                is reached
        """
        # determine the next_state given state and action
        next_state = self.transition_table[self.state, action]
        # done is True if next_state is Goal or Hole
        #############################################
        #TODO
        #############################################



         # Check if the agent reached Goal (state 2) or Hole (state 5)
        done = next_state in [2, 5]



        # reward given the state and action
        reward = self.reward_table[self.state, action]
        # the enviroment is now in new state
        self.state = next_state
        self.count+=1
        return next_state, reward, done




    def print_cell(self, row=0):
        """UI to display agent moving on the grid"""
        print("")
        for i in range(13):
            j = i - 2
            if j in [0, 4, 8]:
                if j == 8:
                    if self.state == 2 and row == 0:
                        marker = "\033[4mG\033[0m"
                    elif self.state == 5 and row == 1:
                        marker = "\033[4mH\033[0m"
                    else:
                        marker = 'G' if row == 0 else 'H'
                    color = self.state == 2 and row == 0
                    color = color or (self.state == 5 and row == 1)
                    color = 'red' if color else 'blue'
                    print(colored(marker, color), end='')
                elif self.state in [0, 1, 3, 4]:
                    cell = [(0, 0, 0), (1, 0, 4), (3, 1, 0), (4, 1, 4)]
                    marker = '_' if (self.state, row, j) in cell else ' '
                    print(colored(marker, 'red'), end='')
                else:
                    print(' ', end='')
            elif i % 4 == 0:
                    print('|', end='')
            else:
                print(' ', end='')
        print("")


    def print_world(self, action):
        """UI to display mode and action of agent"""
        actions = { 0: "(Left)", 1: "(Down)", 2: "(Right)", 3: "(Up)" }
        if self.count==0:
          print("Start Game")
        else:
          print("Action : ", actions[action])
        for _ in range(13):
            print('-', end='')
        self.print_cell()
        for _ in range(13):
            print('-', end='')
        self.print_cell(row=1)
        for _ in range(13):
            print('-', end='')
        print("")




In [5]:
def print_episode(episode, delay=1):
    """UI to display episode count
    Arguments:
        episode (int): episode number
        delay (int): sec delay

    """
    os.system('clear')
    for _ in range(13):
        print('=', end='')
    print("")
    print("Episode ", episode)
    for _ in range(13):
        print('=', end='')
    print("")
    time.sleep(delay)

def print_status(q_world, done,action, delay=1):
    """UI to display the world,
        delay of 1 sec for ease of understanding
    """
    os.system('clear')
    q_world.print_world(action)
    if done:
        print("-------EPISODE DONE--------")
        delay *= 2
    time.sleep(delay)

In [6]:
#instantiate the environment
q_world = QWorld()


# **TODO:**


A) Complete the code in the cells above (the parts to be completed are marked with 'TODO')

B) Once part A) is complete, implement now each of the following situations in **a separate cell**. For the sake of illustration, Situation 1 is already implemented for you to give you a hint on how to answer each part. You could implement somthing similar for Situation 2, Situation 3 and Situation 4.

**Situation 1:**take agent to goal in 2 steps. Print the episode name and display the grid for each step taken.

**Situation 2:** take agent to H in 3 steps. Print the episode name and display the grid for each step taken.

**Situation 3:** implement the following trajectory: down-right-up-right. what's the cumulative reward (assume the discount factor is 1)? Compare with the cumulative reward of the episode from **Situation 1** and comment.
Make sure the cumulative reward is printed when the cell is executed.

**Situation 4:** Implement an agent that takes random actions at each step and stops only when the task is solved (note that your agent may need to go through multiple episodes before your it is able to reach the goal). After how many episodes it solved the task? (the number of episodes should be displayed automatically each time you run the cell)

In [7]:
# Situation 1
# print initial grid --before taking any action--
state = q_world.reset()
done = False
episode = 1
delay = 0
print_episode(episode=episode, delay=0)

# print initial status of the board
print_status(q_world, done, 0, delay=delay)


# to take the agent to GOAL (G) in two steps, the agent needs
# to go right then go right again

# recall the actions:
# 0 - Left, 1 - Down, 2 - Right, 3 - Up

# 1- Go right
action=2
next_state, reward, done = q_world.step(action)
print_status(q_world, done,action, delay=delay)

# 1- Go right again
action=2
next_state, reward, done = q_world.step(action)
print_status(q_world, done,action, delay=delay)


Episode  1
Start Game
-------------
| _ |   | G |
-------------
|   |   | H |
-------------
Action :  (Right)
-------------
|   | _ | G |
-------------
|   |   | H |
-------------
Action :  (Right)
-------------
|   |   | [4mG[0m |
-------------
|   |   | H |
-------------
-------EPISODE DONE--------


In [8]:
# Implement Situation 2
# TODO

# Situation 2: Take the agent to Hole (H) in 3 steps
state = q_world.reset()
done = False
episode = 2
delay = 0

# Print Episode and Initial Grid
print_episode(episode=episode, delay=0)
print_status(q_world, done, 0, delay=delay)

# 1- Go Down
action = 1  # Down
next_state, reward, done = q_world.step(action)
print_status(q_world, done, action, delay=delay)

# 2- Go Right
action = 2  # Right
next_state, reward, done = q_world.step(action)
print_status(q_world, done, action, delay=delay)

# 3- Go Right again
action = 2  # Right
next_state, reward, done = q_world.step(action)
print_status(q_world, done, action, delay=delay)


Episode  2
Start Game
-------------
| _ |   | G |
-------------
|   |   | H |
-------------
Action :  (Down)
-------------
|   |   | G |
-------------
| _ |   | H |
-------------
Action :  (Right)
-------------
|   |   | G |
-------------
|   | _ | H |
-------------
Action :  (Right)
-------------
|   |   | G |
-------------
|   |   | [4mH[0m |
-------------
-------EPISODE DONE--------


In [9]:
# Implement Situation 3
# TODO

# Situation 3: Down → Right → Up → Right
state = q_world.reset()
done = False
episode = 3
delay = 0
cumulative_reward = 0

# Print Episode and Initial Grid
print_episode(episode=episode, delay=0)
print_status(q_world, done, 0, delay=delay)

# 1- Go Down
action = 1  # Down
next_state, reward, done = q_world.step(action)
cumulative_reward += reward
print_status(q_world, done, action, delay=delay)

# 2- Go Right
action = 2  # Right
next_state, reward, done = q_world.step(action)
cumulative_reward += reward
print_status(q_world, done, action, delay=delay)

# 3- Go Up
action = 3  # Up
next_state, reward, done = q_world.step(action)
cumulative_reward += reward
print_status(q_world, done, action, delay=delay)

# 4- Go Right
action = 2  # Right
next_state, reward, done = q_world.step(action)
cumulative_reward += reward
print_status(q_world, done, action, delay=delay)

# Print the cumulative reward
print("Cumulative Reward for Situation 3:", cumulative_reward)

# Compare with Situation 1 (Goal reward is +100)
print("Cumulative Reward for Situation 1: 100")
if cumulative_reward < 100:
    print("The trajectory in Situation 3 is suboptimal compared to Situation 1.")
else:
    print("The trajectory in Situation 3 matches Situation 1.")


Episode  3
Start Game
-------------
| _ |   | G |
-------------
|   |   | H |
-------------
Action :  (Down)
-------------
|   |   | G |
-------------
| _ |   | H |
-------------
Action :  (Right)
-------------
|   |   | G |
-------------
|   | _ | H |
-------------
Action :  (Up)
-------------
|   | _ | G |
-------------
|   |   | H |
-------------
Action :  (Right)
-------------
|   |   | [4mG[0m |
-------------
|   |   | H |
-------------
-------EPISODE DONE--------
Cumulative Reward for Situation 3: 0.0
Cumulative Reward for Situation 1: 100
The trajectory in Situation 3 is suboptimal compared to Situation 1.


In [10]:
# Implement Situation 4
# TODO

import random

# Situation 4: Random actions until the task is solved
episode = 0
delay = 0
solved = False

while not solved:
    state = q_world.reset()
    done = False
    episode += 1

    # Print the episode header
    print_episode(episode=episode, delay=0)

    # Run the episode
    while not done:
        action = random.choice([0, 1, 2, 3])  # Random action
        next_state, reward, done = q_world.step(action)
        print_status(q_world, done, action, delay=delay)

        # Check if the task is solved
        if done and next_state == 2:  # Goal state
            solved = True
            print(f"Task solved in {episode} episodes!")
            break


Episode  1
Action :  (Down)
-------------
|   |   | G |
-------------
| _ |   | H |
-------------
Action :  (Right)
-------------
|   |   | G |
-------------
|   | _ | H |
-------------
Action :  (Down)
-------------
|   |   | G |
-------------
|   | _ | H |
-------------
Action :  (Down)
-------------
|   |   | G |
-------------
|   | _ | H |
-------------
Action :  (Down)
-------------
|   |   | G |
-------------
|   | _ | H |
-------------
Action :  (Up)
-------------
|   | _ | G |
-------------
|   |   | H |
-------------
Action :  (Up)
-------------
|   | _ | G |
-------------
|   |   | H |
-------------
Action :  (Left)
-------------
| _ |   | G |
-------------
|   |   | H |
-------------
Action :  (Down)
-------------
|   |   | G |
-------------
| _ |   | H |
-------------
Action :  (Left)
-------------
|   |   | G |
-------------
| _ |   | H |
-------------
Action :  (Left)
-------------
|   |   | G |
-------------
| _ |   | H |
-------------
Action :  (Left)
-------------
|   