## Assignment XX: Markov Decision Process

Markov Decision Process (MDP) is a mathematical tool to model and solve a sequential decision-making problem.

Every RL task has an MDP: An RL agent interacts with the environment through its actions and the environment responds to the agent with a reward signal and transits the agent from the current state to the next state. The agent-environment interaction leads to the following trajectory.

<p align="center">
  <img src="https://raw.githubusercontent.com/poudel-bibek/AI-Assignments/main/Images_videos/MDP/1.png")
"/>
</p>

<p align="center">
  <em>Figure 1: Agent-Environment interaction trajectory. S=State, A=Action, R=reward.</em>
</p>


As an example, let's go through the following Grid World, where an agent starts at state S1 and tries to reach the goal state S10 as fast as possible. The black blocks are obstacles.

<p align="center">
  <img src="https://raw.githubusercontent.com/poudel-bibek/AI-Assignments/main/Images_videos/MDP/2.jpg")
"/>
</p>

<p align="center">
  <em>Figure 2: Grid World with 10 states. </em>
</p>


Here is the MDP of the Grid World.

- __States:__ agent's current position
- __Actions:__ LEFT, RIGHT, DOWN, UP. Note that not all states allow successful executions of all the actions, e.g., S1 only allows RIGHT and DOWN.
- __Transition function:__ defines how the environment responses to the agent's actions. This grid-world has a deterministic transition function.
- __Reward function:__ The agent receives a negative reward (penalty) of -1 at every time step (a time sensitive task). Thus, the goal of the agent is to receive as low penalty as possible.

<p align="center">
  <img src="https://raw.githubusercontent.com/poudel-bibek/AI-Assignments/main/Images_videos/MDP/3.jpg")
"/>
</p>

<p align="center">
  <em>Figure 3: Transition function of Grid World. </em>
</p>

Now, let's set-up this MDP using [OpenAI Gym](https://gymnasium.farama.org/). Please carefully study the code and make sure you understand it.



In [1]:
# Legacy gym
import gym
import numpy as np

import warnings
warnings.filterwarnings("ignore")

Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
See the migration guide at https://gymnasium.farama.org/introduction/migration_guide/ for additional information.


In [2]:
class GridWorld(gym.Env):
  def __init__(self, ):

    # A simple reward function
    self.step_reward = -1 # Every time-step incurs a -1 reward
    self.obstacle_reward = -10 # Obstacles have a high negative reward -10
    self.goal_reward = 10 # Goal state has a high positive reward

    # Set the boundary of the grid
    self.x_min = 0
    self.y_min = 0
    self.x_max = 3 # 0, 1, 2, 3
    self.y_max = 2 # 0, 1, 2

    # Action space
    self.action_space = gym.spaces.Discrete(4)
    self.state_space = [(i,j) for i in range(self.x_max+1) for j in range(self.y_max+1)]

    # Special places in state space
    self.obstacle_locations = [(2,1),(3,1)]
    self.goal_location = [(self.x_max, self.y_max)]
    self.start_location = (self.x_min, self.y_min)

    # Transition function
    self.transition_function = self.create_transition_function()

  def create_transition_function(self, ):
    # Deterministic transition function, rewards included
    transitions = {}
    for state in self.state_space:
      for action in range(self.action_space.n):
        x, y = state
        # Move up
        if action == 0:
          y = max(self.y_min, y-1)

        # Move down
        elif action == 1:
          y = min(self.y_max, y+1)

        # Move Left
        elif action == 2:
          x = max(x-1, self.x_min)

        # Move right
        elif action == 3:
          x = min(x+1, self.x_max)

        next_state = (x,y)
        reward = self.step_reward

        # Reward based on if we transition to special states
        if next_state in self.obstacle_locations:
          reward += self.obstacle_reward

        elif next_state in self.goal_location:
          reward += self.goal_reward

        transitions[(state, action)] = next_state, reward
    return transitions

  def step(self, agent_location, action):
    # Current location coordinates
    (x,y) = agent_location

    # Apply the action
    next_location, reward = self.transition_function.get(((x,y),action))
    #print(f"Current Location = {agent_location} Next location = {next_location}, Reward = {reward}")

    done = self.check_terminal(next_location) # Check whether the next state is terminal or not

    return next_location, reward, done, {}

  def reset(self, ):
    return self.start_location

  def check_terminal(self, state):
    # What to make the terminal states?
    # Both goal and obstacle states are terminal because there is no notion of `future rewards' from them
    if state in [(self.x_max, self.y_max)]:
      return True

    elif state in self.obstacle_locations:
      return True

    else:
      return False


---
# Exercise 1.

Please finish the following task.

Write code to advance the environment step by step. Output the current state, reward, and the next state of the agent taking the following steps: RIGHT, DOWN, RIGHT. Also output a flag (in binary) to indiciate whether the current episode is terminated or not (for this problem an episode will terminate if the agent either hits an obstacle or reaches the goal state).

In [16]:
################ WRITE YOUR CODE HERE ################
steps = [3,1,3]
terminator_flag = False # False not terminated, True terminated

env = GridWorld()
location = env.reset()

for step in steps:
  next_location, reward, terminator_flag, info = env.step(location, step)
  print("State: ", location)
  print("Reward: ", reward)
  print("Next State: ", next_location)
  if terminator_flag:
    print(1)
  else:
    print(0)
  location = next_location


State:  (0, 0)
Reward:  -1
Next State:  (1, 0)
0
State:  (1, 0)
Reward:  -1
Next State:  (1, 1)
0
State:  (1, 1)
Reward:  -11
Next State:  (2, 1)
1


---
# Exercise 2.

Please finish the following task.

Write a complete MDP for a simpler version of the Grid World below.

<p align="center">
  <img src="https://raw.githubusercontent.com/poudel-bibek/AI-Assignments/main/Images_videos/MDP/4.jpg")
"/>
</p>

<p align="center">
  <em>Figure 4: A Grid World with 7 states.</em>
</p>


You need to inherit the `gym.Env` class and implement the five functions given below.

- __init__
  - Define a reward function (step reward, goal reward, obstacle reward)
  - Define action space and state space of the environment
  - Define the grid boundary, location of the obstacle and goal states

- __create_transition_function__
  - Define what (State, Action) combination leads to what (Next State, Reward).
  - Establishe the coordinate system for the environment

- __step__
  - Advance the environrment by one step including:
    - Update the agent's location
    - Return the next state and reward for the last action taken
    - Check if the agent has reached a terminal state

- __reset__
  - Reset the agent's location (to S1) and return it

- __check_terminal__
  - Define terminal states
  - Returns True if a given state is a terminal state



You can use the code skeleton below to get started.

```
class YourOwnGridWorld(gym.Env):
  def __init__(self, ):
    ###      YOUR CODE HERE       ###
    pass

  def create_transition_function(self, ):
    ###      YOUR CODE HERE       ###
    pass

  def step(self, agent_location, action):
    ###      YOUR CODE HERE       ###
    pass

  def reset(self, ):
    ###      YOUR CODE HERE       ###
    pass

  def check_terminal(self, state):
    ###      YOUR CODE HERE       ###
    pass

```




In [17]:
class YourOwnGridWorld(gym.Env):
  def __init__(self, ):

    # A simple reward function
    self.step_reward = -1 # Every time-step incurs a -1 reward
    self.obstacle_reward = -10 # Obstacles have a high negative reward -10
    self.goal_reward = 10 # Goal state has a high positive reward

    # Set the boundary of the grid
    self.x_min = 0
    self.y_min = 0
    self.x_max = 2 # 0, 1, 2, 3
    self.y_max = 2 # 0, 1, 2

    # Action space
    self.action_space = gym.spaces.Discrete(4)
    self.state_space = [(i,j) for i in range(self.x_max+1) for j in range(self.y_max+1)]

    # Special places in state space
    self.obstacle_locations = [(0,1),(0,2)]
    self.goal_location = [(self.x_max, self.y_max)]
    self.start_location = (self.x_min, self.y_min)

    # Transition function
    self.transition_function = self.create_transition_function()

  def create_transition_function(self, ):
    # Deterministic transition function, rewards included
    transitions = {}
    for state in self.state_space:
      for action in range(self.action_space.n):
        x, y = state
        # Move up
        if action == 0:
          y = max(self.y_min, y-1)

        # Move down
        elif action == 1:
          y = min(self.y_max, y+1)

        # Move Left
        elif action == 2:
          x = max(x-1, self.x_min)

        # Move right
        elif action == 3:
          x = min(x+1, self.x_max)

        next_state = (x,y)
        reward = self.step_reward

        # Reward based on if we transition to special states
        if next_state in self.obstacle_locations:
          reward += self.obstacle_reward

        elif next_state in self.goal_location:
          reward += self.goal_reward

        transitions[(state, action)] = next_state, reward
    return transitions

  def step(self, agent_location, action):
    # Current location coordinates
    (x,y) = agent_location

    # Apply the action
    next_location, reward = self.transition_function.get(((x,y),action))
    #print(f"Current Location = {agent_location} Next location = {next_location}, Reward = {reward}")

    done = self.check_terminal(next_location) # Check whether the next state is terminal or not

    return next_location, reward, done, {}

  def reset(self, ):
    return self.start_location

  def check_terminal(self, state):
    # What to make the terminal states?
    # Both goal and obstacle states are terminal because there is no notion of `future rewards' from them
    if state in [(self.x_max, self.y_max)]:
      return True

    elif state in self.obstacle_locations:
      return True

    else:
      return False

### Testing Your Solution

From Figure 4, we know that the shortest path from the starting state to the goal state is <S1, S2, S3, S5, S7>. The following code snippet, which traces the shortest path trajectory, can be used to test your MDP design.

Note that you need to replace ACTION RIGHT and ACTION DOWN with the numbers you assigned for these two actions in your MDP.


In [18]:
your_env = YourOwnGridWorld()

initial_observation = your_env.reset()
print(initial_observation)

# Take a step Right
first_step, _, _, _ = your_env.step(initial_observation, 3)
print(first_step)

# Take another step Right
second_step, _, _, _ = your_env.step(first_step, 3)
print(second_step)

# Take a step Down
third_step, _, _, _ = your_env.step(second_step, 1)
print(third_step)

# Take another step Down
fourth_step, _, _, _ = your_env.step(third_step, 1)
print(fourth_step)

(0, 0)
(1, 0)
(2, 0)
(2, 1)
(2, 2)
