# Contents  
- How to check the MDP information
- Why do we need Reinforcement Learning?
  - Random policy
  - Hand-crafted policy
- RL policy, Rollout function

Writer: KukJin Kim.  
Email: kukjinkim@korea.ac.kr

#### Install required packages

In [1]:
!pip install numpy gym

Collecting gym
  Downloading gym-0.26.2.tar.gz (721 kB)
     ---------------------------------------- 0.0/721.7 kB ? eta -:--:--
     - -------------------------------------- 30.7/721.7 kB ? eta -:--:--
     ------ ------------------------------- 122.9/721.7 kB 2.4 MB/s eta 0:00:01
     ---------- --------------------------- 204.8/721.7 kB 1.8 MB/s eta 0:00:01
     ------------- ------------------------ 256.0/721.7 kB 2.2 MB/s eta 0:00:01
     ------------------- ------------------ 368.6/721.7 kB 1.9 MB/s eta 0:00:01
     ------------------------------ ------- 583.7/721.7 kB 2.6 MB/s eta 0:00:01
     ------------------------------ ------- 583.7/721.7 kB 2.6 MB/s eta 0:00:01
     -------------------------------------- 721.7/721.7 kB 2.2 MB/s eta 0:00:00
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
  Preparing metadata 

#### Import packages

In [2]:
import numpy as np
import time
import os
import gym
from gym.spaces import Discrete
from IPython.display import clear_output


# 1. First step for RL: Check the MDP information

![](2023-03-27-18-51-41.png)

In class, we learned that reinforcement learning is a kind of black box that takes MDP as input and outputs optimal policies.  
The first step for reinforcement learning is to define an MDP or to check the information in the defined MDP.

MDP is defined as follows. $<\cal {S, A, P, R}, \gamma>$

![](2023-03-27-20-33-27.png)

If we know the reward and the transition probability here, we say we know the information about the environment.  
Then we can do planning. We can obtain the best policy without having to interact with an environment.  
For most problems in reality, the transition probability is unknown. If we don't know the environment, we can try a model-free approach.  


##### The core information we use is the dimension of the state space (observation space) and the action space.  
We can know the state and action infomation of the environment by env.observation_space and env.action_space

### FrozenLake environment
![](2023-03-27-20-51-22.png)

In [3]:
env = gym.make('FrozenLake-v1')

In [4]:
# state (observation) and action space information
print(env.observation_space)
print(env.action_space)

Discrete(16)
Discrete(4)


Let's guess the information of other environments
### CartPole
State: angle of a pole, x location, velocity, raw image, or a combination of above factors  
Action: move left, move right

![](2023-03-27-20-52-19.png)  

### Pendulum
State: angle of a pendulum, angular velocity, raw image, or a combination of above factors 
Action: rotate left, rotate right, amount of force

![](2023-03-27-20-52-42.png)

#### Check the state and action dimension by using env.observation_space and env.action_space ###

In [6]:
env2 = gym.make('CartPole-v1')
### print observation_space and action_space
print(env2.observation_space)
print(env2.action_space)

print()

env3 = gym.make('Pendulum-v1')
### print observation_space and action_space
print(env3.observation_space)
print(env3.action_space)

Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)
Discrete(2)

Box([-1. -1. -8.], [1. 1. 8.], (3,), float32)
Box(-2.0, 2.0, (1,), float32)


What's the output? If you look at the information in CartPole and Pendulum, you can see that observation space and action space are configured as objects called Box.  
Observation and action can be discrete or continuous. In continuous cases, it is represented through the Box object.  

# 2. Why do we need Reinforcement Learning?

Now we'll see why reinforcement learning is useful in sequential decision making.  
Let's assume that we want to solve the simple gridworld environment that we learned in class  
The implementations of the environment are below


In [7]:
class Environment:

    def __init__(self, env_config=None, *args, **kwargs) -> None:
        self.grid_size = env_config['grid_size']
        self.out_reward = env_config['out_reward']
        self.step_reward = env_config['step_reward']
        self.goal_reward = env_config['goal_reward']
        self.max_step = env_config['max_step']
        self.goal = env_config['goal']
        self.action_space = Discrete(4)
        self.observation_space = Discrete(self.grid_size*self.grid_size)
        self.seeker = (0, 0)
        self.info = {'seeker': self.seeker, 'goal': self.goal}
        self.timestep = 0

    def reset(self):
        self.seeker = (0, 0) # row, col
        self.timestep = 0
        return self.get_observation()
    
    def get_observation(self):
        return self.grid_size * self.seeker[0] + self.seeker[1]
    
    def get_reward(self):
        return self.goal_reward if self.seeker == self.goal else 0
    
    def is_done(self):
        if self.timestep == self.max_step:
            return True
        return self.seeker == self.goal

    def check_pos(self, seeker):
        is_out = False
        if seeker[0] < 0 or seeker[0] > self.grid_size - 1 or \
            seeker[1] < 0 or seeker[1] > self.grid_size - 1: 
            is_out = True
        return is_out

    def step(self, action):
        self.timestep += 1
        reward = 0
        is_out = False

        if action == 0: # move left
            self.seeker = (self.seeker[0], self.seeker[1] - 1)
            is_out =  self.check_pos(self.seeker)
            if is_out:
                self.seeker = (self.seeker[0], self.seeker[1] + 1)

        elif action == 1: # move right
            self.seeker = (self.seeker[0], self.seeker[1] + 1)
            is_out =  self.check_pos(self.seeker)
            if is_out:
                self.seeker = (self.seeker[0], self.seeker[1] - 1)

        elif action == 2: # move up
            self.seeker = (self.seeker[0] - 1, self.seeker[1])
            is_out =  self.check_pos(self.seeker)
            if is_out:
                self.seeker = (self.seeker[0] + 1, self.seeker[1])
                
        elif action == 3: # move down
            self.seeker = (self.seeker[0] + 1, self.seeker[1])
            is_out =  self.check_pos(self.seeker)
            if is_out:
                self.seeker = (self.seeker[0] - 1, self.seeker[1])
        else:
            raise ValueError("Invalid action")

        if is_out:
            reward = self.out_reward
        else:
            reward = self.get_reward() + self.step_reward

        return self.get_observation(), reward, self.is_done(), self.info

    def render(self, *args, **kwaargs):
        os.system('cls' if os.name == 'nt' else 'clear')
        clear_output(wait=True)
        grid_row = ['| ' for _ in range(self.grid_size)] 
        grid = [grid_row + ["|\n"] for _ in range(self.grid_size)]
        grid[self.goal[0]][self.goal[1]] = '|G'
        grid[self.seeker[0]][self.seeker[1]] = '|A'
        print(''.join([''.join(grid_row) for grid_row in grid]))


class Gridworld(Environment, gym.Env):
    def __init__(self, env_config, *args, **kwargs) -> None:
        super().__init__(env_config=env_config, *args, **kwargs)

grid size, goal reward, and max_step are defined in dictionaries. We can change the environment by modifying these values.

### Action:  
0: LEFT  
1: RIGHT  
2: UP   
3: DOWN  

In [8]:
from enum import IntEnum

class Action(IntEnum):
    LEFT = 0
    RIGHT = 1
    UP = 2
    DOWN = 3

In [9]:
env_config = {"grid_size": 4,
              "goal": (3, 3),
             "goal_reward": 1,
             "step_reward": -1,
             "out_reward": -5,
             "max_step": 20 }
env = Gridworld(env_config)
env.render()

|A| | | |
| | | | |
| | | | |
| | | |G|



In [10]:
env.step(Action.RIGHT)
env.render()

| |A| | |
| | | | |
| | | | |
| | | |G|



In [11]:
env.step(Action.DOWN)
env.render()

| | | | |
| |A| | |
| | | | |
| | | |G|



## 2.1 Random policy  
We learned that we need a policy to solve decision problems.  
First, you need to implement the policy that behave randomly in your environment.  
Random policy is considered as baseline.

## Pop Quiz 1. Implement the random_policy function
- random_policy function returns random action (0~3)
- To return a random action, utilize the `random` built-in library` and action information
- You can get the size of action dimension by using `env.action_space.n`
- Hint: use `random.randint(min, max)`

In [14]:
import random

def random_policy(env):
    ### Implement here ###
    action = random.randint(0, env.action_space.n - 1)
    return action

Run the below block to see whether your random_policy works well

In [15]:
render_time = 0.5
render = True
for episode in range(0, 2):
    state = env.reset()
    done = False
    total_reward = 0
    current_step = 0
    while not done:
        action = random_policy(env)
        # ! You can also use the env.action_space.sample() function
        # action = env.action_space.sample()
        
        next_state, reward, done, info = env.step(action)
        current_step += 1
        
        total_reward += reward
        state = next_state
        if render:
            time.sleep(render_time)
            env.render()
            print(Action(action))
            print(f"current episode: {episode}")
            print(f"steps: {current_step}")
            print(f"agent pos. x: {env.seeker[0]:>2d}, y: {env.seeker[1]:>2d}")
            print(f"goal pos.  x: {env.goal[0]:>2d}, y: {env.goal[1]:>2d}")
            print(f"total reward: {total_reward:>2d}")

| | | | |
| | | | |
| | | | |
| | | |A|

Action.RIGHT
current episode: 1
steps: 8
agent pos. x:  3, y:  3
goal pos.  x:  3, y:  3
total reward: -7


## 2.2 Hand-crafted policy   
In the above code block, the random policy mostly does not reach the destination well.  
We know that the optimal policy should be made as shown in the figure below. Let's implement hand-crafted policy!  

![](2023-03-27-22-16-45.png)

## Pop Quiz 2. Implement the hand-crafted policy
- Hand-crafted policy takes state as input (0~14) and returns a optimal action (0~3)
- To return the optimal action, utilize the `if, elif, else` conditional statement and enum class `Action`
- Use `random.choice(list)` built-in function for the cases (0, 1, 2, 4, 5, 6, 8, 9, 10) to return random action


In [16]:
from enum import IntEnum

class Action(IntEnum):
    LEFT = 0
    RIGHT = 1
    UP = 2
    DOWN = 3

In [17]:
action_list = [Action.RIGHT, Action.DOWN]
action = random.choice(action_list)
print(action)

Action.RIGHT


In [18]:
def hand_crafted_policy(state):
    if state in [0, 1, 2, 4, 5, 6, 8, 9, 10]:
        action = random.choice(action_list)
    elif state in [3, 7, 11]:
        action = Action.DOWN
    else:
        action = Action.RIGHT
    return action

Run the below block to see whether your hand_crafted_policy works well

In [19]:
render_time = 0.1
render = True
for episode in range(0, 10):
    state = env.reset()
    done = False
    total_reward = 0
    current_step = 0
    while not done:
        action = hand_crafted_policy(state)
        next_state, reward, done, info = env.step(action)
        current_step += 1
        total_reward += reward
        state = next_state
        if render:
            time.sleep(render_time)
            env.render()
            print(Action(action))
            print(f"current episode: {episode}")
            print(f"steps: {current_step}")
            print(f"agent pos. x: {env.seeker[0]:>2d}, y: {env.seeker[1]:>2d}")
            print(f"goal pos.  x: {env.goal[0]:>2d}, y: {env.goal[1]:>2d}")
            print(f"total reward: {total_reward:>2d}")

| | | | |
| | | | |
| | | | |
| | | |A|

Action.RIGHT
current episode: 9
steps: 6
agent pos. x:  3, y:  3
goal pos.  x:  3, y:  3
total reward: -5


If you implemented it well, your policy will be performed successfully in a gridworld environment.  
But let's think about the following situation. Grid size is increased doubly and destination location is moved to left side.  

In [20]:
env_config = {"grid_size": 8,
              "goal": (7, 0),
             "goal_reward": 1,
             "step_reward": -1,
             "out_reward": -5,
             "max_step": 50 }
env = Gridworld(env_config)
env.render()



|A| | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
|G| | | | | | | |



In [21]:
env_config = {"grid_size": 16,
              "goal": (13, 11),
             "goal_reward": 1,
             "step_reward": -1,
             "out_reward": -5,
             "max_step": 250 }
env = Gridworld(env_config)
env.render()

|A| | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | |G| | | | |
| | | | | | | | | | | | | | | | |
| | | | | | | | | | | | | | | | |



##### Of course, we can create manual policies for changing environments, but we can't solve every problem in reality in this way because of limited time and cost. If we require to solve high-dimensional or continuous domain problem, we need an automated methodology.   
### That's reinforcement learning!! 😎

# 3. RL policy, Rollout function

We will practice Q-Learning of reinforcement learning. You don't have to understand everything because you'll soon learn the detailed principles in future lecture.  
So let's implement RL Policy. In the above code blocks, the policy is implemented as a function such like `random_policy` and `hand_crafted_policy`  
But policy is typically implemented through `stateful class`.  

In [22]:
class Policy:
    def __init__(self, env): # Q-table
        self.state_action_table = [
            [0.0 for _ in range(env.action_space.n)] for _ in range(env.observation_space.n)
            ]
        self.action_space = env.action_space
    
    def get_action(self, state, explore=True, epsilon=0.1): # epsilon-greedy
        if explore and random.uniform(0,1) < epsilon:
            return self.action_space.sample()
        return np.argmax(self.state_action_table[state])

    def save(self, num): 
        name = f"policy{num}.npy"
        np.save(name, self.state_action_table)
    
    def load(self, npy_path):
         self.state_action_table = np.load(npy_path)

To reduce code repetition, let's make the abstract function of the the agent interactions.  
We will make the `rollout` function that is the process of getting samples while the agent interacts with the environment.  
The rollout function takes input such as policy, environment, and maximum length of episode and then returns the agent's trajectory.  

![](2023-03-27-23-21-30.png)

In [23]:
def rollout(env, policy, T, *args, **kwargs):
    render = kwargs["render"]
    epsilon = kwargs["epsilon"]
    render_time = kwargs["render_time"]
    explore = kwargs["explore"]
    episode = kwargs["episode"]
    
    experiences = []
    state = env.reset()
    done = False
    total_reward = 0
    current_step = 0
    while not done:
        action = policy.get_action(state, explore, epsilon)
        next_state, reward, done, info = env.step(action)
        experiences.append([state, action, reward, next_state, done])
        current_step += 1
        total_reward += reward
        state = next_state
        if render:
            time.sleep(render_time)
            env.render()
            print(Action(action))            
            print(f"current episode: {episode}")
            print(f"current step: {current_step}")
            print(f"agent pos. x: {env.seeker[0]:>2d}, y: {env.seeker[1]:>2d}")
            print(f"goal pos.  x: {env.goal[0]:>2d}, y: {env.goal[1]:>2d}")
            print(f"total reward: {total_reward:>2d}")
            
        if current_step == T:
            break
    return experiences

#### update_policy: Update the policy by using Q-learning equation
#### train_policy: Iterate update_policy over the episodes.
#### evaluate_policy: Evaluate and visualize to see if the learning went well.

In [24]:
def update_policy(policy, trajectory, weight=0.1, discount_factor=0.9):
    random.shuffle(trajectory)
    for state, action, reward, next_state, done in trajectory:
        next_max = np.max(policy.state_action_table[next_state])
        value = policy.state_action_table[state][action]
        new_value = (1 - weight) * value + weight * (reward + discount_factor * next_max)
        policy.state_action_table[state][action] = new_value

def train_policy(env, policy, T=20, num_episodes=10000, weight=0.1, discount_factor=0.9, epsilon=0.5):
    for e in range(1, num_episodes+1):
        trajectory = rollout(env, policy, T, render=False, render_time=0.0, explore=True, epsilon=epsilon, episode=e)
        update_policy(policy, trajectory, weight, discount_factor)
        
    policy.save(e)
    return policy

def evaluate_policy(env, policy, T, npy_path, num_episodes=5):
    policy.load(npy_path)
    steps = 0
    total_reward_lst = []
    avg_score = 0
    for e in range(num_episodes):
        experiences = rollout(env, policy, T, render=True, render_time=0.1, explore=False, epsilon=0, episode=e)
        total_reward = 0
        for transition in experiences:
            total_reward += transition[2]
        total_reward_lst.append(total_reward)
        steps += len(experiences)
    
    avg_score = sum(total_reward_lst) / len(total_reward_lst)
    return steps / num_episodes, avg_score, total_reward_lst

In [25]:
env_config = {"grid_size": 8,
              "goal": (6, 6),
             "goal_reward": 1,
             "step_reward": -1,
             "out_reward": -5,
             "max_step": 40 }
env = Gridworld(env_config)
env.render()

|A| | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | |G| |
| | | | | | | | |



In [27]:
untrained_policy = Policy(env)
trained_policy = train_policy(env, untrained_policy, env_config["max_step"], num_episodes=10000)
print(np.round(trained_policy.state_action_table, 2))

[[-11.18  -6.86 -11.18  -6.86]
 [ -7.18  -6.51 -10.86  -6.51]
 [ -6.86  -6.13 -10.51  -6.13]
 [ -6.51  -5.7  -10.13  -5.7 ]
 [ -6.13  -5.22  -9.7   -5.22]
 [ -5.7   -4.69  -9.22  -4.69]
 [ -5.22  -5.22  -8.69  -4.1 ]
 [ -4.69  -9.22  -9.22  -4.69]
 [-10.86  -6.51  -7.18  -6.51]
 [ -6.86  -6.13  -6.86  -6.13]
 [ -6.51  -5.7   -6.51  -5.7 ]
 [ -6.13  -5.22  -6.13  -5.22]
 [ -5.7   -4.69  -5.7   -4.69]
 [ -5.22  -4.1   -5.22  -4.1 ]
 [ -4.69  -4.69  -4.69  -3.44]
 [ -4.1   -8.69  -5.22  -4.1 ]
 [-10.51  -6.13  -6.86  -6.13]
 [ -6.51  -5.7   -6.51  -5.7 ]
 [ -6.13  -5.22  -6.13  -5.22]
 [ -5.7   -4.69  -5.7   -4.69]
 [ -5.22  -4.1   -5.22  -4.1 ]
 [ -4.69  -3.44  -4.69  -3.44]
 [ -4.1   -4.1   -4.1   -2.71]
 [ -3.44  -8.1   -4.69  -3.44]
 [-10.13  -5.7   -6.51  -5.7 ]
 [ -6.13  -5.22  -6.13  -5.22]
 [ -5.7   -4.69  -5.7   -4.69]
 [ -5.22  -4.1   -5.22  -4.1 ]
 [ -4.69  -3.44  -4.69  -3.44]
 [ -4.1   -2.71  -4.1   -2.71]
 [ -3.44  -3.44  -3.44  -1.9 ]
 [ -2.71  -7.44  -4.1   -2.71]
 [ -9.7 

In [28]:
avg_steps, avg_score, total_reward_lst = evaluate_policy(env, trained_policy, env_config["max_step"], "policy10000.npy")

| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | | | |
| | | | | | |A| |
| | | | | | | | |

Action.DOWN
current episode: 4
current step: 12
agent pos. x:  6, y:  6
goal pos.  x:  6, y:  6
total reward: -11


In [29]:
print(f"avg_steps: {avg_steps}, avg_score: {avg_score}")


avg_steps: 12.0, avg_score: -11.0
