<a href="https://colab.research.google.com/github/MoseT1/KoreaUniversity-Assignments/blob/main/Lab02_MDP_(practice).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Lab for Markov Decision Process
1. GridWorld: a simple, finite-state MDP
2. MDPs in Gymnasium

### Install & import required packages

In [None]:
!pip install numpy gymnasium

Collecting gymnasium
  Downloading gymnasium-0.29.1-py3-none-any.whl (953 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.9/953.9 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-0.29.1


In [None]:
import os
import time

import numpy as np

import gymnasium as gym
from gym.spaces import Discrete
from IPython.display import clear_output


# Recall: MDP is defined as follows. $\cal{M} = <\cal {S, A, P, R}, \gamma>$
- If we know the reward function $\cal{R}$ and the transition probability $\cal{P}$, we say we know the information about the environment, or the environment model is known. In that case, we can do planning -- we can obtain the best policy without having to interact with an environment.  

- However in most real cases, those informations are unknown. If we don't know the environment, we can try a (environment) model-free approach which involves trial-and-errors.

### Reinforcement Learning (RL) can do model-free learning!

# MDP: GridWorld
- This is a simple finite-state MDP you encountered in the previous lecture. Please read carefully, and understand how state, action, reward and transition probability is defined.


In [None]:
class Environment:
    def __init__(self, env_config=None, *args, **kwargs) -> None:
        self.grid_size = env_config['grid_size']
        self.out_reward = env_config['out_reward']
        self.step_reward = env_config['step_reward']
        self.goal_reward = env_config['goal_reward']
        self.max_step = env_config['max_step']
        self.goal = env_config['goal']
        self.action_space = Discrete(4)
        self.observation_space = Discrete(self.grid_size*self.grid_size)
        self.seeker = (0, 0)
        self.info = {'seeker': self.seeker, 'goal': self.goal}
        self.timestep = 0

    def reset(self):
        self.seeker = (0, 0) # row, col
        self.timestep = 0
        return self.get_observation()

    def get_observation(self):
        return self.grid_size * self.seeker[0] + self.seeker[1]

    def get_reward(self):
        return self.goal_reward if self.seeker == self.goal else 0

    def is_done(self):
        if self.timestep == self.max_step:
            return True
        return self.seeker == self.goal

    def check_pos(self, seeker):
        is_out = False
        if seeker[0] < 0 or seeker[0] > self.grid_size - 1 or \
            seeker[1] < 0 or seeker[1] > self.grid_size - 1:
            is_out = True
        return is_out

    def step(self, action):
        self.timestep += 1
        reward = 0
        is_out = False

        if action == 0: # move left
            self.seeker = (self.seeker[0], self.seeker[1] - 1)
            is_out =  self.check_pos(self.seeker)
            if is_out:
                self.seeker = (self.seeker[0], self.seeker[1] + 1)

        elif action == 1: # move right
            self.seeker = (self.seeker[0], self.seeker[1] + 1)
            is_out =  self.check_pos(self.seeker)
            if is_out:
                self.seeker = (self.seeker[0], self.seeker[1] - 1)

        elif action == 2: # move up
            self.seeker = (self.seeker[0] - 1, self.seeker[1])
            is_out =  self.check_pos(self.seeker)
            if is_out:
                self.seeker = (self.seeker[0] + 1, self.seeker[1])

        elif action == 3: # move down
            self.seeker = (self.seeker[0] + 1, self.seeker[1])
            is_out =  self.check_pos(self.seeker)
            if is_out:
                self.seeker = (self.seeker[0] - 1, self.seeker[1])
        else:
            raise ValueError("Invalid action")

        if is_out:
            reward = self.out_reward
        else:
            reward = self.get_reward() + self.step_reward

        return self.get_observation(), reward, self.is_done(), self.info

    def render(self, *args, **kwaargs):
        os.system('cls' if os.name == 'nt' else 'clear')
        clear_output(wait=True)
        grid_row = ['| ' for _ in range(self.grid_size)]
        grid = [grid_row + ["|\n"] for _ in range(self.grid_size)]
        grid[self.goal[0]][self.goal[1]] = '|G'
        grid[self.seeker[0]][self.seeker[1]] = '|A'
        print(''.join([''.join(grid_row) for grid_row in grid]))


class Gridworld(Environment, gym.Env):
    def __init__(self, env_config, *args, **kwargs) -> None:
        super().__init__(env_config=env_config, *args, **kwargs)

  and should_run_async(code)


By recognizing the environment, you can find out that it has 4 actions:

0: LEFT  
1: RIGHT  
2: UP   
3: DOWN


In [None]:
from enum import IntEnum

class Action(IntEnum):
    LEFT = 0
    RIGHT = 1
    UP = 2
    DOWN = 3

Note that you can change the grid size, goal reward, and max_step of the environment. Let's start from simple setting.

In [None]:
env_config = {"grid_size": 4,
              "goal": (3, 3),
             "goal_reward": 1,
             "step_reward": -1,
             "out_reward": -5,
             "max_step": 20 }
env = Gridworld(env_config)
# render shows the grid map
env.render()

|A| | | |
| | | | |
| | | | |
| | | |G|



SyntaxError: incomplete input (<ipython-input-20-e852b39f3303>, line 1)

Check what happens after applying action.

In [None]:
env.step(0)
env.render()

|A| | | |
| | | | |
| | | | |
| | | |G|



In [None]:
env.step(3)
env.render()

| | | | |
|A| | | |
| | | | |
| | | |G|



## 1. Random policy  

We learned that we need a policy $\pi$ to solve MDP. Let's first try random policy, and see how it performs.

## TODO 1. Implement the random policy
- implement the random_policy function
  - Input: env
  - Return: random action (0~3)

### Hints
- Utilize the `random` built-in library` and action information
  - There is a built-in function `random.randint(min, max)`
  - You can get the size of action dimension by using `env.action_space.n`
- You can also use the `env.action_space.sample()` function

In [None]:
import random

def random_policy(env):
    ### Implement here
    if env.action_space.n <= env.max_step:
      action = random.randint(0,3)
      return action



# Test your random policy

In [None]:
render_time = 0.5
render = True
for episode in range(0, 2):
    state = env.reset()
    done = False
    total_reward = 0
    current_step = 0
    while not done:
        action = random_policy(env)

        next_state, reward, done, info = env.step(action)
        current_step += 1

        total_reward += reward
        state = next_state
        if render:
            time.sleep(render_time)
            env.render()
            print(Action(action))
            print(f"current episode: {episode}")
            print(f"steps: {current_step}")
            print(f"agent pos. x: {env.seeker[0]:>2d}, y: {env.seeker[1]:>2d}")
            print(f"goal pos.  x: {env.goal[0]:>2d}, y: {env.goal[1]:>2d}")
            print(f"total reward: {total_reward:>2d}")


| | | | |
| | | | |
| | | | |
| | | |A|

Action.DOWN
current episode: 1
steps: 10
agent pos. x:  3, y:  3
goal pos.  x:  3, y:  3
total reward: -17


In [None]:
state

14

## 2. Manual policy   

Did you see the performance of random policy? It fails to reach the destination in most cases.  Let's implement manual policy!

## TODO 2. Implement the manual policy
- Implement the manual_policy function
  - Input: state
  - Return: appropriate action
  
## Hints
- Utilize the `if, elif, else` conditional statement and enum class `Action`
- Utilize `random.choice(list)` built-in function

In [None]:
# example) random.choice
action_list = [1,3]
action = random.choice(action_list)
print(action)

3


<enum 'Action'>


  and should_run_async(code)


In [None]:
def manual_policy(env):
    ### Implement here ###
    if state in [0,1,2,4,5,6,8,9,10]:
      action = random.choice(action_list)
    elif state in [3,7,11]:
      action = 3
    else:
      action =1
    return action

  and should_run_async(code)


# Test your manual policy

In [None]:
render_time = 0.5 # this must be set large enough so that colab do not skip the printouts
render = True
for episode in range(0, 10):
    state = env.reset()
    done = False
    total_reward = 0
    current_step = 0
    while not done:
        action = manual_policy(state)
        next_state, reward, done, info = env.step(action)
        current_step += 1
        total_reward += reward
        state = next_state
        if render:
            time.sleep(render_time)
            env.render()
            print(Action(action))
            print(f"current episode: {episode}")
            print(f"steps: {current_step}")
            print(f"agent pos. x: {env.seeker[0]:>2d}, y: {env.seeker[1]:>2d}")
            print(f"goal pos.  x: {env.goal[0]:>2d}, y: {env.goal[1]:>2d}")
            print(f"total reward: {total_reward:>2d}")

| | | | |
| | | | |
| | | | |
| | | |A|

Action.RIGHT
current episode: 9
steps: 6
agent pos. x:  3, y:  3
goal pos.  x:  3, y:  3
total reward: -5


# Well done!
- Now, how about this?

In [None]:
env_config = {"grid_size": 8,
              "goal": (7, 0),
             "goal_reward": 1,
             "step_reward": -1,
             "out_reward": -5,
             "max_step": 50 }
env = Gridworld(env_config)
env.render()

|A| | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
|G| | | | | | | |



and this?

In [None]:
env_config = {"grid_size": 16,
              "goal": (13, 11),
             "goal_reward": 1,
             "step_reward": -1,
             "out_reward": -5,
             "max_step": 250 }
env = Gridworld(env_config)
env.render()

|A| | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | |G| | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |



# Of course, you can implement manual policies for those environments.
- Is it meaningful?
- what happens if it goes high-dimensional, or more complex, and ...

### Resources are limited - Instead of designing policies manually, we can make computer to learn the policy.
### That's what reinforcement learning is about!


# 3. RL policy

We will see how reinforcement learning works here. This algorithm is called Q-learning, and is not yet appeared in the lecture (so don't be stressed if it is difficult to understand!)

### The policy is provoided as follows:

In [None]:
class Policy:
    def __init__(self, env): # Q-table of size |S| x |A|
        self.state_action_table = np.zeros((env.observation_space.n, env.action_space.n))
        self.action_space = env.action_space

    def get_action(self, state, explore=True, epsilon=0.1): # epsilon-greedy
        if explore and random.uniform(0, 1) < epsilon:
            return self.action_space.sample()
        else:
            return np.argmax(self.state_action_table[state, :])

    def save(self, num):
        name = f"policy{num}.npy"
        np.save(name, self.state_action_table)

    def load(self, npy_path):
         self.state_action_table = np.load(npy_path)

# Rollout function

In reinforcement learning, `rollout` refers to the process of getting samples (=experiences) via agent-environment interactions.
- Inputs: policy, environment, maximum length of episode (T), ... etc
- Return: Trajectory (experiences, or (s, a, s', r, d) x ?)

In [None]:
def rollout(env, policy, T, *args, **kwargs):
    render = kwargs["render"]
    epsilon = kwargs["epsilon"]
    render_time = kwargs["render_time"]
    explore = kwargs["explore"]
    episode = kwargs["episode"]

    experiences = []
    state = env.reset()
    done = False
    total_reward = 0
    current_step = 0
    while not done:
        action = policy.get_action(state, explore, epsilon)
        next_state, reward, done, info = env.step(action)
        experiences.append([state, action, reward, next_state, done])
        current_step += 1
        total_reward += reward
        state = next_state
        if render:
            time.sleep(render_time)
            env.render()
            print(Action(action))
            print(f"current episode: {episode}")
            print(f"current step: {current_step}")
            print(f"agent pos. x: {env.seeker[0]:>2d}, y: {env.seeker[1]:>2d}")
            print(f"goal pos.  x: {env.goal[0]:>2d}, y: {env.goal[1]:>2d}")
            print(f"total reward: {total_reward:>2d}")

        if current_step == T:
            break
    return experiences

# Update functions
These functions are used to train and evaluate the Q-learning agent.
- update_policy: Update the policy by using Q-learning equation
- train_policy: Iterate update_policy over the episodes.
- evaluate_policy: Evaluate and visualize to see if the learning went well.

In [None]:
def update_policy(policy, trajectory, weight=0.1, discount_factor=0.9):
    random.shuffle(trajectory)
    for state, action, reward, next_state, done in trajectory:
        next_max = np.max(policy.state_action_table[next_state, :])
        value = policy.state_action_table[state, action]
        new_value = (1 - weight) * value + weight * (reward + discount_factor * next_max)
        policy.state_action_table[state][action] = new_value

def train_policy(env, policy, T=20, num_episodes=10000, weight=0.1, discount_factor=0.9, epsilon=0.5):
    for e in range(1, num_episodes+1):
        trajectory = rollout(env, policy, T, render=False, render_time=0.0, explore=True, epsilon=epsilon, episode=e)
        update_policy(policy, trajectory, weight, discount_factor)

    policy.save(e)
    return policy

def evaluate_policy(env, policy, T, npy_path, num_episodes=5):
    policy.load(npy_path)
    steps = 0
    total_reward_lst = []
    avg_score = 0
    for e in range(num_episodes):
        experiences = rollout(env, policy, T, render=True, render_time=0.1, explore=False, epsilon=0, episode=e)
        total_reward = 0
        for transition in experiences:
            total_reward += transition[2]
        total_reward_lst.append(total_reward)
        steps += len(experiences)

    avg_score = sum(total_reward_lst) / len(total_reward_lst)
    return steps / num_episodes, avg_score, total_reward_lst

In [None]:
env_config = {"grid_size": 8,
              "goal": (6, 6),
             "goal_reward": 1,
             "step_reward": -1,
             "out_reward": -5,
             "max_step": 40 }
env = Gridworld(env_config)
env.render()

|A| | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | |G| |
| | | | | | | | |



# Training a policy

In [None]:
untrained_policy = Policy(env)
trained_policy = train_policy(env, untrained_policy, env_config["max_step"], num_episodes=10000)

# Evaluate trained policy
- Check the Action-Values

In [None]:
# check action-values at state 0
print('Action-Values: ', [(Action(i), a) for i, a in enumerate(np.round(trained_policy.state_action_table[0, :], 2))])
# check action-values at state 14
print('Action-Values: ', [(Action(i), a) for i, a in enumerate(np.round(trained_policy.state_action_table[-2, :], 2))])

Action-Values:  [(<Action.LEFT: 0>, -11.18), (<Action.RIGHT: 1>, -6.86), (<Action.UP: 2>, -11.18), (<Action.DOWN: 3>, -6.86)]
Action-Values:  [(<Action.LEFT: 0>, -1.88), (<Action.RIGHT: 1>, -1.87), (<Action.UP: 2>, 0.0), (<Action.DOWN: 3>, -4.89)]


- Check the performance

In [None]:
avg_steps, avg_score, total_reward_lst = evaluate_policy(env, trained_policy, env_config["max_step"], "policy10000.npy")

| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | |A| |
| | | | | | | | |

Action.DOWN
current episode: 4
current step: 12
agent pos. x:  6, y:  6
goal pos.  x:  6, y:  6
total reward: -11


In [None]:
print(f"avg_steps: {avg_steps}, avg_score: {avg_score}")

avg_steps: 12.0, avg_score: -11.0


# Gymnasium provides various MDPs!
- You can check their configurations as well
- If you are more curious, you can see how they are implemented; Gymnasium is open-source library!

In [None]:
# toy text - Frozenlake
env = gym.make('FrozenLake-v1')

# state (observation) and action space information
print(env.observation_space)
print(env.action_space)

Discrete(16)
Discrete(4)


In [None]:
env2 = gym.make('CartPole-v1')
### print observation_space and action_space

env3 = gym.make('Pendulum-v1')
### print observation_space and action_space


# Get used to Gymnasium
- will use it throughout the course.